commonware_p2p/authenticated/discovery/
channels.rs1use super::{actors::Messenger, Error};
2use crate::{
3 utils::limited::{CheckedSender, LimitedSender},
4 Channel, Message as NetworkMessage, Recipients,
5};
6use commonware_actor::{
7 mailbox::{self, UnreliablePolicy},
8 Feedback, Unreliable,
9};
10use commonware_cryptography::PublicKey;
11use commonware_runtime::{Clock, IoBufs, Metrics, Quota};
12use std::{
13 collections::{BTreeMap, VecDeque},
14 fmt::Debug,
15 num::NonZeroUsize,
16 time::SystemTime,
17};
18
19pub(crate) struct Inbound<P: PublicKey>(pub(crate) NetworkMessage<P>);
20
21impl<P: PublicKey> UnreliablePolicy for Inbound<P> {
22 type Overflow = VecDeque<Self>;
23
24 fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool {
25 false
26 }
27}
28
29#[derive(Debug, Clone)]
33pub struct UnlimitedSender<P: PublicKey> {
34 channel: Channel,
35 max_size: u32,
36 messenger: Messenger<P>,
37}
38
39impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
40 type PublicKey = P;
41
42 fn send(
43 &mut self,
44 recipients: Recipients<Self::PublicKey>,
45 message: impl Into<IoBufs> + Send,
46 priority: bool,
47 ) -> Unreliable<Feedback> {
48 let message = message.into();
49 assert!(
50 message.len() <= self.max_size as usize,
51 "message too large: {} > {}",
52 message.len(),
53 self.max_size
54 );
55
56 self.messenger
57 .content(recipients, self.channel, message, priority)
58 }
59}
60
61pub struct Sender<P: PublicKey, C: Clock> {
63 limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
64}
65
66impl<P: PublicKey, C: Clock> Clone for Sender<P, C> {
67 fn clone(&self) -> Self {
68 Self {
69 limited_sender: self.limited_sender.clone(),
70 }
71 }
72}
73
74impl<P: PublicKey, C: Clock> Sender<P, C> {
75 pub(super) fn new(
76 channel: Channel,
77 max_size: u32,
78 messenger: Messenger<P>,
79 clock: C,
80 quota: Quota,
81 ) -> Self {
82 let master_sender = UnlimitedSender {
83 channel,
84 max_size,
85 messenger: messenger.clone(),
86 };
87 let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
88 Self { limited_sender }
89 }
90}
91
92impl<P, C> crate::LimitedSender for Sender<P, C>
93where
94 P: PublicKey,
95 C: Clock + Send + 'static,
96{
97 type PublicKey = P;
98 type Checked<'a>
99 = CheckedSender<'a, UnlimitedSender<P>>
100 where
101 Self: 'a;
102
103 fn check(
104 &mut self,
105 recipients: Recipients<Self::PublicKey>,
106 ) -> Result<Self::Checked<'_>, SystemTime> {
107 self.limited_sender.check(recipients)
108 }
109}
110
111pub struct Receiver<P: PublicKey> {
113 receiver: mailbox::UnreliableReceiver<Inbound<P>>,
114}
115
116impl<P: PublicKey> Debug for Receiver<P> {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("Receiver").finish_non_exhaustive()
119 }
120}
121
122impl<P: PublicKey> Receiver<P> {
123 pub(super) const fn new(receiver: mailbox::UnreliableReceiver<Inbound<P>>) -> Self {
124 Self { receiver }
125 }
126}
127
128impl<P: PublicKey> crate::Receiver for Receiver<P> {
129 type Error = Error;
130 type PublicKey = P;
131
132 async fn recv(&mut self) -> Result<NetworkMessage<Self::PublicKey>, Error> {
137 let Inbound((sender, message)) = self.receiver.recv().await.ok_or(Error::NetworkClosed)?;
138
139 Ok((sender, message))
142 }
143}
144
145#[derive(Clone, Debug)]
146pub struct Channels<P: PublicKey> {
147 messenger: Messenger<P>,
148 max_size: u32,
149 receivers: BTreeMap<Channel, (Quota, mailbox::UnreliableSender<Inbound<P>>)>,
150}
151
152impl<P: PublicKey> Channels<P> {
153 pub const fn new(messenger: Messenger<P>, max_size: u32) -> Self {
154 Self {
155 messenger,
156 max_size,
157 receivers: BTreeMap::new(),
158 }
159 }
160
161 pub fn register<C: Clock + Metrics>(
162 &mut self,
163 channel: Channel,
164 rate: Quota,
165 backlog: usize,
166 context: C,
167 ) -> (Sender<P, C>, Receiver<P>) {
168 let backlog = NonZeroUsize::new(backlog).expect("message backlog must be non-zero");
169 let (sender, receiver) = mailbox::new_unreliable(context.child("mailbox"), backlog);
170 if self.receivers.insert(channel, (rate, sender)).is_some() {
171 panic!("duplicate channel registration: {channel}");
172 }
173 (
174 Sender::new(
175 channel,
176 self.max_size,
177 self.messenger.clone(),
178 context,
179 rate,
180 ),
181 Receiver::new(receiver),
182 )
183 }
184
185 pub fn collect(self) -> BTreeMap<u64, (Quota, mailbox::UnreliableSender<Inbound<P>>)> {
186 self.receivers
187 }
188}