#![allow(unused_imports)]
#![allow(dead_code)]
use async_trait::async_trait;
use bytes::Bytes;
use mio::net::UdpSocket as MIOUdpStream;
use parking_lot::{Mutex, RwLock};
use ring::rand::*;
use serde::{de::DeserializeOwned, Serialize};
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket as TokioUdpStream;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use super::pool::{NonBlockingStream, PoolCore, PoolError, StreamFactory, ID};
use super::udp::{create_udp_socket, tokio_to_mio_stream};
use super::{Transport, TransportError};
use crate::hub::event::IOSource;
use crate::hub::utils::error;
const MAX_UDP_PAYLOAD_SIZE: usize = 65507;
const INIT_MESSAGE: &[u8] = b"init";
struct QuicStream {
conn: quiche::Connection,
udp_socket: MIOUdpStream,
}
impl NonBlockingStream for QuicStream {
fn try_recv(&mut self) -> Result<Bytes, TransportError> {
let mut buffer = [0; MAX_UDP_PAYLOAD_SIZE];
match self.conn.dgram_recv(&mut buffer) {
Ok(n) => Ok(Bytes::copy_from_slice(&buffer[..n])),
Err(_) => Err(TransportError::NotReady),
}
}
fn try_send(&mut self, data: Option<Bytes>) -> Result<bool, TransportError> {
let data = match data {
Some(d) => d,
None => return Ok(false),
};
let slice = &data;
let mut vec = slice.to_vec();
println!("Is established: {:?}", self.conn.is_established());
match self.conn.send(&mut vec) {
Ok(_) => {
println!("sent");
Ok(true)
}
Err(e) => {
println!("err: {:?}", e);
Err(TransportError::NotReady)
}
}
}
fn source(&mut self) -> IOSource {
IOSource::MIO(&mut self.udp_socket)
}
fn shutdown(&mut self, _how: std::net::Shutdown) -> io::Result<()> {
Ok(())
}
}
struct FactoryInner<I: ID> {
address_book: RwLock<HashMap<I, Vec<SocketAddr>>>,
listen_handle: Option<JoinHandle<()>>,
accepted_stream: Arc<Mutex<mpsc::Receiver<QuicStream>>>,
}
impl<I: ID> Drop for FactoryInner<I> {
fn drop(&mut self) {
if let Some(h) = self.listen_handle.take() {
h.abort()
}
}
}
#[derive(Clone)]
struct Factory<I: ID>(Arc<FactoryInner<I>>);
impl<I: ID> Factory<I> {
fn new(listen_addr: Option<SocketAddr>) -> Self {
let (tx, accepted_stream) = mpsc::channel(1);
let listen_handle = listen_addr.map(|listen_addr| {
tokio::spawn(async move {
let listener = match TokioUdpStream::bind(listen_addr).await {
Ok(l) => l,
Err(e) => {
error!("[Quic] failed to bind to address {}: {}", listen_addr, e);
return
}
};
println!("listening on {}", listener.local_addr().unwrap());
loop {
let mut buf = [0u8; INIT_MESSAGE.len()];
if let Ok((_, client_addr)) = listener.recv_from(&mut buf).await {
println!("received: {:?}", &buf[..]);
if INIT_MESSAGE != &buf[..] {
continue
}
let udp_socket = match create_udp_socket().await {
Ok(s) => s,
Err(e) => {
error!("[Quic] failed to create udp socket: {}", e);
return
}
};
udp_socket.connect(client_addr).await.unwrap();
udp_socket.send(INIT_MESSAGE).await.unwrap();
let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
config.set_max_recv_udp_payload_size(1200);
config
.load_cert_chain_from_pem_file("src/hub/transport/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("src/hub/transport/cert.key")
.unwrap();
config.set_application_protos(&[b"bolic/1.0"]).unwrap();
config.enable_dgram(true, 1000, 1000);
let mut buffer = [0u8; MAX_UDP_PAYLOAD_SIZE];
if let Ok((len, client_addr)) = udp_socket.recv_from(&mut buffer).await {
println!("server {}: Received {} bytes from {} ", listen_addr, len, client_addr);
let hdr = match quiche::Header::from_slice(&mut buffer, quiche::MAX_CONN_ID_LEN) {
Ok(v) => v,
Err(e) => {
error!("[Quic] fail to parsing packet header: {:?}", e);
continue
}
};
if hdr.ty != quiche::Type::Initial {
error!("[Quic server] Packet type is not Initial");
continue
}
let conn = match quiche::accept(&hdr.scid, None, listen_addr, client_addr, &mut config) {
Ok(v) => {
println!("accepted");
v
}
Err(e) => {
error!("[Quic] failed to create quiche connection: {:?}", e);
return
}
};
let stream = QuicStream {
conn,
udp_socket: tokio_to_mio_stream(udp_socket),
};
tx.send(stream).await.ok();
}
}
}
})
});
Self(Arc::new(FactoryInner {
address_book: RwLock::new(HashMap::new()),
accepted_stream: Arc::new(Mutex::new(accepted_stream)),
listen_handle,
}))
}
fn set_addr(&self, id: I, addr: Vec<SocketAddr>) {
self.0.address_book.write().insert(id, addr);
}
fn remove_addr(&self, id: &I) {
self.0.address_book.write().remove(id);
}
}
#[async_trait]
impl<I: ID> StreamFactory<I> for Factory<I> {
async fn create_stream(&self, remote_id: I) -> Option<Box<dyn NonBlockingStream>> {
use futures::stream::StreamExt;
let addrs = self.0.address_book.read().get(&remote_id)?.clone();
let mut stream = Box::pin(futures::stream::iter(addrs).then(
|remote_addr| async move {
let udp_socket = match create_udp_socket().await {
Ok(s) => s,
Err(e) => {
error!("[Quic] failed to create udp socket: {}", e);
return None
}
};
let mut buf = [0u8; INIT_MESSAGE.len()];
if let Ok(_) = udp_socket.send_to(INIT_MESSAGE, remote_addr).await {
match udp_socket.recv_from(&mut buf).await {
Ok((_, server_socket)) => {
if INIT_MESSAGE == &buf[..] {
match udp_socket.connect(server_socket).await {
Ok(_) => {
let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
config.verify_peer(false);
config.set_application_protos(&[b"bolic/1.0"]).unwrap();
config.enable_dgram(true, 1000, 1000);
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
let rng = SystemRandom::new();
rng.fill(&mut scid[..]).unwrap();
let scid = quiche::ConnectionId::from_ref(&scid);
let mut conn = match quiche::connect(
None,
&scid,
udp_socket.local_addr().unwrap(),
server_socket,
&mut config,
) {
Ok(v) => v,
Err(e) => {
error!("[Quic] failed to create quiche connection: {:?}", e);
return None
}
};
println!("quic client: {:?}", conn.stats());
let mut out = [0; 65507];
let (write, send_info) = match conn.send(&mut out) {
Ok(v) => v,
Err(quiche::Error::Done) => return None,
Err(e) => {
println!("send failed: {:?}", e);
return None
}
};
println!("quic sent info: {:?}", send_info);
println!(
"[Client] connecting to {:} from {:} with scid {:?}",
remote_addr,
udp_socket.local_addr().unwrap(),
scid,
);
println!("[Client] status of connection: {:?}", conn.stats());
udp_socket.send(&out[..write]).await.unwrap();
Some(QuicStream {
conn,
udp_socket: tokio_to_mio_stream(udp_socket),
})
}
Err(_) => None,
}
} else {
None
}
}
Err(_) => None,
}
} else {
None
}
},
));
loop {
match stream.next().await {
Some(s) => return Some(Box::new(s?)),
None => (),
}
}
}
async fn discover_stream(&self) -> Box<dyn NonBlockingStream> {
match self.0.accepted_stream.lock().recv().await {
None => futures::future::pending().await,
Some(s) => Box::new(s),
}
}
}
pub struct Pool<I: ID + DeserializeOwned + Serialize> {
factory: Factory<I>,
registry: PoolCore<I>,
}
impl<I: ID + DeserializeOwned + Serialize> Pool<I> {
pub fn new<H: super::pool::Handshake<I>>(
local_id: I, listen_addr: Option<&str>, driver: &crate::hub::Driver,
) -> Result<Self, PoolError> {
let factory = Factory::new(match listen_addr {
Some(a) => Some(a.parse().map_err(|_| PoolError::InvalidAddr)?),
None => None,
});
let config = super::pool::ConfigBuilder::default().build().unwrap();
Ok(Self {
factory: factory.clone(),
registry: PoolCore::new::<H, Factory<I>>(local_id, factory, driver, config),
})
}
}