commonware_p2p/authenticated/discovery/
channels.rs1use super::{actors::Messenger, Error};
2use crate::{Channel, Message, Recipients};
3use bytes::Bytes;
4use commonware_cryptography::PublicKey;
5use futures::{channel::mpsc, StreamExt};
6use governor::Quota;
7use std::collections::BTreeMap;
8
9#[derive(Clone, Debug)]
12pub struct Sender<P: PublicKey> {
13 channel: Channel,
14 max_size: usize,
15 messenger: Messenger<P>,
16}
17
18impl<P: PublicKey> Sender<P> {
19 pub(super) fn new(channel: Channel, max_size: usize, messenger: Messenger<P>) -> Self {
20 Self {
21 channel,
22 max_size,
23 messenger,
24 }
25 }
26}
27
28impl<P: PublicKey> crate::Sender for Sender<P> {
29 type Error = Error;
30 type PublicKey = P;
31
32 async fn send(
52 &mut self,
53 recipients: Recipients<Self::PublicKey>,
54 message: Bytes,
55 priority: bool,
56 ) -> Result<Vec<Self::PublicKey>, Error> {
57 let message_len = message.len();
59 if message_len > self.max_size {
60 return Err(Error::MessageTooLarge(message_len));
61 }
62
63 Ok(self
65 .messenger
66 .content(recipients, self.channel, message, priority)
67 .await)
68 }
69}
70
71#[derive(Debug)]
73pub struct Receiver<P: PublicKey> {
74 receiver: mpsc::Receiver<Message<P>>,
75}
76
77impl<P: PublicKey> Receiver<P> {
78 pub(super) fn new(receiver: mpsc::Receiver<Message<P>>) -> Self {
79 Self { receiver }
80 }
81}
82
83impl<P: PublicKey> crate::Receiver for Receiver<P> {
84 type Error = Error;
85 type PublicKey = P;
86
87 async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
92 let (sender, message) = self.receiver.next().await.ok_or(Error::NetworkClosed)?;
93
94 Ok((sender, message))
97 }
98}
99
100#[derive(Clone)]
101pub struct Channels<P: PublicKey> {
102 messenger: Messenger<P>,
103 max_size: usize,
104 receivers: BTreeMap<Channel, (Quota, mpsc::Sender<Message<P>>)>,
105}
106
107impl<P: PublicKey> Channels<P> {
108 pub fn new(messenger: Messenger<P>, max_size: usize) -> Self {
109 Self {
110 messenger,
111 max_size,
112 receivers: BTreeMap::new(),
113 }
114 }
115
116 pub fn register(
117 &mut self,
118 channel: Channel,
119 rate: governor::Quota,
120 backlog: usize,
121 ) -> (Sender<P>, Receiver<P>) {
122 let (sender, receiver) = mpsc::channel(backlog);
123 if self.receivers.insert(channel, (rate, sender)).is_some() {
124 panic!("duplicate channel registration: {channel}");
125 }
126 (
127 Sender::new(channel, self.max_size, self.messenger.clone()),
128 Receiver::new(receiver),
129 )
130 }
131
132 pub fn collect(self) -> BTreeMap<u64, (Quota, mpsc::Sender<Message<P>>)> {
133 self.receivers
134 }
135}