use crate::{
authenticated::{
data::EncodedData,
discovery::{channels::Channels, types},
relay::Relay,
Mailbox,
},
utils::limited::Connected,
Channel, Recipients,
};
use commonware_cryptography::PublicKey;
use commonware_runtime::{BufferPool, IoBufs};
use commonware_utils::{
channel::{fallible::AsyncFallibleExt, oneshot, ring},
NZUsize,
};
#[derive(Debug)]
pub enum Message<P: PublicKey> {
Ready {
peer: P,
relay: Relay<EncodedData>,
channels: oneshot::Sender<Channels<P>>,
},
Release { peer: P },
Content {
recipients: Recipients<P>,
encoded: EncodedData,
priority: bool,
success: oneshot::Sender<Vec<P>>,
},
SubscribePeers {
response: oneshot::Sender<ring::Receiver<Vec<P>>>,
},
}
impl<P: PublicKey> Mailbox<Message<P>> {
pub async fn ready(&mut self, peer: P, relay: Relay<EncodedData>) -> Option<Channels<P>> {
self.0
.request(|channels| Message::Ready {
peer,
relay,
channels,
})
.await
}
pub async fn release(&mut self, peer: P) {
self.0.send_lossy(Message::Release { peer }).await;
}
}
#[derive(Clone, Debug)]
pub struct Messenger<P: PublicKey> {
pool: BufferPool,
sender: Mailbox<Message<P>>,
}
impl<P: PublicKey> Messenger<P> {
pub const fn new(pool: BufferPool, sender: Mailbox<Message<P>>) -> Self {
Self { pool, sender }
}
pub async fn content(
&mut self,
recipients: Recipients<P>,
channel: Channel,
message: IoBufs,
priority: bool,
) -> Vec<P> {
let encoded = types::Payload::<P>::encode_data(&self.pool, channel, message);
self.sender
.0
.request_or_default(|success| Message::Content {
recipients,
encoded,
priority,
success,
})
.await
}
}
impl<P: PublicKey> Connected for Messenger<P> {
type PublicKey = P;
async fn subscribe(&mut self) -> ring::Receiver<Vec<Self::PublicKey>> {
self.sender
.0
.request(|response| Message::SubscribePeers { response })
.await
.unwrap_or_else(|| {
let (_, rx) = ring::channel(NZUsize!(1));
rx
})
}
}