use std::pin::pin;
use std::time::Duration;
use async_trait::async_trait;
use calimero_context_client::client::ContextClient;
use calimero_node_primitives::client::{NamespaceJoinParams, OpenSubgroupJoinParams};
use calimero_node_primitives::join_bundle::JoinBundle;
use calimero_primitives::context::ContextId;
use eyre::Result;
use futures_util::stream::{self, StreamExt};
use libp2p::PeerId;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{self, MissedTickBehavior};
use tracing::{debug, error, info, warn};
use super::session::{DispatchDecision, FullWarnHint, SessionTracker, SkipReason};
use crate::sync_session_bridge::{
SyncSessionJob, SyncSessionResult, SyncSessionSendError, SyncSessionSender,
};
#[async_trait(?Send)]
pub(crate) trait SyncDriverDispatch {
async fn sync_namespace_from_peer(&self, namespace_id: [u8; 32]);
async fn initiate_namespace_join(&self, params: NamespaceJoinParams) -> Result<JoinBundle>;
async fn initiate_open_subgroup_join(&self, params: OpenSubgroupJoinParams) -> Result<Vec<u8>>;
}
pub(super) struct SyncDriver {
tracker: SessionTracker,
context_client: ContextClient,
ctx_sync_rx: mpsc::Receiver<(Option<ContextId>, Option<PeerId>)>,
ns_sync_rx: mpsc::Receiver<[u8; 32]>,
ns_join_rx: mpsc::Receiver<(NamespaceJoinParams, oneshot::Sender<Result<JoinBundle>>)>,
open_subgroup_join_rx:
mpsc::Receiver<(OpenSubgroupJoinParams, oneshot::Sender<Result<Vec<u8>>>)>,
session_tx: SyncSessionSender,
session_result_rx: mpsc::UnboundedReceiver<SyncSessionResult>,
frequency: Duration,
interval: Duration,
}
impl SyncDriver {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
tracker: SessionTracker,
context_client: ContextClient,
ctx_sync_rx: mpsc::Receiver<(Option<ContextId>, Option<PeerId>)>,
ns_sync_rx: mpsc::Receiver<[u8; 32]>,
ns_join_rx: mpsc::Receiver<(NamespaceJoinParams, oneshot::Sender<Result<JoinBundle>>)>,
open_subgroup_join_rx: mpsc::Receiver<(
OpenSubgroupJoinParams,
oneshot::Sender<Result<Vec<u8>>>,
)>,
session_tx: SyncSessionSender,
session_result_rx: mpsc::UnboundedReceiver<SyncSessionResult>,
frequency: Duration,
interval: Duration,
) -> Self {
Self {
tracker,
context_client,
ctx_sync_rx,
ns_sync_rx,
ns_join_rx,
open_subgroup_join_rx,
session_tx,
session_result_rx,
frequency,
interval,
}
}
pub(super) async fn run<D: SyncDriverDispatch>(mut self, dispatch: &D) {
let mut next_sync = time::interval(self.frequency);
next_sync.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut requested_ctx = None;
let mut requested_peer = None;
loop {
tokio::select! {
_ = next_sync.tick() => {
debug!("Performing interval sync");
if let Some(rollup) = self.tracker.tick_full_drops_summary() {
info!(
full_drops_in_window = rollup.drops,
contexts_affected = rollup.contexts_affected,
"SyncSession mailbox-full drop rollup (#2319)",
);
}
let grace = self.tracker.session_wedge_grace();
for context_id in self.tracker.tick_wedge_watchdog() {
warn!(
%context_id,
grace = ?grace,
"SyncSession initiator produced no result within watchdog grace — assuming a wedged session/actor; failing it so periodic-sync retries (#2319)"
);
}
}
Some(result) = self.session_result_rx.recv() => {
self.tracker.apply_result(result);
continue;
}
Some(namespace_id) = self.ns_sync_rx.recv() => {
info!(
namespace_id = %hex::encode(namespace_id),
"Performing namespace governance sync"
);
dispatch.sync_namespace_from_peer(namespace_id).await;
continue;
}
Some((params, reply_tx)) = self.ns_join_rx.recv() => {
info!(
namespace_id = %hex::encode(params.namespace_id),
"Processing namespace join request (initiator side)"
);
let result = dispatch.initiate_namespace_join(params).await;
let _ignored = reply_tx.send(result);
continue;
}
Some((params, reply_tx)) = self.open_subgroup_join_rx.recv() => {
info!(
namespace_id = %hex::encode(params.namespace_id),
subgroup_id = %hex::encode(params.subgroup_id),
"Processing open-subgroup join request (initiator side)"
);
let result = dispatch.initiate_open_subgroup_join(params).await;
let _ignored = reply_tx.send(result);
continue;
}
Some((ctx, peer)) = self.ctx_sync_rx.recv() => {
info!(?ctx, ?peer, "Received sync request");
requested_ctx = ctx;
requested_peer = peer;
let mut drained_count = 0;
while self.ctx_sync_rx.try_recv().is_ok() {
drained_count += 1;
}
if drained_count > 0 {
info!(drained_count, "Drained additional sync requests from queue, will sync all contexts");
requested_ctx = None;
requested_peer = None;
}
}
}
self.dispatch_pending_contexts(requested_ctx.take(), requested_peer.take())
.await;
}
}
async fn dispatch_pending_contexts(
&mut self,
requested_ctx: Option<ContextId>,
requested_peer: Option<PeerId>,
) {
let contexts = requested_ctx
.is_none()
.then(|| self.context_client.get_context_ids(None));
let contexts = stream::iter(requested_ctx)
.map(Ok)
.chain(stream::iter(contexts).flatten());
let mut contexts = pin!(contexts);
while let Some(context_id) = contexts.next().await {
let context_id = match context_id {
Ok(context_id) => context_id,
Err(err) => {
error!(%err, "Failed reading context id to sync");
continue;
}
};
let force = requested_ctx.is_some();
let is_first_sync = match self.tracker.dispatch_decision(&context_id, force) {
DispatchDecision::Skip(reason) => {
match reason {
SkipReason::DispatchRecentlyAttempted => debug!(
%context_id,
"Skipping sync — dispatch recently attempted, mailbox was full (#2319)"
),
SkipReason::AlreadyInProgress => debug!(
%context_id,
"Sync already in progress"
),
SkipReason::LastSyncTooRecent {
time_since,
minimum,
} => debug!(
%context_id,
?time_since,
?minimum,
"Skipping sync, last one was too recent"
),
}
continue;
}
DispatchDecision::Eligible {
is_first_sync,
forced_despite_recency,
} => {
if let Some(time_since) = forced_despite_recency {
debug!(
%context_id,
?time_since,
minimum = ?self.interval,
"Force syncing despite recency, due to explicit request"
);
}
is_first_sync
}
};
info!(%context_id, "Scheduled sync");
let dispatched = match self.session_tx.try_send(SyncSessionJob::Initiator {
context_id,
peer_id: requested_peer,
}) {
Ok(()) => true,
Err(SyncSessionSendError::Full) => {
match self.tracker.record_dispatch_full(context_id) {
FullWarnHint::EmitWarn => warn!(
%context_id,
"SyncSession actor mailbox full — skipping initiator dispatch; backing off this context for {:?} (#2316/#2319)",
self.interval
),
FullWarnHint::EmitDebug => debug!(
%context_id,
"SyncSession actor mailbox full — skipping (rate-limited; see periodic rollup) (#2319)"
),
}
false
}
Err(SyncSessionSendError::Closed) => {
self.tracker.record_dispatch_closed(context_id);
warn!(
%context_id,
"SyncSession actor closed — skipping initiator dispatch"
);
false
}
};
if !dispatched {
continue;
}
if is_first_sync {
info!(%context_id, "Syncing for the first time");
}
self.tracker
.record_dispatch_succeeded(context_id, is_first_sync);
}
}
}
#[cfg(test)]
mod tests {
}