commonware_p2p/authenticated/lookup/actors/router/
ingress.rs1use crate::{
2 authenticated::{data::Data, lookup::channels::Channels, relay::Relay, Mailbox},
3 utils::limited::Connected,
4 Channel, Recipients,
5};
6use bytes::Bytes;
7use commonware_cryptography::PublicKey;
8use commonware_utils::channels::ring;
9use futures::channel::oneshot;
10
11#[derive(Debug)]
13pub enum Message<P: PublicKey> {
14 Ready {
16 peer: P,
17 relay: Relay<Data>,
18 channels: oneshot::Sender<Channels<P>>,
19 },
20 Release { peer: P },
22 Content {
24 recipients: Recipients<P>,
25 channel: Channel,
26 message: Bytes,
27 priority: bool,
28 success: oneshot::Sender<Vec<P>>,
29 },
30 SubscribePeers {
32 response: oneshot::Sender<ring::Receiver<Vec<P>>>,
33 },
34}
35
36impl<P: PublicKey> Mailbox<Message<P>> {
37 pub async fn ready(&mut self, peer: P, relay: Relay<Data>) -> Channels<P> {
39 let (response, receiver) = oneshot::channel();
40 self.send(Message::Ready {
41 peer,
42 relay,
43 channels: response,
44 })
45 .await
46 .unwrap();
47 receiver.await.unwrap()
48 }
49
50 pub async fn release(&mut self, peer: P) {
55 let _ = self.send(Message::Release { peer }).await;
56 }
57}
58
59#[derive(Clone, Debug)]
60pub struct Messenger<P: PublicKey> {
62 sender: Mailbox<Message<P>>,
63}
64
65impl<P: PublicKey> Messenger<P> {
66 pub const fn new(sender: Mailbox<Message<P>>) -> Self {
69 Self { sender }
70 }
71
72 pub async fn content(
74 &mut self,
75 recipients: Recipients<P>,
76 channel: Channel,
77 message: Bytes,
78 priority: bool,
79 ) -> Vec<P> {
80 let (sender, receiver) = oneshot::channel();
81 self.sender
82 .send(Message::Content {
83 recipients,
84 channel,
85 message,
86 priority,
87 success: sender,
88 })
89 .await
90 .unwrap();
91 receiver.await.unwrap()
92 }
93}
94
95impl<P: PublicKey> Connected for Messenger<P> {
96 type PublicKey = P;
97
98 async fn subscribe(&mut self) -> ring::Receiver<Vec<P>> {
99 let (sender, receiver) = oneshot::channel();
100 self.sender
101 .send(Message::SubscribePeers { response: sender })
102 .await
103 .unwrap();
104 receiver.await.unwrap()
105 }
106}