use calimero_crypto::Nonce;
use calimero_network_primitives::stream::Stream;
use calimero_node_primitives::sync::{InitPayload, MessagePayload, StreamMessage};
use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use calimero_storage::delta::CausalDelta;
use eyre::{bail, OptionExt, Result};
use tracing::{debug, error, info, warn};
use super::manager::SyncManager;
use super::tracking::Sequencer;
const MAX_DELTA_FETCH_LIMIT: usize = 3000;
const DELTA_WARN_LIMIT: usize = 1000;
const GENESIS_DELTA_ID: [u8; 32] = [0u8; 32];
pub(crate) const GENESIS_AUTHOR_SENTINEL: [u8; 32] = [0u8; 32];
pub(crate) fn genesis_author_sentinel() -> PublicKey {
PublicKey::from(GENESIS_AUTHOR_SENTINEL)
}
pub(crate) fn is_genesis_author_sentinel(author: &PublicKey) -> bool {
let bytes: &[u8; 32] = author.as_ref();
bytes == &GENESIS_AUTHOR_SENTINEL
}
pub(crate) struct FetchedDelta {
pub delta: CausalDelta,
pub author_id: PublicKey,
pub governance_position_blob: Option<Vec<u8>>,
pub delta_signature: Option<[u8; 64]>,
}
enum VerifiedParent {
Apply {
position: Option<calimero_context_config::types::GovernancePosition>,
},
Skip,
}
fn verify_fetched_parent(
context_id: &ContextId,
delta_id: [u8; 32],
fetched: &FetchedDelta,
datastore: &calimero_store::Store,
) -> VerifiedParent {
use calimero_context::group_store::{membership_status_at, MembershipStatus};
use calimero_context_config::types::GovernancePosition;
if is_genesis_author_sentinel(&fetched.author_id) {
debug!(
%context_id,
delta_id = ?delta_id,
"DAG-catchup parent-pull: accepting genesis delta via author sentinel"
);
return VerifiedParent::Apply { position: None };
}
let pos = match fetched
.governance_position_blob
.as_deref()
.map(borsh::from_slice::<GovernancePosition>)
.transpose()
{
Ok(p) => p,
Err(e) => {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
%e,
"DAG-catchup parent-pull: failed to decode governance_position; \
skipping this delta and continuing"
);
return VerifiedParent::Skip;
}
};
if let Some(ref sig) = fetched.delta_signature {
if let Err(err) = calimero_node_primitives::sync::delta_auth::verify_delta_signature(
*context_id,
delta_id,
fetched.author_id,
pos.as_ref(),
sig,
) {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
%err,
"DAG-catchup parent-pull: rejecting delta — envelope signature \
verification failed"
);
return VerifiedParent::Skip;
}
}
{
use crate::handlers::state_delta::{
verify_position_group_id_matches_context, GroupIdCheck,
};
match verify_position_group_id_matches_context(
datastore,
context_id,
pos.as_ref().map(|p| p.group_id),
) {
GroupIdCheck::Match | GroupIdCheck::NonGroupOk => {}
GroupIdCheck::GroupContextNoPosition { owning } => {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
owning_group = ?owning,
"DAG-catchup parent-pull: rejecting delta — context is owned by a \
group but delta carries no governance_position"
);
return VerifiedParent::Skip;
}
GroupIdCheck::NonGroupContextWithPosition { claimed } => {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
claimed_group = ?claimed,
"DAG-catchup parent-pull: rejecting delta — delta claims a \
governance position but context is not in any group"
);
return VerifiedParent::Skip;
}
GroupIdCheck::Mismatch { owning, claimed } => {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
owning_group = ?owning,
claimed_group = ?claimed,
"DAG-catchup parent-pull: rejecting delta — governance_position \
references a different group than the context's owning group"
);
return VerifiedParent::Skip;
}
GroupIdCheck::LookupError(err) => {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
%err,
"DAG-catchup parent-pull: skipping delta — group lookup failed \
during anti-bypass check"
);
return VerifiedParent::Skip;
}
}
}
if calimero_context::group_store::is_read_only_for_context(
datastore,
&context_id,
&fetched.author_id,
)
.unwrap_or(false)
{
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
"DAG-catchup parent-pull: rejecting delta from ReadOnly member"
);
return VerifiedParent::Skip;
}
if let Some(ref pos_ref) = pos {
match membership_status_at(datastore, &fetched.author_id, pos_ref) {
Ok(MembershipStatus::Member(_)) => {
}
Ok(MembershipStatus::Removed { last_role }) => {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
last_role = ?last_role,
"DAG-catchup parent-pull: rejecting delta — author was removed \
at the cited governance cut"
);
return VerifiedParent::Skip;
}
Ok(MembershipStatus::NeverMember) => {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
"DAG-catchup parent-pull: rejecting delta — author was never \
a member at the cited governance cut"
);
return VerifiedParent::Skip;
}
Ok(MembershipStatus::Unknown { needed }) => {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
needed = ?needed,
"DAG-catchup parent-pull: skipping delta — governance cut not \
locally known; will re-attempt on next sync tick"
);
return VerifiedParent::Skip;
}
Err(e) => {
warn!(
%context_id,
author = %fetched.author_id,
delta_id = ?delta_id,
error = %e,
"DAG-catchup parent-pull: skipping delta — membership_status_at \
failed"
);
return VerifiedParent::Skip;
}
}
}
VerifiedParent::Apply { position: pos }
}
impl SyncManager {
pub async fn request_missing_deltas(
&self,
context_id: ContextId,
missing_ids: Vec<[u8; 32]>,
source: libp2p::PeerId,
delta_store: crate::delta_store::DeltaStore,
our_identity: PublicKey,
) -> Result<()> {
info!(
%context_id,
?source,
initial_missing_count = missing_ids.len(),
"Requesting missing parent deltas from peer"
);
let mut stream = self.sync_network.open_stream(source).await?;
let mut to_fetch = missing_ids.clone();
let mut fetch_count = 0;
let mut visited_ids = std::collections::HashSet::new();
for id in &missing_ids {
visited_ids.insert(*id);
}
while !to_fetch.is_empty() {
let current_batch = std::mem::take(&mut to_fetch);
for missing_id in current_batch {
if fetch_count >= MAX_DELTA_FETCH_LIMIT {
warn!(
%context_id,
fetch_count,
limit = MAX_DELTA_FETCH_LIMIT,
"Exceeded maximum delta fetch limit. The sync gap is too large."
);
return Ok(());
}
fetch_count += 1;
match self
.request_delta(&context_id, missing_id, &mut stream, our_identity)
.await
{
Ok(Some(fetched)) => {
info!(
%context_id,
delta_id = ?missing_id,
action_count = fetched.delta.actions.len(),
total_fetched = fetch_count,
"Received missing parent delta"
);
let datastore = self.context_client.datastore_handle().into_inner();
let position = match verify_fetched_parent(
&context_id,
missing_id,
&fetched,
&datastore,
) {
VerifiedParent::Apply { position } => position,
VerifiedParent::Skip => continue,
};
for parent_id in &fetched.delta.parents {
if *parent_id == GENESIS_DELTA_ID {
continue;
}
if visited_ids.insert(*parent_id)
&& !delta_store.has_delta(parent_id).await
{
to_fetch.push(*parent_id);
}
}
let dag_delta = calimero_dag::CausalDelta {
id: fetched.delta.id,
parents: fetched.delta.parents.clone(),
payload: fetched.delta.actions.clone(),
hlc: fetched.delta.hlc,
expected_root_hash: fetched.delta.expected_root_hash,
kind: calimero_dag::DeltaKind::Regular,
};
let governance_position_blob =
position.as_ref().and_then(|gp| borsh::to_vec(gp).ok());
if let Err(e) = delta_store
.add_delta(
dag_delta,
Some(fetched.author_id),
governance_position_blob,
fetched.delta_signature,
)
.await
{
warn!(?e, %context_id, delta_id = ?missing_id, "Failed to persist fetched delta to DAG");
continue;
}
}
Ok(None) => {
warn!(%context_id, delta_id = ?missing_id, "Peer doesn't have requested delta");
}
Err(e) => {
error!(?e, %context_id, delta_id = ?missing_id, "Failed to request delta");
break;
}
}
}
}
if fetch_count > 0 {
info!(
%context_id,
total_fetched = fetch_count,
"Completed fetching missing delta ancestors"
);
if fetch_count > DELTA_WARN_LIMIT {
warn!(
%context_id,
total_fetched = fetch_count,
"Large sync detected - fetched many deltas from peer (context has deep history)"
);
}
}
Ok(())
}
pub(crate) async fn request_delta(
&self,
context_id: &ContextId,
delta_id: [u8; 32],
stream: &mut Stream,
our_identity: PublicKey,
) -> Result<Option<FetchedDelta>> {
info!(
%context_id,
delta_id = ?delta_id,
"Requesting missing delta from peer"
);
let msg = StreamMessage::Init {
context_id: *context_id,
party_id: our_identity,
payload: InitPayload::DeltaRequest {
context_id: *context_id,
delta_id,
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
let timeout_budget = self.sync_config.timeout;
match super::stream::recv(stream, None, timeout_budget).await? {
Some(StreamMessage::Message {
payload:
MessagePayload::DeltaResponse {
delta,
author_id,
governance_position_blob,
delta_signature,
},
..
}) => {
let causal_delta: CausalDelta = borsh::from_slice(&delta)?;
if causal_delta.id != delta_id {
bail!(
"Received delta ID mismatch: requested {:?}, got {:?}",
delta_id,
causal_delta.id
);
}
info!(
%context_id,
delta_id = ?delta_id,
action_count = causal_delta.actions.len(),
"Received requested delta"
);
Ok(Some(FetchedDelta {
delta: causal_delta,
author_id,
governance_position_blob: governance_position_blob.map(|cow| cow.into_owned()),
delta_signature,
}))
}
Some(StreamMessage::Message {
payload: MessagePayload::DeltaNotFound,
..
}) => {
debug!(
%context_id,
delta_id = ?delta_id,
"Peer doesn't have requested delta"
);
Ok(None)
}
Some(StreamMessage::OpaqueError) => {
bail!("Peer encountered error processing delta request");
}
other => {
bail!("Unexpected response to delta request: {:?}", other);
}
}
}
pub async fn handle_delta_request(
&self,
context_id: ContextId,
delta_id: [u8; 32],
stream: &mut Stream,
) -> Result<()> {
info!(
%context_id,
delta_id = ?delta_id,
"Handling delta request from peer"
);
use calimero_store::key;
let handle = self.context_client.datastore_handle();
let db_key = key::ContextDagDelta::new(context_id, delta_id);
let response = if let Some(stored_delta) = handle.get(&db_key)? {
let is_genesis_delta = stored_delta.parents == vec![[0u8; 32]];
let effective_author = match (stored_delta.author_id, is_genesis_delta) {
(Some(a), _) => Some(a),
(None, true) => Some(genesis_author_sentinel()),
(None, false) => None,
};
match effective_author {
None => {
debug!(
%context_id,
delta_id = ?delta_id,
"Delta found but stored without an author claim (likely a snapshot \
checkpoint or pre-author-tracking row) — returning DeltaNotFound \
so the initiator falls back to a verifiable path"
);
MessagePayload::DeltaNotFound
}
Some(author_id) => {
let actions: Vec<calimero_storage::interface::Action> =
borsh::from_slice(&stored_delta.actions)?;
let causal_delta = CausalDelta {
id: stored_delta.delta_id,
parents: stored_delta.parents,
actions,
hlc: stored_delta.hlc,
expected_root_hash: stored_delta.expected_root_hash,
};
let serialized = borsh::to_vec(&causal_delta)?;
debug!(
%context_id,
delta_id = ?delta_id,
size = serialized.len(),
source = "RocksDB",
governance_position_present =
stored_delta.governance_position_blob.is_some(),
"Sending requested delta to peer"
);
MessagePayload::DeltaResponse {
delta: serialized.into(),
author_id,
governance_position_blob: stored_delta
.governance_position_blob
.map(Into::into),
delta_signature: stored_delta.delta_signature,
}
}
}
} else if let Some(delta_store) = self.state_access.delta_store(&context_id) {
if delta_store.get_delta(&delta_id).await.is_some() {
debug!(
%context_id,
delta_id = ?delta_id,
"Delta in in-memory DeltaStore but not yet persisted with author info — \
returning DeltaNotFound; initiator will re-fetch after persist settles"
);
MessagePayload::DeltaNotFound
} else {
warn!(
%context_id,
delta_id = ?delta_id,
"Requested delta not found in RocksDB or DeltaStore"
);
MessagePayload::DeltaNotFound
}
} else {
warn!(
%context_id,
delta_id = ?delta_id,
"Requested delta not found (no DeltaStore for context)"
);
MessagePayload::DeltaNotFound
};
let mut sqx = Sequencer::default();
let msg = StreamMessage::Message {
sequence_id: sqx.next(),
payload: response,
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
Ok(())
}
pub async fn handle_dag_heads_request(
&self,
context_id: ContextId,
stream: &mut Stream,
_nonce: Nonce,
) -> Result<()> {
info!(
%context_id,
"Handling DAG heads request from peer"
);
let context = self
.context_client
.get_context(&context_id)?
.ok_or_eyre("Context not found")?;
info!(
%context_id,
heads_count = context.dag_heads.len(),
root_hash = %context.root_hash,
"Sending DAG heads to peer"
);
let mut sqx = Sequencer::default();
let msg = StreamMessage::Message {
sequence_id: sqx.next(),
payload: MessagePayload::DagHeadsResponse {
dag_heads: context.dag_heads,
root_hash: context.root_hash,
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
Ok(())
}
}