commonware_p2p/authenticated/discovery/
channels.rs

1use super::{actors::Messenger, Error};
2use crate::{
3    utils::limited::{CheckedSender, LimitedSender},
4    Channel, Message, Recipients,
5};
6use bytes::Bytes;
7use commonware_cryptography::PublicKey;
8use commonware_runtime::{Clock, Quota};
9use futures::{channel::mpsc, StreamExt};
10use std::{collections::BTreeMap, fmt::Debug, time::SystemTime};
11
12/// An interior sender that enforces message size limits and
13/// supports sending arbitrary bytes to a set of recipients over
14/// a pre-defined [`Channel`].
15#[derive(Debug, Clone)]
16pub struct UnlimitedSender<P: PublicKey> {
17    channel: Channel,
18    max_size: u32,
19    messenger: Messenger<P>,
20}
21
22impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
23    type PublicKey = P;
24    type Error = Error;
25
26    async fn send(
27        &mut self,
28        recipients: Recipients<Self::PublicKey>,
29        message: Bytes,
30        priority: bool,
31    ) -> Result<Vec<Self::PublicKey>, Self::Error> {
32        if message.len() > self.max_size as usize {
33            return Err(Error::MessageTooLarge(message.len()));
34        }
35
36        Ok(self
37            .messenger
38            .content(recipients, self.channel, message, priority)
39            .await)
40    }
41}
42
43/// Sender is the mechanism used to send arbitrary bytes to a set of recipients over a pre-defined channel.
44#[derive(Clone)]
45pub struct Sender<P: PublicKey, C: Clock> {
46    limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
47}
48
49impl<P: PublicKey, C: Clock> Sender<P, C> {
50    pub(super) fn new(
51        channel: Channel,
52        max_size: u32,
53        messenger: Messenger<P>,
54        clock: C,
55        quota: Quota,
56    ) -> Self {
57        let master_sender = UnlimitedSender {
58            channel,
59            max_size,
60            messenger: messenger.clone(),
61        };
62        let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
63        Self { limited_sender }
64    }
65}
66
67impl<P, C> crate::LimitedSender for Sender<P, C>
68where
69    P: PublicKey,
70    C: Clock + Clone + Send + 'static,
71{
72    type PublicKey = P;
73    type Checked<'a>
74        = CheckedSender<'a, UnlimitedSender<P>>
75    where
76        Self: 'a;
77
78    async fn check(
79        &mut self,
80        recipients: Recipients<Self::PublicKey>,
81    ) -> Result<Self::Checked<'_>, SystemTime> {
82        self.limited_sender.check(recipients).await
83    }
84}
85
86/// Channel to asynchronously receive messages from a channel.
87#[derive(Debug)]
88pub struct Receiver<P: PublicKey> {
89    receiver: mpsc::Receiver<Message<P>>,
90}
91
92impl<P: PublicKey> Receiver<P> {
93    pub(super) const fn new(receiver: mpsc::Receiver<Message<P>>) -> Self {
94        Self { receiver }
95    }
96}
97
98impl<P: PublicKey> crate::Receiver for Receiver<P> {
99    type Error = Error;
100    type PublicKey = P;
101
102    /// Receives a message from the channel.
103    ///
104    /// This method will block until a message is received or the underlying
105    /// network shuts down.
106    async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
107        let (sender, message) = self.receiver.next().await.ok_or(Error::NetworkClosed)?;
108
109        // We don't check that the message is too large here because we already enforce
110        // that on the network layer.
111        Ok((sender, message))
112    }
113}
114
115#[derive(Clone)]
116pub struct Channels<P: PublicKey> {
117    messenger: Messenger<P>,
118    max_size: u32,
119    receivers: BTreeMap<Channel, (Quota, mpsc::Sender<Message<P>>)>,
120}
121
122impl<P: PublicKey> Channels<P> {
123    pub const fn new(messenger: Messenger<P>, max_size: u32) -> Self {
124        Self {
125            messenger,
126            max_size,
127            receivers: BTreeMap::new(),
128        }
129    }
130
131    pub fn register<C: Clock>(
132        &mut self,
133        channel: Channel,
134        rate: Quota,
135        backlog: usize,
136        clock: C,
137    ) -> (Sender<P, C>, Receiver<P>) {
138        let (sender, receiver) = mpsc::channel(backlog);
139        if self.receivers.insert(channel, (rate, sender)).is_some() {
140            panic!("duplicate channel registration: {channel}");
141        }
142        (
143            Sender::new(channel, self.max_size, self.messenger.clone(), clock, rate),
144            Receiver::new(receiver),
145        )
146    }
147
148    pub fn collect(self) -> BTreeMap<u64, (Quota, mpsc::Sender<Message<P>>)> {
149        self.receivers
150    }
151}