commonware_p2p/authenticated/discovery/actors/router/
ingress.rs1use crate::{
2 authenticated::{data::Data, discovery::channels::Channels, relay::Relay, Mailbox},
3 utils::limited::Connected,
4 Channel, Recipients,
5};
6use bytes::{Buf, Bytes};
7use commonware_cryptography::PublicKey;
8use commonware_utils::{
9 channels::{fallible::AsyncFallibleExt, ring},
10 NZUsize,
11};
12use futures::channel::oneshot;
13
14#[derive(Debug)]
16pub enum Message<P: PublicKey> {
17 Ready {
19 peer: P,
20 relay: Relay<Data>,
21 channels: oneshot::Sender<Channels<P>>,
22 },
23 Release { peer: P },
25 Content {
27 recipients: Recipients<P>,
28 channel: Channel,
29 message: Bytes,
30 priority: bool,
31 success: oneshot::Sender<Vec<P>>,
32 },
33 SubscribePeers {
35 response: oneshot::Sender<ring::Receiver<Vec<P>>>,
36 },
37}
38
39impl<P: PublicKey> Mailbox<Message<P>> {
40 pub async fn ready(&mut self, peer: P, relay: Relay<Data>) -> Option<Channels<P>> {
44 self.0
45 .request(|channels| Message::Ready {
46 peer,
47 relay,
48 channels,
49 })
50 .await
51 }
52
53 pub async fn release(&mut self, peer: P) {
58 self.0.send_lossy(Message::Release { peer }).await;
59 }
60}
61
62#[derive(Clone, Debug)]
64pub struct Messenger<P: PublicKey> {
65 sender: Mailbox<Message<P>>,
66}
67
68impl<P: PublicKey> Messenger<P> {
69 pub const fn new(sender: Mailbox<Message<P>>) -> Self {
72 Self { sender }
73 }
74
75 pub async fn content(
79 &mut self,
80 recipients: Recipients<P>,
81 channel: Channel,
82 mut message: impl Buf + Send,
83 priority: bool,
84 ) -> Vec<P> {
85 let message = message.copy_to_bytes(message.remaining());
86 self.sender
87 .0
88 .request_or_default(|success| Message::Content {
89 recipients,
90 channel,
91 message,
92 priority,
93 success,
94 })
95 .await
96 }
97}
98
99impl<P: PublicKey> Connected for Messenger<P> {
100 type PublicKey = P;
101
102 async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
103 self.sender
104 .0
105 .request(|response| Message::SubscribePeers { response })
106 .await
107 .unwrap_or_else(|| {
108 let (_, rx) = ring::channel(NZUsize!(1));
109 rx
110 })
111 }
112}