use super::{actors::Messenger, Error};
use crate::{
utils::limited::{CheckedSender, LimitedSender},
Channel, Message as NetworkMessage, Recipients,
};
use commonware_actor::{
mailbox::{self, UnreliablePolicy},
Feedback, Unreliable,
};
use commonware_cryptography::PublicKey;
use commonware_runtime::{Clock, IoBufs, Metrics, Quota};
use std::{
collections::{BTreeMap, VecDeque},
fmt::Debug,
num::NonZeroUsize,
time::SystemTime,
};
pub(crate) struct Inbound<P: PublicKey>(pub(crate) NetworkMessage<P>);
impl<P: PublicKey> UnreliablePolicy for Inbound<P> {
type Overflow = VecDeque<Self>;
fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool {
false
}
}
#[derive(Debug, Clone)]
pub struct UnlimitedSender<P: PublicKey> {
channel: Channel,
max_size: u32,
messenger: Messenger<P>,
}
impl<P: PublicKey> crate::UnlimitedSender for UnlimitedSender<P> {
type PublicKey = P;
fn send(
&mut self,
recipients: Recipients<Self::PublicKey>,
message: impl Into<IoBufs> + Send,
priority: bool,
) -> Unreliable<Feedback> {
let message = message.into();
assert!(
message.len() <= self.max_size as usize,
"message too large: {} > {}",
message.len(),
self.max_size
);
self.messenger
.content(recipients, self.channel, message, priority)
}
}
pub struct Sender<P: PublicKey, C: Clock> {
limited_sender: LimitedSender<C, UnlimitedSender<P>, Messenger<P>>,
}
impl<P: PublicKey, C: Clock> Clone for Sender<P, C> {
fn clone(&self) -> Self {
Self {
limited_sender: self.limited_sender.clone(),
}
}
}
impl<P: PublicKey, C: Clock> Sender<P, C> {
pub(super) fn new(
channel: Channel,
max_size: u32,
messenger: Messenger<P>,
clock: C,
quota: Quota,
) -> Self {
let master_sender = UnlimitedSender {
channel,
max_size,
messenger: messenger.clone(),
};
let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger);
Self { limited_sender }
}
}
impl<P, C> crate::LimitedSender for Sender<P, C>
where
P: PublicKey,
C: Clock + Send + 'static,
{
type PublicKey = P;
type Checked<'a>
= CheckedSender<'a, UnlimitedSender<P>>
where
Self: 'a;
fn check(
&mut self,
recipients: Recipients<Self::PublicKey>,
) -> Result<Self::Checked<'_>, SystemTime> {
self.limited_sender.check(recipients)
}
}
pub struct Receiver<P: PublicKey> {
receiver: mailbox::UnreliableReceiver<Inbound<P>>,
}
impl<P: PublicKey> Debug for Receiver<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Receiver").finish_non_exhaustive()
}
}
impl<P: PublicKey> Receiver<P> {
pub(super) const fn new(receiver: mailbox::UnreliableReceiver<Inbound<P>>) -> Self {
Self { receiver }
}
}
impl<P: PublicKey> crate::Receiver for Receiver<P> {
type Error = Error;
type PublicKey = P;
async fn recv(&mut self) -> Result<NetworkMessage<Self::PublicKey>, Error> {
let Inbound((sender, message)) = self.receiver.recv().await.ok_or(Error::NetworkClosed)?;
Ok((sender, message))
}
}
#[derive(Clone, Debug)]
pub struct Channels<P: PublicKey> {
messenger: Messenger<P>,
max_size: u32,
receivers: BTreeMap<Channel, (Quota, mailbox::UnreliableSender<Inbound<P>>)>,
}
impl<P: PublicKey> Channels<P> {
pub const fn new(messenger: Messenger<P>, max_size: u32) -> Self {
Self {
messenger,
max_size,
receivers: BTreeMap::new(),
}
}
pub fn register<C: Clock + Metrics>(
&mut self,
channel: Channel,
rate: Quota,
backlog: usize,
context: C,
) -> (Sender<P, C>, Receiver<P>) {
let backlog = NonZeroUsize::new(backlog).expect("message backlog must be non-zero");
let (sender, receiver) = mailbox::new_unreliable(context.child("mailbox"), backlog);
if self.receivers.insert(channel, (rate, sender)).is_some() {
panic!("duplicate channel registration: {channel}");
}
(
Sender::new(
channel,
self.max_size,
self.messenger.clone(),
context,
rate,
),
Receiver::new(receiver),
)
}
pub fn collect(self) -> BTreeMap<u64, (Quota, mailbox::UnreliableSender<Inbound<P>>)> {
self.receivers
}
}