Skip to main content

commonware_p2p/authenticated/lookup/
channels.rs

1use super::Error;
2use crate::{
3    authenticated::lookup::actors::router::{self, Messenger},
4    utils::limited::{CheckedSender, LimitedSender},
5    Channel, Message as NetworkMessage, Recipients,
6};
7use commonware_actor::{
8    mailbox::{self, UnreliablePolicy},
9    Feedback, Unreliable,
10};
11use commonware_cryptography::PublicKey;
12use commonware_runtime::{Clock, IoBufs, Metrics, Quota};
13use std::{
14    collections::{BTreeMap, VecDeque},
15    fmt::Debug,
16    num::NonZeroUsize,
17    time::SystemTime,
18};
19
20pub(crate) struct Inbound<P: PublicKey>(pub(crate) NetworkMessage<P>);
21
22impl<P: PublicKey> UnreliablePolicy for Inbound<P> {
23    type Overflow = VecDeque<Self>;
24
25    fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool {
26        false
27    }
28}
29
30/// An interior sender that enforces message size limits and
31/// supports sending arbitrary bytes to a set of recipients over
32/// a pre-defined [`Channel`].
33#[derive(Debug, Clone)]
34pub struct UnlimitedSender<P: PublicKey> {
35    channel: Channel,
36    max_size: u32,
37    messenger: Messenger<P>,
38}
39
40impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
41    type PublicKey = P;
42
43    fn send(
44        &mut self,
45        recipients: Recipients<Self::PublicKey>,
46        message: impl Into<IoBufs> + Send,
47        priority: bool,
48    ) -> Unreliable<Feedback> {
49        let message = message.into();
50        assert!(
51            message.len() <= self.max_size as usize,
52            "message too large: {} > {}",
53            message.len(),
54            self.max_size
55        );
56
57        self.messenger
58            .content(recipients, self.channel, message, priority)
59    }
60}
61
62/// Sender is the mechanism used to send arbitrary bytes to a set of recipients over a pre-defined channel.
63pub struct Sender<P: PublicKey, C: Clock> {
64    limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
65}
66
67impl<P: PublicKey, C: Clock> Clone for Sender<P, C> {
68    fn clone(&self) -> Self {
69        Self {
70            limited_sender: self.limited_sender.clone(),
71        }
72    }
73}
74
75impl<P: PublicKey, C: Clock> Sender<P, C> {
76    pub(super) fn new(
77        channel: Channel,
78        max_size: u32,
79        messenger: Messenger<P>,
80        clock: C,
81        quota: Quota,
82    ) -> Self {
83        let master_sender = UnlimitedSender {
84            channel,
85            max_size,
86            messenger: messenger.clone(),
87        };
88        let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
89        Self { limited_sender }
90    }
91}
92
93impl<P, C> crate::LimitedSender for Sender<P, C>
94where
95    P: PublicKey,
96    C: Clock + Send + 'static,
97{
98    type PublicKey = P;
99    type Checked<'a>
100        = CheckedSender<'a, UnlimitedSender<P>>
101    where
102        Self: 'a;
103
104    fn check(
105        &mut self,
106        recipients: Recipients<Self::PublicKey>,
107    ) -> Result<Self::Checked<'_>, SystemTime> {
108        self.limited_sender.check(recipients)
109    }
110}
111
112/// Channel to asynchronously receive messages from a channel.
113pub struct Receiver<P: PublicKey> {
114    receiver: mailbox::UnreliableReceiver<Inbound<P>>,
115}
116
117impl<P: PublicKey> Debug for Receiver<P> {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        f.debug_struct("Receiver").finish_non_exhaustive()
120    }
121}
122
123impl<P: PublicKey> Receiver<P> {
124    pub(super) const fn new(receiver: mailbox::UnreliableReceiver<Inbound<P>>) -> Self {
125        Self { receiver }
126    }
127}
128
129impl<P: PublicKey> crate::Receiver for Receiver<P> {
130    type Error = Error;
131    type PublicKey = P;
132
133    /// Receives a message from the channel.
134    ///
135    /// This method will block until a message is received or the underlying
136    /// network shuts down.
137    async fn recv(&mut self) -> Result<NetworkMessage<Self::PublicKey>, Error> {
138        let Inbound((sender, message)) = self.receiver.recv().await.ok_or(Error::NetworkClosed)?;
139
140        // We don't check that the message is too large here because we already enforce
141        // that on the network layer.
142        Ok((sender, message))
143    }
144}
145
146#[derive(Clone, Debug)]
147pub struct Channels<P: PublicKey> {
148    messenger: router::Messenger<P>,
149    max_size: u32,
150    receivers: BTreeMap<Channel, (Quota, mailbox::UnreliableSender<Inbound<P>>)>,
151}
152
153impl<P: PublicKey> Channels<P> {
154    pub const fn new(messenger: router::Messenger<P>, max_size: u32) -> Self {
155        Self {
156            messenger,
157            max_size,
158            receivers: BTreeMap::new(),
159        }
160    }
161
162    pub fn register<C: Clock + Metrics>(
163        &mut self,
164        channel: Channel,
165        rate: Quota,
166        backlog: usize,
167        context: C,
168    ) -> (Sender<P, C>, Receiver<P>) {
169        let backlog = NonZeroUsize::new(backlog).expect("message backlog must be non-zero");
170        let (sender, receiver) = mailbox::new_unreliable(context.child("mailbox"), backlog);
171        if self.receivers.insert(channel, (rate, sender)).is_some() {
172            panic!("duplicate channel registration: {channel}");
173        }
174        (
175            Sender::new(
176                channel,
177                self.max_size,
178                self.messenger.clone(),
179                context,
180                rate,
181            ),
182            Receiver::new(receiver),
183        )
184    }
185
186    pub fn collect(self) -> BTreeMap<u64, (Quota, mailbox::UnreliableSender<Inbound<P>>)> {
187        self.receivers
188    }
189}