use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix::{
Actor, ActorFutureExt, Addr, ArbiterHandle, AsyncContext, Context, Handler, Message, WrapFuture,
};
use calimero_network_primitives::stream::Stream;
use calimero_primitives::context::ContextId;
use dashmap::DashMap;
use libp2p::PeerId;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::registry::Registry;
use tokio::sync::{mpsc, Semaphore};
use tracing::{debug, info, warn};
use calimero_node_primitives::sync::SyncProtocol;
use crate::sync::SyncManager;
pub const SYNC_SESSION_CHANNEL_CAPACITY: usize = 256;
const SUMMARY_INTERVAL: Duration = Duration::from_secs(60);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DropReason {
MailboxFull,
ActorClosed,
ContextBusy,
}
impl DropReason {
fn label(self) -> &'static str {
match self {
DropReason::MailboxFull => "mailbox_full",
DropReason::ActorClosed => "actor_closed",
DropReason::ContextBusy => "context_busy",
}
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct DropReasonLabel {
reason: String,
}
#[derive(Clone, Debug)]
pub struct SyncSessionMetrics {
mailbox_full: Counter,
actor_closed: Counter,
context_busy: Counter,
}
impl SyncSessionMetrics {
pub fn new(registry: &mut Registry) -> Self {
let dropped = Family::<DropReasonLabel, Counter>::default();
let sub = registry.sub_registry_with_prefix("sync_session");
sub.register(
"jobs_dropped_total",
"SyncSessionJobs dropped without running, by reason",
dropped.clone(),
);
Self::from_family(&dropped)
}
#[cfg(test)]
pub fn new_unregistered() -> Self {
Self::from_family(&Family::<DropReasonLabel, Counter>::default())
}
fn from_family(dropped: &Family<DropReasonLabel, Counter>) -> Self {
let counter = |reason: DropReason| {
dropped
.get_or_create(&DropReasonLabel {
reason: reason.label().to_owned(),
})
.clone()
};
Self {
mailbox_full: counter(DropReason::MailboxFull),
actor_closed: counter(DropReason::ActorClosed),
context_busy: counter(DropReason::ContextBusy),
}
}
fn counter(&self, reason: DropReason) -> &Counter {
match reason {
DropReason::MailboxFull => &self.mailbox_full,
DropReason::ActorClosed => &self.actor_closed,
DropReason::ContextBusy => &self.context_busy,
}
}
pub fn record_drop(&self, reason: DropReason) {
let _prev = self.counter(reason).inc();
}
pub fn dropped_total(&self) -> u64 {
self.mailbox_full.get() + self.actor_closed.get() + self.context_busy.get()
}
fn count(&self, reason: DropReason) -> u64 {
self.counter(reason).get()
}
}
#[derive(Debug)]
pub struct SyncSessionResult {
pub context_id: ContextId,
pub peer_id: PeerId,
pub took: Duration,
pub result: Result<Result<SyncProtocol, eyre::Error>, tokio::time::error::Elapsed>,
}
struct InFlightGuard {
counter: Arc<AtomicU64>,
}
impl InFlightGuard {
fn new(counter: Arc<AtomicU64>) -> Self {
let _prev = counter.fetch_add(1, Ordering::Relaxed);
Self { counter }
}
}
impl Drop for InFlightGuard {
fn drop(&mut self) {
let _prev = self.counter.fetch_sub(1, Ordering::Relaxed);
}
}
struct ContextGuard {
map: Arc<DashMap<ContextId, ()>>,
context_id: ContextId,
}
impl Drop for ContextGuard {
fn drop(&mut self) {
let _removed = self.map.remove(&self.context_id);
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub enum SyncSessionJob {
Responder {
peer_id: PeerId,
stream: Box<Stream>,
},
Initiator {
context_id: ContextId,
peer_id: Option<PeerId>,
},
}
#[derive(Clone, Debug)]
pub struct SyncSessionSender {
addr: Addr<SyncSessionActor>,
metrics: SyncSessionMetrics,
}
#[derive(Debug)]
pub enum SyncSessionSendError {
Full,
Closed,
}
impl SyncSessionSender {
pub fn try_send(&self, job: SyncSessionJob) -> Result<(), SyncSessionSendError> {
match self.addr.try_send(job) {
Ok(()) => Ok(()),
Err(actix::dev::SendError::Full(_)) => {
self.metrics.record_drop(DropReason::MailboxFull);
Err(SyncSessionSendError::Full)
}
Err(actix::dev::SendError::Closed(_)) => {
self.metrics.record_drop(DropReason::ActorClosed);
Err(SyncSessionSendError::Closed)
}
}
}
}
pub struct SyncSessionActor {
sync_manager: SyncManager,
session_timeout: Duration,
concurrency: Arc<Semaphore>,
in_flight_initiators: Arc<DashMap<ContextId, ()>>,
result_tx: Option<mpsc::UnboundedSender<SyncSessionResult>>,
metrics: SyncSessionMetrics,
in_flight: Arc<AtomicU64>,
processed_total: Arc<AtomicU64>,
error_total: Arc<AtomicU64>,
timeout_total: Arc<AtomicU64>,
}
impl SyncSessionActor {
fn new(
sync_manager: SyncManager,
session_timeout: Duration,
max_concurrent: usize,
result_tx: Option<mpsc::UnboundedSender<SyncSessionResult>>,
metrics: SyncSessionMetrics,
) -> Self {
Self {
sync_manager,
session_timeout,
concurrency: Arc::new(Semaphore::new(max_concurrent)),
in_flight_initiators: Arc::new(DashMap::new()),
result_tx,
metrics,
in_flight: Arc::new(AtomicU64::new(0)),
processed_total: Arc::new(AtomicU64::new(0)),
error_total: Arc::new(AtomicU64::new(0)),
timeout_total: Arc::new(AtomicU64::new(0)),
}
}
fn log_summary(&self) {
let processed = self.processed_total.load(Ordering::Relaxed);
let errors = self.error_total.load(Ordering::Relaxed);
let timeouts = self.timeout_total.load(Ordering::Relaxed);
let dropped = self.metrics.dropped_total();
let per_context_busy = self.metrics.count(DropReason::ContextBusy);
let in_flight = self.in_flight.load(Ordering::Relaxed);
let in_flight_contexts = self.in_flight_initiators.len();
info!(
processed_total = processed,
error_total = errors,
timeout_total = timeouts,
dropped_total = dropped,
per_context_busy_total = per_context_busy,
in_flight,
in_flight_contexts,
"SyncSession actor summary"
);
}
}
impl Actor for SyncSessionActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("SyncSession actor started on dedicated Arbiter");
let _handle = ctx.run_interval(SUMMARY_INTERVAL, |actor, _ctx| {
actor.log_summary();
});
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
self.log_summary();
info!("SyncSession actor stopped");
}
}
impl Handler<SyncSessionJob> for SyncSessionActor {
type Result = ();
fn handle(&mut self, job: SyncSessionJob, ctx: &mut Self::Context) {
let session_timeout = self.session_timeout;
let sync_manager = self.sync_manager.clone();
let result_tx = self.result_tx.clone();
let concurrency = Arc::clone(&self.concurrency);
match job {
SyncSessionJob::Responder { peer_id, stream } => {
let in_flight_guard = InFlightGuard::new(Arc::clone(&self.in_flight));
let processed_total = Arc::clone(&self.processed_total);
let timeout_total = Arc::clone(&self.timeout_total);
let work = async move {
let _guard = in_flight_guard;
let started = Instant::now();
let outcome = tokio::time::timeout(session_timeout, async move {
let _permit = concurrency.acquire_owned().await.ok();
sync_manager.handle_opened_stream(peer_id, stream).await
})
.await;
match &outcome {
Ok(()) => {
let _prev = processed_total.fetch_add(1, Ordering::Relaxed);
}
Err(_elapsed) => {
let _prev = timeout_total.fetch_add(1, Ordering::Relaxed);
}
}
(outcome, started)
};
let _spawn_handle = ctx.spawn(work.into_actor(self).map(
move |(outcome, started), _act, _ctx| match outcome {
Ok(()) => {
debug!(
%peer_id,
elapsed_ms = started.elapsed().as_millis(),
"SyncSession responder completed"
);
}
Err(_elapsed) => {
warn!(
%peer_id,
timeout_secs = session_timeout.as_secs(),
elapsed_ms = started.elapsed().as_millis(),
"SyncSession responder exceeded timeout — dropping; peer will retry"
);
}
},
));
}
SyncSessionJob::Initiator {
context_id,
peer_id,
} => {
if self.in_flight_initiators.insert(context_id, ()).is_some() {
self.metrics.record_drop(DropReason::ContextBusy);
debug!(
%context_id,
"SyncSession actor already running an initiator for this context — dropping duplicate (#2319)"
);
if let Some(tx) = &result_tx {
let _ignored = tx.send(SyncSessionResult {
context_id,
peer_id: peer_id.unwrap_or_else(PeerId::random),
took: Duration::ZERO,
result: Ok(Err(eyre::eyre!(
"initiator skipped — a sync session for this context is already in flight on the actor (#2319)"
))),
});
}
return;
}
let context_guard = ContextGuard {
map: Arc::clone(&self.in_flight_initiators),
context_id,
};
let in_flight_guard = InFlightGuard::new(Arc::clone(&self.in_flight));
let processed_total = Arc::clone(&self.processed_total);
let error_total = Arc::clone(&self.error_total);
let timeout_total = Arc::clone(&self.timeout_total);
let work = async move {
let _context_guard = context_guard;
let _guard = in_flight_guard;
let started = Instant::now();
let outcome = tokio::time::timeout(session_timeout, async move {
let _permit = concurrency.acquire_owned().await.ok();
sync_manager
.perform_interval_sync(context_id, peer_id)
.await
})
.await;
let chosen_peer = outcome
.as_ref()
.ok()
.and_then(|r| r.as_ref().ok())
.map(|(p, _)| *p)
.or(peer_id)
.unwrap_or_else(PeerId::random);
match &outcome {
Ok(Ok(_)) => {
let _prev = processed_total.fetch_add(1, Ordering::Relaxed);
}
Ok(Err(_)) => {
let _prev = error_total.fetch_add(1, Ordering::Relaxed);
}
Err(_elapsed) => {
let _prev = timeout_total.fetch_add(1, Ordering::Relaxed);
}
}
let took = started.elapsed();
let result = outcome.map(|r| r.map(|(_, proto)| proto));
(result, took, chosen_peer)
};
let _spawn_handle = ctx.spawn(work.into_actor(self).map(
move |(result, took, chosen_peer), _act, _ctx| {
match &result {
Ok(Ok(_)) => debug!(
%context_id,
%chosen_peer,
took_ms = took.as_millis(),
"SyncSession initiator completed"
),
Ok(Err(err)) => debug!(
%context_id,
%chosen_peer,
took_ms = took.as_millis(),
error = %err,
"SyncSession initiator failed"
),
Err(_elapsed) => warn!(
%context_id,
%chosen_peer,
took_ms = took.as_millis(),
"SyncSession initiator exceeded timeout — dropping; periodic-sync will retry"
),
}
if let Some(tx) = result_tx {
let session_result = SyncSessionResult {
context_id,
peer_id: chosen_peer,
took,
result,
};
let _ignored = tx.send(session_result);
}
},
));
}
}
}
}
pub fn start_sync_session_actor(
arbiter: &ArbiterHandle,
capacity: usize,
max_concurrent: usize,
sync_manager: SyncManager,
session_deadline: Duration,
result_tx: Option<mpsc::UnboundedSender<SyncSessionResult>>,
registry: &mut Registry,
) -> SyncSessionSender {
let metrics = SyncSessionMetrics::new(registry);
let metrics_for_actor = metrics.clone();
let addr = SyncSessionActor::start_in_arbiter(arbiter, move |ctx| {
ctx.set_mailbox_capacity(capacity);
SyncSessionActor::new(
sync_manager,
session_deadline,
max_concurrent,
result_tx,
metrics_for_actor,
)
});
SyncSessionSender { addr, metrics }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn metrics_start_at_zero() {
let m = SyncSessionMetrics::new_unregistered();
assert_eq!(m.dropped_total(), 0);
assert_eq!(m.count(DropReason::MailboxFull), 0);
assert_eq!(m.count(DropReason::ActorClosed), 0);
assert_eq!(m.count(DropReason::ContextBusy), 0);
}
#[test]
fn record_drop_routes_each_reason_to_its_own_counter() {
let m = SyncSessionMetrics::new_unregistered();
m.record_drop(DropReason::MailboxFull);
m.record_drop(DropReason::MailboxFull);
m.record_drop(DropReason::ActorClosed);
m.record_drop(DropReason::ContextBusy);
assert_eq!(m.count(DropReason::MailboxFull), 2);
assert_eq!(m.count(DropReason::ActorClosed), 1);
assert_eq!(m.count(DropReason::ContextBusy), 1);
assert_eq!(m.dropped_total(), 4);
}
#[test]
fn dropped_total_is_the_sum_of_per_reason_counters() {
let m = SyncSessionMetrics::new_unregistered();
for _ in 0..3 {
m.record_drop(DropReason::ContextBusy);
}
m.record_drop(DropReason::MailboxFull);
assert_eq!(
m.dropped_total(),
m.count(DropReason::MailboxFull)
+ m.count(DropReason::ActorClosed)
+ m.count(DropReason::ContextBusy)
);
assert_eq!(m.dropped_total(), 4);
}
}