use std::collections::HashSet;
use actix::{AsyncContext, WrapFuture};
use calimero_context::governance_broadcast::sign_ack;
use calimero_context::group_store::get_namespace_identity;
use calimero_context_client::local_governance::{
hash_scoped_namespace, NamespaceTopicMsg, SignedNamespaceOp,
};
use calimero_context_client::messages::NamespaceApplyOutcome;
use calimero_context_config::types::ContextGroupId;
use calimero_network_primitives::client::NetworkClient;
use calimero_node_primitives::sync::{BroadcastMessage, MAX_SIGNED_GROUP_OP_PAYLOAD_BYTES};
use calimero_primitives::identity::PrivateKey;
use libp2p::gossipsub::TopicHash;
use tracing::{debug, info, warn};
use zeroize::Zeroize;
use crate::sync::parent_pull::{NextPeer, ParentPullBudget};
use crate::NodeManager;
pub(super) fn handle_namespace_governance_delta(
this: &mut NodeManager,
ctx: &mut actix::Context<NodeManager>,
source: libp2p::PeerId,
namespace_id: [u8; 32],
payload: Vec<u8>,
) {
if payload.len() > MAX_SIGNED_GROUP_OP_PAYLOAD_BYTES {
warn!(
len = payload.len(),
"oversized NamespaceGovernanceDelta payload"
);
return;
}
let msg: NamespaceTopicMsg = match borsh::from_slice(&payload) {
Ok(msg) => msg,
Err(err) => {
warn!(%err, "failed to decode NamespaceTopicMsg payload");
return;
}
};
let op = match msg {
NamespaceTopicMsg::Op(op) => op,
NamespaceTopicMsg::Ack(ack) => {
let _ = this.clients.context.ack_router().route(ack);
return;
}
NamespaceTopicMsg::ReadinessBeacon(beacon) => {
if beacon.namespace_id != namespace_id {
warn!("ReadinessBeacon namespace_id mismatch with topic; dropping");
return;
}
super::readiness::handle_readiness_beacon(this, ctx, source, beacon);
return;
}
NamespaceTopicMsg::ReadinessProbe(probe) => {
if probe.namespace_id != namespace_id {
warn!("ReadinessProbe namespace_id mismatch with topic; dropping");
return;
}
super::readiness::handle_readiness_probe(this, ctx, source, probe);
return;
}
};
if op.namespace_id != namespace_id {
warn!("NamespaceGovernanceDelta namespace_id mismatch with topic");
return;
}
if let Err(err) = op.verify_signature() {
warn!(%err, "NamespaceGovernanceDelta signature verification failed");
return;
}
let context_client = this.clients.context.clone();
let node_client = this.clients.node.clone();
let network_client = this.managers.sync.network_client.clone();
let sync_timeout = this.managers.sync.sync_config.timeout;
let pull_budget_max_peers = this.managers.sync.sync_config.parent_pull_additional_peers;
let pull_budget_duration = this.managers.sync.sync_config.parent_pull_budget;
let op_for_delivery = op.clone();
let readiness_addr = this.readiness_addr.clone();
let op_for_ack = op.clone();
let _ignored = ctx.spawn(
async move {
let outcome = match context_client.apply_signed_namespace_op(op).await {
Ok(outcome) => outcome,
Err(err) => {
warn!(?err, %source, "failed to apply namespace governance delta");
return;
}
};
if matches!(outcome, NamespaceApplyOutcome::Applied) {
if let Some(addr) = &readiness_addr {
addr.do_send(crate::readiness::NamespaceOpApplied { namespace_id });
}
}
if matches!(outcome, NamespaceApplyOutcome::Applied) {
emit_namespace_ack(&context_client, &network_client, namespace_id, &op_for_ack)
.await;
}
if matches!(outcome, NamespaceApplyOutcome::Pending) {
debug!(
%source,
namespace_id = %hex::encode(namespace_id),
"gossip governance op is pending; triggering proactive backfill"
);
fetch_and_apply_namespace_backfill(
&context_client,
&network_client,
source,
namespace_id,
Vec::new(),
sync_timeout,
)
.await;
resolve_namespace_pending(
&context_client,
&network_client,
source,
namespace_id,
sync_timeout,
pull_budget_max_peers,
pull_budget_duration,
)
.await;
}
crate::key_delivery::maybe_publish_key_delivery(
&context_client,
&node_client,
&op_for_delivery,
)
.await;
}
.into_actor(this),
);
}
pub(super) fn handle_namespace_state_heartbeat(
this: &mut NodeManager,
ctx: &mut actix::Context<NodeManager>,
source: libp2p::PeerId,
namespace_id: [u8; 32],
peer_heads: Vec<[u8; 32]>,
) {
const MAX_PEER_HEADS: usize = 256;
if peer_heads.len() > MAX_PEER_HEADS {
warn!(
%source,
heads = peer_heads.len(),
"Namespace heartbeat exceeds max peer heads, ignoring"
);
return;
}
let context_client = this.clients.context.clone();
let _ignored = ctx.spawn(
async move {
let store = context_client.datastore_handle().into_inner();
let ns_head_key = calimero_store::key::NamespaceGovHead::new(namespace_id);
let handle = store.handle();
let local_heads: HashSet<[u8; 32]> = match handle.get(&ns_head_key) {
Ok(Some(h)) => h.dag_heads.into_iter().collect(),
_ => HashSet::new(),
};
drop(handle);
let we_missing = peer_heads
.iter()
.filter(|h| !local_heads.contains(*h))
.count();
let peer_head_set: HashSet<[u8; 32]> = peer_heads.iter().copied().collect();
let peer_missing = local_heads
.iter()
.filter(|h| !peer_head_set.contains(*h))
.count();
if we_missing == 0 && peer_missing == 0 {
return;
}
tracing::debug!(
namespace_id = %hex::encode(namespace_id),
%source,
we_missing,
peer_missing,
"namespace heartbeat: divergence detected (liveness-only — recovery via \
publish_and_await_ack / parent_pull / readiness beacon)"
);
}
.into_actor(this),
);
}
async fn resolve_namespace_pending(
context_client: &calimero_context_client::client::ContextClient,
network_client: &NetworkClient,
initial_peer: libp2p::PeerId,
namespace_id: [u8; 32],
sync_timeout: tokio::time::Duration,
max_additional_peers: usize,
budget: tokio::time::Duration,
) {
let topic = libp2p::gossipsub::TopicHash::from_raw(format!("ns/{}", hex::encode(namespace_id)));
let mut mesh_peers = network_client.mesh_peers(topic.clone()).await;
let mut scheduler = ParentPullBudget::new(initial_peer, max_additional_peers, budget);
loop {
match namespace_has_pending(context_client, namespace_id).await {
Ok(false) => break,
Ok(true) => {}
Err(err) => {
warn!(
?err,
namespace_id = %hex::encode(namespace_id),
"namespace_pending_op_count failed; aborting cross-peer retry"
);
break;
}
}
let next_peer = match scheduler.next(&mesh_peers) {
NextPeer::Peer(p) => p,
NextPeer::RefetchMesh => {
mesh_peers = network_client.mesh_peers(topic.clone()).await;
scheduler.record_refetch();
match scheduler.next(&mesh_peers) {
NextPeer::Peer(p) => p,
other => {
debug!(
namespace_id = %hex::encode(namespace_id),
?other,
"no additional ns mesh peers for parent pull"
);
break;
}
}
}
NextPeer::BudgetExhausted => {
warn!(
namespace_id = %hex::encode(namespace_id),
"namespace parent-pull budget exhausted"
);
break;
}
NextPeer::MaxPeersReached | NextPeer::NoMorePeers => break,
};
scheduler.record_attempt(next_peer);
info!(
namespace_id = %hex::encode(namespace_id),
?next_peer,
attempt = scheduler.attempts(),
"retrying namespace backfill against additional mesh peer"
);
fetch_and_apply_namespace_backfill(
context_client,
network_client,
next_peer,
namespace_id,
Vec::new(),
sync_timeout,
)
.await;
}
}
async fn fetch_and_apply_namespace_backfill(
context_client: &calimero_context_client::client::ContextClient,
network_client: &NetworkClient,
peer: libp2p::PeerId,
namespace_id: [u8; 32],
delta_ids: Vec<[u8; 32]>,
sync_timeout: tokio::time::Duration,
) {
let Ok(mut stream) = network_client.open_stream(peer).await else {
debug!(
%peer,
"failed to open stream for namespace backfill"
);
return;
};
let msg = calimero_node_primitives::sync::StreamMessage::Init {
context_id: calimero_primitives::context::ContextId::from([0u8; 32]),
party_id: calimero_primitives::identity::PublicKey::from([0u8; 32]),
payload: calimero_node_primitives::sync::InitPayload::NamespaceBackfillRequest {
namespace_id,
delta_ids,
},
next_nonce: {
use rand::Rng;
rand::thread_rng().gen()
},
};
if let Err(err) = crate::sync::stream::send(&mut stream, &msg, None).await {
debug!(%err, "failed to send NamespaceBackfillRequest");
return;
}
match crate::sync::stream::recv(&mut stream, None, sync_timeout).await {
Ok(Some(calimero_node_primitives::sync::StreamMessage::Message {
payload:
calimero_node_primitives::sync::MessagePayload::NamespaceBackfillResponse { deltas },
..
})) => {
for (delta_id, op_bytes) in deltas {
if let Ok(op) = borsh::from_slice::<SignedNamespaceOp>(&op_bytes) {
if let Err(err) = context_client.apply_signed_namespace_op(op).await {
warn!(
%peer,
namespace_id = %hex::encode(namespace_id),
delta_id = %hex::encode(delta_id),
?err,
"failed to apply namespace backfill op"
);
}
}
}
}
_ => {
debug!("unexpected response to NamespaceBackfillRequest");
}
}
}
async fn namespace_has_pending(
context_client: &calimero_context_client::client::ContextClient,
namespace_id: [u8; 32],
) -> eyre::Result<bool> {
Ok(context_client
.namespace_pending_op_count(namespace_id)
.await?
> 0)
}
async fn emit_namespace_ack(
context_client: &calimero_context_client::client::ContextClient,
network_client: &NetworkClient,
namespace_id: [u8; 32],
op: &SignedNamespaceOp,
) {
let topic_str = format!("ns/{}", hex::encode(namespace_id));
let topic = TopicHash::from_raw(topic_str.clone());
let op_hash = match hash_scoped_namespace(topic_str.as_bytes(), op) {
Ok(h) => h,
Err(err) => {
warn!(%err, "ack: failed to hash op for ack signing; skipping");
return;
}
};
let store = context_client.datastore();
let ns_group = ContextGroupId::from(namespace_id);
let mut identity = match get_namespace_identity(store, &ns_group) {
Ok(Some(t)) => t,
Ok(None) => {
debug!(
namespace_id = %hex::encode(namespace_id),
"ack: no namespace identity yet; skipping"
);
return;
}
Err(err) => {
warn!(%err, "ack: namespace identity lookup failed; skipping");
return;
}
};
let signer_sk = PrivateKey::from(identity.1);
identity.1.zeroize();
identity.2.zeroize();
let ack = match sign_ack(&signer_sk, op_hash) {
Ok(ack) => ack,
Err(err) => {
warn!(%err, "ack: signing failed; skipping");
return;
}
};
let inner = match borsh::to_vec(&NamespaceTopicMsg::Ack(ack)) {
Ok(p) => p,
Err(err) => {
warn!(%err, "ack: borsh encode failed; skipping");
return;
}
};
let envelope = BroadcastMessage::NamespaceGovernanceDelta {
namespace_id,
delta_id: [0u8; 32],
parent_ids: vec![],
payload: inner,
};
let bytes = match borsh::to_vec(&envelope) {
Ok(b) => b,
Err(err) => {
warn!(%err, "ack: envelope encode failed; skipping");
return;
}
};
if let Err(err) = network_client.publish(topic, bytes).await {
debug!(%err, "ack: publish failed; sender will retry on timeout");
}
}