Skip to main content

commonware_p2p/authenticated/discovery/actors/router/
ingress.rs

1use crate::{
2    authenticated::{
3        data::EncodedData,
4        discovery::{channels::Channels, types},
5        relay::Relay,
6        Mailbox,
7    },
8    utils::limited::Connected,
9    Channel, Recipients,
10};
11use commonware_cryptography::PublicKey;
12use commonware_runtime::{BufferPool, IoBufs};
13use commonware_utils::{
14    channel::{fallible::AsyncFallibleExt, oneshot, ring},
15    NZUsize,
16};
17
18/// Messages that can be processed by the router.
19#[derive(Debug)]
20pub enum Message<P: PublicKey> {
21    /// Notify the router that a peer is ready to communicate.
22    Ready {
23        peer: P,
24        relay: Relay<EncodedData>,
25        channels: oneshot::Sender<Channels<P>>,
26    },
27    /// Notify the router that a peer is no longer available.
28    Release { peer: P },
29    /// Send pre-encoded data to one or more recipients.
30    Content {
31        recipients: Recipients<P>,
32        encoded: EncodedData,
33        priority: bool,
34        success: oneshot::Sender<Vec<P>>,
35    },
36    /// Get a subscription to peers known by the router.
37    SubscribePeers {
38        response: oneshot::Sender<ring::Receiver<Vec<P>>>,
39    },
40}
41
42impl<P: PublicKey> Mailbox<Message<P>> {
43    /// Notify the router that a peer is ready to communicate.
44    ///
45    /// Returns `None` if the router has shut down.
46    pub async fn ready(&mut self, peer: P, relay: Relay<EncodedData>) -> Option<Channels<P>> {
47        self.0
48            .request(|channels| Message::Ready {
49                peer,
50                relay,
51                channels,
52            })
53            .await
54    }
55
56    /// Notify the router that a peer is no longer available.
57    ///
58    /// This may fail during shutdown if the router has already exited,
59    /// which is harmless since the router no longer tracks any peers.
60    pub async fn release(&mut self, peer: P) {
61        self.0.send_lossy(Message::Release { peer }).await;
62    }
63}
64
65/// Sends messages containing content to the router to send to peers.
66#[derive(Clone, Debug)]
67pub struct Messenger<P: PublicKey> {
68    pool: BufferPool,
69    sender: Mailbox<Message<P>>,
70}
71
72impl<P: PublicKey> Messenger<P> {
73    /// Returns a new [Messenger] with the given sender.
74    /// (The router has the corresponding receiver.)
75    pub const fn new(pool: BufferPool, sender: Mailbox<Message<P>>) -> Self {
76        Self { pool, sender }
77    }
78
79    /// Sends a message to the given `recipients`.
80    ///
81    /// Encodes the message once and shares the encoded bytes across all recipients.
82    /// Returns an empty list if the router has shut down.
83    pub async fn content(
84        &mut self,
85        recipients: Recipients<P>,
86        channel: Channel,
87        message: IoBufs,
88        priority: bool,
89    ) -> Vec<P> {
90        // Build Data and encode Payload::Data once for all recipients
91        let encoded = types::Payload::<P>::encode_data(&self.pool, channel, message);
92
93        self.sender
94            .0
95            .request_or_default(|success| Message::Content {
96                recipients,
97                encoded,
98                priority,
99                success,
100            })
101            .await
102    }
103}
104
105impl<P: PublicKey> Connected for Messenger<P> {
106    type PublicKey = P;
107
108    async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
109        self.sender
110            .0
111            .request(|response| Message::SubscribePeers { response })
112            .await
113            .unwrap_or_else(|| {
114                let (_, rx) = ring::channel(NZUsize!(1));
115                rx
116            })
117    }
118}