bolic-network 0.0.1

Modern network abstraction and tooling for building distributed systems
Documentation
#![allow(unused_imports)]
#![allow(dead_code)]
use async_trait::async_trait;
use bytes::Bytes;
use mio::net::UdpSocket as MIOUdpStream;
use parking_lot::{Mutex, RwLock};
use ring::rand::*;
use serde::{de::DeserializeOwned, Serialize};
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket as TokioUdpStream;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use super::pool::{NonBlockingStream, PoolCore, PoolError, StreamFactory, ID};
use super::udp::{create_udp_socket, tokio_to_mio_stream};
use super::{Transport, TransportError};
use crate::hub::event::IOSource;
use crate::hub::utils::error;

const MAX_UDP_PAYLOAD_SIZE: usize = 65507;
const INIT_MESSAGE: &[u8] = b"init";

struct QuicStream {
    conn: quiche::Connection,
    udp_socket: MIOUdpStream,
}

impl NonBlockingStream for QuicStream {
    fn try_recv(&mut self) -> Result<Bytes, TransportError> {
        let mut buffer = [0; MAX_UDP_PAYLOAD_SIZE];
        match self.conn.dgram_recv(&mut buffer) {
            Ok(n) => Ok(Bytes::copy_from_slice(&buffer[..n])),
            Err(_) => Err(TransportError::NotReady),
        }
    }

    fn try_send(&mut self, data: Option<Bytes>) -> Result<bool, TransportError> {
        let data = match data {
            Some(d) => d,
            None => return Ok(false),
        };

        let slice = &data;
        let mut vec = slice.to_vec();

        println!("Is established: {:?}", self.conn.is_established());
        match self.conn.send(&mut vec) {
            Ok(_) => {
                println!("sent");
                Ok(true)
            }
            Err(e) => {
                println!("err: {:?}", e);
                Err(TransportError::NotReady)
            }
        }
    }

    fn source(&mut self) -> IOSource {
        IOSource::MIO(&mut self.udp_socket)
    }

    fn shutdown(&mut self, _how: std::net::Shutdown) -> io::Result<()> {
        Ok(())
    }
}

struct FactoryInner<I: ID> {
    address_book: RwLock<HashMap<I, Vec<SocketAddr>>>,
    listen_handle: Option<JoinHandle<()>>,
    accepted_stream: Arc<Mutex<mpsc::Receiver<QuicStream>>>,
}

impl<I: ID> Drop for FactoryInner<I> {
    fn drop(&mut self) {
        if let Some(h) = self.listen_handle.take() {
            h.abort()
        }
    }
}

#[derive(Clone)]
struct Factory<I: ID>(Arc<FactoryInner<I>>);

impl<I: ID> Factory<I> {
    fn new(listen_addr: Option<SocketAddr>) -> Self {
        let (tx, accepted_stream) = mpsc::channel(1);
        let listen_handle = listen_addr.map(|listen_addr| {
            tokio::spawn(async move {
                let listener = match TokioUdpStream::bind(listen_addr).await {
                    Ok(l) => l,
                    Err(e) => {
                        error!("[Quic] failed to bind to address {}: {}", listen_addr, e);
                        return
                    }
                };
                println!("listening on {}", listener.local_addr().unwrap());

                loop {
                    // Create a new udp socket for each incoming udp client
                    let mut buf = [0u8; INIT_MESSAGE.len()];

                    if let Ok((_, client_addr)) = listener.recv_from(&mut buf).await {
                        println!("received: {:?}", &buf[..]);
                        if INIT_MESSAGE != &buf[..] {
                            continue
                        }
                        let udp_socket = match create_udp_socket().await {
                            Ok(s) => s,
                            Err(e) => {
                                error!("[Quic] failed to create udp socket: {}", e);
                                return
                            }
                        };
                        udp_socket.connect(client_addr).await.unwrap();
                        udp_socket.send(INIT_MESSAGE).await.unwrap();
                        let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
                        config.set_max_recv_udp_payload_size(1200);
                        config
                            .load_cert_chain_from_pem_file("src/hub/transport/cert.crt")
                            .unwrap();
                        config
                            .load_priv_key_from_pem_file("src/hub/transport/cert.key")
                            .unwrap();
                        config.set_application_protos(&[b"bolic/1.0"]).unwrap();
                        config.enable_dgram(true, 1000, 1000);

                        let mut buffer = [0u8; MAX_UDP_PAYLOAD_SIZE];

                        if let Ok((len, client_addr)) = udp_socket.recv_from(&mut buffer).await {
                            println!("server {}: Received {} bytes from {} ", listen_addr, len, client_addr);

                            let hdr = match quiche::Header::from_slice(&mut buffer, quiche::MAX_CONN_ID_LEN) {
                                Ok(v) => v,

                                Err(e) => {
                                    error!("[Quic] fail to parsing packet header: {:?}", e);
                                    continue
                                }
                            };

                            if hdr.ty != quiche::Type::Initial {
                                error!("[Quic server] Packet type is not Initial");
                                continue
                            }

                            let conn = match quiche::accept(&hdr.scid, None, listen_addr, client_addr, &mut config) {
                                Ok(v) => {
                                    println!("accepted");
                                    v
                                }
                                Err(e) => {
                                    error!("[Quic] failed to create quiche connection: {:?}", e);
                                    return
                                }
                            };

                            let stream = QuicStream {
                                conn,
                                udp_socket: tokio_to_mio_stream(udp_socket),
                            };

                            tx.send(stream).await.ok();
                        }
                    }
                }
            })
        });
        Self(Arc::new(FactoryInner {
            address_book: RwLock::new(HashMap::new()),
            accepted_stream: Arc::new(Mutex::new(accepted_stream)),
            listen_handle,
        }))
    }

    fn set_addr(&self, id: I, addr: Vec<SocketAddr>) {
        self.0.address_book.write().insert(id, addr);
    }

    fn remove_addr(&self, id: &I) {
        self.0.address_book.write().remove(id);
    }
}

#[async_trait]
impl<I: ID> StreamFactory<I> for Factory<I> {
    async fn create_stream(&self, remote_id: I) -> Option<Box<dyn NonBlockingStream>> {
        use futures::stream::StreamExt;
        let addrs = self.0.address_book.read().get(&remote_id)?.clone();
        let mut stream = Box::pin(futures::stream::iter(addrs).then(
            |remote_addr| async move {
                let udp_socket = match create_udp_socket().await {
                    Ok(s) => s,
                    Err(e) => {
                        error!("[Quic] failed to create udp socket: {}", e);
                        return None
                    }
                };

                let mut buf = [0u8; INIT_MESSAGE.len()];
                if let Ok(_) = udp_socket.send_to(INIT_MESSAGE, remote_addr).await {
                    match udp_socket.recv_from(&mut buf).await {
                        Ok((_, server_socket)) => {
                            if INIT_MESSAGE == &buf[..] {
                                match udp_socket.connect(server_socket).await {
                                    Ok(_) => {
                                        let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
                                        config.verify_peer(false);
                                        config.set_application_protos(&[b"bolic/1.0"]).unwrap();
                                        config.enable_dgram(true, 1000, 1000);
                                        // Generate a random source connection ID for the connection.
                                        let mut scid = [0; quiche::MAX_CONN_ID_LEN];
                                        let rng = SystemRandom::new();
                                        rng.fill(&mut scid[..]).unwrap();
                                        let scid = quiche::ConnectionId::from_ref(&scid);

                                        let mut conn = match quiche::connect(
                                            None,
                                            &scid,
                                            udp_socket.local_addr().unwrap(),
                                            server_socket,
                                            &mut config,
                                        ) {
                                            Ok(v) => v,
                                            Err(e) => {
                                                error!("[Quic] failed to create quiche connection: {:?}", e);
                                                return None
                                            }
                                        };

                                        println!("quic client: {:?}", conn.stats());

                                        let mut out = [0; 65507];
                                        let (write, send_info) = match conn.send(&mut out) {
                                            Ok(v) => v,
                                            Err(quiche::Error::Done) => return None,
                                            Err(e) => {
                                                println!("send failed: {:?}", e);
                                                return None
                                            }
                                        };

                                        println!("quic sent info: {:?}", send_info);
                                        println!(
                                            "[Client] connecting to {:} from {:} with scid {:?}",
                                            remote_addr,
                                            udp_socket.local_addr().unwrap(),
                                            scid,
                                        );
                                        println!("[Client] status of connection: {:?}", conn.stats());

                                        udp_socket.send(&out[..write]).await.unwrap();

                                        Some(QuicStream {
                                            conn,
                                            udp_socket: tokio_to_mio_stream(udp_socket),
                                        })
                                    }
                                    Err(_) => None,
                                }
                            } else {
                                None
                            }
                        }
                        Err(_) => None,
                    }
                } else {
                    None
                }
            },
        ));
        loop {
            match stream.next().await {
                Some(s) => return Some(Box::new(s?)),
                None => (),
            }
        }
    }

    async fn discover_stream(&self) -> Box<dyn NonBlockingStream> {
        match self.0.accepted_stream.lock().recv().await {
            None => futures::future::pending().await,
            Some(s) => Box::new(s),
        }
    }
}

/// [Pool] manages a pool of udp streams indexed by a custom [ID] type.
pub struct Pool<I: ID + DeserializeOwned + Serialize> {
    factory: Factory<I>,
    registry: PoolCore<I>,
}

impl<I: ID + DeserializeOwned + Serialize> Pool<I> {
    pub fn new<H: super::pool::Handshake<I>>(
        local_id: I, listen_addr: Option<&str>, driver: &crate::hub::Driver,
    ) -> Result<Self, PoolError> {
        let factory = Factory::new(match listen_addr {
            Some(a) => Some(a.parse().map_err(|_| PoolError::InvalidAddr)?),
            None => None,
        });
        let config = super::pool::ConfigBuilder::default().build().unwrap();
        Ok(Self {
            factory: factory.clone(),
            registry: PoolCore::new::<H, Factory<I>>(local_id, factory, driver, config),
        })
    }
}