bolic-network 0.0.1

Modern network abstraction and tooling for building distributed systems
Documentation
use async_trait::async_trait;
use mio::net::TcpStream as MIOTcpStream;
use parking_lot::Mutex;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream as TokioTcpStream};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use super::pool::{AddressedStreamFactory, NonBlockingStream};
use super::TransportError;
use crate::hub::event::IOSource;
use crate::hub::utils::error;
use bytes::Bytes;

type MIOMessageStream = super::stream::MessageStream<MIOTcpStream, super::stream::SimpleTokenizer>;

impl NonBlockingStream for MIOMessageStream {
    fn try_recv(&mut self) -> Result<Bytes, TransportError> {
        self.try_recv()
    }

    fn try_send(&mut self, data: Option<Bytes>) -> Result<bool, TransportError> {
        self.try_send(data)
    }

    fn source(&mut self) -> IOSource {
        IOSource::MIO(self.as_inner_mut())
    }

    fn shutdown(&mut self, how: std::net::Shutdown) -> io::Result<()> {
        self.as_inner().shutdown(how)
    }
}

struct FactoryInner {
    listen_handle: Option<JoinHandle<()>>,
    accepted_stream: Mutex<mpsc::Receiver<MIOMessageStream>>,
}

impl Drop for FactoryInner {
    fn drop(&mut self) {
        if let Some(h) = self.listen_handle.take() {
            h.abort()
        }
    }
}

#[derive(Clone)]
pub struct Factory(Arc<FactoryInner>);

impl Factory {
    pub fn new(listen_addr: Option<SocketAddr>) -> Self {
        let (tx, accepted_stream) = mpsc::channel(1);
        let listen_handle = listen_addr.map(|listen_addr| {
            tokio::spawn(async move {
                let listener = match TcpListener::bind(listen_addr).await {
                    Ok(l) => l,
                    Err(e) => {
                        error!("[Tcp] failed to bind to address {}: {}", listen_addr, e);
                        return
                    }
                };
                loop {
                    if let Ok((stream, _)) = listener.accept().await {
                        tx.send(MIOMessageStream::new(tokio_to_mio_stream(stream), 65536))
                            .await
                            .ok();
                    }
                }
            })
        });
        Self(Arc::new(FactoryInner {
            accepted_stream: Mutex::new(accepted_stream),
            listen_handle,
        }))
    }
}

#[async_trait]
impl AddressedStreamFactory for Factory {
    async fn create_stream(&self, addr: &str) -> Option<Box<dyn NonBlockingStream>> {
        let addr: SocketAddr = addr.parse().ok()?;
        let tcp_stream = TokioTcpStream::connect(addr).await.ok()?;
        Some(Box::new(MIOMessageStream::new(tokio_to_mio_stream(tcp_stream), 65536)))
    }

    async fn discover_stream(&self) -> Box<dyn NonBlockingStream> {
        match self.0.accepted_stream.lock().recv().await {
            None => futures::future::pending().await,
            Some(s) => Box::new(s),
        }
    }
}

pub(crate) fn tokio_to_mio_stream(src: TokioTcpStream) -> MIOTcpStream {
    let stream = src.into_std().unwrap();
    MIOTcpStream::from_std(stream)
}