commonware_p2p/authenticated/lookup/
channels.rs

1use super::Error;
2use crate::{
3    authenticated::lookup::actors::router::{self, Messenger},
4    utils::limited::{CheckedSender, LimitedSender},
5    Channel, Message, Recipients,
6};
7use bytes::Bytes;
8use commonware_cryptography::PublicKey;
9use commonware_runtime::{Clock, Quota};
10use futures::{channel::mpsc, StreamExt};
11use std::{collections::BTreeMap, fmt::Debug, time::SystemTime};
12
13/// An interior sender that enforces message size limits and
14/// supports sending arbitrary bytes to a set of recipients over
15/// a pre-defined [`Channel`].
16#[derive(Debug, Clone)]
17pub struct UnlimitedSender<P: PublicKey> {
18    channel: Channel,
19    max_size: u32,
20    messenger: Messenger<P>,
21}
22
23impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
24    type Error = Error;
25    type PublicKey = P;
26
27    async fn send(
28        &mut self,
29        recipients: Recipients<Self::PublicKey>,
30        message: Bytes,
31        priority: bool,
32    ) -> Result<Vec<Self::PublicKey>, Self::Error> {
33        if message.len() > self.max_size as usize {
34            return Err(Error::MessageTooLarge(message.len()));
35        }
36
37        Ok(self
38            .messenger
39            .content(recipients, self.channel, message, priority)
40            .await)
41    }
42}
43
44/// Sender is the mechanism used to send arbitrary bytes to a set of recipients over a pre-defined channel.
45#[derive(Clone)]
46pub struct Sender<P: PublicKey, C: Clock> {
47    limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
48}
49
50impl<P: PublicKey, C: Clock> Sender<P, C> {
51    pub(super) fn new(
52        channel: Channel,
53        max_size: u32,
54        messenger: Messenger<P>,
55        clock: C,
56        quota: Quota,
57    ) -> Self {
58        let master_sender = UnlimitedSender {
59            channel,
60            max_size,
61            messenger: messenger.clone(),
62        };
63        let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
64        Self { limited_sender }
65    }
66}
67
68impl<P, C> crate::LimitedSender for Sender<P, C>
69where
70    P: PublicKey,
71    C: Clock + Clone + Send + 'static,
72{
73    type PublicKey = P;
74    type Checked<'a>
75        = CheckedSender<'a, UnlimitedSender<P>>
76    where
77        Self: 'a;
78
79    async fn check(
80        &mut self,
81        recipients: Recipients<Self::PublicKey>,
82    ) -> Result<Self::Checked<'_>, SystemTime> {
83        self.limited_sender.check(recipients).await
84    }
85}
86
87/// Channel to asynchronously receive messages from a channel.
88#[derive(Debug)]
89pub struct Receiver<P: PublicKey> {
90    receiver: mpsc::Receiver<Message<P>>,
91}
92
93impl<P: PublicKey> Receiver<P> {
94    pub(super) const fn new(receiver: mpsc::Receiver<Message<P>>) -> Self {
95        Self { receiver }
96    }
97}
98
99impl<P: PublicKey> crate::Receiver for Receiver<P> {
100    type Error = Error;
101    type PublicKey = P;
102
103    /// Receives a message from the channel.
104    ///
105    /// This method will block until a message is received or the underlying
106    /// network shuts down.
107    async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
108        let (sender, message) = self.receiver.next().await.ok_or(Error::NetworkClosed)?;
109
110        // We don't check that the message is too large here because we already enforce
111        // that on the network layer.
112        Ok((sender, message))
113    }
114}
115
116#[derive(Clone)]
117pub struct Channels<P: PublicKey> {
118    messenger: router::Messenger<P>,
119    max_size: u32,
120    receivers: BTreeMap<Channel, (Quota, mpsc::Sender<Message<P>>)>,
121}
122
123impl<P: PublicKey> Channels<P> {
124    pub const fn new(messenger: router::Messenger<P>, max_size: u32) -> Self {
125        Self {
126            messenger,
127            max_size,
128            receivers: BTreeMap::new(),
129        }
130    }
131
132    pub fn register<C: Clock>(
133        &mut self,
134        channel: Channel,
135        rate: Quota,
136        backlog: usize,
137        clock: C,
138    ) -> (Sender<P, C>, Receiver<P>) {
139        let (sender, receiver) = mpsc::channel(backlog);
140        if self.receivers.insert(channel, (rate, sender)).is_some() {
141            panic!("duplicate channel registration: {channel}");
142        }
143        (
144            Sender::new(channel, self.max_size, self.messenger.clone(), clock, rate),
145            Receiver::new(receiver),
146        )
147    }
148
149    pub fn collect(self) -> BTreeMap<u64, (Quota, mpsc::Sender<Message<P>>)> {
150        self.receivers
151    }
152}