use super::{ingress::Message, Config};
use crate::authenticated::{
lookup::{
actors::{peer, router, tracker},
metrics,
},
mailbox::UnboundedMailbox,
Mailbox,
};
use commonware_cryptography::PublicKey;
use commonware_macros::select_loop;
use commonware_runtime::{
spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Sink, Spawner, Stream,
};
use commonware_utils::channel::mpsc;
use prometheus_client::metrics::{counter::Counter, family::Family};
use rand_core::CryptoRngCore;
use std::num::NonZeroUsize;
use tracing::debug;
pub struct Actor<
E: Spawner + BufferPooler + Clock + CryptoRngCore + Metrics,
Si: Sink,
St: Stream,
C: PublicKey,
> {
context: ContextCell<E>,
mailbox_size: usize,
send_batch_size: NonZeroUsize,
ping_frequency: std::time::Duration,
receiver: mpsc::Receiver<Message<Si, St, C>>,
sent_messages: Family<metrics::Message, Counter>,
received_messages: Family<metrics::Message, Counter>,
dropped_messages: Family<metrics::Message, Counter>,
rate_limited: Family<metrics::Message, Counter>,
}
impl<
E: Spawner + BufferPooler + Clock + CryptoRngCore + Metrics,
Si: Sink,
St: Stream,
C: PublicKey,
> Actor<E, Si, St, C>
{
pub fn new(context: E, cfg: Config) -> (Self, Mailbox<Message<Si, St, C>>) {
let sent_messages = Family::<metrics::Message, Counter>::default();
let received_messages = Family::<metrics::Message, Counter>::default();
let dropped_messages = Family::<metrics::Message, Counter>::default();
let rate_limited = Family::<metrics::Message, Counter>::default();
context.register("messages_sent", "messages sent", sent_messages.clone());
context.register(
"messages_received",
"messages received",
received_messages.clone(),
);
context.register(
"messages_dropped",
"messages dropped due to full application buffer",
dropped_messages.clone(),
);
context.register(
"messages_rate_limited",
"messages rate limited",
rate_limited.clone(),
);
let (sender, receiver) = Mailbox::new(cfg.mailbox_size);
(
Self {
context: ContextCell::new(context),
mailbox_size: cfg.mailbox_size,
send_batch_size: cfg.send_batch_size,
ping_frequency: cfg.ping_frequency,
receiver,
sent_messages,
received_messages,
dropped_messages,
rate_limited,
},
sender,
)
}
pub fn start(
mut self,
tracker: UnboundedMailbox<tracker::Message<C>>,
router: Mailbox<router::Message<C>>,
) -> Handle<()> {
spawn_cell!(self.context, self.run(tracker, router).await)
}
async fn run(
mut self,
tracker: UnboundedMailbox<tracker::Message<C>>,
router: Mailbox<router::Message<C>>,
) {
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping spawner");
},
Some(msg) = self.receiver.recv() else {
debug!("mailbox closed, stopping spawner");
break;
} => {
match msg {
Message::Spawn {
peer,
connection,
reservation,
} => {
let sent_messages = self.sent_messages.clone();
let received_messages = self.received_messages.clone();
let dropped_messages = self.dropped_messages.clone();
let rate_limited = self.rate_limited.clone();
let mut tracker = tracker.clone();
let mut router = router.clone();
self.context
.with_label("peer")
.spawn(move |context| async move {
debug!(?peer, "peer started");
let (peer_actor, peer_mailbox, messenger) = peer::Actor::new(
context,
peer::Config {
ping_frequency: self.ping_frequency,
sent_messages,
received_messages,
dropped_messages,
rate_limited,
mailbox_size: self.mailbox_size,
send_batch_size: self.send_batch_size,
},
);
let Some(channels) = router.ready(peer.clone(), messenger).await
else {
debug!(?peer, "router shut down during peer setup");
return;
};
tracker.connect(peer.clone(), peer_mailbox);
let result =
peer_actor.run(peer.clone(), connection, channels).await;
match result {
Ok(()) => debug!(?peer, "peer shutdown gracefully"),
Err(e) => debug!(error = ?e, ?peer, "peer shutdown"),
}
router.release(peer).await;
drop(reservation)
});
}
}
},
}
}
}