Skip to main content

commonware_p2p/authenticated/discovery/actors/router/
ingress.rs

1use crate::{
2    authenticated::{
3        data::{Data, EncodedData},
4        discovery::{channels::Channels, types},
5        relay::Relay,
6        Mailbox,
7    },
8    utils::limited::Connected,
9    Channel, Recipients,
10};
11use commonware_codec::Encode;
12use commonware_cryptography::PublicKey;
13use commonware_runtime::IoBufMut;
14use commonware_utils::{
15    channel::{fallible::AsyncFallibleExt, oneshot, ring},
16    NZUsize,
17};
18
19/// Messages that can be processed by the router.
20#[derive(Debug)]
21pub enum Message<P: PublicKey> {
22    /// Notify the router that a peer is ready to communicate.
23    Ready {
24        peer: P,
25        relay: Relay<EncodedData>,
26        channels: oneshot::Sender<Channels<P>>,
27    },
28    /// Notify the router that a peer is no longer available.
29    Release { peer: P },
30    /// Send pre-encoded data to one or more recipients.
31    Content {
32        recipients: Recipients<P>,
33        encoded: EncodedData,
34        priority: bool,
35        success: oneshot::Sender<Vec<P>>,
36    },
37    /// Get a subscription to peers known by the router.
38    SubscribePeers {
39        response: oneshot::Sender<ring::Receiver<Vec<P>>>,
40    },
41}
42
43impl<P: PublicKey> Mailbox<Message<P>> {
44    /// Notify the router that a peer is ready to communicate.
45    ///
46    /// Returns `None` if the router has shut down.
47    pub async fn ready(&mut self, peer: P, relay: Relay<EncodedData>) -> Option<Channels<P>> {
48        self.0
49            .request(|channels| Message::Ready {
50                peer,
51                relay,
52                channels,
53            })
54            .await
55    }
56
57    /// Notify the router that a peer is no longer available.
58    ///
59    /// This may fail during shutdown if the router has already exited,
60    /// which is harmless since the router no longer tracks any peers.
61    pub async fn release(&mut self, peer: P) {
62        self.0.send_lossy(Message::Release { peer }).await;
63    }
64}
65
66/// Sends messages containing content to the router to send to peers.
67#[derive(Clone, Debug)]
68pub struct Messenger<P: PublicKey> {
69    sender: Mailbox<Message<P>>,
70}
71
72impl<P: PublicKey> Messenger<P> {
73    /// Returns a new [Messenger] with the given sender.
74    /// (The router has the corresponding receiver.)
75    pub const fn new(sender: Mailbox<Message<P>>) -> Self {
76        Self { sender }
77    }
78
79    /// Sends a message to the given `recipients`.
80    ///
81    /// Encodes the message once and shares the encoded bytes across all recipients.
82    /// Returns an empty list if the router has shut down.
83    pub async fn content(
84        &mut self,
85        recipients: Recipients<P>,
86        channel: Channel,
87        message: IoBufMut,
88        priority: bool,
89    ) -> Vec<P> {
90        // Build Data and encode Payload::Data once for all recipients
91        let data = Data {
92            channel,
93            message: message.freeze(),
94        };
95        let payload_bytes = types::Payload::<P>::Data(data).encode();
96        let encoded = EncodedData {
97            channel,
98            payload: payload_bytes.into(),
99        };
100
101        self.sender
102            .0
103            .request_or_default(|success| Message::Content {
104                recipients,
105                encoded,
106                priority,
107                success,
108            })
109            .await
110    }
111}
112
113impl<P: PublicKey> Connected for Messenger<P> {
114    type PublicKey = P;
115
116    async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
117        self.sender
118            .0
119            .request(|response| Message::SubscribePeers { response })
120            .await
121            .unwrap_or_else(|| {
122                let (_, rx) = ring::channel(NZUsize!(1));
123                rx
124            })
125    }
126}