use std::collections::HashMap;
use std::time::{Duration, Instant};
use actix::{AsyncContext, WrapFuture};
use calimero_context::governance_broadcast::verify_readiness_beacon;
use calimero_context_client::local_governance::{ReadinessProbe, SignedReadinessBeacon};
use libp2p::PeerId;
use tracing::{debug, info, warn};
use crate::readiness::{ApplyBeaconLocal, EmitOutOfCycleBeacon};
use crate::NodeManager;
const NS_BEACON_SYNC_DEBOUNCE: Duration = Duration::from_secs(5);
fn beacon_indicates_divergence(
local_has_state: bool,
dag_head: [u8; 32],
head_op_present_locally: bool,
) -> bool {
local_has_state && dag_head != [0u8; 32] && !head_op_present_locally
}
fn debounce_allows_sync(
debounce: &mut HashMap<[u8; 32], Instant>,
namespace_id: [u8; 32],
now: Instant,
) -> bool {
match debounce.get(&namespace_id) {
Some(last) if now.duration_since(*last) < NS_BEACON_SYNC_DEBOUNCE => false,
_ => {
let _ = debounce.insert(namespace_id, now);
true
}
}
}
pub(super) fn handle_readiness_beacon(
manager: &mut NodeManager,
ctx: &mut actix::Context<NodeManager>,
_peer_id: PeerId,
beacon: SignedReadinessBeacon,
) {
if !verify_readiness_beacon(&manager.datastore, &beacon) {
debug!(
namespace_id = %hex::encode(beacon.namespace_id),
"ReadinessBeacon failed verification; dropping"
);
return;
}
let namespace_id = beacon.namespace_id;
let peer_pubkey = beacon.peer_pubkey;
let applied_through = beacon.applied_through;
let strong = beacon.strong;
manager.readiness_cache.insert(&beacon);
manager.readiness_notify.notify(namespace_id);
info!(
namespace_id = %hex::encode(namespace_id),
peer = %peer_pubkey,
applied_through,
strong,
"readiness beacon received"
);
let dag_head = beacon.dag_head;
let datastore = manager.datastore.clone();
let node_client = manager.clients.node.clone();
let debounce = manager.ns_beacon_sync_debounce.clone();
let _ignored = ctx.spawn(
async move {
let handle = datastore.handle();
let local_has_state =
match handle.get(&calimero_store::key::NamespaceGovHead::new(namespace_id)) {
Ok(Some(head)) => head.dag_heads.iter().any(|h| *h != [0u8; 32]),
Ok(None) => false,
Err(err) => {
warn!(
?err,
namespace_id = %hex::encode(namespace_id),
"beacon-divergence: namespace head read failed; skipping sync"
);
return;
}
};
let head_op_present = match handle.get(&calimero_store::key::NamespaceGovOp::new(
namespace_id,
dag_head,
)) {
Ok(present) => present.is_some(),
Err(err) => {
warn!(
?err,
namespace_id = %hex::encode(namespace_id),
"beacon-divergence: local DAG read failed; skipping sync"
);
return;
}
};
drop(handle);
if !beacon_indicates_divergence(local_has_state, dag_head, head_op_present) {
return;
}
{
let mut guard = debounce
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if !debounce_allows_sync(&mut guard, namespace_id, Instant::now()) {
return;
}
}
info!(
namespace_id = %hex::encode(namespace_id),
dag_head = %hex::encode(dag_head),
"beacon advertises an unknown namespace DAG head; \
triggering governance sync"
);
if let Err(err) = node_client.sync_namespace(namespace_id).await {
warn!(
?err,
namespace_id = %hex::encode(namespace_id),
"beacon-triggered namespace governance sync failed"
);
}
}
.into_actor(manager),
);
if let Some(addr) = &manager.readiness_addr {
addr.do_send(ApplyBeaconLocal { namespace_id });
}
}
pub(super) fn handle_readiness_probe(
manager: &mut NodeManager,
_ctx: &mut actix::Context<NodeManager>,
peer_id: PeerId,
probe: ReadinessProbe,
) {
if let Some(addr) = &manager.readiness_addr {
addr.do_send(EmitOutOfCycleBeacon {
namespace_id: probe.namespace_id,
requesting_peer: peer_id,
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn divergence_true_when_head_op_absent() {
assert!(beacon_indicates_divergence(true, [7u8; 32], false));
}
#[test]
fn divergence_false_when_head_op_present() {
assert!(!beacon_indicates_divergence(true, [7u8; 32], true));
}
#[test]
fn divergence_false_for_zero_head() {
assert!(!beacon_indicates_divergence(true, [0u8; 32], false));
}
#[test]
fn divergence_false_when_no_local_state() {
assert!(!beacon_indicates_divergence(false, [7u8; 32], false));
}
#[test]
fn debounce_allows_first_then_blocks_within_window() {
let mut d: HashMap<[u8; 32], Instant> = HashMap::new();
let t0 = Instant::now();
assert!(debounce_allows_sync(&mut d, [1u8; 32], t0));
assert!(!debounce_allows_sync(
&mut d,
[1u8; 32],
t0 + Duration::from_secs(1)
));
}
#[test]
fn debounce_reallows_after_window() {
let mut d: HashMap<[u8; 32], Instant> = HashMap::new();
let t0 = Instant::now();
assert!(debounce_allows_sync(&mut d, [1u8; 32], t0));
assert!(debounce_allows_sync(
&mut d,
[1u8; 32],
t0 + NS_BEACON_SYNC_DEBOUNCE + Duration::from_millis(1)
));
}
#[test]
fn debounce_is_per_namespace() {
let mut d: HashMap<[u8; 32], Instant> = HashMap::new();
let t0 = Instant::now();
assert!(debounce_allows_sync(&mut d, [1u8; 32], t0));
assert!(debounce_allows_sync(&mut d, [2u8; 32], t0));
}
}