use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix::{
Actor, ActorFutureExt, Addr, ArbiterHandle, AsyncContext, Context, Handler, Message, WrapFuture,
};
use calimero_network_primitives::stream::Stream;
use calimero_primitives::context::ContextId;
use libp2p::PeerId;
use tokio::sync::{mpsc, Semaphore};
use tracing::{debug, info, warn};
use calimero_node_primitives::sync::SyncProtocol;
use crate::sync::SyncManager;
pub const SYNC_SESSION_CHANNEL_CAPACITY: usize = 256;
const SUMMARY_INTERVAL: Duration = Duration::from_secs(60);
#[derive(Debug)]
pub struct SyncSessionResult {
pub context_id: ContextId,
pub peer_id: PeerId,
pub took: Duration,
pub result: Result<Result<SyncProtocol, eyre::Error>, tokio::time::error::Elapsed>,
}
struct InFlightGuard {
counter: Arc<AtomicU64>,
}
impl InFlightGuard {
fn new(counter: Arc<AtomicU64>) -> Self {
let _prev = counter.fetch_add(1, Ordering::Relaxed);
Self { counter }
}
}
impl Drop for InFlightGuard {
fn drop(&mut self) {
let _prev = self.counter.fetch_sub(1, Ordering::Relaxed);
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub enum SyncSessionJob {
Responder {
peer_id: PeerId,
stream: Box<Stream>,
},
Initiator {
context_id: ContextId,
peer_id: Option<PeerId>,
},
}
#[derive(Clone, Debug)]
pub struct SyncSessionSender {
addr: Addr<SyncSessionActor>,
dropped_total: Arc<AtomicU64>,
}
#[derive(Debug)]
pub enum SyncSessionSendError {
Full,
Closed,
}
impl SyncSessionSender {
pub fn try_send(&self, job: SyncSessionJob) -> Result<(), SyncSessionSendError> {
match self.addr.try_send(job) {
Ok(()) => Ok(()),
Err(actix::dev::SendError::Full(_)) => {
let _prev = self.dropped_total.fetch_add(1, Ordering::Relaxed);
Err(SyncSessionSendError::Full)
}
Err(actix::dev::SendError::Closed(_)) => {
let _prev = self.dropped_total.fetch_add(1, Ordering::Relaxed);
Err(SyncSessionSendError::Closed)
}
}
}
}
pub struct SyncSessionActor {
sync_manager: SyncManager,
session_timeout: Duration,
concurrency: Arc<Semaphore>,
result_tx: Option<mpsc::UnboundedSender<SyncSessionResult>>,
in_flight: Arc<AtomicU64>,
processed_total: Arc<AtomicU64>,
error_total: Arc<AtomicU64>,
timeout_total: Arc<AtomicU64>,
dropped_total: Arc<AtomicU64>,
}
impl SyncSessionActor {
fn new(
sync_manager: SyncManager,
session_timeout: Duration,
max_concurrent: usize,
result_tx: Option<mpsc::UnboundedSender<SyncSessionResult>>,
dropped_total: Arc<AtomicU64>,
) -> Self {
Self {
sync_manager,
session_timeout,
concurrency: Arc::new(Semaphore::new(max_concurrent)),
result_tx,
in_flight: Arc::new(AtomicU64::new(0)),
processed_total: Arc::new(AtomicU64::new(0)),
error_total: Arc::new(AtomicU64::new(0)),
timeout_total: Arc::new(AtomicU64::new(0)),
dropped_total,
}
}
fn log_summary(&self) {
let processed = self.processed_total.load(Ordering::Relaxed);
let errors = self.error_total.load(Ordering::Relaxed);
let timeouts = self.timeout_total.load(Ordering::Relaxed);
let dropped = self.dropped_total.load(Ordering::Relaxed);
let in_flight = self.in_flight.load(Ordering::Relaxed);
info!(
processed_total = processed,
error_total = errors,
timeout_total = timeouts,
dropped_total = dropped,
in_flight,
"SyncSession actor summary"
);
}
}
impl Actor for SyncSessionActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("SyncSession actor started on dedicated Arbiter");
let _handle = ctx.run_interval(SUMMARY_INTERVAL, |actor, _ctx| {
actor.log_summary();
});
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
self.log_summary();
info!("SyncSession actor stopped");
}
}
impl Handler<SyncSessionJob> for SyncSessionActor {
type Result = ();
fn handle(&mut self, job: SyncSessionJob, ctx: &mut Self::Context) {
let in_flight_guard = InFlightGuard::new(Arc::clone(&self.in_flight));
let session_timeout = self.session_timeout;
let sync_manager = self.sync_manager.clone();
let result_tx = self.result_tx.clone();
let concurrency = Arc::clone(&self.concurrency);
match job {
SyncSessionJob::Responder { peer_id, stream } => {
let processed_total = Arc::clone(&self.processed_total);
let timeout_total = Arc::clone(&self.timeout_total);
let work = async move {
let _guard = in_flight_guard;
let _permit = concurrency.acquire_owned().await.ok();
let started = Instant::now();
let outcome = tokio::time::timeout(
session_timeout,
sync_manager.handle_opened_stream(peer_id, stream),
)
.await;
match &outcome {
Ok(()) => {
let _prev = processed_total.fetch_add(1, Ordering::Relaxed);
}
Err(_elapsed) => {
let _prev = timeout_total.fetch_add(1, Ordering::Relaxed);
}
}
(outcome, started)
};
let _spawn_handle = ctx.spawn(work.into_actor(self).map(
move |(outcome, started), _act, _ctx| match outcome {
Ok(()) => {
debug!(
%peer_id,
elapsed_ms = started.elapsed().as_millis(),
"SyncSession responder completed"
);
}
Err(_elapsed) => {
warn!(
%peer_id,
timeout_secs = session_timeout.as_secs(),
elapsed_ms = started.elapsed().as_millis(),
"SyncSession responder exceeded timeout — dropping; peer will retry"
);
}
},
));
}
SyncSessionJob::Initiator {
context_id,
peer_id,
} => {
let processed_total = Arc::clone(&self.processed_total);
let error_total = Arc::clone(&self.error_total);
let timeout_total = Arc::clone(&self.timeout_total);
let work = async move {
let _guard = in_flight_guard;
let _permit = concurrency.acquire_owned().await.ok();
let started = Instant::now();
let outcome = tokio::time::timeout(
session_timeout,
sync_manager.perform_interval_sync(context_id, peer_id),
)
.await;
let chosen_peer = outcome
.as_ref()
.ok()
.and_then(|r| r.as_ref().ok())
.map(|(p, _)| *p)
.or(peer_id)
.unwrap_or_else(PeerId::random);
match &outcome {
Ok(Ok(_)) => {
let _prev = processed_total.fetch_add(1, Ordering::Relaxed);
}
Ok(Err(_)) => {
let _prev = error_total.fetch_add(1, Ordering::Relaxed);
}
Err(_elapsed) => {
let _prev = timeout_total.fetch_add(1, Ordering::Relaxed);
}
}
let took = started.elapsed();
let result = outcome.map(|r| r.map(|(_, proto)| proto));
(result, took, chosen_peer)
};
let _spawn_handle = ctx.spawn(work.into_actor(self).map(
move |(result, took, chosen_peer), _act, _ctx| {
match &result {
Ok(Ok(_)) => debug!(
%context_id,
%chosen_peer,
took_ms = took.as_millis(),
"SyncSession initiator completed"
),
Ok(Err(err)) => debug!(
%context_id,
%chosen_peer,
took_ms = took.as_millis(),
error = %err,
"SyncSession initiator failed"
),
Err(_elapsed) => warn!(
%context_id,
%chosen_peer,
took_ms = took.as_millis(),
"SyncSession initiator exceeded timeout — dropping; periodic-sync will retry"
),
}
if let Some(tx) = result_tx {
let session_result = SyncSessionResult {
context_id,
peer_id: chosen_peer,
took,
result,
};
let _ignored = tx.send(session_result);
}
},
));
}
}
}
}
pub fn start_sync_session_actor(
arbiter: &ArbiterHandle,
capacity: usize,
max_concurrent: usize,
sync_manager: SyncManager,
session_timeout: Duration,
result_tx: Option<mpsc::UnboundedSender<SyncSessionResult>>,
) -> SyncSessionSender {
let dropped_total = Arc::new(AtomicU64::new(0));
let dropped_for_actor = Arc::clone(&dropped_total);
let addr = SyncSessionActor::start_in_arbiter(arbiter, move |ctx| {
ctx.set_mailbox_capacity(capacity);
SyncSessionActor::new(
sync_manager,
session_timeout,
max_concurrent,
result_tx,
dropped_for_actor,
)
});
SyncSessionSender {
addr,
dropped_total,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dropped_total_starts_at_zero() {
let dropped_total = Arc::new(AtomicU64::new(0));
assert_eq!(dropped_total.load(Ordering::Relaxed), 0);
}
}