use crate::{
authenticated::{
data::EncodedData,
lookup::{channels::Channels, types},
relay::Relay,
},
utils::limited::Connected,
Channel, Recipients,
};
use commonware_actor::{
mailbox::{self, UnreliablePolicy},
Feedback, Unreliable,
};
use commonware_cryptography::PublicKey;
use commonware_runtime::{BufferPool, IoBufs};
use commonware_utils::{
channel::{oneshot, ring},
NZUsize,
};
use std::{collections::VecDeque, fmt};
pub enum Message<P: PublicKey> {
Ready {
peer: P,
relay: Relay<EncodedData>,
channels: oneshot::Sender<Channels<P>>,
},
Release { peer: P },
Content {
recipients: Recipients<P>,
encoded: EncodedData,
priority: bool,
},
SubscribePeers { sender: ring::Sender<Vec<P>> },
}
impl<P: PublicKey> UnreliablePolicy for Message<P> {
type Overflow = VecDeque<Self>;
fn handle(overflow: &mut Self::Overflow, message: Self) -> bool {
match message {
Self::Content { .. } => false,
message => {
overflow.push_back(message);
true
}
}
}
}
pub struct Mailbox<P: PublicKey>(mailbox::UnreliableSender<Message<P>>);
impl<P: PublicKey> Clone for Mailbox<P> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<P: PublicKey> fmt::Debug for Mailbox<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Mailbox").field(&self.0).finish()
}
}
impl<P: PublicKey> Mailbox<P> {
pub const fn new(sender: mailbox::UnreliableSender<Message<P>>) -> Self {
Self(sender)
}
pub async fn ready(&self, peer: P, relay: Relay<EncodedData>) -> Option<Channels<P>> {
let (channels, receiver) = oneshot::channel();
let _ = self.0.enqueue(Message::Ready {
peer,
relay,
channels,
});
receiver.await.ok()
}
pub fn release(&self, peer: P) -> Feedback {
match self.0.enqueue(Message::Release { peer }) {
Unreliable::Outcome(feedback) => feedback,
Unreliable::Rejected => unreachable!("router release cannot be rejected"),
}
}
}
#[derive(Clone, Debug)]
pub struct Messenger<P: PublicKey> {
pool: BufferPool,
sender: Mailbox<P>,
}
impl<P: PublicKey> Messenger<P> {
pub const fn new(pool: BufferPool, sender: Mailbox<P>) -> Self {
Self { pool, sender }
}
pub fn content(
&self,
recipients: Recipients<P>,
channel: Channel,
message: IoBufs,
priority: bool,
) -> Unreliable<Feedback> {
let encoded = types::Message::encode_data(&self.pool, channel, message);
self.sender.0.enqueue(Message::Content {
recipients,
encoded,
priority,
})
}
}
impl<P: PublicKey> Connected for Messenger<P> {
type PublicKey = P;
fn subscribe(&self) -> ring::Receiver<Vec<Self::PublicKey>> {
let (sender, receiver) = ring::channel(NZUsize!(1));
let _ = self.sender.0.enqueue(Message::SubscribePeers { sender });
receiver
}
}