commonware_p2p/authenticated/lookup/
channels.rs1use super::Error;
2use crate::{
3 authenticated::lookup::actors::router::{self, Messenger},
4 utils::limited::{CheckedSender, LimitedSender},
5 Channel, Message as NetworkMessage, Recipients,
6};
7use commonware_actor::{
8 mailbox::{self, UnreliablePolicy},
9 Feedback, Unreliable,
10};
11use commonware_cryptography::PublicKey;
12use commonware_runtime::{Clock, IoBufs, Metrics, Quota};
13use std::{
14 collections::{BTreeMap, VecDeque},
15 fmt::Debug,
16 num::NonZeroUsize,
17 time::SystemTime,
18};
19
20pub(crate) struct Inbound<P: PublicKey>(pub(crate) NetworkMessage<P>);
21
22impl<P: PublicKey> UnreliablePolicy for Inbound<P> {
23 type Overflow = VecDeque<Self>;
24
25 fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool {
26 false
27 }
28}
29
30#[derive(Debug, Clone)]
34pub struct UnlimitedSender<P: PublicKey> {
35 channel: Channel,
36 max_size: u32,
37 messenger: Messenger<P>,
38}
39
40impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
41 type PublicKey = P;
42
43 fn send(
44 &mut self,
45 recipients: Recipients<Self::PublicKey>,
46 message: impl Into<IoBufs> + Send,
47 priority: bool,
48 ) -> Unreliable<Feedback> {
49 let message = message.into();
50 assert!(
51 message.len() <= self.max_size as usize,
52 "message too large: {} > {}",
53 message.len(),
54 self.max_size
55 );
56
57 self.messenger
58 .content(recipients, self.channel, message, priority)
59 }
60}
61
62pub struct Sender<P: PublicKey, C: Clock> {
64 limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
65}
66
67impl<P: PublicKey, C: Clock> Clone for Sender<P, C> {
68 fn clone(&self) -> Self {
69 Self {
70 limited_sender: self.limited_sender.clone(),
71 }
72 }
73}
74
75impl<P: PublicKey, C: Clock> Sender<P, C> {
76 pub(super) fn new(
77 channel: Channel,
78 max_size: u32,
79 messenger: Messenger<P>,
80 clock: C,
81 quota: Quota,
82 ) -> Self {
83 let master_sender = UnlimitedSender {
84 channel,
85 max_size,
86 messenger: messenger.clone(),
87 };
88 let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
89 Self { limited_sender }
90 }
91}
92
93impl<P, C> crate::LimitedSender for Sender<P, C>
94where
95 P: PublicKey,
96 C: Clock + Send + 'static,
97{
98 type PublicKey = P;
99 type Checked<'a>
100 = CheckedSender<'a, UnlimitedSender<P>>
101 where
102 Self: 'a;
103
104 fn check(
105 &mut self,
106 recipients: Recipients<Self::PublicKey>,
107 ) -> Result<Self::Checked<'_>, SystemTime> {
108 self.limited_sender.check(recipients)
109 }
110}
111
112pub struct Receiver<P: PublicKey> {
114 receiver: mailbox::UnreliableReceiver<Inbound<P>>,
115}
116
117impl<P: PublicKey> Debug for Receiver<P> {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 f.debug_struct("Receiver").finish_non_exhaustive()
120 }
121}
122
123impl<P: PublicKey> Receiver<P> {
124 pub(super) const fn new(receiver: mailbox::UnreliableReceiver<Inbound<P>>) -> Self {
125 Self { receiver }
126 }
127}
128
129impl<P: PublicKey> crate::Receiver for Receiver<P> {
130 type Error = Error;
131 type PublicKey = P;
132
133 async fn recv(&mut self) -> Result<NetworkMessage<Self::PublicKey>, Error> {
138 let Inbound((sender, message)) = self.receiver.recv().await.ok_or(Error::NetworkClosed)?;
139
140 Ok((sender, message))
143 }
144}
145
146#[derive(Clone, Debug)]
147pub struct Channels<P: PublicKey> {
148 messenger: router::Messenger<P>,
149 max_size: u32,
150 receivers: BTreeMap<Channel, (Quota, mailbox::UnreliableSender<Inbound<P>>)>,
151}
152
153impl<P: PublicKey> Channels<P> {
154 pub const fn new(messenger: router::Messenger<P>, max_size: u32) -> Self {
155 Self {
156 messenger,
157 max_size,
158 receivers: BTreeMap::new(),
159 }
160 }
161
162 pub fn register<C: Clock + Metrics>(
163 &mut self,
164 channel: Channel,
165 rate: Quota,
166 backlog: usize,
167 context: C,
168 ) -> (Sender<P, C>, Receiver<P>) {
169 let backlog = NonZeroUsize::new(backlog).expect("message backlog must be non-zero");
170 let (sender, receiver) = mailbox::new_unreliable(context.child("mailbox"), backlog);
171 if self.receivers.insert(channel, (rate, sender)).is_some() {
172 panic!("duplicate channel registration: {channel}");
173 }
174 (
175 Sender::new(
176 channel,
177 self.max_size,
178 self.messenger.clone(),
179 context,
180 rate,
181 ),
182 Receiver::new(receiver),
183 )
184 }
185
186 pub fn collect(self) -> BTreeMap<u64, (Quota, mailbox::UnreliableSender<Inbound<P>>)> {
187 self.receivers
188 }
189}