commonware_p2p/authenticated/lookup/actors/router/
ingress.rs1use crate::{
2 authenticated::{
3 data::EncodedData,
4 lookup::{channels::Channels, types},
5 relay::Relay,
6 Mailbox,
7 },
8 utils::limited::Connected,
9 Channel, Recipients,
10};
11use commonware_cryptography::PublicKey;
12use commonware_runtime::{BufferPool, IoBufs};
13use commonware_utils::{
14 channel::{fallible::AsyncFallibleExt, oneshot, ring},
15 NZUsize,
16};
17
18#[derive(Debug)]
20pub enum Message<P: PublicKey> {
21 Ready {
23 peer: P,
24 relay: Relay<EncodedData>,
25 channels: oneshot::Sender<Channels<P>>,
26 },
27 Release { peer: P },
29 Content {
31 recipients: Recipients<P>,
32 encoded: EncodedData,
33 priority: bool,
34 success: oneshot::Sender<Vec<P>>,
35 },
36 SubscribePeers {
38 response: oneshot::Sender<ring::Receiver<Vec<P>>>,
39 },
40}
41
42impl<P: PublicKey> Mailbox<Message<P>> {
43 pub async fn ready(&mut self, peer: P, relay: Relay<EncodedData>) -> Option<Channels<P>> {
47 self.0
48 .request(|channels| Message::Ready {
49 peer,
50 relay,
51 channels,
52 })
53 .await
54 }
55
56 pub async fn release(&mut self, peer: P) {
61 self.0.send_lossy(Message::Release { peer }).await;
62 }
63}
64
65#[derive(Clone, Debug)]
67pub struct Messenger<P: PublicKey> {
68 pool: BufferPool,
69 sender: Mailbox<Message<P>>,
70}
71
72impl<P: PublicKey> Messenger<P> {
73 pub const fn new(pool: BufferPool, sender: Mailbox<Message<P>>) -> Self {
76 Self { pool, sender }
77 }
78
79 pub async fn content(
84 &mut self,
85 recipients: Recipients<P>,
86 channel: Channel,
87 message: IoBufs,
88 priority: bool,
89 ) -> Vec<P> {
90 let encoded = types::Message::encode_data(&self.pool, channel, message);
92
93 self.sender
94 .0
95 .request_or_default(|success| Message::Content {
96 recipients,
97 encoded,
98 priority,
99 success,
100 })
101 .await
102 }
103}
104
105impl<P: PublicKey> Connected for Messenger<P> {
106 type PublicKey = P;
107
108 async fn subscribe(&mut self) -> ring::Receiver<Vec<P>> {
109 self.sender
110 .0
111 .request(|response| Message::SubscribePeers { response })
112 .await
113 .unwrap_or_else(|| {
114 let (_, rx) = ring::channel(NZUsize!(1));
115 rx
116 })
117 }
118}