use super::{
ingress::{Mailbox, Message, Messenger},
Config,
};
use crate::{
authenticated::{data::EncodedData, discovery::channels::Channels, relay::Relay},
Recipients,
};
use commonware_actor::mailbox;
use commonware_cryptography::PublicKey;
use commonware_macros::select_loop;
use commonware_runtime::{spawn_cell, BufferPooler, ContextCell, Handle, Metrics, Spawner};
use commonware_utils::channel::ring;
use futures::Sink;
use std::{collections::BTreeMap, pin::Pin};
use tracing::debug;
pub struct Actor<E: Spawner + BufferPooler + Metrics, P: PublicKey> {
context: ContextCell<E>,
control: mailbox::UnreliableReceiver<Message<P>>,
connections: BTreeMap<P, Relay<EncodedData>>,
open_subscriptions: Vec<ring::Sender<Vec<P>>>,
}
impl<E: Spawner + BufferPooler + Metrics, P: PublicKey> Actor<E, P> {
pub fn new(context: E, cfg: Config) -> (Self, Mailbox<P>, Messenger<P>) {
let (control_sender, control_receiver) =
mailbox::new_unreliable::<Message<P>>(context.child("mailbox"), cfg.mailbox_size);
let pool = context.network_buffer_pool().clone();
(
Self {
context: ContextCell::new(context),
control: control_receiver,
connections: BTreeMap::new(),
open_subscriptions: Vec::new(),
},
Mailbox::new(control_sender.clone()),
Messenger::new(pool, Mailbox::new(control_sender)),
)
}
fn send(&mut self, recipient: P, encoded: EncodedData, priority: bool) {
if let Some(relay) = self.connections.get_mut(&recipient) {
let _ = relay.send(encoded, priority);
}
}
fn route(&mut self, recipients: Recipients<P>, encoded: EncodedData, priority: bool) {
match recipients {
Recipients::One(recipient) => {
self.send(recipient, encoded, priority);
}
Recipients::Some(recipients) => {
for recipient in recipients {
self.send(recipient, encoded.clone(), priority);
}
}
Recipients::All => {
for relay in self.connections.values_mut() {
let _ = relay.send(encoded.clone(), priority);
}
}
}
}
pub fn start(mut self, routing: Channels<P>) -> Handle<()> {
spawn_cell!(self.context, self.run(routing))
}
async fn run(mut self, routing: Channels<P>) {
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping router");
},
Some(msg) = self.control.recv() else {
debug!("mailbox closed, stopping router");
break;
} => match msg {
Message::Ready {
peer,
relay,
channels,
} => {
debug!(?peer, "peer ready");
self.connections.insert(peer, relay);
let _ = channels.send(routing.clone());
self.notify_subscribers();
}
Message::Release { peer } => {
debug!(?peer, "peer released");
self.connections.remove(&peer);
self.notify_subscribers();
}
Message::Content {
recipients,
encoded,
priority,
} => {
self.route(recipients, encoded, priority);
}
Message::SubscribePeers { sender } => {
self.subscribe_peers(sender);
}
},
}
}
fn subscribe_peers(&mut self, mut sender: ring::Sender<Vec<P>>) {
let peers = self.connections.keys().cloned().collect();
if Pin::new(&mut sender).start_send(peers).is_ok() {
self.open_subscriptions.push(sender);
}
}
fn notify_subscribers(&mut self) {
let peers: Vec<P> = self.connections.keys().cloned().collect();
let mut keep = Vec::with_capacity(self.open_subscriptions.len());
for mut subscriber in self.open_subscriptions.drain(..) {
if Pin::new(&mut subscriber).start_send(peers.clone()).is_ok() {
keep.push(subscriber);
}
}
self.open_subscriptions = keep;
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_cryptography::ed25519::PublicKey;
use commonware_runtime::{deterministic, Runner as _};
use commonware_utils::NZUsize;
#[test]
fn subscribe_retains_only_open_initial_sender() {
deterministic::Runner::default().start(|context| async move {
let (mut actor, _, _) = Actor::<deterministic::Context, PublicKey>::new(
context,
Config {
mailbox_size: NZUsize!(1),
},
);
let (sender, receiver) = ring::channel(NZUsize!(1));
drop(receiver);
actor.subscribe_peers(sender);
assert!(actor.open_subscriptions.is_empty());
let (sender, _receiver) = ring::channel(NZUsize!(1));
actor.subscribe_peers(sender);
assert_eq!(actor.open_subscriptions.len(), 1);
});
}
}