use calimero_context_client::client::ContextClient;
use calimero_crypto::Nonce;
use calimero_node_primitives::client::NodeClient;
use calimero_primitives::context::ContextId;
use calimero_primitives::events::{
ContextEvent, ContextEventPayload, ExecutionEvent, NodeEvent, StateMutationPayload,
};
use calimero_primitives::hash::Hash;
use calimero_primitives::identity::{PrivateKey, PublicKey};
use calimero_storage::action::Action;
use eyre::{bail, OptionExt, Result};
use libp2p::PeerId;
use tracing::{debug, info, warn};
use crate::delta_store::DeltaStore;
use crate::utils::choose_stream;
pub(crate) struct StateDeltaMessage {
pub(crate) source: PeerId,
pub(crate) context_id: ContextId,
pub(crate) author_id: PublicKey,
pub(crate) delta_id: [u8; 32],
pub(crate) parent_ids: Vec<[u8; 32]>,
pub(crate) hlc: calimero_storage::logical_clock::HybridTimestamp,
pub(crate) root_hash: Hash,
pub(crate) artifact: Vec<u8>,
pub(crate) nonce: Nonce,
pub(crate) events: Option<Vec<u8>>,
pub(crate) governance_epoch: Vec<[u8; 32]>,
pub(crate) key_id: [u8; 32],
}
pub(crate) struct StateDeltaContext {
pub(crate) node_clients: crate::NodeClients,
pub(crate) node_state: crate::NodeState,
pub(crate) network_client: calimero_network_primitives::client::NetworkClient,
pub(crate) sync_timeout: std::time::Duration,
}
pub(crate) struct ReplayBufferedDeltaInput {
pub(crate) context_client: ContextClient,
pub(crate) node_client: NodeClient,
pub(crate) node_state: crate::NodeState,
pub(crate) context_id: ContextId,
pub(crate) our_identity: PublicKey,
pub(crate) buffered: calimero_node_primitives::delta_buffer::BufferedDelta,
pub(crate) sync_timeout: std::time::Duration,
pub(crate) is_covered_by_checkpoint: bool,
}
pub async fn handle_state_delta(
input: StateDeltaContext,
message: StateDeltaMessage,
) -> Result<()> {
let StateDeltaContext {
node_clients,
node_state,
network_client,
sync_timeout,
} = input;
let StateDeltaMessage {
source,
context_id,
author_id,
delta_id,
parent_ids,
hlc,
root_hash,
artifact,
nonce,
events,
governance_epoch,
key_id,
} = message;
let Some(context) = node_clients.context.get_context(&context_id)? else {
bail!("context '{}' not found", context_id);
};
if calimero_context::group_store::is_read_only_for_context(
node_clients.context.datastore(),
&context_id,
&author_id,
)
.unwrap_or(false)
{
warn!(
%context_id,
%author_id,
"Rejecting state delta from ReadOnly member"
);
return Ok(());
}
info!(
%context_id,
%author_id,
delta_id = ?delta_id,
parent_count = parent_ids.len(),
expected_root_hash = %root_hash,
current_root_hash = %context.root_hash,
governance_epoch_heads = governance_epoch.len(),
"Received state delta"
);
if !governance_epoch.is_empty() {
tracing::debug!(
%context_id,
heads = governance_epoch.len(),
"state delta carries governance epoch (catch-up via namespace heartbeat)"
);
}
let is_uninitialized = context.root_hash == Hash::default();
let should_buffer = node_state.should_buffer_delta(&context_id) || is_uninitialized;
if should_buffer {
info!(
%context_id,
delta_id = ?delta_id,
is_uninitialized,
has_events = events.is_some(),
"Buffering delta (sync in progress or context uninitialized)"
);
let buffered = calimero_node_primitives::delta_buffer::BufferedDelta {
id: delta_id,
parents: parent_ids.clone(),
hlc: hlc.get_time().as_u64(),
payload: artifact.clone(),
nonce,
author_id,
root_hash,
events: events.clone(),
source_peer: source,
key_id,
};
if let Some(result) = node_state.buffer_delta(&context_id, buffered) {
if result.was_added()
|| matches!(
result,
calimero_node_primitives::delta_buffer::PushResult::Duplicate
)
{
return Ok(()); }
}
if is_uninitialized && !node_state.should_buffer_delta(&context_id) {
node_state.start_sync_session(context_id, hlc.get_time().as_u64());
let buffered = calimero_node_primitives::delta_buffer::BufferedDelta {
id: delta_id,
parents: parent_ids.clone(),
hlc: hlc.get_time().as_u64(),
payload: artifact.clone(),
nonce,
author_id,
root_hash,
events: events.clone(),
source_peer: source,
key_id,
};
if let Some(result) = node_state.buffer_delta(&context_id, buffered) {
if result.was_added()
|| matches!(
result,
calimero_node_primitives::delta_buffer::PushResult::Duplicate
)
{
info!(
%context_id,
delta_id = ?delta_id,
"Started buffer session for uninitialized context"
);
return Ok(());
}
}
}
warn!(
%context_id,
delta_id = ?delta_id,
"Delta buffer full or zero capacity, proceeding with normal processing (may fail)"
);
}
let group_key = {
let store = node_clients.context.datastore();
let gid = calimero_context::group_store::get_group_for_context(store, &context_id)?;
match gid {
Some(g) => calimero_context::group_store::load_group_key_by_id(store, &g, &key_id)?
.map(PrivateKey::from)
.ok_or_else(|| {
eyre::eyre!("no group key found for key_id {}", hex::encode(key_id))
})?,
None => {
let identity = node_clients
.context
.get_identity(&context_id, &author_id)?
.ok_or_else(|| eyre::eyre!("no identity for author {author_id}"))?;
identity
.sender_key
.ok_or_else(|| eyre::eyre!("no sender_key or group_key for context"))?
}
}
};
let actions = decrypt_delta_actions(artifact, nonce, group_key)?;
let delta = calimero_dag::CausalDelta {
id: delta_id,
parents: parent_ids,
payload: actions,
hlc,
expected_root_hash: *root_hash,
kind: calimero_dag::DeltaKind::Regular,
};
let our_identity = choose_owned_identity(&node_clients.context, &context_id).await?;
let is_self_authored = author_id == our_identity;
if is_self_authored {
debug!(
%context_id,
%author_id,
delta_id = ?delta_id,
"Skipping self-authored delta (already applied locally)"
);
let events_payload = parse_events_payload(&events, &context_id);
if let Some(payload) = events_payload {
emit_state_mutation_event_parsed(&node_clients.node, &context_id, root_hash, payload)?;
}
return Ok(());
}
if let Err(e) = ensure_application_available(
&node_clients.node,
&node_clients.context,
&context_id,
sync_timeout,
)
.await
{
bail!(
"Application not available for context {} - delta will be retried on rebroadcast: {}",
context_id,
e
);
}
let DeltaStoreSetup {
store: delta_store_ref,
is_uninitialized,
} = init_delta_store(
&node_state,
&node_clients,
context_id,
our_identity,
context.root_hash,
sync_timeout,
)
.await?;
let add_result = delta_store_ref
.add_delta_with_events(delta, events.clone())
.await?;
let mut applied = add_result.applied;
let mut handlers_already_executed = false;
if !applied {
let missing_result = delta_store_ref.get_missing_parents().await;
let cascade_outcome = execute_cascaded_events(
&missing_result.cascaded_events,
&node_clients,
&context_id,
&our_identity,
sync_timeout,
"missing parent check",
Some(&delta_id),
)
.await?;
applied |= cascade_outcome.applied_current;
handlers_already_executed |= cascade_outcome.handlers_executed_for_current;
if !missing_result.missing_ids.is_empty() {
warn!(
%context_id,
missing_count = missing_result.missing_ids.len(),
context_is_uninitialized = is_uninitialized,
has_events = events.is_some(),
"Delta pending due to missing parents - requesting them from peer"
);
if let Err(e) = request_missing_deltas(
network_client,
sync_timeout,
context_id,
missing_result.missing_ids,
source,
our_identity,
delta_store_ref.clone(),
)
.await
{
warn!(?e, %context_id, ?source, "Failed to request missing deltas");
}
} else {
warn!(
%context_id,
delta_id = ?delta_id,
has_events = events.is_some(),
"Delta pending - parents exist but not yet applied (will cascade when ready)"
);
}
let was_cascaded = delta_store_ref.dag_has_delta_applied(&delta_id).await;
if was_cascaded {
info!(
%context_id,
delta_id = ?delta_id,
"Delta was applied via cascade - will execute handlers"
);
applied = true;
if !handlers_already_executed && events.is_some() {
info!(
%context_id,
delta_id = ?delta_id,
"Delta cascaded via alternate path - handlers will be executed in main flow"
);
}
}
}
let events_payload = parse_events_payload(&events, &context_id);
if applied && !handlers_already_executed {
if let Some(ref payload) = events_payload {
let is_author = author_id == our_identity;
info!(
%context_id,
%author_id,
%our_identity,
is_author,
"Evaluating event handler execution for applied delta"
);
if !is_author {
execute_event_handlers_parsed(
&node_clients.context,
&context_id,
&our_identity,
payload,
)
.await?;
} else {
info!(
%context_id,
%author_id,
"Skipping event handler execution (we are the author node)"
);
}
}
} else if !applied && events_payload.is_some() {
warn!(
%context_id,
delta_id = ?delta_id,
"Delta with events buffered as pending - handlers will NOT execute when delta is applied later!"
);
}
if let Some(payload) = events_payload {
emit_state_mutation_event_parsed(&node_clients.node, &context_id, root_hash, payload)?;
}
execute_cascaded_events(
&add_result.cascaded_events,
&node_clients,
&context_id,
&our_identity,
sync_timeout,
"dag cascade",
None,
)
.await?;
if applied {
if let Ok(Some(ctx)) = node_clients.context.get_context(&context_id) {
if !ctx.root_hash.is_zero() {
let _ = node_clients
.node
.broadcast_heartbeat(&context_id, ctx.root_hash, ctx.dag_heads.clone())
.await;
}
}
}
Ok(())
}
#[derive(Default)]
struct CascadeOutcome {
applied_current: bool,
handlers_executed_for_current: bool,
}
struct DeltaStoreSetup {
store: DeltaStore,
is_uninitialized: bool,
}
fn decrypt_delta_actions(
artifact: Vec<u8>,
nonce: Nonce,
sender_key: PrivateKey,
) -> Result<Vec<Action>> {
let shared_key = calimero_crypto::SharedKey::from_sk(&sender_key);
let decrypted_artifact = shared_key
.decrypt(artifact, nonce)
.ok_or_eyre("failed to decrypt artifact")?;
let storage_delta: calimero_storage::delta::StorageDelta =
borsh::from_slice(&decrypted_artifact)?;
match storage_delta {
calimero_storage::delta::StorageDelta::Actions(actions) => Ok(actions),
_ => bail!("Expected Actions variant in state delta"),
}
}
async fn choose_owned_identity(
context_client: &ContextClient,
context_id: &ContextId,
) -> Result<PublicKey> {
let identities = context_client.get_context_members(context_id, Some(true));
let Some((our_identity, _)) = choose_stream(identities, &mut rand::thread_rng())
.await
.transpose()?
else {
bail!("no owned identities found for context: {}", context_id);
};
Ok(our_identity)
}
async fn init_delta_store(
node_state: &crate::NodeState,
node_clients: &crate::NodeClients,
context_id: ContextId,
our_identity: PublicKey,
root_hash: Hash,
sync_timeout: std::time::Duration,
) -> Result<DeltaStoreSetup> {
let is_uninitialized = root_hash == Hash::default();
let (delta_store_ref, is_new_store) = {
let mut is_new = false;
let delta_store = node_state
.delta_stores
.entry(context_id)
.or_insert_with(|| {
is_new = true;
DeltaStore::new(
[0u8; 32],
node_clients.context.clone(),
context_id,
our_identity,
)
});
(delta_store.clone(), is_new)
};
if is_new_store {
let init_result = async {
if let Err(e) = delta_store_ref.load_persisted_deltas().await {
warn!(
?e,
%context_id,
"Failed to load persisted deltas, starting with empty DAG"
);
}
let missing_result = delta_store_ref.get_missing_parents().await;
if !missing_result.missing_ids.is_empty() {
warn!(
%context_id,
missing_count = missing_result.missing_ids.len(),
"Missing parents after loading persisted deltas - will request from network"
);
}
execute_cascaded_events(
&missing_result.cascaded_events,
node_clients,
&context_id,
&our_identity,
sync_timeout,
"initial load",
None,
)
.await
}
.await;
if let Err(err) = init_result {
warn!(
%context_id,
?err,
"Initial delta store setup failed - removing store to retry on next delta"
);
node_state.delta_stores.remove(&context_id);
return Err(err);
}
}
Ok(DeltaStoreSetup {
store: delta_store_ref,
is_uninitialized,
})
}
async fn execute_cascaded_events(
cascaded_events: &[([u8; 32], Vec<u8>)],
node_clients: &crate::NodeClients,
context_id: &ContextId,
our_identity: &PublicKey,
sync_timeout: std::time::Duration,
phase: &str,
current_delta: Option<&[u8; 32]>,
) -> Result<CascadeOutcome> {
if cascaded_events.is_empty() {
return Ok(CascadeOutcome::default());
}
info!(
%context_id,
cascaded_count = cascaded_events.len(),
phase = phase,
"Executing event handlers for cascaded deltas"
);
let mut outcome = CascadeOutcome::default();
if let Some(current) = current_delta {
if cascaded_events.iter().any(|(id, _)| *id == *current) {
info!(
%context_id,
delta_id = ?current,
phase = phase,
"Current delta cascaded - marking as applied"
);
outcome.applied_current = true;
}
}
let app_available = ensure_application_available(
&node_clients.node,
&node_clients.context,
context_id,
sync_timeout,
)
.await
.is_ok();
if !app_available {
warn!(
%context_id,
cascaded_count = cascaded_events.len(),
phase = phase,
"Application not available - skipping cascaded handler execution"
);
return Ok(outcome);
}
for (cascaded_id, events_data) in cascaded_events {
match serde_json::from_slice::<Vec<ExecutionEvent>>(events_data) {
Ok(cascaded_payload) => {
info!(
%context_id,
delta_id = ?cascaded_id,
events_count = cascaded_payload.len(),
phase = phase,
"Executing handlers for cascaded delta"
);
execute_event_handlers_parsed(
&node_clients.context,
context_id,
our_identity,
&cascaded_payload,
)
.await?;
if current_delta == Some(cascaded_id) {
outcome.handlers_executed_for_current = true;
}
}
Err(e) => {
warn!(
%context_id,
delta_id = ?cascaded_id,
error = %e,
phase = phase,
"Failed to deserialize cascaded events"
);
}
}
}
Ok(outcome)
}
fn parse_events_payload(
events: &Option<Vec<u8>>,
context_id: &ContextId,
) -> Option<Vec<ExecutionEvent>> {
let Some(events_data) = events else {
return None;
};
match serde_json::from_slice::<Vec<ExecutionEvent>>(events_data) {
Ok(payload) => Some(payload),
Err(e) => {
warn!(
%context_id,
error = %e,
"Failed to deserialize events, skipping handler execution and WebSocket emission"
);
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use calimero_crypto::{SharedKey, NONCE_LEN};
use calimero_storage::delta::StorageDelta;
use rand::thread_rng;
#[test]
fn parse_events_payload_success() {
let events = vec![ExecutionEvent {
kind: "test".to_string(),
data: vec![1, 2, 3],
handler: Some("handler_fn".to_string()),
}];
let serialized = serde_json::to_vec(&events).expect("serialization should succeed");
let parsed = parse_events_payload(&Some(serialized), &ContextId::zero())
.expect("events should parse");
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].kind, "test");
assert_eq!(parsed[0].handler.as_deref(), Some("handler_fn"));
}
#[test]
fn parse_events_payload_invalid() {
let parsed = parse_events_payload(&Some(b"not-json".to_vec()), &ContextId::zero());
assert!(parsed.is_none());
}
#[test]
fn decrypt_delta_actions_roundtrip() -> Result<()> {
let mut rng = thread_rng();
let sender_key = PrivateKey::random(&mut rng);
let shared_key = SharedKey::from_sk(&sender_key);
let nonce = [7u8; NONCE_LEN];
let storage_delta = StorageDelta::Actions(Vec::new());
let plaintext = borsh::to_vec(&storage_delta)?;
let cipher = shared_key
.encrypt(plaintext, nonce)
.ok_or_eyre("encryption failed")?;
let decrypted = decrypt_delta_actions(cipher, nonce, sender_key)?;
assert!(decrypted.is_empty());
Ok(())
}
#[test]
fn decrypt_delta_actions_rejects_bad_cipher() {
let mut rng = thread_rng();
let sender_key = PrivateKey::random(&mut rng);
let nonce = [9u8; NONCE_LEN];
let result = decrypt_delta_actions(vec![1, 2, 3, 4], nonce, sender_key);
assert!(result.is_err());
}
}
async fn execute_event_handlers_parsed(
context_client: &ContextClient,
context_id: &ContextId,
our_identity: &PublicKey,
events_payload: &[ExecutionEvent],
) -> Result<()> {
for event in events_payload {
if let Some(handler_name) = &event.handler {
debug!(
%context_id,
event_kind = %event.kind,
handler_name = %handler_name,
"Executing handler for event"
);
match context_client
.execute(
context_id,
our_identity,
handler_name.clone(),
event.data.clone(),
vec![],
None,
)
.await
{
Ok(_handler_response) => {
debug!(
handler_name = %handler_name,
"Handler executed successfully"
);
}
Err(err) => {
warn!(
handler_name = %handler_name,
error = %err,
"Handler execution failed"
);
}
}
}
}
Ok(())
}
fn emit_state_mutation_event_parsed(
node_client: &NodeClient,
context_id: &ContextId,
root_hash: Hash,
events_payload: Vec<ExecutionEvent>,
) -> Result<()> {
let state_mutation = ContextEvent {
context_id: *context_id,
payload: ContextEventPayload::StateMutation(StateMutationPayload::with_root_and_events(
root_hash,
events_payload,
)),
};
if let Err(e) = node_client.send_event(NodeEvent::Context(state_mutation)) {
warn!(
%context_id,
error = %e,
"Failed to emit state mutation event to WebSocket clients"
);
}
Ok(())
}
async fn request_missing_deltas(
network_client: calimero_network_primitives::client::NetworkClient,
sync_timeout: std::time::Duration,
context_id: ContextId,
missing_ids: Vec<[u8; 32]>,
source: PeerId,
our_identity: PublicKey,
delta_store: DeltaStore,
) -> Result<()> {
use calimero_node_primitives::sync::{InitPayload, MessagePayload, StreamMessage};
let mut stream = network_client.open_stream(source).await?;
let mut to_fetch = missing_ids;
let mut fetched_deltas: Vec<(calimero_dag::CausalDelta<Vec<Action>>, [u8; 32])> = Vec::new();
let mut fetch_count = 0;
while !to_fetch.is_empty() {
let current_batch = to_fetch.clone();
to_fetch.clear();
for missing_id in current_batch {
fetch_count += 1;
info!(
%context_id,
delta_id = ?missing_id,
total_fetched = fetch_count,
"Requesting missing parent delta from peer"
);
let request_msg = StreamMessage::Init {
context_id,
party_id: our_identity,
payload: InitPayload::DeltaRequest {
context_id,
delta_id: missing_id,
},
next_nonce: {
use rand::Rng;
rand::thread_rng().gen()
},
};
crate::sync::stream::send(&mut stream, &request_msg, None).await?;
let timeout_budget = sync_timeout / 3;
match crate::sync::stream::recv(&mut stream, None, timeout_budget).await? {
Some(StreamMessage::Message {
payload: MessagePayload::DeltaResponse { delta },
..
}) => {
let storage_delta: calimero_storage::delta::CausalDelta =
borsh::from_slice(&delta)?;
info!(
%context_id,
delta_id = ?missing_id,
action_count = storage_delta.actions.len(),
"Received missing parent delta"
);
let dag_delta = calimero_dag::CausalDelta {
id: storage_delta.id,
parents: storage_delta.parents.clone(),
payload: storage_delta.actions,
hlc: storage_delta.hlc,
expected_root_hash: storage_delta.expected_root_hash,
kind: calimero_dag::DeltaKind::Regular,
};
fetched_deltas.push((dag_delta, missing_id));
for parent_id in &storage_delta.parents {
if *parent_id == [0; 32] {
continue;
}
if !delta_store.has_delta(parent_id).await
&& !to_fetch.contains(parent_id)
&& !fetched_deltas.iter().any(|(d, _)| d.id == *parent_id)
{
to_fetch.push(*parent_id);
}
}
}
Some(StreamMessage::Message {
payload: MessagePayload::DeltaNotFound,
..
}) => {
warn!(%context_id, delta_id = ?missing_id, "Peer doesn't have requested delta");
}
other => {
warn!(%context_id, delta_id = ?missing_id, ?other, "Unexpected response to delta request");
}
}
}
}
if !fetched_deltas.is_empty() {
info!(
%context_id,
total_fetched = fetched_deltas.len(),
"Adding fetched deltas to DAG in topological order"
);
fetched_deltas.reverse();
for (dag_delta, delta_id) in fetched_deltas {
if let Err(e) = delta_store.add_delta(dag_delta).await {
warn!(?e, %context_id, delta_id = ?delta_id, "Failed to add fetched delta to DAG");
}
}
if fetch_count > 1000 {
warn!(
%context_id,
total_fetched = fetch_count,
"Large sync detected - fetched many deltas from peer (context has deep history)"
);
}
}
Ok(())
}
async fn ensure_application_available(
node_client: &calimero_node_primitives::client::NodeClient,
context_client: &calimero_context_client::client::ContextClient,
context_id: &ContextId,
timeout: std::time::Duration,
) -> Result<()> {
use std::time::Duration;
use tokio::time::{sleep, Instant};
let context = context_client
.get_context(context_id)?
.ok_or_else(|| eyre::eyre!("context not found"))?;
let application_id = context.application_id;
if let Ok(Some(app)) = node_client.get_application(&application_id) {
if node_client.has_blob(&app.blob.bytecode)? {
debug!(
%context_id,
%application_id,
"Application blob already available"
);
return Ok(());
}
}
let start = Instant::now();
let mut delay = Duration::from_millis(50);
let max_delay = Duration::from_millis(500);
info!(
%context_id,
%application_id,
timeout_ms = timeout.as_millis(),
"Waiting for application blob to become available..."
);
while start.elapsed() < timeout {
sleep(delay).await;
if let Ok(Some(app)) = node_client.get_application(&application_id) {
if node_client.has_blob(&app.blob.bytecode)? {
info!(
%context_id,
%application_id,
elapsed_ms = start.elapsed().as_millis(),
"Application blob now available"
);
return Ok(());
}
}
delay = std::cmp::min(delay * 2, max_delay);
}
warn!(
%context_id,
%application_id,
elapsed_ms = start.elapsed().as_millis(),
"Timeout waiting for application blob"
);
Err(eyre::eyre!(
"Application blob not available after {:?}",
timeout
))
}
pub async fn replay_buffered_delta(input: ReplayBufferedDeltaInput) -> Result<bool> {
let ReplayBufferedDeltaInput {
context_client,
node_client,
node_state,
context_id,
our_identity,
buffered,
sync_timeout,
is_covered_by_checkpoint,
} = input;
let delta_id = buffered.id;
info!(
%context_id,
delta_id = ?delta_id,
author = %buffered.author_id,
has_events = buffered.events.is_some(),
"Replaying buffered delta"
);
if buffered.author_id == our_identity {
debug!(
%context_id,
delta_id = ?delta_id,
"Skipping replay of self-authored delta"
);
return Ok(false);
}
let _context = context_client
.get_context(&context_id)?
.ok_or_else(|| eyre::eyre!("context not found after snapshot sync"))?;
let group_key = {
let store = context_client.datastore();
let gid = calimero_context::group_store::get_group_for_context(store, &context_id)?;
match gid {
Some(g) => {
calimero_context::group_store::load_group_key_by_id(store, &g, &buffered.key_id)?
.map(PrivateKey::from)
.ok_or_else(|| eyre::eyre!("no group key found for buffered delta"))?
}
None => {
let identity = context_client
.get_identity(&context_id, &buffered.author_id)?
.ok_or_else(|| eyre::eyre!("no identity for buffered author"))?;
identity
.sender_key
.ok_or_else(|| eyre::eyre!("no sender_key or group_key"))?
}
}
};
let actions = decrypt_delta_actions(buffered.payload, buffered.nonce, group_key)?;
use calimero_storage::logical_clock::{HybridTimestamp, Timestamp, ID, NTP64};
use std::num::NonZeroU128;
let default_id = ID::from(NonZeroU128::new(1).expect("1 is non-zero"));
let hlc = HybridTimestamp::new(Timestamp::new(NTP64(buffered.hlc), default_id));
let delta = calimero_dag::CausalDelta {
id: buffered.id,
parents: buffered.parents,
payload: actions,
hlc,
expected_root_hash: *buffered.root_hash,
kind: calimero_dag::DeltaKind::Regular,
};
let delta_store = node_state
.delta_stores
.entry(context_id)
.or_insert_with(|| {
crate::delta_store::DeltaStore::new(
[0u8; 32],
context_client.clone(),
context_id,
our_identity,
)
})
.clone();
let _ = delta_store.load_persisted_deltas().await;
let is_checkpoint_match = delta_store.dag_has_delta_applied(&delta_id).await;
let add_result = if is_covered_by_checkpoint && !is_checkpoint_match {
debug!(
%context_id,
delta_id = ?delta_id,
"Skipping DAG addition for ancestor delta (state covered by checkpoint)"
);
crate::delta_store::AddDeltaResult {
applied: false,
cascaded_events: vec![],
}
} else {
delta_store
.add_delta_with_events(delta.clone(), buffered.events.clone())
.await?
};
let is_checkpoint_match =
!add_result.applied && delta_store.dag_has_delta_applied(&delta_id).await;
let should_execute_handlers =
add_result.applied || is_checkpoint_match || is_covered_by_checkpoint;
if should_execute_handlers {
if let Some(events_data) = &buffered.events {
let events_payload: Option<Vec<ExecutionEvent>> =
match serde_json::from_slice(events_data) {
Ok(events) => Some(events),
Err(e) => {
warn!(
%context_id,
delta_id = ?delta_id,
error = %e,
"Failed to parse buffered events"
);
None
}
};
if let Some(events) = events_payload {
let is_author = buffered.author_id == our_identity;
if !is_author {
info!(
%context_id,
delta_id = ?delta_id,
events_count = events.len(),
applied_via_dag = add_result.applied,
is_checkpoint_match,
is_covered_by_checkpoint,
"Executing handlers for replayed buffered delta"
);
execute_event_handlers_parsed(
&context_client,
&context_id,
&our_identity,
&events,
)
.await?;
}
emit_state_mutation_event_parsed(
&node_client,
&context_id,
buffered.root_hash,
events,
)?;
}
}
} else {
debug!(
%context_id,
delta_id = ?delta_id,
has_events = buffered.events.is_some(),
"Skipping handler execution for pending delta (will execute when delta is applied)"
);
}
let node_clients = crate::NodeClients {
context: context_client.clone(),
node: node_client.clone(),
};
execute_cascaded_events(
&add_result.cascaded_events,
&node_clients,
&context_id,
&our_identity,
sync_timeout,
"buffered delta replay",
None,
)
.await?;
Ok(add_result.applied)
}