commonware_p2p/authenticated/discovery/actors/router/
ingress.rs1use crate::{
2 authenticated::{
3 data::{Data, EncodedData},
4 discovery::{channels::Channels, types},
5 relay::Relay,
6 Mailbox,
7 },
8 utils::limited::Connected,
9 Channel, Recipients,
10};
11use commonware_codec::Encode;
12use commonware_cryptography::PublicKey;
13use commonware_runtime::IoBufMut;
14use commonware_utils::{
15 channel::{fallible::AsyncFallibleExt, oneshot, ring},
16 NZUsize,
17};
18
19#[derive(Debug)]
21pub enum Message<P: PublicKey> {
22 Ready {
24 peer: P,
25 relay: Relay<EncodedData>,
26 channels: oneshot::Sender<Channels<P>>,
27 },
28 Release { peer: P },
30 Content {
32 recipients: Recipients<P>,
33 encoded: EncodedData,
34 priority: bool,
35 success: oneshot::Sender<Vec<P>>,
36 },
37 SubscribePeers {
39 response: oneshot::Sender<ring::Receiver<Vec<P>>>,
40 },
41}
42
43impl<P: PublicKey> Mailbox<Message<P>> {
44 pub async fn ready(&mut self, peer: P, relay: Relay<EncodedData>) -> Option<Channels<P>> {
48 self.0
49 .request(|channels| Message::Ready {
50 peer,
51 relay,
52 channels,
53 })
54 .await
55 }
56
57 pub async fn release(&mut self, peer: P) {
62 self.0.send_lossy(Message::Release { peer }).await;
63 }
64}
65
66#[derive(Clone, Debug)]
68pub struct Messenger<P: PublicKey> {
69 sender: Mailbox<Message<P>>,
70}
71
72impl<P: PublicKey> Messenger<P> {
73 pub const fn new(sender: Mailbox<Message<P>>) -> Self {
76 Self { sender }
77 }
78
79 pub async fn content(
84 &mut self,
85 recipients: Recipients<P>,
86 channel: Channel,
87 message: IoBufMut,
88 priority: bool,
89 ) -> Vec<P> {
90 let data = Data {
92 channel,
93 message: message.freeze(),
94 };
95 let payload_bytes = types::Payload::<P>::Data(data).encode();
96 let encoded = EncodedData {
97 channel,
98 payload: payload_bytes.into(),
99 };
100
101 self.sender
102 .0
103 .request_or_default(|success| Message::Content {
104 recipients,
105 encoded,
106 priority,
107 success,
108 })
109 .await
110 }
111}
112
113impl<P: PublicKey> Connected for Messenger<P> {
114 type PublicKey = P;
115
116 async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
117 self.sender
118 .0
119 .request(|response| Message::SubscribePeers { response })
120 .await
121 .unwrap_or_else(|| {
122 let (_, rx) = ring::channel(NZUsize!(1));
123 rx
124 })
125 }
126}