bolic-network 0.0.1

Modern network abstraction and tooling for building distributed systems
Documentation
use serde::{de::DeserializeOwned, Serialize};
use std::collections::HashMap;

use super::endpoint::{Endpoint, EndpointError, Receiver, Sender};
pub use super::transport::pool::AcceptAll;
use super::transport::pool::{AccessControl, Config, ConfigBuilder, DelayGenerator, Handshake, Pool, ID};
use super::Driver;
use std::hash::Hash;

#[derive(PartialEq, Eq, Hash, Clone, Copy)]
pub enum Transport {
    Tcp,
    Udp,
    WebSocket,
}

#[derive(Debug)]
pub enum HubError {
    InvalidAddr,
    PeerNotExist,
    FailedToCreateTransport,
    TransportNotExist,
    Pool(super::transport::pool::PoolError),
    Driver(super::DriverError),
    Endpoint(EndpointError),
}

impl From<super::DriverError> for HubError {
    fn from(err: super::DriverError) -> Self {
        HubError::Driver(err)
    }
}

impl From<EndpointError> for HubError {
    fn from(err: EndpointError) -> Self {
        HubError::Endpoint(err)
    }
}

impl From<super::transport::pool::PoolError> for HubError {
    fn from(err: super::transport::pool::PoolError) -> Self {
        HubError::Pool(err)
    }
}

#[derive(Clone)]
pub struct SharedReceiver(Endpoint);

impl SharedReceiver {
    pub fn lock(&self) -> Receiver {
        self.0.inbound()
    }

    pub async fn is_disconnected(&self) -> std::result::Result<bool, EndpointError> {
        self.0.is_disconnected().await
    }
}

pub type Result<T> = std::result::Result<T, HubError>;

pub struct Hub<I: ID + Serialize + DeserializeOwned> {
    id: I,
    pools: HashMap<Transport, Box<Pool<I>>>,
    endpoints: HashMap<(I, Transport), Endpoint>,
    driver: Driver,
}

impl<I: ID + Serialize + DeserializeOwned> Hub<I> {
    /// Enable one transport with the given pool. If transport exists, it will not be changed.
    pub fn enable_transport(&mut self, transport_id: Transport, pool: Box<Pool<I>>) -> &mut Self {
        self.pools.entry(transport_id).or_insert(pool);
        self
    }

    /// Add a new peer. If peer already exists, it functions the same ways as [set_peer_addr].
    pub async fn add_peer(&mut self, peer_id: I, transport: Transport, addrs: impl AsRef<[String]>) -> Result<()> {
        let pool = self.pools.get_mut(&transport).ok_or(HubError::TransportNotExist)?;
        pool.set_addr(peer_id.clone(), addrs.as_ref());
        let key = (peer_id.clone(), transport);
        let endpoints = &mut self.endpoints;
        if !endpoints.contains_key(&key) {
            let ptp = pool
                .new_remote(peer_id)
                .await
                .ok_or(HubError::FailedToCreateTransport)?;
            let ep = self.driver.create_endpoint().await?;
            ep.set_transport(ptp).await?;
            endpoints.insert(key, ep);
        }
        Ok(())
    }

    /// Remove a peer. This will disconnect to the peer in all transports and the peer cannot
    /// establish communication unless it is added back by `add_peer`.
    pub async fn remove_peer(&mut self, peer_id: I, transport: Transport) -> Result<()> {
        let endpoints = &mut self.endpoints;
        match endpoints.remove(&(peer_id, transport)) {
            Some(_) => Ok(()),
            None => Err(HubError::PeerNotExist),
        }
    }

    pub fn set_peer_addr(&mut self, peer_id: I, transport: Transport, addrs: impl AsRef<[String]>) -> Result<()> {
        let pool = self.pools.get_mut(&transport).ok_or(HubError::TransportNotExist)?;
        pool.set_addr(peer_id, addrs.as_ref());
        Ok(())
    }

    pub async fn set_retry_delay(
        &mut self, peer_id: I, transport: Transport, delay: Box<dyn DelayGenerator>,
    ) -> Result<()> {
        let pool = self.pools.get_mut(&transport).ok_or(HubError::TransportNotExist)?;
        Ok(pool.set_retry_delay(&peer_id, delay).await?)
    }

    /// Get the endpoint handle for the given peer, through the transport. `None` if the transport
    /// does not exist for the peer.
    #[inline(always)]
    fn endpoint(&self, peer_id: I, transport: Transport) -> Result<&Endpoint> {
        self.endpoints.get(&(peer_id, transport)).ok_or(HubError::PeerNotExist)
    }

    pub fn inbound(&self, peer_id: I, transport: Transport) -> Result<Receiver<'_>> {
        Ok(self.endpoint(peer_id, transport)?.inbound())
    }

    pub fn inbound_shared(&self, peer_id: I, transport: Transport) -> Result<SharedReceiver> {
        Ok(SharedReceiver(self.endpoint(peer_id, transport)?.clone()))
    }

    pub fn outbound(&self, peer_id: I, transport: Transport) -> Result<Sender> {
        Ok(self.endpoint(peer_id, transport)?.outbound())
    }

    pub async fn reset(&self, peer_id: I, transport: Transport) -> Result<()> {
        self.endpoint(peer_id, transport)?.reset().await?;
        Ok(())
    }

    pub fn get_id(&self) -> &I {
        &self.id
    }

    pub fn get_driver(&self) -> &Driver {
        &self.driver
    }
}

#[cfg(not(target_arch = "wasm32"))] use super::transport::TcpFactory;
#[cfg(not(target_arch = "wasm32"))] use super::transport::UdpFactory;
use super::transport::WSFactory;

pub struct HubBuilder<I: ID + Serialize + DeserializeOwned> {
    id: I,
    pools: HashMap<Transport, Box<Pool<I>>>,
}

impl<I: ID + Serialize + DeserializeOwned> HubBuilder<I> {
    pub fn new(id: I) -> Self {
        Self {
            id,
            pools: HashMap::new(),
        }
    }

    #[cfg(not(target_arch = "wasm32"))]
    pub fn default_driver<H: Handshake<I>>(self) -> Result<HubBuilderWithDriver<I, H>> {
        Ok(HubBuilderWithDriver::new(Driver::new(1)?, self))
    }

    #[cfg(target_arch = "wasm32")]
    pub async fn default_driver(self) -> Result<HubBuilderWithDriver<I, super::transport::pool::PlainHandshake<I>>> {
        Ok(HubBuilderWithDriver::new(Driver::new(1).await?, self))
    }

    pub fn driver<H: Handshake<I>>(self, driver: &Driver) -> HubBuilderWithDriver<I, H> {
        HubBuilderWithDriver::new(driver.clone(), self)
    }
}

pub struct HubBuilderWithDriver<I: ID + Serialize + DeserializeOwned, H: Handshake<I>> {
    inner: HubBuilder<I>,
    driver: crate::hub::Driver,
    config: Config,
    _handshake: std::marker::PhantomData<H>,
}

impl<I: ID + Serialize + DeserializeOwned, H: Handshake<I>> HubBuilderWithDriver<I, H> {
    fn new(driver: Driver, inner: HubBuilder<I>) -> Self {
        Self {
            driver,
            inner,
            config: ConfigBuilder::default().build().unwrap(),
            _handshake: std::marker::PhantomData,
        }
    }

    pub fn config(mut self, config: Config) -> Self {
        self.config = config;
        self
    }

    pub fn retry_delay(mut self, delay: std::ops::Range<std::time::Duration>) -> Self {
        self.config.retry_delay = delay;
        self
    }

    #[cfg(not(target_arch = "wasm32"))]
    pub fn tcp<A: AccessControl<I>>(mut self, bind_addr: Option<&str>, access_control: A) -> Result<Self> {
        // channel between pool and peer to communicate discovered but unknown remotes
        let pool = Box::new(Pool::new::<H, _, A>(
            self.inner.id.clone(),
            TcpFactory::new(match bind_addr {
                Some(a) => Some(a.parse().map_err(|_| HubError::InvalidAddr)?),
                None => None,
            }),
            access_control,
            &self.driver,
            self.config.clone(),
        ));
        self.inner.pools.insert(Transport::Tcp, pool);
        Ok(self)
    }

    #[cfg(not(target_arch = "wasm32"))]
    pub fn udp<A: AccessControl<I>>(mut self, bind_addr: Option<&str>, access_control: A) -> Result<Self> {
        let pool = Box::new(Pool::new::<H, _, A>(
            self.inner.id.clone(),
            UdpFactory::new(match bind_addr {
                Some(a) => Some(a.parse().map_err(|_| HubError::InvalidAddr)?),
                None => None,
            }),
            access_control,
            &self.driver,
            self.config.clone(),
        ));
        self.inner.pools.insert(Transport::Udp, pool);
        Ok(self)
    }

    pub fn websocket<A: AccessControl<I>>(mut self, bind_addr: Option<&str>, access_control: A) -> Result<Self> {
        let pool = Box::new(Pool::new::<H, _, A>(
            self.inner.id.clone(),
            WSFactory::new(match bind_addr {
                Some(a) => Some(a.parse().map_err(|_| HubError::InvalidAddr)?),
                None => None,
            }),
            access_control,
            &self.driver,
            self.config.clone(),
        ));
        self.inner.pools.insert(Transport::WebSocket, pool);
        Ok(self)
    }

    pub fn build(self) -> Result<Hub<I>> {
        Ok(Hub {
            id: self.inner.id,
            endpoints: HashMap::new(),
            driver: self.driver,
            pools: self.inner.pools,
        })
    }
}