commonware_p2p/authenticated/discovery/
channels.rs1use super::{actors::Messenger, Error};
2use crate::{
3 utils::limited::{CheckedSender, LimitedSender},
4 Channel, Message, Recipients,
5};
6use bytes::Bytes;
7use commonware_cryptography::PublicKey;
8use commonware_runtime::{Clock, Quota};
9use futures::{channel::mpsc, StreamExt};
10use std::{collections::BTreeMap, fmt::Debug, time::SystemTime};
11
12#[derive(Debug, Clone)]
16pub struct UnlimitedSender<P: PublicKey> {
17 channel: Channel,
18 max_size: u32,
19 messenger: Messenger<P>,
20}
21
22impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
23 type PublicKey = P;
24 type Error = Error;
25
26 async fn send(
27 &mut self,
28 recipients: Recipients<Self::PublicKey>,
29 message: Bytes,
30 priority: bool,
31 ) -> Result<Vec<Self::PublicKey>, Self::Error> {
32 if message.len() > self.max_size as usize {
33 return Err(Error::MessageTooLarge(message.len()));
34 }
35
36 Ok(self
37 .messenger
38 .content(recipients, self.channel, message, priority)
39 .await)
40 }
41}
42
43#[derive(Clone)]
45pub struct Sender<P: PublicKey, C: Clock> {
46 limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
47}
48
49impl<P: PublicKey, C: Clock> Sender<P, C> {
50 pub(super) fn new(
51 channel: Channel,
52 max_size: u32,
53 messenger: Messenger<P>,
54 clock: C,
55 quota: Quota,
56 ) -> Self {
57 let master_sender = UnlimitedSender {
58 channel,
59 max_size,
60 messenger: messenger.clone(),
61 };
62 let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
63 Self { limited_sender }
64 }
65}
66
67impl<P, C> crate::LimitedSender for Sender<P, C>
68where
69 P: PublicKey,
70 C: Clock + Clone + Send + 'static,
71{
72 type PublicKey = P;
73 type Checked<'a>
74 = CheckedSender<'a, UnlimitedSender<P>>
75 where
76 Self: 'a;
77
78 async fn check(
79 &mut self,
80 recipients: Recipients<Self::PublicKey>,
81 ) -> Result<Self::Checked<'_>, SystemTime> {
82 self.limited_sender.check(recipients).await
83 }
84}
85
86#[derive(Debug)]
88pub struct Receiver<P: PublicKey> {
89 receiver: mpsc::Receiver<Message<P>>,
90}
91
92impl<P: PublicKey> Receiver<P> {
93 pub(super) const fn new(receiver: mpsc::Receiver<Message<P>>) -> Self {
94 Self { receiver }
95 }
96}
97
98impl<P: PublicKey> crate::Receiver for Receiver<P> {
99 type Error = Error;
100 type PublicKey = P;
101
102 async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
107 let (sender, message) = self.receiver.next().await.ok_or(Error::NetworkClosed)?;
108
109 Ok((sender, message))
112 }
113}
114
115#[derive(Clone)]
116pub struct Channels<P: PublicKey> {
117 messenger: Messenger<P>,
118 max_size: u32,
119 receivers: BTreeMap<Channel, (Quota, mpsc::Sender<Message<P>>)>,
120}
121
122impl<P: PublicKey> Channels<P> {
123 pub const fn new(messenger: Messenger<P>, max_size: u32) -> Self {
124 Self {
125 messenger,
126 max_size,
127 receivers: BTreeMap::new(),
128 }
129 }
130
131 pub fn register<C: Clock>(
132 &mut self,
133 channel: Channel,
134 rate: Quota,
135 backlog: usize,
136 clock: C,
137 ) -> (Sender<P, C>, Receiver<P>) {
138 let (sender, receiver) = mpsc::channel(backlog);
139 if self.receivers.insert(channel, (rate, sender)).is_some() {
140 panic!("duplicate channel registration: {channel}");
141 }
142 (
143 Sender::new(channel, self.max_size, self.messenger.clone(), clock, rate),
144 Receiver::new(receiver),
145 )
146 }
147
148 pub fn collect(self) -> BTreeMap<u64, (Quota, mpsc::Sender<Message<P>>)> {
149 self.receivers
150 }
151}