use super::Error;
use crate::{
authenticated::lookup::actors::router::{self, Messenger},
utils::limited::{CheckedSender, LimitedSender},
Channel, Message, Recipients,
};
use commonware_cryptography::PublicKey;
use commonware_runtime::{Clock, IoBufs, Quota};
use commonware_utils::channel::mpsc;
use std::{collections::BTreeMap, fmt::Debug, time::SystemTime};
#[derive(Debug, Clone)]
pub struct UnlimitedSender<P: PublicKey> {
channel: Channel,
max_size: u32,
messenger: Messenger<P>,
}
impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
type Error = Error;
type PublicKey = P;
async fn send(
&mut self,
recipients: Recipients<Self::PublicKey>,
message: impl Into<IoBufs> + Send,
priority: bool,
) -> Result<Vec<Self::PublicKey>, Self::Error> {
let message = message.into();
if message.len() > self.max_size as usize {
return Err(Error::MessageTooLarge(message.len()));
}
Ok(self
.messenger
.content(recipients, self.channel, message, priority)
.await)
}
}
#[derive(Clone)]
pub struct Sender<P: PublicKey, C: Clock> {
limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
}
impl<P: PublicKey, C: Clock> Sender<P, C> {
pub(super) fn new(
channel: Channel,
max_size: u32,
messenger: Messenger<P>,
clock: C,
quota: Quota,
) -> Self {
let master_sender = UnlimitedSender {
channel,
max_size,
messenger: messenger.clone(),
};
let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
Self { limited_sender }
}
}
impl<P, C> crate::LimitedSender for Sender<P, C>
where
P: PublicKey,
C: Clock + Clone + Send + 'static,
{
type PublicKey = P;
type Checked<'a>
= CheckedSender<'a, UnlimitedSender<P>>
where
Self: 'a;
async fn check(
&mut self,
recipients: Recipients<Self::PublicKey>,
) -> Result<Self::Checked<'_>, SystemTime> {
self.limited_sender.check(recipients).await
}
}
#[derive(Debug)]
pub struct Receiver<P: PublicKey> {
receiver: mpsc::Receiver<Message<P>>,
}
impl<P: PublicKey> Receiver<P> {
pub(super) const fn new(receiver: mpsc::Receiver<Message<P>>) -> Self {
Self { receiver }
}
}
impl<P: PublicKey> crate::Receiver for Receiver<P> {
type Error = Error;
type PublicKey = P;
async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Error> {
let (sender, message) = self.receiver.recv().await.ok_or(Error::NetworkClosed)?;
Ok((sender, message))
}
}
#[derive(Clone, Debug)]
pub struct Channels<P: PublicKey> {
messenger: router::Messenger<P>,
max_size: u32,
receivers: BTreeMap<Channel, (Quota, mpsc::Sender<Message<P>>)>,
}
impl<P: PublicKey> Channels<P> {
pub const fn new(messenger: router::Messenger<P>, max_size: u32) -> Self {
Self {
messenger,
max_size,
receivers: BTreeMap::new(),
}
}
pub fn register<C: Clock>(
&mut self,
channel: Channel,
rate: Quota,
backlog: usize,
clock: C,
) -> (Sender<P, C>, Receiver<P>) {
let (sender, receiver) = mpsc::channel(backlog);
if self.receivers.insert(channel, (rate, sender)).is_some() {
panic!("duplicate channel registration: {channel}");
}
(
Sender::new(channel, self.max_size, self.messenger.clone(), clock, rate),
Receiver::new(receiver),
)
}
pub fn collect(self) -> BTreeMap<u64, (Quota, mpsc::Sender<Message<P>>)> {
self.receivers
}
}