use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use actix::{Actor, AsyncContext, Context, Handler, Message};
use calimero_context_client::local_governance::SignedReadinessBeacon;
use calimero_node_primitives::client::NodeClient;
use calimero_primitives::identity::PublicKey;
use calimero_store::Store;
use libp2p::PeerId;
use zeroize::Zeroize;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadinessTier {
Bootstrapping,
LocallyReady,
PeerValidatedReady,
CatchingUp { target_applied_through: u64 },
Degraded { reason: DemotionReason },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DemotionReason {
PendingOps(usize),
NoRecentPeers,
}
#[derive(Debug, Clone)]
pub struct ReadinessState {
pub tier: ReadinessTier,
pub local_applied_through: u64,
pub local_pending_ops: usize,
pub subscribed_at: Instant,
}
#[derive(Debug, Clone, Copy)]
pub struct ReadinessConfig {
pub boot_grace: Duration,
pub ttl_heartbeat: Duration,
pub beacon_interval: Duration,
pub applied_through_grace: u64,
}
impl Default for ReadinessConfig {
fn default() -> Self {
Self {
boot_grace: Duration::from_secs(10),
ttl_heartbeat: Duration::from_secs(60),
beacon_interval: Duration::from_secs(5),
applied_through_grace: 2,
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PeerSummary {
pub max_applied_through: Option<u64>,
pub heard_recent_beacon: bool,
}
pub fn evaluate_readiness(
state: &ReadinessState,
peers: &PeerSummary,
cfg: &ReadinessConfig,
now: Instant,
) -> ReadinessTier {
if state.local_pending_ops > 0 {
return ReadinessTier::Degraded {
reason: DemotionReason::PendingOps(state.local_pending_ops),
};
}
if state.local_applied_through == 0 {
return if peers.heard_recent_beacon {
ReadinessTier::CatchingUp {
target_applied_through: peers.max_applied_through.unwrap_or(0),
}
} else {
ReadinessTier::Bootstrapping
};
}
let boot_grace_elapsed = now.duration_since(state.subscribed_at) >= cfg.boot_grace;
match (
peers.max_applied_through,
peers.heard_recent_beacon,
boot_grace_elapsed,
) {
(Some(peer_at), true, _) => {
if state
.local_applied_through
.saturating_add(cfg.applied_through_grace)
>= peer_at
{
ReadinessTier::PeerValidatedReady
} else {
ReadinessTier::CatchingUp {
target_applied_through: peer_at,
}
}
}
(None, false, true) => ReadinessTier::LocallyReady,
(None, false, false) => ReadinessTier::Bootstrapping,
(None, true, _) => {
debug_assert!(
false,
"PeerSummary built from non-atomic reads (None, true) — use ReadinessCache::peer_summary"
);
ReadinessTier::Bootstrapping
}
(Some(_), false, _) => {
debug_assert!(
false,
"PeerSummary built from non-atomic reads (Some, false) — use ReadinessCache::peer_summary"
);
ReadinessTier::Degraded {
reason: DemotionReason::NoRecentPeers,
}
}
}
}
#[derive(Debug, Clone)]
pub struct CacheEntry {
pub head: [u8; 32],
pub applied_through: u64,
pub ts_millis: u64,
pub received_at: Instant,
pub strong: bool,
}
pub const MAX_BEACON_CLOCK_DRIFT_MS: u64 = 60_000;
#[derive(Debug, Default)]
pub struct ReadinessCache {
entries: Mutex<BTreeMap<([u8; 32], PublicKey), CacheEntry>>,
}
impl ReadinessCache {
fn entries_lock(
&self,
) -> std::sync::MutexGuard<'_, BTreeMap<([u8; 32], PublicKey), CacheEntry>> {
self.entries
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
}
impl ReadinessCache {
pub fn insert(&self, beacon: &SignedReadinessBeacon) {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if beacon.ts_millis > now_ms.saturating_add(MAX_BEACON_CLOCK_DRIFT_MS) {
return;
}
let now = Instant::now();
let mut g = self.entries_lock();
let key = (beacon.namespace_id, beacon.peer_pubkey);
if let Some(existing) = g.get(&key) {
if beacon.ts_millis < existing.ts_millis
|| (beacon.ts_millis == existing.ts_millis
&& beacon.applied_through <= existing.applied_through)
{
return;
}
}
let evict_window = Duration::from_millis(MAX_BEACON_CLOCK_DRIFT_MS.saturating_mul(2));
g.retain(|(ns, _), entry| {
*ns != beacon.namespace_id || now.duration_since(entry.received_at) <= evict_window
});
let _ = g.insert(
key,
CacheEntry {
head: beacon.dag_head,
applied_through: beacon.applied_through,
ts_millis: beacon.ts_millis,
received_at: now,
strong: beacon.strong,
},
);
}
pub fn fresh_peers(&self, ns: [u8; 32], ttl: Duration) -> Vec<(PublicKey, CacheEntry)> {
let g = self.entries_lock();
let now = Instant::now();
g.iter()
.filter(|((nns, _), e)| *nns == ns && now.duration_since(e.received_at) <= ttl)
.map(|((_, pk), e)| (*pk, e.clone()))
.collect()
}
pub fn pick_sync_partner(
&self,
ns: [u8; 32],
ttl: Duration,
) -> Option<(PublicKey, CacheEntry)> {
self.fresh_peers(ns, ttl).into_iter().max_by(|a, b| {
a.1.strong
.cmp(&b.1.strong)
.then(a.1.applied_through.cmp(&b.1.applied_through))
.then(a.1.received_at.cmp(&b.1.received_at))
})
}
pub fn peer_summary(&self, ns: [u8; 32], ttl: Duration) -> PeerSummary {
let g = self.entries_lock();
let now = Instant::now();
let mut max_applied: Option<u64> = None;
let mut any_fresh = false;
for ((nns, _), e) in g.iter() {
if *nns != ns || now.duration_since(e.received_at) > ttl {
continue;
}
any_fresh = true;
max_applied = Some(max_applied.map_or(e.applied_through, |m| m.max(e.applied_through)));
}
PeerSummary {
max_applied_through: max_applied,
heard_recent_beacon: any_fresh,
}
}
}
pub struct ReadinessManager {
pub cache: Arc<ReadinessCache>,
pub config: ReadinessConfig,
pub state_per_namespace: HashMap<[u8; 32], ReadinessState>,
pub node_client: NodeClient,
pub datastore: Store,
pub last_probe_response_at: HashMap<(PeerId, [u8; 32]), Instant>,
}
impl Actor for ReadinessManager {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(self.config.beacon_interval, |this, _ctx| {
this.emit_periodic_beacons();
});
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ApplyBeaconLocal {
pub namespace_id: [u8; 32],
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct NamespaceOpApplied {
pub namespace_id: [u8; 32],
}
pub(crate) fn read_local_applied_through(store: &Store, ns_id: [u8; 32]) -> u64 {
let handle = store.handle();
let key = calimero_store::key::NamespaceGovHead::new(ns_id);
match handle.get(&key) {
Ok(Some(head)) => head.sequence,
Ok(None) => 0,
Err(err) => {
tracing::warn!(
?err,
namespace_id = %hex::encode(ns_id),
"read_local_applied_through: store read failed; treating as 0 — \
FSM may regress until the next successful read"
);
0
}
}
}
impl ReadinessManager {
fn emit_periodic_beacons(&mut self) {
let now = Instant::now();
let ttl = self.config.ttl_heartbeat;
let cfg = self.config;
let mut to_emit: Vec<([u8; 32], ReadinessState)> = Vec::new();
let ns_ids: Vec<[u8; 32]> = self.state_per_namespace.keys().copied().collect();
for ns_id in ns_ids {
let peers = self.cache.peer_summary(ns_id, ttl);
let applied_through = read_local_applied_through(&self.datastore, ns_id);
let snapshot = if let Some(entry) = self.state_per_namespace.get_mut(&ns_id) {
entry.local_applied_through = applied_through;
let new_tier = evaluate_readiness(entry, &peers, &cfg, now);
if new_tier != entry.tier {
tracing::info!(
namespace_id = %hex::encode(ns_id),
old = ?entry.tier,
new = ?new_tier,
cause = "periodic_tick",
"readiness tier transition"
);
entry.tier = new_tier;
}
if matches!(
entry.tier,
ReadinessTier::PeerValidatedReady | ReadinessTier::LocallyReady
) {
Some(entry.clone())
} else {
None
}
} else {
None
};
if let Some(snapshot) = snapshot {
to_emit.push((ns_id, snapshot));
}
}
for (ns_id, state) in to_emit {
self.publish_beacon(ns_id, &state);
}
}
fn publish_beacon(&self, ns_id: [u8; 32], state: &ReadinessState) {
use calimero_context_client::local_governance::{NamespaceTopicMsg, SignedReadinessBeacon};
use calimero_node_primitives::sync::BroadcastMessage;
let group_id = calimero_context_config::types::ContextGroupId::from(ns_id);
let identity =
match calimero_context::group_store::get_namespace_identity(&self.datastore, &group_id)
{
Ok(Some(id)) => id,
Ok(None) => return, Err(err) => {
tracing::debug!(?err, ?ns_id, "ReadinessBeacon: identity load failed");
return;
}
};
let (peer_pubkey, mut sk_bytes, mut sender_key) = identity;
sender_key.zeroize();
let strong = matches!(state.tier, ReadinessTier::PeerValidatedReady);
let ts_millis = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let dag_head = {
let handle = self.datastore.handle();
let key = calimero_store::key::NamespaceGovHead::new(ns_id);
match handle.get(&key) {
Ok(Some(head)) => head.dag_heads.iter().min().copied().unwrap_or([0u8; 32]),
Ok(None) | Err(_) => [0u8; 32],
}
};
let mut beacon = SignedReadinessBeacon {
namespace_id: ns_id,
peer_pubkey,
dag_head,
applied_through: state.local_applied_through,
ts_millis,
strong,
signature: [0u8; 64],
};
let signable = match beacon.signable_bytes() {
Ok(s) => s,
Err(err) => {
tracing::debug!(?err, "ReadinessBeacon: signable_bytes failed");
return;
}
};
let signing_key = calimero_primitives::identity::PrivateKey::from(sk_bytes);
sk_bytes.zeroize();
let signature = match signing_key.sign(&signable) {
Ok(sig) => sig.to_bytes(),
Err(err) => {
tracing::debug!(?err, "ReadinessBeacon: sign failed");
return;
}
};
beacon.signature = signature;
let topic = calimero_context::governance_broadcast::ns_topic(ns_id);
let inner = match borsh::to_vec(&NamespaceTopicMsg::ReadinessBeacon(beacon)) {
Ok(b) => b,
Err(err) => {
tracing::debug!(?err, "ReadinessBeacon: borsh encode (inner) failed");
return;
}
};
let envelope = BroadcastMessage::NamespaceGovernanceDelta {
namespace_id: ns_id,
delta_id: [0u8; 32],
parent_ids: Vec::new(),
payload: inner,
};
let bytes = match borsh::to_vec(&envelope) {
Ok(b) => b,
Err(err) => {
tracing::debug!(?err, "ReadinessBeacon: borsh encode (envelope) failed");
return;
}
};
let net = self.node_client.network_client().clone();
let log_ns = ns_id;
let log_applied = state.local_applied_through;
let log_strong = strong;
actix::spawn(async move {
match net.publish(topic, bytes).await {
Ok(_) => tracing::info!(
namespace_id = %hex::encode(log_ns),
applied_through = log_applied,
strong = log_strong,
"readiness beacon emitted"
),
Err(err) => {
tracing::debug!(?err, "ReadinessBeacon publish failed (non-fatal)");
}
}
});
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct EmitOutOfCycleBeacon {
pub namespace_id: [u8; 32],
pub requesting_peer: PeerId,
}
impl Handler<NamespaceOpApplied> for ReadinessManager {
type Result = ();
fn handle(&mut self, msg: NamespaceOpApplied, _ctx: &mut Self::Context) {
let peers = self
.cache
.peer_summary(msg.namespace_id, self.config.ttl_heartbeat);
let applied_through = read_local_applied_through(&self.datastore, msg.namespace_id);
let to_emit = {
let entry = self
.state_per_namespace
.entry(msg.namespace_id)
.or_insert_with(|| ReadinessState {
tier: ReadinessTier::Bootstrapping,
local_applied_through: 0,
local_pending_ops: 0,
subscribed_at: Instant::now(),
});
entry.local_applied_through = applied_through;
let new_tier = evaluate_readiness(entry, &peers, &self.config, Instant::now());
if new_tier != entry.tier {
tracing::info!(
namespace_id = %hex::encode(msg.namespace_id),
old = ?entry.tier,
new = ?new_tier,
cause = "namespace_op_applied",
"readiness tier transition"
);
entry.tier = new_tier;
if matches!(
new_tier,
ReadinessTier::PeerValidatedReady | ReadinessTier::LocallyReady
) {
Some(entry.clone())
} else {
None
}
} else {
None
}
};
if let Some(snapshot) = to_emit {
self.clear_probe_window_for(msg.namespace_id);
self.publish_beacon(msg.namespace_id, &snapshot);
}
}
}
impl Handler<EmitOutOfCycleBeacon> for ReadinessManager {
type Result = ();
fn handle(&mut self, msg: EmitOutOfCycleBeacon, _ctx: &mut Self::Context) {
let now = Instant::now();
let min_spacing = self.config.beacon_interval / 2;
let key = (msg.requesting_peer, msg.namespace_id);
if let Some(last) = self.last_probe_response_at.get(&key) {
if now.duration_since(*last) < min_spacing {
return; }
}
let snapshot = match self.state_per_namespace.get(&msg.namespace_id) {
Some(s)
if matches!(
s.tier,
ReadinessTier::PeerValidatedReady | ReadinessTier::LocallyReady
) =>
{
Some(s.clone())
}
_ => None,
};
let _ = self.last_probe_response_at.insert(key, now);
if let Some(snapshot) = snapshot {
self.publish_beacon(msg.namespace_id, &snapshot);
}
}
}
impl ReadinessManager {
fn clear_probe_window_for(&mut self, ns_id: [u8; 32]) {
self.last_probe_response_at
.retain(|(_, key_ns), _| *key_ns != ns_id);
}
}
impl Handler<ApplyBeaconLocal> for ReadinessManager {
type Result = ();
fn handle(&mut self, msg: ApplyBeaconLocal, _ctx: &mut Self::Context) {
let Some(mut state) = self.state_per_namespace.get(&msg.namespace_id).cloned() else {
return;
};
state.local_applied_through = read_local_applied_through(&self.datastore, msg.namespace_id);
let peers = self
.cache
.peer_summary(msg.namespace_id, self.config.ttl_heartbeat);
let new_tier = evaluate_readiness(&state, &peers, &self.config, Instant::now());
let tier_changed = new_tier != state.tier;
let snapshot = if let Some(s) = self.state_per_namespace.get_mut(&msg.namespace_id) {
s.local_applied_through = state.local_applied_through;
if tier_changed {
tracing::info!(
namespace_id = %hex::encode(msg.namespace_id),
old = ?state.tier,
new = ?new_tier,
cause = "peer_beacon_received",
"readiness tier transition"
);
s.tier = new_tier;
if matches!(
new_tier,
ReadinessTier::PeerValidatedReady | ReadinessTier::LocallyReady
) {
Some(s.clone())
} else {
None
}
} else {
None
}
} else {
None
};
if let Some(snapshot) = snapshot {
self.clear_probe_window_for(msg.namespace_id);
self.publish_beacon(msg.namespace_id, &snapshot);
}
}
}
#[derive(Debug, Default)]
pub struct ReadinessCacheNotify {
waiters: Mutex<HashMap<[u8; 32], Arc<tokio::sync::Notify>>>,
}
impl ReadinessCacheNotify {
fn waiters_lock(
&self,
) -> std::sync::MutexGuard<'_, HashMap<[u8; 32], Arc<tokio::sync::Notify>>> {
self.waiters
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
pub fn waiter_for(&self, ns: [u8; 32]) -> Arc<tokio::sync::Notify> {
let mut g = self.waiters_lock();
g.entry(ns)
.or_insert_with(|| Arc::new(tokio::sync::Notify::new()))
.clone()
}
pub fn notify(&self, ns: [u8; 32]) {
let g = self.waiters_lock();
if let Some(n) = g.get(&ns) {
n.notify_waiters();
}
}
}
impl ReadinessCache {
pub async fn await_first_fresh_beacon(
&self,
notify: &ReadinessCacheNotify,
ns: [u8; 32],
ttl: Duration,
deadline: Duration,
) -> Option<(PublicKey, CacheEntry)> {
let waiter = notify.waiter_for(ns);
let timeout_fut = tokio::time::sleep(deadline);
tokio::pin!(timeout_fut);
loop {
let notified = waiter.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if let Some(entry) = self.pick_sync_partner(ns, ttl) {
tracing::info!(
namespace_id = %hex::encode(ns),
partner = %entry.0,
partner_applied = entry.1.applied_through,
partner_strong = entry.1.strong,
"first fresh beacon resolved"
);
return Some(entry);
}
tokio::select! {
_ = notified => { }
_ = &mut timeout_fut => return None,
}
}
}
}