commonware_p2p/authenticated/lookup/
channels.rs1use super::Error;
2use crate::{
3 authenticated::lookup::actors::router::{self, Messenger},
4 utils::limited::{CheckedSender, LimitedSender},
5 Channel, Message, Recipients,
6};
7use bytes::Bytes;
8use commonware_cryptography::PublicKey;
9use commonware_runtime::{Clock, Quota};
10use futures::{channel::mpsc, StreamExt};
11use std::{collections::BTreeMap, fmt::Debug, time::SystemTime};
12
13#[derive(Debug, Clone)]
17pub struct UnlimitedSender<P: PublicKey> {
18 channel: Channel,
19 max_size: u32,
20 messenger: Messenger<P>,
21}
22
23impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
24 type Error = Error;
25 type PublicKey = P;
26
27 async fn send(
28 &mut self,
29 recipients: Recipients<Self::PublicKey>,
30 message: Bytes,
31 priority: bool,
32 ) -> Result<Vec<Self::PublicKey>, Self::Error> {
33 if message.len() > self.max_size as usize {
34 return Err(Error::MessageTooLarge(message.len()));
35 }
36
37 Ok(self
38 .messenger
39 .content(recipients, self.channel, message, priority)
40 .await)
41 }
42}
43
44#[derive(Clone)]
46pub struct Sender<P: PublicKey, C: Clock> {
47 limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
48}
49
50impl<P: PublicKey, C: Clock> Sender<P, C> {
51 pub(super) fn new(
52 channel: Channel,
53 max_size: u32,
54 messenger: Messenger<P>,
55 clock: C,
56 quota: Quota,
57 ) -> Self {
58 let master_sender = UnlimitedSender {
59 channel,
60 max_size,
61 messenger: messenger.clone(),
62 };
63 let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
64 Self { limited_sender }
65 }
66}
67
68impl<P, C> crate::LimitedSender for Sender<P, C>
69where
70 P: PublicKey,
71 C: Clock + Clone + Send + 'static,
72{
73 type PublicKey = P;
74 type Checked<'a>
75 = CheckedSender<'a, UnlimitedSender<P>>
76 where
77 Self: 'a;
78
79 async fn check(
80 &mut self,
81 recipients: Recipients<Self::PublicKey>,
82 ) -> Result<Self::Checked<'_>, SystemTime> {
83 self.limited_sender.check(recipients).await
84 }
85}
86
87#[derive(Debug)]
89pub struct Receiver<P: PublicKey> {
90 receiver: mpsc::Receiver<Message<P>>,
91}
92
93impl<P: PublicKey> Receiver<P> {
94 pub(super) const fn new(receiver: mpsc::Receiver<Message<P>>) -> Self {
95 Self { receiver }
96 }
97}
98
99impl<P: PublicKey> crate::Receiver for Receiver<P> {
100 type Error = Error;
101 type PublicKey = P;
102
103 async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
108 let (sender, message) = self.receiver.next().await.ok_or(Error::NetworkClosed)?;
109
110 Ok((sender, message))
113 }
114}
115
116#[derive(Clone)]
117pub struct Channels<P: PublicKey> {
118 messenger: router::Messenger<P>,
119 max_size: u32,
120 receivers: BTreeMap<Channel, (Quota, mpsc::Sender<Message<P>>)>,
121}
122
123impl<P: PublicKey> Channels<P> {
124 pub const fn new(messenger: router::Messenger<P>, max_size: u32) -> Self {
125 Self {
126 messenger,
127 max_size,
128 receivers: BTreeMap::new(),
129 }
130 }
131
132 pub fn register<C: Clock>(
133 &mut self,
134 channel: Channel,
135 rate: Quota,
136 backlog: usize,
137 clock: C,
138 ) -> (Sender<P, C>, Receiver<P>) {
139 let (sender, receiver) = mpsc::channel(backlog);
140 if self.receivers.insert(channel, (rate, sender)).is_some() {
141 panic!("duplicate channel registration: {channel}");
142 }
143 (
144 Sender::new(channel, self.max_size, self.messenger.clone(), clock, rate),
145 Receiver::new(receiver),
146 )
147 }
148
149 pub fn collect(self) -> BTreeMap<u64, (Quota, mpsc::Sender<Message<P>>)> {
150 self.receivers
151 }
152}