commonware_p2p/authenticated/lookup/actors/router/
ingress.rs

1use crate::{
2    authenticated::{data::Data, lookup::channels::Channels, relay::Relay, Mailbox},
3    Channel, Recipients,
4};
5use bytes::Bytes;
6use commonware_cryptography::PublicKey;
7use futures::{
8    channel::{mpsc, oneshot},
9    SinkExt,
10};
11
12/// Messages that can be processed by the router.
13pub enum Message<P: PublicKey> {
14    /// Notify the router that a peer is ready to communicate.
15    Ready {
16        peer: P,
17        relay: Relay<Data>,
18        channels: oneshot::Sender<Channels<P>>,
19    },
20    /// Notify the router that a peer is no longer available.
21    Release { peer: P },
22    /// Send a message to one or more recipients.
23    Content {
24        recipients: Recipients<P>,
25        channel: Channel,
26        message: Bytes,
27        priority: bool,
28        success: oneshot::Sender<Vec<P>>,
29    },
30}
31
32impl<P: PublicKey> Mailbox<Message<P>> {
33    /// Notify the router that a peer is ready to communicate.
34    pub async fn ready(&mut self, peer: P, relay: Relay<Data>) -> Channels<P> {
35        let (response, receiver) = oneshot::channel();
36        self.send(Message::Ready {
37            peer,
38            relay,
39            channels: response,
40        })
41        .await
42        .unwrap();
43        receiver.await.unwrap()
44    }
45
46    /// Notify the router that a peer is no longer available.
47    pub async fn release(&mut self, peer: P) {
48        self.send(Message::Release { peer }).await.unwrap()
49    }
50}
51
52#[derive(Clone, Debug)]
53/// Sends messages containing content to the router to send to peers.
54pub struct Messenger<P: PublicKey> {
55    sender: mpsc::Sender<Message<P>>,
56}
57
58impl<P: PublicKey> Messenger<P> {
59    /// Returns a new [Messenger] with the given sender.
60    /// (The router has the corresponding receiver.)
61    pub fn new(sender: mpsc::Sender<Message<P>>) -> Self {
62        Self { sender }
63    }
64
65    /// Sends a message to the given `recipients`.
66    pub async fn content(
67        &mut self,
68        recipients: Recipients<P>,
69        channel: Channel,
70        message: Bytes,
71        priority: bool,
72    ) -> Vec<P> {
73        let (sender, receiver) = oneshot::channel();
74        self.sender
75            .send(Message::Content {
76                recipients,
77                channel,
78                message,
79                priority,
80                success: sender,
81            })
82            .await
83            .unwrap();
84        receiver.await.unwrap()
85    }
86}