use std::{sync::Arc, time::Duration};
use futures::lock::Mutex;
use multiaddr::Multiaddr;
use crate::{
book::{peerbook_syscall::DriverPeerBook, MemoryPeerBook, PeerBook},
connector_syscall::DriverConnector,
event_syscall::DriverEventMediator,
keystore::{keystore_syscall::DriverKeyStore, KeyStore, MemoryKeyStore},
stream_syscall::DriverStreamDispatcher,
transport::{transport_syscall::DriverTransport, Transport},
ConnPool, Connector, Error, EventMediator, MutexStreamDispatcher, Result, StreamDispatcher,
SyncEventMediator,
};
use super::{mutable::MutableSwitch, Switch};
pub struct SwitchOptions {
pub max_observed_addrs_len: usize,
pub connect_replication: usize,
pub timeout: Duration,
pub agent_version: String,
pub max_packet_size: usize,
pub transports: Vec<Transport>,
pub keystore: KeyStore,
pub peer_book: PeerBook,
pub connector: Connector,
pub stream_dispatcher: StreamDispatcher,
pub event_mediator: EventMediator,
}
impl SwitchOptions {
pub(super) fn new(agent_version: String) -> Self {
Self {
max_observed_addrs_len: 5,
connect_replication: 3,
agent_version,
timeout: Duration::from_secs(5),
max_packet_size: 1024 * 1024 * 4,
transports: vec![],
keystore: MemoryKeyStore::random().into(),
peer_book: MemoryPeerBook::default().into(),
connector: ConnPool::default().into(),
stream_dispatcher: MutexStreamDispatcher::default().into(),
event_mediator: SyncEventMediator::default().into(),
}
}
pub(super) fn get_transport_by_address(&self, laddr: &Multiaddr) -> Option<&Transport> {
self.transports
.iter()
.find(|transport| transport.multiaddr_hint(laddr))
}
}
struct SwitchBuilderInner {
laddrs: Vec<Multiaddr>,
immutable: SwitchOptions,
}
pub struct SwitchBuilder {
ops: Result<SwitchBuilderInner>,
}
impl SwitchBuilder {
pub(super) fn new(agent_version: String) -> Self {
Self {
ops: Ok(SwitchBuilderInner {
laddrs: Default::default(),
immutable: SwitchOptions::new(agent_version),
}),
}
}
pub fn connector<C>(self, value: C) -> Self
where
C: DriverConnector + 'static,
{
self.and_then(|mut cfg| {
cfg.immutable.connector = value.into();
Ok(cfg)
})
}
pub fn event_mediator<K>(self, value: K) -> Self
where
K: DriverEventMediator + 'static,
{
self.and_then(|mut cfg| {
cfg.immutable.event_mediator = value.into();
Ok(cfg)
})
}
pub fn keystore<K>(self, value: K) -> Self
where
K: DriverKeyStore + 'static,
{
self.and_then(|mut cfg| {
cfg.immutable.keystore = value.into();
Ok(cfg)
})
}
pub fn stream_dispatcher<K>(self, value: K) -> Self
where
K: DriverStreamDispatcher + 'static,
{
self.and_then(|mut cfg| {
cfg.immutable.stream_dispatcher = value.into();
Ok(cfg)
})
}
pub fn peer_book<R>(self, value: R) -> Self
where
R: DriverPeerBook + 'static,
{
self.and_then(|mut cfg| {
cfg.immutable.peer_book = value.into();
Ok(cfg)
})
}
pub fn timeout(self, duration: Duration) -> Self {
self.and_then(|mut cfg| {
cfg.immutable.timeout = duration;
Ok(cfg)
})
}
pub fn max_observed_addrs_len(self, value: usize) -> Self {
self.and_then(|mut cfg| {
cfg.immutable.max_observed_addrs_len = value;
Ok(cfg)
})
}
pub fn max_packet_size(self, value: usize) -> Self {
self.and_then(|mut cfg| {
cfg.immutable.max_packet_size = value;
Ok(cfg)
})
}
pub fn connect_replication(self, value: usize) -> Self {
self.and_then(|mut cfg| {
cfg.immutable.connect_replication = value;
Ok(cfg)
})
}
pub fn transport<T>(self, value: T) -> Self
where
T: DriverTransport + 'static,
{
self.and_then(|mut cfg| {
cfg.immutable.transports.push(value.into());
Ok(cfg)
})
}
pub fn transport_bind<I, E>(self, laddrs: I) -> Self
where
I: IntoIterator,
I::Item: TryInto<Multiaddr, Error = E>,
Error: From<E>,
{
self.and_then(|mut cfg| {
cfg.laddrs = laddrs
.into_iter()
.map(|item| item.try_into().map_err(|err| err.into()))
.collect::<Result<Vec<Multiaddr>>>()?;
Ok(cfg)
})
}
pub async fn create(self) -> Result<Switch> {
let ops = self.ops?;
if ops.immutable.transports.is_empty() {
return Err(Error::NullTransportStack);
}
let public_key = ops.immutable.keystore.public_key().await?;
let switch = Switch {
local_peer_id: Arc::new(public_key.to_peer_id()),
public_key: Arc::new(public_key),
mutable: Arc::new(Mutex::new(MutableSwitch::new(
ops.immutable.max_observed_addrs_len,
))),
ops: Arc::new(ops.immutable),
};
for laddr in ops.laddrs {
switch.transport_bind(&laddr).await?;
}
Ok(switch)
}
fn and_then<F>(self, func: F) -> Self
where
F: FnOnce(SwitchBuilderInner) -> Result<SwitchBuilderInner>,
{
SwitchBuilder {
ops: self.ops.and_then(func),
}
}
}