use calimero_context_client::client::ContextClient;
use calimero_node_primitives::client::NodeClient;
use calimero_primitives::context::ContextId;
use calimero_primitives::events::{
ContextEvent, ContextEventPayload, ExecutionEvent, NodeEvent, StateMutationPayload,
};
use calimero_primitives::hash::Hash;
use calimero_primitives::identity::PublicKey;
use eyre::Result;
use tracing::{debug, info, warn};
use crate::delta_store::DeltaStore;
use super::ensure_application_available;
#[derive(Default)]
pub(super) struct CascadeOutcome {
pub(super) applied_current: bool,
pub(super) handlers_executed_for_current: bool,
}
pub(super) async fn execute_cascaded_events(
cascaded_events: &[([u8; 32], Vec<u8>)],
node_client: &NodeClient,
context_client: &ContextClient,
context_id: &ContextId,
our_identity: &PublicKey,
sync_timeout: std::time::Duration,
phase: &str,
current_delta: Option<&[u8; 32]>,
delta_store: &DeltaStore,
) -> Result<CascadeOutcome> {
if cascaded_events.is_empty() {
return Ok(CascadeOutcome::default());
}
info!(
%context_id,
cascaded_count = cascaded_events.len(),
phase = phase,
"Executing event handlers for cascaded deltas"
);
let mut outcome = CascadeOutcome::default();
if let Some(current) = current_delta {
if cascaded_events.iter().any(|(id, _)| *id == *current) {
info!(
%context_id,
delta_id = ?current,
phase = phase,
"Current delta cascaded - marking as applied"
);
outcome.applied_current = true;
}
}
let app_available =
ensure_application_available(node_client, context_client, context_id, sync_timeout)
.await
.is_ok();
if !app_available {
warn!(
%context_id,
cascaded_count = cascaded_events.len(),
phase = phase,
"Application not available - skipping cascaded handler execution. Events are preserved in DB (applied: true, events: Some(..)) and will replay on next init once the application becomes available."
);
return Ok(outcome);
}
for (cascaded_id, events_data) in cascaded_events {
match serde_json::from_slice::<Vec<ExecutionEvent>>(events_data) {
Ok(cascaded_payload) => {
info!(
%context_id,
delta_id = ?cascaded_id,
events_count = cascaded_payload.len(),
phase = phase,
"Executing handlers for cascaded delta"
);
let all_succeeded = match execute_event_handlers_parsed(
context_client,
context_id,
our_identity,
&cascaded_payload,
)
.await
{
Ok(succeeded) => succeeded,
Err(err) => {
warn!(
%context_id,
delta_id = ?cascaded_id,
error = %err,
phase = phase,
"Handler execution errored for cascaded delta; keeping events for restart replay"
);
false
}
};
if all_succeeded {
delta_store.mark_events_executed(cascaded_id);
} else {
warn!(
%context_id,
delta_id = ?cascaded_id,
phase = phase,
"One or more handlers failed; keeping events in DB for restart replay"
);
}
if current_delta == Some(cascaded_id) {
outcome.handlers_executed_for_current = true;
}
}
Err(e) => {
warn!(
%context_id,
delta_id = ?cascaded_id,
error = %e,
phase = phase,
"Failed to deserialize cascaded events — clearing blob to prevent permanent replay loop"
);
delta_store.mark_events_executed(cascaded_id);
}
}
}
Ok(outcome)
}
pub(super) async fn execute_event_handlers_parsed(
context_client: &ContextClient,
context_id: &ContextId,
our_identity: &PublicKey,
events_payload: &[ExecutionEvent],
) -> Result<bool> {
let mut all_succeeded = true;
for event in events_payload {
if let Some(handler_name) = &event.handler {
debug!(
%context_id,
event_kind = %event.kind,
handler_name = %handler_name,
"Executing handler for event"
);
match context_client
.execute(
context_id,
our_identity,
handler_name.clone(),
event.data.clone(),
vec![],
None,
)
.await
{
Ok(_handler_response) => {
debug!(
handler_name = %handler_name,
"Handler executed successfully"
);
}
Err(err) => {
warn!(
handler_name = %handler_name,
error = %err,
"Handler execution failed"
);
all_succeeded = false;
}
}
}
}
Ok(all_succeeded)
}
pub(super) fn emit_state_mutation_event_parsed(
node_client: &NodeClient,
context_id: &ContextId,
root_hash: Hash,
events_payload: Vec<ExecutionEvent>,
) {
let state_mutation = ContextEvent {
context_id: *context_id,
payload: ContextEventPayload::StateMutation(StateMutationPayload::with_root_and_events(
root_hash,
events_payload,
)),
};
if let Err(e) = node_client.send_event(NodeEvent::Context(state_mutation)) {
warn!(
%context_id,
error = %e,
"Failed to emit state mutation event to WebSocket clients"
);
}
}
pub(super) fn parse_events_payload(
events: &Option<Vec<u8>>,
context_id: &ContextId,
) -> Option<Vec<ExecutionEvent>> {
let Some(events_data) = events else {
return None;
};
match serde_json::from_slice::<Vec<ExecutionEvent>>(events_data) {
Ok(payload) => Some(payload),
Err(e) => {
warn!(
%context_id,
error = %e,
"Failed to deserialize events, skipping handler execution and WebSocket emission"
);
None
}
}
}