commonware_p2p/authenticated/lookup/actors/router/
ingress.rs

1use crate::{
2    authenticated::{data::Data, lookup::channels::Channels, relay::Relay, Mailbox},
3    utils::limited::Connected,
4    Channel, Recipients,
5};
6use bytes::{Buf, Bytes};
7use commonware_cryptography::PublicKey;
8use commonware_utils::{
9    channels::{fallible::AsyncFallibleExt, ring},
10    NZUsize,
11};
12use futures::channel::oneshot;
13
14/// Messages that can be processed by the router.
15#[derive(Debug)]
16pub enum Message<P: PublicKey> {
17    /// Notify the router that a peer is ready to communicate.
18    Ready {
19        peer: P,
20        relay: Relay<Data>,
21        channels: oneshot::Sender<Channels<P>>,
22    },
23    /// Notify the router that a peer is no longer available.
24    Release { peer: P },
25    /// Send a message to one or more recipients.
26    Content {
27        recipients: Recipients<P>,
28        channel: Channel,
29        message: Bytes,
30        priority: bool,
31        success: oneshot::Sender<Vec<P>>,
32    },
33    /// Get a subscription to peers known by the router.
34    SubscribePeers {
35        response: oneshot::Sender<ring::Receiver<Vec<P>>>,
36    },
37}
38
39impl<P: PublicKey> Mailbox<Message<P>> {
40    /// Notify the router that a peer is ready to communicate.
41    ///
42    /// Returns `None` if the router has shut down.
43    pub async fn ready(&mut self, peer: P, relay: Relay<Data>) -> Option<Channels<P>> {
44        self.0
45            .request(|channels| Message::Ready {
46                peer,
47                relay,
48                channels,
49            })
50            .await
51    }
52
53    /// Notify the router that a peer is no longer available.
54    ///
55    /// This may fail during shutdown if the router has already exited,
56    /// which is harmless since the router no longer tracks any peers.
57    pub async fn release(&mut self, peer: P) {
58        self.0.send_lossy(Message::Release { peer }).await;
59    }
60}
61
62#[derive(Clone, Debug)]
63/// Sends messages containing content to the router to send to peers.
64pub struct Messenger<P: PublicKey> {
65    sender: Mailbox<Message<P>>,
66}
67
68impl<P: PublicKey> Messenger<P> {
69    /// Returns a new [Messenger] with the given sender.
70    /// (The router has the corresponding receiver.)
71    pub const fn new(sender: Mailbox<Message<P>>) -> Self {
72        Self { sender }
73    }
74
75    /// Sends a message to the given `recipients`.
76    ///
77    /// Returns an empty list if the router has shut down.
78    pub async fn content(
79        &mut self,
80        recipients: Recipients<P>,
81        channel: Channel,
82        mut message: impl Buf + Send,
83        priority: bool,
84    ) -> Vec<P> {
85        let message = message.copy_to_bytes(message.remaining());
86        self.sender
87            .0
88            .request_or_default(|success| Message::Content {
89                recipients,
90                channel,
91                message,
92                priority,
93                success,
94            })
95            .await
96    }
97}
98
99impl<P: PublicKey> Connected for Messenger<P> {
100    type PublicKey = P;
101
102    async fn subscribe(&mut self) -> ring::Receiver<Vec<P>> {
103        self.sender
104            .0
105            .request(|response| Message::SubscribePeers { response })
106            .await
107            .unwrap_or_else(|| {
108                let (_, rx) = ring::channel(NZUsize!(1));
109                rx
110            })
111    }
112}