commonware_p2p/authenticated/lookup/
channels.rs

1use super::Error;
2use crate::{authenticated::lookup::actors::router, Channel, Message, Recipients};
3use bytes::Bytes;
4use commonware_cryptography::PublicKey;
5use futures::{channel::mpsc, StreamExt};
6use governor::Quota;
7use std::collections::BTreeMap;
8
9/// Sender is the mechanism used to send arbitrary bytes to
10/// a set of recipients over a pre-defined channel.
11#[derive(Clone, Debug)]
12pub struct Sender<P: PublicKey> {
13    channel: Channel,
14    max_size: usize,
15    messenger: router::Messenger<P>,
16}
17
18impl<P: PublicKey> Sender<P> {
19    pub(super) fn new(channel: Channel, max_size: usize, messenger: router::Messenger<P>) -> Self {
20        Self {
21            channel,
22            max_size,
23            messenger,
24        }
25    }
26}
27
28impl<P: PublicKey> crate::Sender for Sender<P> {
29    type Error = Error;
30    type PublicKey = P;
31
32    /// Sends a message to a set of recipients.
33    ///
34    /// # Offline Recipients
35    ///
36    /// If a recipient is offline at the time a message is sent, the message will be dropped.
37    /// It is up to the application to handle retries (if necessary).
38    ///
39    /// # Parameters
40    ///
41    /// * `recipients` - The set of recipients to send the message to.
42    /// * `message` - The message to send.
43    /// * `priority` - Whether the message should be sent with priority (across
44    ///   all channels).
45    ///
46    /// # Returns
47    ///
48    /// A vector of recipients that the message was sent to, or an error if the message is too large.
49    ///
50    /// Note: a successful send does not guarantee that the recipient will receive the message.
51    async fn send(
52        &mut self,
53        recipients: Recipients<Self::PublicKey>,
54        message: Bytes,
55        priority: bool,
56    ) -> Result<Vec<Self::PublicKey>, Error> {
57        // Ensure message isn't too large
58        let message_len = message.len();
59        if message_len > self.max_size {
60            return Err(Error::MessageTooLarge(message_len));
61        }
62
63        // Wait for messenger to let us know who we sent to
64        Ok(self
65            .messenger
66            .content(recipients, self.channel, message, priority)
67            .await)
68    }
69}
70
71/// Channel to asynchronously receive messages from a channel.
72#[derive(Debug)]
73pub struct Receiver<P: PublicKey> {
74    receiver: mpsc::Receiver<Message<P>>,
75}
76
77impl<P: PublicKey> Receiver<P> {
78    pub(super) fn new(receiver: mpsc::Receiver<Message<P>>) -> Self {
79        Self { receiver }
80    }
81}
82
83impl<P: PublicKey> crate::Receiver for Receiver<P> {
84    type Error = Error;
85    type PublicKey = P;
86
87    /// Receives a message from the channel.
88    ///
89    /// This method will block until a message is received or the underlying
90    /// network shuts down.
91    async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
92        let (sender, message) = self.receiver.next().await.ok_or(Error::NetworkClosed)?;
93
94        // We don't check that the message is too large here because we already enforce
95        // that on the network layer.
96        Ok((sender, message))
97    }
98}
99
100#[derive(Clone)]
101pub struct Channels<P: PublicKey> {
102    messenger: router::Messenger<P>,
103    max_size: usize,
104    receivers: BTreeMap<Channel, (Quota, mpsc::Sender<Message<P>>)>,
105}
106
107impl<P: PublicKey> Channels<P> {
108    pub fn new(messenger: router::Messenger<P>, max_size: usize) -> Self {
109        Self {
110            messenger,
111            max_size,
112            receivers: BTreeMap::new(),
113        }
114    }
115
116    pub fn register(
117        &mut self,
118        channel: Channel,
119        rate: governor::Quota,
120        backlog: usize,
121    ) -> (Sender<P>, Receiver<P>) {
122        let (sender, receiver) = mpsc::channel(backlog);
123        if self.receivers.insert(channel, (rate, sender)).is_some() {
124            panic!("duplicate channel registration: {channel}");
125        }
126        (
127            Sender::new(channel, self.max_size, self.messenger.clone()),
128            Receiver::new(receiver),
129        )
130    }
131
132    pub fn collect(self) -> BTreeMap<u64, (Quota, mpsc::Sender<Message<P>>)> {
133        self.receivers
134    }
135}