Skip to main content

commonware_p2p/authenticated/discovery/
channels.rs

1use super::{actors::Messenger, Error};
2use crate::{
3    utils::limited::{CheckedSender, LimitedSender},
4    Channel, Message as NetworkMessage, Recipients,
5};
6use commonware_actor::{
7    mailbox::{self, UnreliablePolicy},
8    Feedback, Unreliable,
9};
10use commonware_cryptography::PublicKey;
11use commonware_runtime::{Clock, IoBufs, Metrics, Quota};
12use std::{
13    collections::{BTreeMap, VecDeque},
14    fmt::Debug,
15    num::NonZeroUsize,
16    time::SystemTime,
17};
18
19pub(crate) struct Inbound<P: PublicKey>(pub(crate) NetworkMessage<P>);
20
21impl<P: PublicKey> UnreliablePolicy for Inbound<P> {
22    type Overflow = VecDeque<Self>;
23
24    fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool {
25        false
26    }
27}
28
29/// An interior sender that enforces message size limits and
30/// supports sending arbitrary bytes to a set of recipients over
31/// a pre-defined [`Channel`].
32#[derive(Debug, Clone)]
33pub struct UnlimitedSender<P: PublicKey> {
34    channel: Channel,
35    max_size: u32,
36    messenger: Messenger<P>,
37}
38
39impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
40    type PublicKey = P;
41
42    fn send(
43        &mut self,
44        recipients: Recipients<Self::PublicKey>,
45        message: impl Into<IoBufs> + Send,
46        priority: bool,
47    ) -> Unreliable<Feedback> {
48        let message = message.into();
49        assert!(
50            message.len() <= self.max_size as usize,
51            "message too large: {} > {}",
52            message.len(),
53            self.max_size
54        );
55
56        self.messenger
57            .content(recipients, self.channel, message, priority)
58    }
59}
60
61/// Sender is the mechanism used to send arbitrary bytes to a set of recipients over a pre-defined channel.
62pub struct Sender<P: PublicKey, C: Clock> {
63    limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
64}
65
66impl<P: PublicKey, C: Clock> Clone for Sender<P, C> {
67    fn clone(&self) -> Self {
68        Self {
69            limited_sender: self.limited_sender.clone(),
70        }
71    }
72}
73
74impl<P: PublicKey, C: Clock> Sender<P, C> {
75    pub(super) fn new(
76        channel: Channel,
77        max_size: u32,
78        messenger: Messenger<P>,
79        clock: C,
80        quota: Quota,
81    ) -> Self {
82        let master_sender = UnlimitedSender {
83            channel,
84            max_size,
85            messenger: messenger.clone(),
86        };
87        let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
88        Self { limited_sender }
89    }
90}
91
92impl<P, C> crate::LimitedSender for Sender<P, C>
93where
94    P: PublicKey,
95    C: Clock + Send + 'static,
96{
97    type PublicKey = P;
98    type Checked<'a>
99        = CheckedSender<'a, UnlimitedSender<P>>
100    where
101        Self: 'a;
102
103    fn check(
104        &mut self,
105        recipients: Recipients<Self::PublicKey>,
106    ) -> Result<Self::Checked<'_>, SystemTime> {
107        self.limited_sender.check(recipients)
108    }
109}
110
111/// Channel to asynchronously receive messages from a channel.
112pub struct Receiver<P: PublicKey> {
113    receiver: mailbox::UnreliableReceiver<Inbound<P>>,
114}
115
116impl<P: PublicKey> Debug for Receiver<P> {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        f.debug_struct("Receiver").finish_non_exhaustive()
119    }
120}
121
122impl<P: PublicKey> Receiver<P> {
123    pub(super) const fn new(receiver: mailbox::UnreliableReceiver<Inbound<P>>) -> Self {
124        Self { receiver }
125    }
126}
127
128impl<P: PublicKey> crate::Receiver for Receiver<P> {
129    type Error = Error;
130    type PublicKey = P;
131
132    /// Receives a message from the channel.
133    ///
134    /// This method will block until a message is received or the underlying
135    /// network shuts down.
136    async fn recv(&mut self) -> Result<NetworkMessage<Self::PublicKey>, Error> {
137        let Inbound((sender, message)) = self.receiver.recv().await.ok_or(Error::NetworkClosed)?;
138
139        // We don't check that the message is too large here because we already enforce
140        // that on the network layer.
141        Ok((sender, message))
142    }
143}
144
145#[derive(Clone, Debug)]
146pub struct Channels<P: PublicKey> {
147    messenger: Messenger<P>,
148    max_size: u32,
149    receivers: BTreeMap<Channel, (Quota, mailbox::UnreliableSender<Inbound<P>>)>,
150}
151
152impl<P: PublicKey> Channels<P> {
153    pub const fn new(messenger: Messenger<P>, max_size: u32) -> Self {
154        Self {
155            messenger,
156            max_size,
157            receivers: BTreeMap::new(),
158        }
159    }
160
161    pub fn register<C: Clock + Metrics>(
162        &mut self,
163        channel: Channel,
164        rate: Quota,
165        backlog: usize,
166        context: C,
167    ) -> (Sender<P, C>, Receiver<P>) {
168        let backlog = NonZeroUsize::new(backlog).expect("message backlog must be non-zero");
169        let (sender, receiver) = mailbox::new_unreliable(context.child("mailbox"), backlog);
170        if self.receivers.insert(channel, (rate, sender)).is_some() {
171            panic!("duplicate channel registration: {channel}");
172        }
173        (
174            Sender::new(
175                channel,
176                self.max_size,
177                self.messenger.clone(),
178                context,
179                rate,
180            ),
181            Receiver::new(receiver),
182        )
183    }
184
185    pub fn collect(self) -> BTreeMap<u64, (Quota, mailbox::UnreliableSender<Inbound<P>>)> {
186        self.receivers
187    }
188}