use serde::{de::DeserializeOwned, Serialize};
use std::collections::HashMap;
use super::endpoint::{Endpoint, EndpointError, Receiver, Sender};
pub use super::transport::pool::AcceptAll;
use super::transport::pool::{AccessControl, Config, ConfigBuilder, DelayGenerator, Handshake, Pool, ID};
use super::Driver;
use std::hash::Hash;
#[derive(PartialEq, Eq, Hash, Clone, Copy)]
pub enum Transport {
Tcp,
Udp,
WebSocket,
}
#[derive(Debug)]
pub enum HubError {
InvalidAddr,
PeerNotExist,
FailedToCreateTransport,
TransportNotExist,
Pool(super::transport::pool::PoolError),
Driver(super::DriverError),
Endpoint(EndpointError),
}
impl From<super::DriverError> for HubError {
fn from(err: super::DriverError) -> Self {
HubError::Driver(err)
}
}
impl From<EndpointError> for HubError {
fn from(err: EndpointError) -> Self {
HubError::Endpoint(err)
}
}
impl From<super::transport::pool::PoolError> for HubError {
fn from(err: super::transport::pool::PoolError) -> Self {
HubError::Pool(err)
}
}
#[derive(Clone)]
pub struct SharedReceiver(Endpoint);
impl SharedReceiver {
pub fn lock(&self) -> Receiver {
self.0.inbound()
}
pub async fn is_disconnected(&self) -> std::result::Result<bool, EndpointError> {
self.0.is_disconnected().await
}
}
pub type Result<T> = std::result::Result<T, HubError>;
pub struct Hub<I: ID + Serialize + DeserializeOwned> {
id: I,
pools: HashMap<Transport, Box<Pool<I>>>,
endpoints: HashMap<(I, Transport), Endpoint>,
driver: Driver,
}
impl<I: ID + Serialize + DeserializeOwned> Hub<I> {
pub fn enable_transport(&mut self, transport_id: Transport, pool: Box<Pool<I>>) -> &mut Self {
self.pools.entry(transport_id).or_insert(pool);
self
}
pub async fn add_peer(&mut self, peer_id: I, transport: Transport, addrs: impl AsRef<[String]>) -> Result<()> {
let pool = self.pools.get_mut(&transport).ok_or(HubError::TransportNotExist)?;
pool.set_addr(peer_id.clone(), addrs.as_ref());
let key = (peer_id.clone(), transport);
let endpoints = &mut self.endpoints;
if !endpoints.contains_key(&key) {
let ptp = pool
.new_remote(peer_id)
.await
.ok_or(HubError::FailedToCreateTransport)?;
let ep = self.driver.create_endpoint().await?;
ep.set_transport(ptp).await?;
endpoints.insert(key, ep);
}
Ok(())
}
pub async fn remove_peer(&mut self, peer_id: I, transport: Transport) -> Result<()> {
let endpoints = &mut self.endpoints;
match endpoints.remove(&(peer_id, transport)) {
Some(_) => Ok(()),
None => Err(HubError::PeerNotExist),
}
}
pub fn set_peer_addr(&mut self, peer_id: I, transport: Transport, addrs: impl AsRef<[String]>) -> Result<()> {
let pool = self.pools.get_mut(&transport).ok_or(HubError::TransportNotExist)?;
pool.set_addr(peer_id, addrs.as_ref());
Ok(())
}
pub async fn set_retry_delay(
&mut self, peer_id: I, transport: Transport, delay: Box<dyn DelayGenerator>,
) -> Result<()> {
let pool = self.pools.get_mut(&transport).ok_or(HubError::TransportNotExist)?;
Ok(pool.set_retry_delay(&peer_id, delay).await?)
}
#[inline(always)]
fn endpoint(&self, peer_id: I, transport: Transport) -> Result<&Endpoint> {
self.endpoints.get(&(peer_id, transport)).ok_or(HubError::PeerNotExist)
}
pub fn inbound(&self, peer_id: I, transport: Transport) -> Result<Receiver<'_>> {
Ok(self.endpoint(peer_id, transport)?.inbound())
}
pub fn inbound_shared(&self, peer_id: I, transport: Transport) -> Result<SharedReceiver> {
Ok(SharedReceiver(self.endpoint(peer_id, transport)?.clone()))
}
pub fn outbound(&self, peer_id: I, transport: Transport) -> Result<Sender> {
Ok(self.endpoint(peer_id, transport)?.outbound())
}
pub async fn reset(&self, peer_id: I, transport: Transport) -> Result<()> {
self.endpoint(peer_id, transport)?.reset().await?;
Ok(())
}
pub fn get_id(&self) -> &I {
&self.id
}
pub fn get_driver(&self) -> &Driver {
&self.driver
}
}
#[cfg(not(target_arch = "wasm32"))] use super::transport::TcpFactory;
#[cfg(not(target_arch = "wasm32"))] use super::transport::UdpFactory;
use super::transport::WSFactory;
pub struct HubBuilder<I: ID + Serialize + DeserializeOwned> {
id: I,
pools: HashMap<Transport, Box<Pool<I>>>,
}
impl<I: ID + Serialize + DeserializeOwned> HubBuilder<I> {
pub fn new(id: I) -> Self {
Self {
id,
pools: HashMap::new(),
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn default_driver<H: Handshake<I>>(self) -> Result<HubBuilderWithDriver<I, H>> {
Ok(HubBuilderWithDriver::new(Driver::new(1)?, self))
}
#[cfg(target_arch = "wasm32")]
pub async fn default_driver(self) -> Result<HubBuilderWithDriver<I, super::transport::pool::PlainHandshake<I>>> {
Ok(HubBuilderWithDriver::new(Driver::new(1).await?, self))
}
pub fn driver<H: Handshake<I>>(self, driver: &Driver) -> HubBuilderWithDriver<I, H> {
HubBuilderWithDriver::new(driver.clone(), self)
}
}
pub struct HubBuilderWithDriver<I: ID + Serialize + DeserializeOwned, H: Handshake<I>> {
inner: HubBuilder<I>,
driver: crate::hub::Driver,
config: Config,
_handshake: std::marker::PhantomData<H>,
}
impl<I: ID + Serialize + DeserializeOwned, H: Handshake<I>> HubBuilderWithDriver<I, H> {
fn new(driver: Driver, inner: HubBuilder<I>) -> Self {
Self {
driver,
inner,
config: ConfigBuilder::default().build().unwrap(),
_handshake: std::marker::PhantomData,
}
}
pub fn config(mut self, config: Config) -> Self {
self.config = config;
self
}
pub fn retry_delay(mut self, delay: std::ops::Range<std::time::Duration>) -> Self {
self.config.retry_delay = delay;
self
}
#[cfg(not(target_arch = "wasm32"))]
pub fn tcp<A: AccessControl<I>>(mut self, bind_addr: Option<&str>, access_control: A) -> Result<Self> {
let pool = Box::new(Pool::new::<H, _, A>(
self.inner.id.clone(),
TcpFactory::new(match bind_addr {
Some(a) => Some(a.parse().map_err(|_| HubError::InvalidAddr)?),
None => None,
}),
access_control,
&self.driver,
self.config.clone(),
));
self.inner.pools.insert(Transport::Tcp, pool);
Ok(self)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn udp<A: AccessControl<I>>(mut self, bind_addr: Option<&str>, access_control: A) -> Result<Self> {
let pool = Box::new(Pool::new::<H, _, A>(
self.inner.id.clone(),
UdpFactory::new(match bind_addr {
Some(a) => Some(a.parse().map_err(|_| HubError::InvalidAddr)?),
None => None,
}),
access_control,
&self.driver,
self.config.clone(),
));
self.inner.pools.insert(Transport::Udp, pool);
Ok(self)
}
pub fn websocket<A: AccessControl<I>>(mut self, bind_addr: Option<&str>, access_control: A) -> Result<Self> {
let pool = Box::new(Pool::new::<H, _, A>(
self.inner.id.clone(),
WSFactory::new(match bind_addr {
Some(a) => Some(a.parse().map_err(|_| HubError::InvalidAddr)?),
None => None,
}),
access_control,
&self.driver,
self.config.clone(),
));
self.inner.pools.insert(Transport::WebSocket, pool);
Ok(self)
}
pub fn build(self) -> Result<Hub<I>> {
Ok(Hub {
id: self.inner.id,
endpoints: HashMap::new(),
driver: self.driver,
pools: self.inner.pools,
})
}
}