use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
use anyhow::Result;
use futures_lite::future::Boxed as BoxedFuture;
use futures_util::future::join_all;
use iroh_blobs::{provider::EventSender, util::local_pool::LocalPoolHandle};
use iroh_net::endpoint::Connecting;
pub trait ProtocolHandler: Send + Sync + IntoArcAny + fmt::Debug + 'static {
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
Box::pin(async move {})
}
}
pub trait IntoArcAny {
fn into_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
}
impl<T: Send + Sync + 'static> IntoArcAny for T {
fn into_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct ProtocolMap(BTreeMap<&'static [u8], Arc<dyn ProtocolHandler>>);
impl ProtocolMap {
pub(super) fn get_typed<P: ProtocolHandler>(&self, alpn: &[u8]) -> Option<Arc<P>> {
let protocol: Arc<dyn ProtocolHandler> = self.0.get(alpn)?.clone();
let protocol_any: Arc<dyn Any + Send + Sync> = protocol.into_arc_any();
let protocol_ref = Arc::downcast(protocol_any).ok()?;
Some(protocol_ref)
}
pub(super) fn get(&self, alpn: &[u8]) -> Option<Arc<dyn ProtocolHandler>> {
self.0.get(alpn).cloned()
}
pub(super) fn insert(&mut self, alpn: &'static [u8], handler: Arc<dyn ProtocolHandler>) {
self.0.insert(alpn, handler);
}
pub(super) fn alpns(&self) -> impl Iterator<Item = &&[u8]> {
self.0.keys()
}
pub(super) async fn shutdown(&self) {
let handlers = self.0.values().cloned().map(ProtocolHandler::shutdown);
join_all(handlers).await;
}
}
#[derive(Debug)]
pub(crate) struct BlobsProtocol<S> {
rt: LocalPoolHandle,
store: S,
events: EventSender,
}
impl<S: iroh_blobs::store::Store> BlobsProtocol<S> {
pub fn new_with_events(store: S, rt: LocalPoolHandle, events: EventSender) -> Self {
Self { rt, store, events }
}
}
impl<S: iroh_blobs::store::Store> ProtocolHandler for BlobsProtocol<S> {
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
Box::pin(async move {
iroh_blobs::provider::handle_connection(
conn.await?,
self.store.clone(),
self.events.clone(),
self.rt.clone(),
)
.await;
Ok(())
})
}
}
impl ProtocolHandler for iroh_gossip::net::Gossip {
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
Box::pin(async move { self.handle_connection(conn.await?).await })
}
}