Skip to main content

calimero_context/handlers/
execute.rs

1use std::borrow::Cow;
2use std::collections::btree_map;
3// Removed: NonZeroUsize (replaced with CausalDelta)
4use std::time::Instant;
5
6use actix::{
7    ActorFuture, ActorFutureExt, ActorResponse, ActorTryFutureExt, Handler, Message, WrapFuture,
8};
9use calimero_context_config::repr::ReprTransmute;
10use calimero_context_primitives::client::crypto::ContextIdentity;
11use calimero_context_primitives::client::ContextClient;
12use calimero_context_primitives::messages::{
13    ExecuteError, ExecuteEvent, ExecuteRequest, ExecuteResponse,
14};
15use calimero_context_primitives::{ContextAtomic, ContextAtomicKey};
16use calimero_node_primitives::client::NodeClient;
17use calimero_primitives::alias::Alias;
18use calimero_primitives::application::ApplicationId;
19use calimero_primitives::context::{Context, ContextId};
20use calimero_primitives::events::{
21    ContextEvent, ContextEventPayload, ExecutionEvent, NodeEvent, StateMutationPayload,
22};
23use calimero_primitives::hash::Hash;
24use calimero_primitives::identity::{PrivateKey, PublicKey};
25use calimero_runtime::logic::{ContextHost, Outcome};
26use calimero_storage::{
27    action::Action,
28    delta::{CausalDelta, StorageDelta},
29    entities::StorageType,
30};
31use calimero_store::{key, types, Store};
32use calimero_utils_actix::global_runtime;
33use either::Either;
34use eyre::{bail, WrapErr};
35use futures_util::future::TryFutureExt;
36use futures_util::io::Cursor;
37use memchr::memmem;
38use tokio::sync::OwnedMutexGuard;
39use tracing::{debug, error, info, warn};
40
41use crate::error::ContextError;
42use crate::handlers::utils::{process_context_mutations, StoreContextHost};
43use crate::metrics::ExecutionLabels;
44use crate::ContextManager;
45
46pub mod storage;
47
48use storage::{ContextPrivateStorage, ContextStorage};
49
50impl Handler<ExecuteRequest> for ContextManager {
51    type Result = ActorResponse<Self, <ExecuteRequest as Message>::Result>;
52
53    fn handle(
54        &mut self,
55        ExecuteRequest {
56            context: context_id,
57            executor,
58            method,
59            payload,
60            aliases,
61            atomic,
62        }: ExecuteRequest,
63        _ctx: &mut Self::Context,
64    ) -> Self::Result {
65        info!(
66            %context_id,
67            method,
68            "Executing method in context"
69        );
70        debug!(
71            %context_id,
72            %executor,
73            method,
74            aliases = ?aliases,
75            payload_len = payload.len(),
76            atomic = %match atomic {
77                None => "no",
78                Some(ContextAtomic::Lock) => "acquire",
79                Some(ContextAtomic::Held(_)) => "yes",
80            },
81            "Execution request details"
82        );
83
84        let context = match self.get_or_fetch_context(&context_id) {
85            Ok(Some(context)) => context,
86            Ok(None) => return ActorResponse::reply(Err(ExecuteError::ContextNotFound)),
87            Err(err) => {
88                error!(%err, "failed to execute request");
89
90                return ActorResponse::reply(Err(ExecuteError::InternalError));
91            }
92        };
93
94        let is_state_op = "__calimero_sync_next" == method;
95
96        if !is_state_op && *context.meta.root_hash == [0; 32] {
97            return ActorResponse::reply(Err(ExecuteError::Uninitialized));
98        }
99
100        let (guard, is_atomic) = match atomic {
101            None => (context.lock(), false),
102            Some(ContextAtomic::Lock) => (context.lock(), true),
103            Some(ContextAtomic::Held(ContextAtomicKey(guard))) => (Either::Left(guard), true),
104        };
105
106        let external_config = match self.context_client.context_config(&context_id) {
107            Ok(Some(external_config)) => external_config,
108            Ok(None) => {
109                error!(%context_id, "missing context config for context");
110
111                return ActorResponse::reply(Err(ExecuteError::InternalError));
112            }
113            Err(err) => {
114                error!(%err, "failed to execute request");
115
116                return ActorResponse::reply(Err(ExecuteError::InternalError));
117            }
118        };
119
120        // Fetch the full identity, not just the sender_key
121        let identity = match self.context_client.get_identity(&context_id, &executor) {
122            Ok(Some(
123                identity @ ContextIdentity {
124                    private_key: Some(_),
125                    sender_key: Some(_),
126                    ..
127                },
128            )) => identity, // Keep the whole identity
129            Ok(_) => {
130                return ActorResponse::reply(Err(ExecuteError::Unauthorized {
131                    context_id,
132                    public_key: executor,
133                }))
134            }
135            Err(err) => {
136                error!(%err, "failed to execute request");
137
138                return ActorResponse::reply(Err(ExecuteError::InternalError));
139            }
140        };
141
142        // Extract the private key so it could be moved later into the closure
143        // without having ownership errors.
144        let private_key = identity.private_key.expect(
145            "infallible (verified before): missing private key in ContextIdentity for signing",
146        );
147
148        // Get the private key we know we have
149        let sender_key = identity.sender_key.expect(
150            "infallible (verified before): missing sender key in ContextIdentity for signing",
151        );
152
153        debug!(
154            public_key = ?identity.public_key,
155            public_key = %identity.public_key,
156            sender_key = ?sender_key.public_key(),
157            sender_key = %sender_key.public_key(),
158            "ContextManager: keys",
159        );
160
161        let payload =
162            match substitute_aliases_in_payload(&self.node_client, context_id, payload, &aliases) {
163                Ok(payload) => payload,
164                Err(err) => {
165                    error!(%err, "failed to execute request");
166
167                    return ActorResponse::reply(Err(err));
168                }
169            };
170
171        let guard_task = async move {
172            match guard {
173                Either::Left(guard) => guard,
174                Either::Right(task) => task.await,
175            }
176        }
177        .into_actor(self);
178
179        let context_task = guard_task.map(move |guard, act, _ctx| {
180            let Some(context) = act.get_or_fetch_context(&context_id)? else {
181                bail!(ContextError::ContextDeleted { context_id });
182            };
183
184            Ok((guard, context.meta.clone()))
185        });
186
187        let module_task = context_task.and_then(move |(guard, context), act, _ctx| {
188            act.get_module(context.application_id)
189                .map_ok(move |module, _act, _ctx| (guard, context, module))
190        });
191
192        let execution_count = self.metrics.as_ref().map(|m| m.execution_count.clone());
193        let execution_duration = self.metrics.as_ref().map(|m| m.execution_duration.clone());
194
195        let execute_task = module_task.and_then(move |(guard, mut context, module), act, _ctx| {
196            let datastore = act.datastore.clone();
197            let node_client = act.node_client.clone();
198            let context_client = act.context_client.clone();
199
200            async move {
201                let old_root_hash = context.root_hash;
202
203                let start = Instant::now();
204
205                let (outcome, causal_delta) = internal_execute(
206                    datastore,
207                    &node_client,
208                    &context_client,
209                    module,
210                    &guard,
211                    &mut context,
212                    executor,
213                    method.clone().into(),
214                    payload.into(),
215                    is_state_op,
216                    &private_key,
217                )
218                .await?;
219
220                let duration = start.elapsed().as_secs_f64();
221                let status = outcome
222                    .returns
223                    .is_ok()
224                    .then_some("success")
225                    .unwrap_or("failure");
226
227                // Update execution count metrics
228                if let Some(execution_count) = execution_count {
229                    let _ignored = execution_count
230                        .clone()
231                        .get_or_create(&ExecutionLabels {
232                            context_id: context_id.to_string(),
233                            method: method.clone(),
234                            status: status.to_owned(),
235                        })
236                        .inc();
237                }
238
239                // Update execution duration metrics
240                if let Some(execution_duration) = execution_duration {
241                    let _ignored = execution_duration
242                        .clone()
243                        .get_or_create(&ExecutionLabels {
244                            context_id: context_id.to_string(),
245                            method: method.clone(),
246                            status: status.to_owned(),
247                        })
248                        .observe(duration);
249                }
250
251                info!(
252                    %context_id,
253                    method,
254                    status,
255                    "Method execution completed"
256                );
257                debug!(
258                    %context_id,
259                    %executor,
260                    method,
261                    status,
262                    %old_root_hash,
263                    new_root_hash=%context.root_hash,
264                    artifact_len = outcome.artifact.len(),
265                    logs_count = outcome.logs.len(),
266                    events_count = outcome.events.len(),
267                    xcalls_count = outcome.xcalls.len(),
268                    "Execution outcome details"
269                );
270
271                Ok((guard, context, outcome, causal_delta))
272            }
273            .into_actor(act)
274        });
275
276        let external_task =
277            execute_task.and_then(move |(guard, context, outcome, causal_delta), act, _ctx| {
278                if let Some(cached_context) = act.contexts.get_mut(&context_id) {
279                    debug!(
280                        %context_id,
281                        old_root = ?cached_context.meta.root_hash,
282                        new_root = ?context.root_hash,
283                        is_state_op,
284                        "Updating cached context root_hash"
285                    );
286                    cached_context.meta.root_hash = context.root_hash;
287                } else {
288                    debug!(%context_id, is_state_op, "Context not in cache, will be fetched from DB next time");
289                }
290
291                let node_client = act.node_client.clone();
292                let context_client = act.context_client.clone();
293
294                async move {
295                    if outcome.returns.is_err() {
296                        return Ok((guard, context.root_hash, outcome));
297                    }
298
299                    info!(
300                        %context_id,
301                        %executor,
302                        is_state_op,
303                        artifact_empty = outcome.artifact.is_empty(),
304                        events_count = outcome.events.len(),
305                        xcalls_count = outcome.xcalls.len(),
306                        "Execution outcome details"
307                    );
308
309                    // Collect events that have handlers for deferred execution.
310                    // NOTE: We cannot execute handlers synchronously here because it would cause
311                    // an actor deadlock - the ContextManager actor is busy processing this request
312                    // and can't process another ExecuteRequest until this one completes.
313                    //
314                    // Instead, we spawn handler executions as separate async tasks that will run
315                    // after this response is sent. This means handlers execute asynchronously
316                    // and their results are broadcast separately.
317                    let events_with_handlers: Vec<_> = outcome
318                        .events
319                        .iter()
320                        .enumerate()
321                        .filter_map(|(idx, event)| {
322                            event.handler.as_ref().map(|handler_name| {
323                                info!(
324                                    %context_id,
325                                    event_kind = %event.kind,
326                                    handler_name = %handler_name,
327                                    "Event with handler will be executed after response"
328                                );
329                                (idx, handler_name.clone(), event.data.clone())
330                            })
331                        })
332                        .collect();
333
334                    // Track indices of events whose handlers will be executed locally
335                    let executed_handler_indices: std::collections::HashSet<usize> =
336                        events_with_handlers.iter().map(|(idx, _, _)| *idx).collect();
337
338                    // Process cross-context calls
339                    // NOTE: XCalls are executed locally on the current node after the main execution completes.
340                    // This allows contexts to communicate by calling functions on other contexts.
341                    for xcall in &outcome.xcalls {
342                        let target_context_id = ContextId::from(xcall.context_id);
343
344                        info!(
345                            %context_id,
346                            target_context = ?target_context_id,
347                            function = %xcall.function,
348                            params_len = xcall.params.len(),
349                            "Processing cross-context call"
350                        );
351
352                        // Find an owned member of the target context to execute as
353                        // We need to use a member that has permissions on the target context
354                        use futures_util::TryStreamExt;
355                        let members: Vec<_> = context_client
356                            .get_context_members(&target_context_id, Some(true))
357                            .try_collect()
358                            .await
359                            .unwrap_or_default();
360
361                        let Some((target_executor, _is_owned)) = members.first() else {
362                            error!(
363                                %context_id,
364                                target_context = ?target_context_id,
365                                function = %xcall.function,
366                                "No owned members found for target context"
367                            );
368                            continue;
369                        };
370
371                        let target_executor = *target_executor;
372
373                        info!(
374                            %context_id,
375                            target_context = ?target_context_id,
376                            target_executor = ?target_executor,
377                            "Found owned member for target context"
378                        );
379
380                        // Execute the cross-context call with the target context's member
381                        let xcall_result = context_client
382                            .execute(
383                                &target_context_id,
384                                &target_executor,
385                                xcall.function.clone(),
386                                xcall.params.clone(),
387                                vec![],
388                                None,
389                            )
390                            .await;
391
392                        match xcall_result {
393                            Ok(_) => {
394                                info!(
395                                    %context_id,
396                                    target_context = ?target_context_id,
397                                    function = %xcall.function,
398                                    "Cross-context call executed successfully"
399                                );
400                            }
401                            Err(err) => {
402                                error!(
403                                    %context_id,
404                                    target_context = ?target_context_id,
405                                    function = %xcall.function,
406                                    ?err,
407                                    "Cross-context call failed"
408                                );
409                            }
410                        }
411                    }
412
413                    // Broadcast state deltas to other nodes when:
414                    // 1. It's not a state synchronization operation (is_state_op = false)
415                    // 2. AND there's a state change artifact (non-empty artifact)
416                    //
417                    // This ensures that:
418                    // - State changes are broadcast when there are actual state changes
419                    // - State synchronization operations don't trigger broadcasts (prevents loops)
420                    // - Events are still broadcast via WebSocket regardless of state changes
421                    if !(is_state_op || outcome.artifact.is_empty()) {
422                        info!(
423                            %context_id,
424                            %executor,
425                            is_state_op,
426                            artifact_empty = outcome.artifact.is_empty(),
427                            events_count = outcome.events.len(),
428                            has_delta = causal_delta.is_some(),
429                            "Broadcasting state delta and events to other nodes"
430                        );
431
432                        if let Some(ref the_delta) = causal_delta {
433                            // Serialize events if any were emitted
434                            let events_data = if outcome.events.is_empty() {
435                                info!(
436                                    %context_id,
437                                    %executor,
438                                    "No events to serialize"
439                                );
440                                None
441                            } else {
442                                // Clear handler field for events whose handlers were already executed locally.
443                                // This prevents receivers from re-executing handlers that the sender
444                                // already executed, avoiding duplicate state changes.
445                                let events_vec: Vec<ExecutionEvent> = outcome
446                                    .events
447                                    .iter()
448                                    .enumerate()
449                                    .map(|(idx, e)| ExecutionEvent {
450                                        kind: e.kind.clone(),
451                                        data: e.data.clone(),
452                                        // If handler was executed locally, clear it from broadcast
453                                        handler: if executed_handler_indices.contains(&idx) {
454                                            None
455                                        } else {
456                                            e.handler.clone()
457                                        },
458                                    })
459                                    .collect();
460                                let serialized = serde_json::to_vec(&events_vec)?;
461                                info!(
462                                    %context_id,
463                                    %executor,
464                                    events_count = events_vec.len(),
465                                    handlers_executed_locally = executed_handler_indices.len(),
466                                    serialized_len = serialized.len(),
467                                    "Serializing events for broadcast (handlers cleared for executed events)"
468                                );
469                                Some(serialized)
470                            };
471
472                            node_client
473                                .broadcast(
474                                    &context,
475                                    &executor,
476                                    &sender_key,
477                                    outcome.artifact.clone(),
478                                    the_delta.id,
479                                    the_delta.parents.clone(),
480                                    the_delta.hlc,
481                                    events_data,
482                                )
483                                .await?;
484                        }
485                    }
486
487                    let external_client =
488                        context_client.external_client(&context_id, &external_config)?;
489
490                    let proxy_client = external_client.proxy();
491
492                    for (proposal_id, actions) in &outcome.proposals {
493                        let actions = borsh::from_slice(actions)?;
494
495                        let proposal_id = proposal_id.rt().expect("infallible conversion");
496
497                        proxy_client
498                            .propose(&executor, &proposal_id, actions)
499                            .await?;
500                    }
501
502                    for proposal_id in &outcome.approvals {
503                        let proposal_id = proposal_id.rt().expect("infallible conversion");
504
505                        proxy_client.approve(&executor, &proposal_id).await?;
506                    }
507
508                    process_context_mutations(&context_client, &node_client, context_id, executor, &outcome.context_mutations).await;
509
510                    // Spawn handler executions as separate tasks.
511                    // This runs after the current response is sent, avoiding actor deadlock.
512                    // Each handler execution creates its own delta that gets broadcast separately.
513                    if !events_with_handlers.is_empty() {
514                        let handler_context_client = context_client.clone();
515                        let handler_context_id = context_id;
516                        let handler_executor = executor;
517
518                        // Spawn handlers in the global runtime so they execute independently
519                        global_runtime().spawn(async move {
520                            for (_idx, handler_name, event_data) in events_with_handlers {
521                                info!(
522                                    %handler_context_id,
523                                    handler_name = %handler_name,
524                                    "Executing handler on sender (deferred)"
525                                );
526
527                                match handler_context_client
528                                    .execute(
529                                        &handler_context_id,
530                                        &handler_executor,
531                                        handler_name.clone(),
532                                        event_data,
533                                        vec![],
534                                        None,
535                                    )
536                                    .await
537                                {
538                                    Ok(_) => {
539                                        info!(
540                                            %handler_context_id,
541                                            handler_name = %handler_name,
542                                            "Handler executed successfully on sender"
543                                        );
544                                    }
545                                    Err(err) => {
546                                        warn!(
547                                            %handler_context_id,
548                                            handler_name = %handler_name,
549                                            error = %err,
550                                            "Handler execution failed on sender"
551                                        );
552                                    }
553                                }
554                            }
555                        });
556                    }
557
558                    Ok((guard, context.root_hash, outcome))
559                }
560                .map_err(|err| {
561                    error!(
562                    ?err,
563                    "execution succeeded, but an error occurred while performing external actions"
564                );
565
566                    err
567                })
568                .into_actor(act)
569            });
570
571        let task = external_task
572            .map_err(|err, _act, _ctx| {
573                err.downcast::<ExecuteError>().unwrap_or_else(|err| {
574                    debug!(?err, "an error occurred while executing request");
575                    ExecuteError::InternalError
576                })
577            })
578            .map_ok(
579                move |(guard, root_hash, outcome), _act, _ctx| ExecuteResponse {
580                    returns: outcome.returns.map_err(Into::into),
581                    logs: outcome.logs,
582                    events: outcome
583                        .events
584                        .into_iter()
585                        .map(|e| ExecuteEvent {
586                            kind: e.kind,
587                            data: e.data,
588                            handler: e.handler,
589                        })
590                        .collect(),
591                    root_hash,
592                    artifact: outcome.artifact,
593                    atomic: is_atomic.then_some(ContextAtomicKey(guard)),
594                },
595            );
596
597        ActorResponse::r#async(task)
598    }
599}
600
601impl ContextManager {
602    pub fn get_module(
603        &self,
604        application_id: ApplicationId,
605    ) -> impl ActorFuture<Self, Output = eyre::Result<calimero_runtime::Module>> + 'static {
606        let blob_task = async {}.into_actor(self).map(move |_, act, _ctx| {
607            let blob = match act.applications.entry(application_id) {
608                btree_map::Entry::Vacant(vacant) => {
609                    let Some(app) = act.node_client.get_application(&application_id)? else {
610                        bail!(ExecuteError::ApplicationNotInstalled { application_id });
611                    };
612
613                    vacant.insert(app).blob
614                }
615                btree_map::Entry::Occupied(occupied) => occupied.into_mut().blob,
616            };
617
618            Ok(blob)
619        });
620
621        let module_task = blob_task.and_then(move |mut blob, act, _ctx| {
622            let node_client = act.node_client.clone();
623
624            async move {
625                if let Some(compiled) = node_client.get_blob_bytes(&blob.compiled, None).await? {
626                    let module =
627                        unsafe { calimero_runtime::Engine::headless().from_precompiled(&compiled) };
628
629                    match module {
630                        Ok(module) => return Ok((module, None)),
631                        Err(err) => {
632                            debug!(
633                                ?err,
634                                %application_id,
635                                blob_id=%blob.compiled,
636                                "failed to load precompiled module, recompiling.."
637                            );
638                        }
639                    }
640                }
641
642                debug!(
643                    %application_id,
644                    blob_id=%blob.compiled,
645                    "no usable precompiled module found, compiling.."
646                );
647
648                // Use get_application_bytes instead of get_blob_bytes for bytecode
649                // because get_application_bytes knows how to extract WASM from bundles
650                let Some(bytecode) = node_client.get_application_bytes(&application_id).await?
651                else {
652                    bail!(ExecuteError::ApplicationNotInstalled { application_id });
653                };
654
655                // Compile WASM in a blocking task to avoid blocking the async executor.
656                // Note: panics during compilation will surface as JoinError.
657                let module = global_runtime()
658                    .spawn_blocking(move || calimero_runtime::Engine::default().compile(&bytecode))
659                    .await
660                    .wrap_err("WASM compilation task failed")? // JoinError (task panicked/cancelled)
661                    ?; // Compilation error
662
663                let compiled = Cursor::new(module.to_bytes()?);
664
665                let (blob_id, _ignored) = node_client.add_blob(compiled, None, None).await?;
666
667                blob.compiled = blob_id;
668
669                node_client.update_compiled_app(&application_id, &blob_id)?;
670
671                Ok((module, Some(blob)))
672            }
673            .into_actor(act)
674        });
675
676        module_task
677            .map_ok(move |(module, blob), act, _ctx| {
678                if let Some(blob) = blob {
679                    if let Some(app) = act.applications.get_mut(&application_id) {
680                        app.blob = blob;
681                    }
682                }
683
684                module
685            })
686            .map_err(|err, _act, _ctx| {
687                error!(?err, "failed to initialize module for execution");
688
689                err
690            })
691    }
692}
693
694async fn internal_execute(
695    datastore: Store,
696    node_client: &NodeClient,
697    _context_client: &ContextClient,
698    module: calimero_runtime::Module,
699    guard: &OwnedMutexGuard<ContextId>,
700    context: &mut Context,
701    executor: PublicKey,
702    method: Cow<'static, str>,
703    input: Cow<'static, [u8]>,
704    is_state_op: bool,
705    identity_private_key: &PrivateKey,
706) -> eyre::Result<(Outcome, Option<CausalDelta>)> {
707    // Create the host store context implementation
708    let context_host = StoreContextHost {
709        store: datastore.clone(),
710        context_id: context.id,
711    };
712
713    let storage = ContextStorage::from(datastore.clone(), context.id);
714    // Create private storage (node-local, NOT synchronized)
715    let private_storage = ContextPrivateStorage::from(datastore, context.id);
716    let (mut outcome, storage, private_storage) = execute(
717        guard,
718        module,
719        executor,
720        method.clone(),
721        input,
722        storage,
723        private_storage,
724        node_client.clone(),
725        Some(Box::new(context_host)),
726    )
727    .await?;
728
729    debug!(
730        context_id = %context.id,
731        method = %method,
732        is_state_op,
733        has_root_hash = outcome.root_hash.is_some(),
734        artifact_len = outcome.artifact.len(),
735        events_count = outcome.events.len(),
736        returns_ok = outcome.returns.is_ok(),
737        "WASM execution completed"
738    );
739
740    if outcome.returns.is_err() {
741        warn!(
742            context_id = %context.id,
743            method = %method,
744            error = ?outcome.returns,
745            "WASM execution returned error"
746        );
747        return Ok((outcome, None));
748    }
749
750    'fine: {
751        if outcome.root_hash.is_some() && outcome.artifact.is_empty() {
752            debug!(
753                context_id = %context.id,
754                has_root_hash = true,
755                artifact_empty = true,
756                is_state_op,
757                "Outcome has root hash but empty artifact - checking mitigation"
758            );
759
760            if is_state_op {
761                // fixme! temp mitigation for a potential state inconsistency
762                break 'fine;
763            }
764
765            bail!(ContextError::StateInconsistency);
766        }
767    }
768
769    let mut causal_delta = None;
770
771    // Always update root_hash if present (even if storage is empty)
772    // This is critical for state_ops like __calimero_sync_next where actions
773    // are applied inside WASM but storage appears empty
774    if let Some(root_hash) = outcome.root_hash {
775        debug!(
776            context_id = %context.id,
777            old_root = ?context.root_hash,
778            new_root = ?Hash::from(root_hash),
779            is_state_op,
780            storage_empty = storage.is_empty(),
781            "Updating context root_hash after execution"
782        );
783        context.root_hash = root_hash.into();
784
785        // Commit storage and persist metadata
786        let store = storage.commit()?;
787        // Commit private storage (node-local, NOT synchronized)
788        // Private storage changes are not included in sync deltas
789        let _private_store = private_storage.commit()?;
790
791        // Create causal delta for non-state ops with non-empty artifacts
792        if !is_state_op && !outcome.artifact.is_empty() {
793            // Extract actions from artifact for DAG persistence
794            let mut actions = match borsh::from_slice::<StorageDelta>(&outcome.artifact) {
795                Ok(StorageDelta::Actions(actions)) => actions,
796                Ok(_) => {
797                    warn!("Unexpected StorageDelta variant, using empty actions");
798                    vec![]
799                }
800                Err(e) => {
801                    warn!(
802                        ?e,
803                        "Failed to deserialize artifact for DAG, using empty actions"
804                    );
805                    vec![]
806                }
807            };
808
809            // The artifact was `StorageDelta::Actions`.
810            if actions.len() != 0 {
811                info!(
812                    context_id = %context.id,
813                    actions_count = actions.len(),
814                    "Received several actions. Verify if there any user actions..."
815                );
816                sign_user_actions(&mut actions, &identity_private_key)
817                    .wrap_err("Failed to sign user actions")?;
818
819                // Re-serialize the *signed* actions into a new artifact
820                let new_artifact = borsh::to_vec(&StorageDelta::Actions(actions.clone()))?;
821                outcome.artifact = new_artifact;
822            }
823
824            // Use current DAG heads as parents, verifying they exist in RocksDB
825            let parents = if context.dag_heads.is_empty() {
826                // Genesis case: parent is the zero hash
827                vec![[0u8; 32]]
828            } else {
829                // Filter out parents that aren't persisted yet (cascaded deltas)
830                let mut verified_parents = Vec::new();
831                for head in &context.dag_heads {
832                    if *head == [0u8; 32] {
833                        verified_parents.push(*head);
834                        continue;
835                    }
836
837                    // Check if this parent is actually in RocksDB
838                    let db_key = key::ContextDagDelta::new(context.id, *head);
839                    if store.handle().get(&db_key).is_ok_and(|v| v.is_some()) {
840                        verified_parents.push(*head);
841                    } else {
842                        warn!(
843                            context_id = %context.id,
844                            parent_id = ?head,
845                            "DAG head not in RocksDB - skipping as parent (likely cascaded delta not yet persisted)"
846                        );
847                    }
848                }
849
850                // If NO parents verified, use genesis
851                if verified_parents.is_empty() {
852                    warn!(
853                        context_id = %context.id,
854                        "No DAG heads in RocksDB - using genesis as parent"
855                    );
856                    vec![[0u8; 32]]
857                } else {
858                    verified_parents
859                }
860            };
861
862            let hlc = calimero_storage::env::hlc_timestamp();
863            let delta_id = CausalDelta::compute_id(&parents, &actions, &hlc);
864
865            let delta = CausalDelta {
866                id: delta_id,
867                parents,
868                actions,
869                hlc,
870                expected_root_hash: root_hash,
871            };
872
873            // Update context's DAG heads to this new delta
874            context.dag_heads = vec![delta.id];
875
876            causal_delta = Some(delta);
877        } else if !is_state_op {
878            // No delta created (empty artifact), but state changed
879            // Use root_hash as dag_head fallback to enable sync
880            // This happens when init() creates state but doesn't generate actions
881            if context.dag_heads.is_empty() {
882                warn!(
883                    context_id = %context.id,
884                    root_hash = ?root_hash,
885                    artifact_empty = outcome.artifact.is_empty(),
886                    "State changed but no delta created - using root_hash as dag_head fallback"
887                );
888                context.dag_heads = vec![root_hash];
889            }
890        }
891
892        // Persist context metadata when root_hash changes
893        let mut handle = store.handle();
894
895        debug!(
896            context_id = %context.id,
897            root_hash = ?context.root_hash,
898            dag_heads_count = context.dag_heads.len(),
899            is_state_op,
900            "Persisting context metadata to database"
901        );
902
903        handle.put(
904            &key::ContextMeta::new(context.id),
905            &types::ContextMeta::new(
906                key::ApplicationMeta::new(context.application_id),
907                *context.root_hash,
908                context.dag_heads.clone(),
909            ),
910        )?;
911
912        // Also persist the delta itself for serving to peers who request it
913        if let Some(ref delta) = causal_delta {
914            let serialized_actions = borsh::to_vec(&delta.actions)?;
915
916            handle.put(
917                &key::ContextDagDelta::new(context.id, delta.id),
918                &types::ContextDagDelta {
919                    delta_id: delta.id,
920                    parents: delta.parents.clone(),
921                    actions: serialized_actions,
922                    hlc: delta.hlc,
923                    applied: true,
924                    expected_root_hash: delta.expected_root_hash,
925                    events: None, // No events stored for locally created deltas
926                },
927            )?;
928
929            debug!(
930                context_id = %context.id,
931                delta_id = ?delta.id,
932                "Persisted delta to database for future requests"
933            );
934        }
935
936        debug!(
937            context_id = %context.id,
938            root_hash = ?context.root_hash,
939            dag_heads_count = context.dag_heads.len(),
940            is_state_op,
941            "Context metadata persisted successfully"
942        );
943    }
944
945    // Emit state mutation to WebSocket clients (frontends) if there are events or state changes
946    // Note: This is separate from node-to-node DAG broadcast (lines 408-419)
947    if !outcome.events.is_empty() || outcome.root_hash.is_some() {
948        let new_root = outcome
949            .root_hash
950            .map(|h| h.into())
951            .unwrap_or((*context.root_hash).into());
952
953        let events_vec = outcome
954            .events
955            .iter()
956            .map(|e| ExecutionEvent {
957                kind: e.kind.clone(),
958                data: e.data.clone(),
959                handler: e.handler.clone(),
960            })
961            .collect();
962
963        node_client.send_event(NodeEvent::Context(ContextEvent {
964            context_id: context.id,
965            payload: ContextEventPayload::StateMutation(
966                StateMutationPayload::with_root_and_events(new_root, events_vec),
967            ),
968        }))?;
969    }
970
971    Ok((outcome, causal_delta))
972}
973
974pub async fn execute(
975    context: &OwnedMutexGuard<ContextId>,
976    module: calimero_runtime::Module,
977    executor: PublicKey,
978    method: Cow<'static, str>,
979    input: Cow<'static, [u8]>,
980    mut storage: ContextStorage,
981    mut private_storage: ContextPrivateStorage,
982    node_client: NodeClient,
983    context_host: Option<Box<dyn ContextHost>>,
984) -> eyre::Result<(Outcome, ContextStorage, ContextPrivateStorage)> {
985    let context_id = **context;
986
987    // Run WASM execution in blocking context
988    // TODO(ctx)
989    global_runtime()
990        .spawn_blocking(move || {
991            let outcome = module.run(
992                context_id,
993                executor,
994                &method,
995                &input,
996                &mut storage,
997                Some(&mut private_storage),
998                Some(node_client),
999                context_host,
1000            )?;
1001            Ok((outcome, storage, private_storage))
1002        })
1003        .await
1004        .wrap_err("failed to receive execution response")?
1005}
1006
1007fn substitute_aliases_in_payload(
1008    node_client: &NodeClient,
1009    context_id: ContextId,
1010    payload: Vec<u8>,
1011    aliases: &[Alias<PublicKey>],
1012) -> Result<Vec<u8>, ExecuteError> {
1013    if aliases.is_empty() {
1014        return Ok(payload);
1015    }
1016
1017    // todo! evaluate a byte-version of calimero_server[build]::replace
1018    // todo! ref: https://github.com/calimero-network/core/blob/6deb2db81a65e0b5c86af9fe2950cf9019ab61af/crates/server/build.rs#L139-L175
1019
1020    let mut result = Vec::with_capacity(payload.len());
1021    let mut remaining = &payload[..];
1022
1023    for alias in aliases {
1024        let needle_str = format!("{{{alias}}}");
1025        let needle = needle_str.into_bytes();
1026
1027        while let Some(pos) = memmem::find(remaining, &needle) {
1028            result.extend_from_slice(&remaining[..pos]);
1029
1030            let public_key = node_client
1031                .resolve_alias(*alias, Some(context_id))
1032                .map_err(|_| ExecuteError::InternalError)?
1033                .ok_or_else(|| ExecuteError::AliasResolutionFailed { alias: *alias })?;
1034
1035            result.extend_from_slice(public_key.as_str().as_bytes());
1036
1037            remaining = &remaining[pos + needle.len()..];
1038        }
1039    }
1040
1041    result.extend_from_slice(remaining);
1042
1043    Ok(result)
1044}
1045
1046/// Helper function to sign user actions
1047/// Iterates over actions and signs any that are local, user-owned, and unsigned.
1048fn sign_user_actions(
1049    actions: &mut [Action],
1050    identity_private_key: &PrivateKey,
1051) -> eyre::Result<()> {
1052    // Sign the actions
1053    info!(actions_count = actions.len(), "Signing user actions...");
1054    for action in actions.iter_mut() {
1055        let action_id = action.id();
1056        let payload_for_signing = action.payload_for_signing();
1057
1058        // The nonce was already set by `calimero-storage`:
1059        // * For Add/Update, it's `metadata.updated_at`.
1060        // * For DeleteRef, it's `deleted_at`.
1061        // We just need to ensure the action's nonce field matches
1062        let (metadata, nonce) = match action {
1063            Action::Add { metadata, .. } => {
1064                let nonce = *metadata.updated_at;
1065                (metadata, nonce)
1066            }
1067            Action::Update { metadata, .. } => {
1068                let nonce = *metadata.updated_at;
1069                (metadata, nonce)
1070            }
1071            Action::DeleteRef {
1072                metadata,
1073                deleted_at,
1074                ..
1075            } => {
1076                let nonce = *deleted_at;
1077                (metadata, nonce)
1078            }
1079            Action::Compare { .. } => continue,
1080        };
1081
1082        if let StorageType::User {
1083            owner,
1084            signature_data: Some(sig_data),
1085        } = &mut metadata.storage_type
1086        {
1087            debug!(
1088                action_id = ?action_id,
1089                owner = %owner,
1090                nonce = %nonce,
1091                "Received user action from the outcome"
1092            );
1093
1094            // Check if it's ours and is currently unsigned (placeholder signature)
1095            if *owner == identity_private_key.public_key() && sig_data.signature == [0; 64] {
1096                // Re-set the nonce in sig_data just in case
1097                sig_data.nonce = nonce;
1098
1099                // TODO: Add `.map_err`.
1100                let signature = identity_private_key.sign(&payload_for_signing)?;
1101                sig_data.signature = signature.to_bytes();
1102
1103                debug!(
1104                    action_id = ?action_id,
1105                    action_id = %action_id,
1106                    owner = %owner,
1107                    owner = ?owner.digest(),
1108                    nonce = %nonce,
1109                    payload_for_signing = ?payload_for_signing,
1110                    ed25519_signature = ?signature,
1111                    signature = ?sig_data.signature,
1112                    signature_len = sig_data.signature.len(),
1113                    "Signed user action"
1114                );
1115            }
1116        }
1117
1118        if let StorageType::User {
1119            owner: _,
1120            signature_data: Some(_),
1121        } = &metadata.storage_type
1122        {
1123            debug!(
1124                action_serialized = ?borsh::to_vec(action)?,
1125                "After signing user action"
1126            );
1127        }
1128    }
1129    Ok(())
1130}