use super::{ingress::Message, Config};
use crate::authenticated::{
discovery::{
actors::{
peer, router,
tracker::{self, Metadata},
},
metrics,
types::InfoVerifier,
},
mailbox::UnboundedMailbox,
Mailbox,
};
use commonware_cryptography::PublicKey;
use commonware_macros::select_loop;
use commonware_runtime::{
spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Sink, Spawner, Stream,
};
use commonware_utils::channel::mpsc;
use prometheus_client::metrics::{counter::Counter, family::Family};
use rand_core::CryptoRngCore;
use std::{num::NonZeroUsize, time::Duration};
use tracing::debug;
pub struct Actor<
E: Spawner + BufferPooler + Clock + CryptoRngCore + Metrics,
O: Sink,
I: Stream,
C: PublicKey,
> {
context: ContextCell<E>,
mailbox_size: usize,
send_batch_size: NonZeroUsize,
gossip_bit_vec_frequency: Duration,
max_peer_set_size: u64,
peer_gossip_max_count: usize,
info_verifier: InfoVerifier<C>,
receiver: mpsc::Receiver<Message<O, I, C>>,
sent_messages: Family<metrics::Message, Counter>,
received_messages: Family<metrics::Message, Counter>,
dropped_messages: Family<metrics::Message, Counter>,
rate_limited: Family<metrics::Message, Counter>,
}
impl<
E: Spawner + BufferPooler + Clock + CryptoRngCore + Metrics,
O: Sink,
I: Stream,
C: PublicKey,
> Actor<E, O, I, C>
{
#[allow(clippy::type_complexity)]
pub fn new(context: E, cfg: Config<C>) -> (Self, Mailbox<Message<O, I, C>>) {
let sent_messages = Family::<metrics::Message, Counter>::default();
let received_messages = Family::<metrics::Message, Counter>::default();
let dropped_messages = Family::<metrics::Message, Counter>::default();
let rate_limited = Family::<metrics::Message, Counter>::default();
context.register("messages_sent", "messages sent", sent_messages.clone());
context.register(
"messages_received",
"messages received",
received_messages.clone(),
);
context.register(
"messages_dropped",
"messages dropped due to full application buffer",
dropped_messages.clone(),
);
context.register(
"messages_rate_limited",
"messages rate limited",
rate_limited.clone(),
);
let (sender, receiver) = Mailbox::new(cfg.mailbox_size);
(
Self {
context: ContextCell::new(context),
mailbox_size: cfg.mailbox_size,
send_batch_size: cfg.send_batch_size,
gossip_bit_vec_frequency: cfg.gossip_bit_vec_frequency,
max_peer_set_size: cfg.max_peer_set_size,
peer_gossip_max_count: cfg.peer_gossip_max_count,
info_verifier: cfg.info_verifier,
receiver,
sent_messages,
received_messages,
dropped_messages,
rate_limited,
},
sender,
)
}
pub fn start(
mut self,
tracker: UnboundedMailbox<tracker::Message<C>>,
router: Mailbox<router::Message<C>>,
) -> Handle<()> {
spawn_cell!(self.context, self.run(tracker, router).await)
}
async fn run(
mut self,
tracker: UnboundedMailbox<tracker::Message<C>>,
router: Mailbox<router::Message<C>>,
) {
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping spawner");
},
Some(msg) = self.receiver.recv() else {
debug!("mailbox closed, stopping spawner");
break;
} => {
match msg {
Message::Spawn {
peer,
connection,
reservation,
} => {
self.context.with_label("peer").spawn({
let sent_messages = self.sent_messages.clone();
let received_messages = self.received_messages.clone();
let dropped_messages = self.dropped_messages.clone();
let rate_limited = self.rate_limited.clone();
let mut tracker = tracker.clone();
let mut router = router.clone();
let is_dialer = matches!(reservation.metadata(), Metadata::Dialer(..));
let info_verifier = self.info_verifier.clone();
move |context| async move {
let Some(greeting) = tracker.connect(peer.clone(), is_dialer).await
else {
debug!(?peer, "peer not eligible");
drop(reservation);
return;
};
debug!(?peer, "peer started");
let (peer_actor, messenger) = peer::Actor::new(
context,
peer::Config {
sent_messages,
received_messages,
dropped_messages,
rate_limited,
mailbox_size: self.mailbox_size,
send_batch_size: self.send_batch_size,
gossip_bit_vec_frequency: self.gossip_bit_vec_frequency,
max_peer_set_size: self.max_peer_set_size,
peer_gossip_max_count: self.peer_gossip_max_count,
info_verifier,
},
);
let Some(channels) = router.ready(peer.clone(), messenger).await
else {
debug!(?peer, "router shut down during peer setup");
return;
};
let result = peer_actor
.run(peer.clone(), greeting, connection, tracker, channels)
.await;
match result {
Ok(()) => debug!(?peer, "peer shutdown gracefully"),
Err(e) => debug!(error = ?e, ?peer, "peer shutdown"),
}
router.release(peer).await;
drop(reservation);
}
});
}
}
},
}
}
}