commonware_p2p/authenticated/lookup/actors/router/
ingress.rs

1use crate::{
2    authenticated::{data::Data, lookup::channels::Channels, relay::Relay, Mailbox},
3    utils::limited::Connected,
4    Channel, Recipients,
5};
6use bytes::Bytes;
7use commonware_cryptography::PublicKey;
8use commonware_utils::channels::ring;
9use futures::channel::oneshot;
10
11/// Messages that can be processed by the router.
12#[derive(Debug)]
13pub enum Message<P: PublicKey> {
14    /// Notify the router that a peer is ready to communicate.
15    Ready {
16        peer: P,
17        relay: Relay<Data>,
18        channels: oneshot::Sender<Channels<P>>,
19    },
20    /// Notify the router that a peer is no longer available.
21    Release { peer: P },
22    /// Send a message to one or more recipients.
23    Content {
24        recipients: Recipients<P>,
25        channel: Channel,
26        message: Bytes,
27        priority: bool,
28        success: oneshot::Sender<Vec<P>>,
29    },
30    /// Get a subscription to peers known by the router.
31    SubscribePeers {
32        response: oneshot::Sender<ring::Receiver<Vec<P>>>,
33    },
34}
35
36impl<P: PublicKey> Mailbox<Message<P>> {
37    /// Notify the router that a peer is ready to communicate.
38    pub async fn ready(&mut self, peer: P, relay: Relay<Data>) -> Channels<P> {
39        let (response, receiver) = oneshot::channel();
40        self.send(Message::Ready {
41            peer,
42            relay,
43            channels: response,
44        })
45        .await
46        .unwrap();
47        receiver.await.unwrap()
48    }
49
50    /// Notify the router that a peer is no longer available.
51    ///
52    /// This may fail during shutdown if the router has already exited,
53    /// which is harmless since the router no longer tracks any peers.
54    pub async fn release(&mut self, peer: P) {
55        let _ = self.send(Message::Release { peer }).await;
56    }
57}
58
59#[derive(Clone, Debug)]
60/// Sends messages containing content to the router to send to peers.
61pub struct Messenger<P: PublicKey> {
62    sender: Mailbox<Message<P>>,
63}
64
65impl<P: PublicKey> Messenger<P> {
66    /// Returns a new [Messenger] with the given sender.
67    /// (The router has the corresponding receiver.)
68    pub const fn new(sender: Mailbox<Message<P>>) -> Self {
69        Self { sender }
70    }
71
72    /// Sends a message to the given `recipients`.
73    pub async fn content(
74        &mut self,
75        recipients: Recipients<P>,
76        channel: Channel,
77        message: Bytes,
78        priority: bool,
79    ) -> Vec<P> {
80        let (sender, receiver) = oneshot::channel();
81        self.sender
82            .send(Message::Content {
83                recipients,
84                channel,
85                message,
86                priority,
87                success: sender,
88            })
89            .await
90            .unwrap();
91        receiver.await.unwrap()
92    }
93}
94
95impl<P: PublicKey> Connected for Messenger<P> {
96    type PublicKey = P;
97
98    async fn subscribe(&mut self) -> ring::Receiver<Vec<P>> {
99        let (sender, receiver) = oneshot::channel();
100        self.sender
101            .send(Message::SubscribePeers { response: sender })
102            .await
103            .unwrap();
104        receiver.await.unwrap()
105    }
106}