use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use calimero_context_client::client::ContextClient;
use calimero_context_client::messages::DivergenceReport;
use calimero_context_config::types::ContextGroupId;
use calimero_node_primitives::sync::SyncProtocol;
use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use dashmap::DashMap;
use libp2p::gossipsub::TopicHash;
use libp2p::PeerId;
use rand::seq::SliceRandom;
use super::network::SyncNetwork;
use super::state_access::SyncStateAccess;
use crate::state::ReconcileAttempt;
#[async_trait(?Send)]
pub(crate) trait ReconcileSyncDispatch {
async fn initiate_sync(
&self,
context_id: ContextId,
peer: PeerId,
) -> eyre::Result<(PeerId, SyncProtocol)>;
}
#[derive(Clone)]
pub(crate) struct Reconciler {
state_access: Arc<dyn SyncStateAccess>,
sync_network: Arc<dyn SyncNetwork>,
context_client: ContextClient,
}
impl Reconciler {
pub(crate) fn new(
state_access: Arc<dyn SyncStateAccess>,
sync_network: Arc<dyn SyncNetwork>,
context_client: ContextClient,
) -> Self {
Self {
state_access,
sync_network,
context_client,
}
}
pub(crate) async fn reconcile_after_divergence<D: ReconcileSyncDispatch>(
&self,
dispatch: &D,
report: DivergenceReport,
) {
if report.hash_differs.is_empty() {
if report.group_hash_diverges {
tracing::warn!(
group_id = %hex::encode(report.group_id.to_bytes()),
op_kind = report.op_kind,
only_in_expected_count = report.only_in_expected.len(),
only_in_actual_count = report.only_in_actual.len(),
"reconcile-after-divergence: group-state hash diverges from signed expected, \
but no per-context hash mismatch is reconcilable here — convergence relies \
on the cross-DAG check against subsequent signed ops"
);
} else {
tracing::debug!(
group_id = %hex::encode(report.group_id.to_bytes()),
op_kind = report.op_kind,
only_in_expected_count = report.only_in_expected.len(),
only_in_actual_count = report.only_in_actual.len(),
"reconcile-after-divergence: no per-context hash mismatches to reconcile; \
namespace-DAG drift (if any) is handled by the cross-DAG check on \
subsequent state deltas"
);
}
return;
}
for (context_id, expected_root_hash) in &report.hash_differs {
self.reconcile_one_divergent_context(
dispatch,
report.group_id,
*context_id,
*expected_root_hash,
report.op_kind,
)
.await;
}
}
async fn reconcile_one_divergent_context<D: ReconcileSyncDispatch>(
&self,
dispatch: &D,
group_id: ContextGroupId,
context_id: ContextId,
expected_root_hash: [u8; 32],
op_kind: &'static str,
) {
if let Some((remaining, failures)) =
self.state_access.reconcile_remaining_cooldown(&context_id)
{
tracing::debug!(
%context_id,
op_kind,
consecutive_failures = failures,
cooldown_remaining_secs = remaining.as_secs(),
"reconcile-after-divergence: skipping — prior attempts failed and the \
per-context cooldown is still active; will re-attempt after backoff lapses"
);
return;
}
let anchors = self.anchor_identities_for_group(&group_id);
if anchors.is_empty() {
tracing::warn!(
%context_id,
group_id = %hex::encode(group_id.to_bytes()),
op_kind,
"reconcile-after-divergence: no trusted anchors defined for this group — \
falling back to operator path (no automatic recovery)"
);
return;
}
let topic = TopicHash::from_raw(context_id);
let mut mesh_peers = self.sync_network.mesh_peers(topic).await;
let mesh_peer_count = mesh_peers.len();
mesh_peers.shuffle(&mut rand::thread_rng());
let mut peers_missing_cache_entry: usize = 0;
let mut peers_known_not_anchor: usize = 0;
let anchor_peer =
mesh_peers
.iter()
.copied()
.find(|peer| match self.state_access.peer_identities(peer) {
Some(ids) => {
if ids.iter().any(|id| anchors.contains(id)) {
true
} else {
peers_known_not_anchor += 1;
false
}
}
None => {
peers_missing_cache_entry += 1;
tracing::debug!(
%context_id,
%peer,
op_kind,
"reconcile-after-divergence: mesh peer skipped — no peer_identities \
cache entry yet (peer has not been observed signing a verified \
message); cache warms as the peer's signed traffic is processed"
);
false
}
});
let Some(anchor_peer) = anchor_peer else {
tracing::warn!(
%context_id,
op_kind,
anchor_count = anchors.len(),
connected_mesh_peers = mesh_peer_count,
peers_missing_cache_entry,
peers_known_not_anchor,
"reconcile-after-divergence: no connected mesh peer matches the anchor set — \
falling back to operator path; reconcile will re-attempt on the next signed \
op or sync tick"
);
return;
};
tracing::info!(
%context_id,
%anchor_peer,
op_kind,
expected_root_hash = %hex::encode(expected_root_hash),
"reconcile-after-divergence: pulling canonical state from trusted anchor"
);
match dispatch.initiate_sync(context_id, anchor_peer).await {
Ok((peer_used, protocol)) => {
tracing::info!(
%context_id,
%peer_used,
?protocol,
"reconcile-after-divergence: anchor sync completed; verifying post-adoption hash"
);
let converged = self.verify_post_reconcile_root_hash(
context_id,
expected_root_hash,
peer_used,
op_kind,
);
if converged {
self.state_access.record_reconcile_success(&context_id);
} else {
let failures = self.state_access.record_reconcile_failure(context_id);
tracing::warn!(
%context_id,
op_kind,
consecutive_failures = failures,
next_cooldown_secs = reconcile_cooldown(failures).as_secs(),
"reconcile-after-divergence: recorded failure; subsequent reconcile \
attempts for this context are gated by the backoff window"
);
}
}
Err(err) => {
let failures = self.state_access.record_reconcile_failure(context_id);
tracing::warn!(
%context_id,
%anchor_peer,
op_kind,
%err,
consecutive_failures = failures,
next_cooldown_secs = reconcile_cooldown(failures).as_secs(),
"reconcile-after-divergence: anchor sync failed; reconcile will re-attempt \
after the backoff window lapses"
);
}
}
}
fn verify_post_reconcile_root_hash(
&self,
context_id: ContextId,
expected_root_hash: [u8; 32],
anchor_peer: PeerId,
op_kind: &'static str,
) -> bool {
let Ok(Some(context)) = self.context_client.get_context(&context_id) else {
tracing::warn!(
%context_id,
%anchor_peer,
op_kind,
"reconcile-after-divergence: context not found locally after anchor sync — \
cannot verify root hash"
);
return false;
};
let actual_root_hash: [u8; 32] = *AsRef::<[u8; 32]>::as_ref(&context.root_hash);
if actual_root_hash == expected_root_hash {
tracing::info!(
%context_id,
%anchor_peer,
op_kind,
root_hash = %hex::encode(actual_root_hash),
"reconcile-after-divergence: post-adoption hash matches signed expected — converged"
);
true
} else {
tracing::warn!(
%context_id,
%anchor_peer,
op_kind,
expected_root_hash = %hex::encode(expected_root_hash),
actual_root_hash = %hex::encode(actual_root_hash),
"reconcile-after-divergence: post-adoption hash does NOT match signed expected — \
either the anchor served non-canonical state or local apply diverged again; \
operator-investigation territory until pre-adoption rejection lands"
);
false
}
}
fn anchor_identities_for_group(&self, group_id: &ContextGroupId) -> BTreeSet<PublicKey> {
let store = self.context_client.datastore_handle().into_inner();
calimero_context::group_store::trusted_anchors_for_group(&store, group_id)
.unwrap_or_default()
}
}
pub(crate) fn reconcile_cooldown(consecutive_failures: u32) -> Duration {
const BASE_SECS: u64 = 30;
const MAX: Duration = Duration::from_secs(30 * 60);
let exp = consecutive_failures.saturating_sub(1).min(8);
let secs = BASE_SECS.saturating_mul(1u64 << u64::from(exp));
Duration::from_secs(secs).min(MAX)
}
pub(crate) fn reconcile_remaining_cooldown(
attempts: &DashMap<ContextId, ReconcileAttempt>,
context_id: &ContextId,
) -> Option<(Duration, u32)> {
let entry = attempts.get(context_id)?;
let cooldown = reconcile_cooldown(entry.consecutive_failures);
let elapsed = entry.last_attempt_at.elapsed();
let remaining = cooldown.checked_sub(elapsed)?;
if remaining.is_zero() {
None
} else {
Some((remaining, entry.consecutive_failures))
}
}
pub(crate) fn record_reconcile_failure(
attempts: &DashMap<ContextId, ReconcileAttempt>,
context_id: ContextId,
) -> u32 {
let mut entry = attempts
.entry(context_id)
.or_insert_with(|| ReconcileAttempt {
last_attempt_at: Instant::now(),
consecutive_failures: 0,
});
entry.consecutive_failures = entry.consecutive_failures.saturating_add(1);
entry.last_attempt_at = Instant::now();
entry.consecutive_failures
}
pub(crate) fn record_reconcile_success(
attempts: &DashMap<ContextId, ReconcileAttempt>,
context_id: &ContextId,
) {
let _ = attempts.remove(context_id);
}
#[cfg(test)]
mod tests {
use super::*;
fn ctx(byte: u8) -> ContextId {
ContextId::from([byte; 32])
}
#[test]
fn reconcile_cooldown_schedule_doubles_then_caps() {
assert_eq!(reconcile_cooldown(1), Duration::from_secs(30));
assert_eq!(reconcile_cooldown(2), Duration::from_secs(60));
assert_eq!(reconcile_cooldown(3), Duration::from_secs(120));
assert_eq!(reconcile_cooldown(4), Duration::from_secs(240));
assert_eq!(reconcile_cooldown(5), Duration::from_secs(480));
assert_eq!(reconcile_cooldown(6), Duration::from_secs(960));
assert_eq!(reconcile_cooldown(7), Duration::from_secs(30 * 60));
assert_eq!(reconcile_cooldown(50), Duration::from_secs(30 * 60));
assert_eq!(reconcile_cooldown(u32::MAX), Duration::from_secs(30 * 60));
}
#[test]
fn reconcile_cooldown_zero_failures_treated_as_one() {
assert_eq!(reconcile_cooldown(0), Duration::from_secs(30));
}
#[test]
fn record_reconcile_failure_increments_counter_and_stamps_time() {
let attempts: DashMap<ContextId, ReconcileAttempt> = DashMap::new();
let context = ctx(1);
assert_eq!(record_reconcile_failure(&attempts, context), 1);
assert_eq!(record_reconcile_failure(&attempts, context), 2);
assert_eq!(record_reconcile_failure(&attempts, context), 3);
let entry = attempts.get(&context).expect("entry was inserted");
assert_eq!(entry.consecutive_failures, 3);
assert!(entry.last_attempt_at.elapsed() < Duration::from_secs(5));
}
#[test]
fn record_reconcile_success_clears_entry() {
let attempts: DashMap<ContextId, ReconcileAttempt> = DashMap::new();
let context = ctx(1);
let _ = record_reconcile_failure(&attempts, context);
let _ = record_reconcile_failure(&attempts, context);
assert!(attempts.contains_key(&context));
record_reconcile_success(&attempts, &context);
assert!(
!attempts.contains_key(&context),
"success should clear all backoff state for the context"
);
}
#[test]
fn reconcile_remaining_cooldown_none_when_no_entry() {
let attempts: DashMap<ContextId, ReconcileAttempt> = DashMap::new();
assert!(reconcile_remaining_cooldown(&attempts, &ctx(1)).is_none());
}
#[test]
fn reconcile_remaining_cooldown_some_after_recent_failure() {
let attempts: DashMap<ContextId, ReconcileAttempt> = DashMap::new();
let context = ctx(1);
let _ = record_reconcile_failure(&attempts, context);
let (remaining, failures) =
reconcile_remaining_cooldown(&attempts, &context).expect("within cooldown");
assert_eq!(failures, 1);
assert!(remaining > Duration::from_secs(25));
assert!(remaining <= Duration::from_secs(30));
}
#[test]
fn reconcile_remaining_cooldown_none_after_cooldown_lapsed() {
let attempts: DashMap<ContextId, ReconcileAttempt> = DashMap::new();
let context = ctx(1);
let _replaced = attempts.insert(
context,
ReconcileAttempt {
last_attempt_at: Instant::now() - Duration::from_secs(60 * 60),
consecutive_failures: 7,
},
);
assert!(reconcile_remaining_cooldown(&attempts, &context).is_none());
}
}