p2panda_net/
protocols.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::any::Any;
4use std::collections::BTreeMap;
5use std::fmt;
6use std::sync::Arc;
7
8use anyhow::Result;
9use futures_lite::future::Boxed as BoxedFuture;
10use futures_util::future::join_all;
11use iroh::endpoint::Connecting;
12use tracing::debug;
13
14/// Interface to accept incoming connections for custom protocol implementations.
15///
16/// A node can accept connections for custom protocols. By default, the node only accepts
17/// connections for the core protocols (gossip and optionally sync or blobs).
18pub trait ProtocolHandler: Send + Sync + IntoArcAny + fmt::Debug + 'static {
19    /// Handle an incoming connection.
20    ///
21    /// This runs on a freshly spawned tokio task so this can be long-running.
22    fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
23
24    /// Called when the node shuts down.
25    fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
26        Box::pin(async move {})
27    }
28}
29
30/// Helper trait to facilitate casting from `Arc<dyn T>` to `Arc<dyn Any>`.
31///
32/// This trait has a blanket implementation so there is no need to implement this yourself.
33pub trait IntoArcAny {
34    fn into_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
35}
36
37impl<T: Send + Sync + 'static> IntoArcAny for T {
38    fn into_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
39        self
40    }
41}
42
43#[derive(Debug, Clone, Default)]
44pub(super) struct ProtocolMap(BTreeMap<&'static [u8], Arc<dyn ProtocolHandler>>);
45
46impl ProtocolMap {
47    /// Returns the registered protocol handler for an ALPN as a [`Arc<dyn ProtocolHandler>`].
48    pub(super) fn get(&self, alpn: &[u8]) -> Option<Arc<dyn ProtocolHandler>> {
49        self.0.get(alpn).cloned()
50    }
51
52    /// Inserts a protocol handler.
53    pub(super) fn insert(&mut self, alpn: &'static [u8], handler: Arc<dyn ProtocolHandler>) {
54        self.0.insert(alpn, handler);
55    }
56
57    /// Returns an iterator of all registered ALPN protocol identifiers.
58    pub(super) fn alpns(&self) -> Vec<Vec<u8>> {
59        self.0.keys().map(|alpn| alpn.to_vec()).collect::<Vec<_>>()
60    }
61
62    /// Shuts down all protocol handlers.
63    ///
64    /// Calls and awaits [`ProtocolHandler::shutdown`] for all registered handlers concurrently.
65    pub(super) async fn shutdown(&self) {
66        let handlers = self.0.values().cloned().map(ProtocolHandler::shutdown);
67        debug!("await all handler shutdown handles");
68        join_all(handlers).await;
69        debug!("all handlers closed");
70    }
71}
72
73impl ProtocolHandler for iroh_gossip::net::Gossip {
74    fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
75        Box::pin(async move {
76            self.handle_connection(conn.await?).await?;
77            Ok(())
78        })
79    }
80}