1use std::borrow::Cow;
2use std::collections::btree_map;
3use std::time::Instant;
5
6use actix::{
7 ActorFuture, ActorFutureExt, ActorResponse, ActorTryFutureExt, Handler, Message, WrapFuture,
8};
9use calimero_context_config::repr::ReprTransmute;
10use calimero_context_primitives::client::crypto::ContextIdentity;
11use calimero_context_primitives::client::ContextClient;
12use calimero_context_primitives::messages::{
13 ExecuteError, ExecuteEvent, ExecuteRequest, ExecuteResponse,
14};
15use calimero_context_primitives::{ContextAtomic, ContextAtomicKey};
16use calimero_node_primitives::client::NodeClient;
17use calimero_primitives::alias::Alias;
18use calimero_primitives::application::ApplicationId;
19use calimero_primitives::context::{Context, ContextId};
20use calimero_primitives::events::{
21 ContextEvent, ContextEventPayload, ExecutionEvent, NodeEvent, StateMutationPayload,
22};
23use calimero_primitives::hash::Hash;
24use calimero_primitives::identity::{PrivateKey, PublicKey};
25use calimero_runtime::logic::{ContextHost, Outcome};
26use calimero_storage::{
27 action::Action,
28 delta::{CausalDelta, StorageDelta},
29 entities::StorageType,
30};
31use calimero_store::{key, types, Store};
32use calimero_utils_actix::global_runtime;
33use either::Either;
34use eyre::{bail, WrapErr};
35use futures_util::future::TryFutureExt;
36use futures_util::io::Cursor;
37use memchr::memmem;
38use tokio::sync::OwnedMutexGuard;
39use tracing::{debug, error, info, warn};
40
41use crate::error::ContextError;
42use crate::handlers::utils::{process_context_mutations, StoreContextHost};
43use crate::metrics::ExecutionLabels;
44use crate::ContextManager;
45
46pub mod storage;
47
48use storage::{ContextPrivateStorage, ContextStorage};
49
50impl Handler<ExecuteRequest> for ContextManager {
51 type Result = ActorResponse<Self, <ExecuteRequest as Message>::Result>;
52
53 fn handle(
54 &mut self,
55 ExecuteRequest {
56 context: context_id,
57 executor,
58 method,
59 payload,
60 aliases,
61 atomic,
62 }: ExecuteRequest,
63 _ctx: &mut Self::Context,
64 ) -> Self::Result {
65 info!(
66 %context_id,
67 method,
68 "Executing method in context"
69 );
70 debug!(
71 %context_id,
72 %executor,
73 method,
74 aliases = ?aliases,
75 payload_len = payload.len(),
76 atomic = %match atomic {
77 None => "no",
78 Some(ContextAtomic::Lock) => "acquire",
79 Some(ContextAtomic::Held(_)) => "yes",
80 },
81 "Execution request details"
82 );
83
84 let context = match self.get_or_fetch_context(&context_id) {
85 Ok(Some(context)) => context,
86 Ok(None) => return ActorResponse::reply(Err(ExecuteError::ContextNotFound)),
87 Err(err) => {
88 error!(%err, "failed to execute request");
89
90 return ActorResponse::reply(Err(ExecuteError::InternalError));
91 }
92 };
93
94 let is_state_op = "__calimero_sync_next" == method;
95
96 if !is_state_op && *context.meta.root_hash == [0; 32] {
97 return ActorResponse::reply(Err(ExecuteError::Uninitialized));
98 }
99
100 let (guard, is_atomic) = match atomic {
101 None => (context.lock(), false),
102 Some(ContextAtomic::Lock) => (context.lock(), true),
103 Some(ContextAtomic::Held(ContextAtomicKey(guard))) => (Either::Left(guard), true),
104 };
105
106 let external_config = match self.context_client.context_config(&context_id) {
107 Ok(Some(external_config)) => external_config,
108 Ok(None) => {
109 error!(%context_id, "missing context config for context");
110
111 return ActorResponse::reply(Err(ExecuteError::InternalError));
112 }
113 Err(err) => {
114 error!(%err, "failed to execute request");
115
116 return ActorResponse::reply(Err(ExecuteError::InternalError));
117 }
118 };
119
120 let identity = match self.context_client.get_identity(&context_id, &executor) {
122 Ok(Some(
123 identity @ ContextIdentity {
124 private_key: Some(_),
125 sender_key: Some(_),
126 ..
127 },
128 )) => identity, Ok(_) => {
130 return ActorResponse::reply(Err(ExecuteError::Unauthorized {
131 context_id,
132 public_key: executor,
133 }))
134 }
135 Err(err) => {
136 error!(%err, "failed to execute request");
137
138 return ActorResponse::reply(Err(ExecuteError::InternalError));
139 }
140 };
141
142 let private_key = identity.private_key.expect(
145 "infallible (verified before): missing private key in ContextIdentity for signing",
146 );
147
148 let sender_key = identity.sender_key.expect(
150 "infallible (verified before): missing sender key in ContextIdentity for signing",
151 );
152
153 debug!(
154 public_key = ?identity.public_key,
155 public_key = %identity.public_key,
156 sender_key = ?sender_key.public_key(),
157 sender_key = %sender_key.public_key(),
158 "ContextManager: keys",
159 );
160
161 let payload =
162 match substitute_aliases_in_payload(&self.node_client, context_id, payload, &aliases) {
163 Ok(payload) => payload,
164 Err(err) => {
165 error!(%err, "failed to execute request");
166
167 return ActorResponse::reply(Err(err));
168 }
169 };
170
171 let guard_task = async move {
172 match guard {
173 Either::Left(guard) => guard,
174 Either::Right(task) => task.await,
175 }
176 }
177 .into_actor(self);
178
179 let context_task = guard_task.map(move |guard, act, _ctx| {
180 let Some(context) = act.get_or_fetch_context(&context_id)? else {
181 bail!(ContextError::ContextDeleted { context_id });
182 };
183
184 Ok((guard, context.meta.clone()))
185 });
186
187 let module_task = context_task.and_then(move |(guard, context), act, _ctx| {
188 act.get_module(context.application_id)
189 .map_ok(move |module, _act, _ctx| (guard, context, module))
190 });
191
192 let execution_count = self.metrics.as_ref().map(|m| m.execution_count.clone());
193 let execution_duration = self.metrics.as_ref().map(|m| m.execution_duration.clone());
194
195 let execute_task = module_task.and_then(move |(guard, mut context, module), act, _ctx| {
196 let datastore = act.datastore.clone();
197 let node_client = act.node_client.clone();
198 let context_client = act.context_client.clone();
199
200 async move {
201 let old_root_hash = context.root_hash;
202
203 let start = Instant::now();
204
205 let (outcome, causal_delta) = internal_execute(
206 datastore,
207 &node_client,
208 &context_client,
209 module,
210 &guard,
211 &mut context,
212 executor,
213 method.clone().into(),
214 payload.into(),
215 is_state_op,
216 &private_key,
217 )
218 .await?;
219
220 let duration = start.elapsed().as_secs_f64();
221 let status = outcome
222 .returns
223 .is_ok()
224 .then_some("success")
225 .unwrap_or("failure");
226
227 if let Some(execution_count) = execution_count {
229 let _ignored = execution_count
230 .clone()
231 .get_or_create(&ExecutionLabels {
232 context_id: context_id.to_string(),
233 method: method.clone(),
234 status: status.to_owned(),
235 })
236 .inc();
237 }
238
239 if let Some(execution_duration) = execution_duration {
241 let _ignored = execution_duration
242 .clone()
243 .get_or_create(&ExecutionLabels {
244 context_id: context_id.to_string(),
245 method: method.clone(),
246 status: status.to_owned(),
247 })
248 .observe(duration);
249 }
250
251 info!(
252 %context_id,
253 method,
254 status,
255 "Method execution completed"
256 );
257 debug!(
258 %context_id,
259 %executor,
260 method,
261 status,
262 %old_root_hash,
263 new_root_hash=%context.root_hash,
264 artifact_len = outcome.artifact.len(),
265 logs_count = outcome.logs.len(),
266 events_count = outcome.events.len(),
267 xcalls_count = outcome.xcalls.len(),
268 "Execution outcome details"
269 );
270
271 Ok((guard, context, outcome, causal_delta))
272 }
273 .into_actor(act)
274 });
275
276 let external_task =
277 execute_task.and_then(move |(guard, context, outcome, causal_delta), act, _ctx| {
278 if let Some(cached_context) = act.contexts.get_mut(&context_id) {
279 debug!(
280 %context_id,
281 old_root = ?cached_context.meta.root_hash,
282 new_root = ?context.root_hash,
283 is_state_op,
284 "Updating cached context root_hash"
285 );
286 cached_context.meta.root_hash = context.root_hash;
287 } else {
288 debug!(%context_id, is_state_op, "Context not in cache, will be fetched from DB next time");
289 }
290
291 let node_client = act.node_client.clone();
292 let context_client = act.context_client.clone();
293
294 async move {
295 if outcome.returns.is_err() {
296 return Ok((guard, context.root_hash, outcome));
297 }
298
299 info!(
300 %context_id,
301 %executor,
302 is_state_op,
303 artifact_empty = outcome.artifact.is_empty(),
304 events_count = outcome.events.len(),
305 xcalls_count = outcome.xcalls.len(),
306 "Execution outcome details"
307 );
308
309 let events_with_handlers: Vec<_> = outcome
318 .events
319 .iter()
320 .enumerate()
321 .filter_map(|(idx, event)| {
322 event.handler.as_ref().map(|handler_name| {
323 info!(
324 %context_id,
325 event_kind = %event.kind,
326 handler_name = %handler_name,
327 "Event with handler will be executed after response"
328 );
329 (idx, handler_name.clone(), event.data.clone())
330 })
331 })
332 .collect();
333
334 let executed_handler_indices: std::collections::HashSet<usize> =
336 events_with_handlers.iter().map(|(idx, _, _)| *idx).collect();
337
338 for xcall in &outcome.xcalls {
342 let target_context_id = ContextId::from(xcall.context_id);
343
344 info!(
345 %context_id,
346 target_context = ?target_context_id,
347 function = %xcall.function,
348 params_len = xcall.params.len(),
349 "Processing cross-context call"
350 );
351
352 use futures_util::TryStreamExt;
355 let members: Vec<_> = context_client
356 .get_context_members(&target_context_id, Some(true))
357 .try_collect()
358 .await
359 .unwrap_or_default();
360
361 let Some((target_executor, _is_owned)) = members.first() else {
362 error!(
363 %context_id,
364 target_context = ?target_context_id,
365 function = %xcall.function,
366 "No owned members found for target context"
367 );
368 continue;
369 };
370
371 let target_executor = *target_executor;
372
373 info!(
374 %context_id,
375 target_context = ?target_context_id,
376 target_executor = ?target_executor,
377 "Found owned member for target context"
378 );
379
380 let xcall_result = context_client
382 .execute(
383 &target_context_id,
384 &target_executor,
385 xcall.function.clone(),
386 xcall.params.clone(),
387 vec![],
388 None,
389 )
390 .await;
391
392 match xcall_result {
393 Ok(_) => {
394 info!(
395 %context_id,
396 target_context = ?target_context_id,
397 function = %xcall.function,
398 "Cross-context call executed successfully"
399 );
400 }
401 Err(err) => {
402 error!(
403 %context_id,
404 target_context = ?target_context_id,
405 function = %xcall.function,
406 ?err,
407 "Cross-context call failed"
408 );
409 }
410 }
411 }
412
413 if !(is_state_op || outcome.artifact.is_empty()) {
422 info!(
423 %context_id,
424 %executor,
425 is_state_op,
426 artifact_empty = outcome.artifact.is_empty(),
427 events_count = outcome.events.len(),
428 has_delta = causal_delta.is_some(),
429 "Broadcasting state delta and events to other nodes"
430 );
431
432 if let Some(ref the_delta) = causal_delta {
433 let events_data = if outcome.events.is_empty() {
435 info!(
436 %context_id,
437 %executor,
438 "No events to serialize"
439 );
440 None
441 } else {
442 let events_vec: Vec<ExecutionEvent> = outcome
446 .events
447 .iter()
448 .enumerate()
449 .map(|(idx, e)| ExecutionEvent {
450 kind: e.kind.clone(),
451 data: e.data.clone(),
452 handler: if executed_handler_indices.contains(&idx) {
454 None
455 } else {
456 e.handler.clone()
457 },
458 })
459 .collect();
460 let serialized = serde_json::to_vec(&events_vec)?;
461 info!(
462 %context_id,
463 %executor,
464 events_count = events_vec.len(),
465 handlers_executed_locally = executed_handler_indices.len(),
466 serialized_len = serialized.len(),
467 "Serializing events for broadcast (handlers cleared for executed events)"
468 );
469 Some(serialized)
470 };
471
472 node_client
473 .broadcast(
474 &context,
475 &executor,
476 &sender_key,
477 outcome.artifact.clone(),
478 the_delta.id,
479 the_delta.parents.clone(),
480 the_delta.hlc,
481 events_data,
482 )
483 .await?;
484 }
485 }
486
487 let external_client =
488 context_client.external_client(&context_id, &external_config)?;
489
490 let proxy_client = external_client.proxy();
491
492 for (proposal_id, actions) in &outcome.proposals {
493 let actions = borsh::from_slice(actions)?;
494
495 let proposal_id = proposal_id.rt().expect("infallible conversion");
496
497 proxy_client
498 .propose(&executor, &proposal_id, actions)
499 .await?;
500 }
501
502 for proposal_id in &outcome.approvals {
503 let proposal_id = proposal_id.rt().expect("infallible conversion");
504
505 proxy_client.approve(&executor, &proposal_id).await?;
506 }
507
508 process_context_mutations(&context_client, &node_client, context_id, executor, &outcome.context_mutations).await;
509
510 if !events_with_handlers.is_empty() {
514 let handler_context_client = context_client.clone();
515 let handler_context_id = context_id;
516 let handler_executor = executor;
517
518 global_runtime().spawn(async move {
520 for (_idx, handler_name, event_data) in events_with_handlers {
521 info!(
522 %handler_context_id,
523 handler_name = %handler_name,
524 "Executing handler on sender (deferred)"
525 );
526
527 match handler_context_client
528 .execute(
529 &handler_context_id,
530 &handler_executor,
531 handler_name.clone(),
532 event_data,
533 vec![],
534 None,
535 )
536 .await
537 {
538 Ok(_) => {
539 info!(
540 %handler_context_id,
541 handler_name = %handler_name,
542 "Handler executed successfully on sender"
543 );
544 }
545 Err(err) => {
546 warn!(
547 %handler_context_id,
548 handler_name = %handler_name,
549 error = %err,
550 "Handler execution failed on sender"
551 );
552 }
553 }
554 }
555 });
556 }
557
558 Ok((guard, context.root_hash, outcome))
559 }
560 .map_err(|err| {
561 error!(
562 ?err,
563 "execution succeeded, but an error occurred while performing external actions"
564 );
565
566 err
567 })
568 .into_actor(act)
569 });
570
571 let task = external_task
572 .map_err(|err, _act, _ctx| {
573 err.downcast::<ExecuteError>().unwrap_or_else(|err| {
574 debug!(?err, "an error occurred while executing request");
575 ExecuteError::InternalError
576 })
577 })
578 .map_ok(
579 move |(guard, root_hash, outcome), _act, _ctx| ExecuteResponse {
580 returns: outcome.returns.map_err(Into::into),
581 logs: outcome.logs,
582 events: outcome
583 .events
584 .into_iter()
585 .map(|e| ExecuteEvent {
586 kind: e.kind,
587 data: e.data,
588 handler: e.handler,
589 })
590 .collect(),
591 root_hash,
592 artifact: outcome.artifact,
593 atomic: is_atomic.then_some(ContextAtomicKey(guard)),
594 },
595 );
596
597 ActorResponse::r#async(task)
598 }
599}
600
601impl ContextManager {
602 pub fn get_module(
603 &self,
604 application_id: ApplicationId,
605 ) -> impl ActorFuture<Self, Output = eyre::Result<calimero_runtime::Module>> + 'static {
606 let blob_task = async {}.into_actor(self).map(move |_, act, _ctx| {
607 let blob = match act.applications.entry(application_id) {
608 btree_map::Entry::Vacant(vacant) => {
609 let Some(app) = act.node_client.get_application(&application_id)? else {
610 bail!(ExecuteError::ApplicationNotInstalled { application_id });
611 };
612
613 vacant.insert(app).blob
614 }
615 btree_map::Entry::Occupied(occupied) => occupied.into_mut().blob,
616 };
617
618 Ok(blob)
619 });
620
621 let module_task = blob_task.and_then(move |mut blob, act, _ctx| {
622 let node_client = act.node_client.clone();
623
624 async move {
625 if let Some(compiled) = node_client.get_blob_bytes(&blob.compiled, None).await? {
626 let module =
627 unsafe { calimero_runtime::Engine::headless().from_precompiled(&compiled) };
628
629 match module {
630 Ok(module) => return Ok((module, None)),
631 Err(err) => {
632 debug!(
633 ?err,
634 %application_id,
635 blob_id=%blob.compiled,
636 "failed to load precompiled module, recompiling.."
637 );
638 }
639 }
640 }
641
642 debug!(
643 %application_id,
644 blob_id=%blob.compiled,
645 "no usable precompiled module found, compiling.."
646 );
647
648 let Some(bytecode) = node_client.get_application_bytes(&application_id).await?
651 else {
652 bail!(ExecuteError::ApplicationNotInstalled { application_id });
653 };
654
655 let module = global_runtime()
658 .spawn_blocking(move || calimero_runtime::Engine::default().compile(&bytecode))
659 .await
660 .wrap_err("WASM compilation task failed")? ?; let compiled = Cursor::new(module.to_bytes()?);
664
665 let (blob_id, _ignored) = node_client.add_blob(compiled, None, None).await?;
666
667 blob.compiled = blob_id;
668
669 node_client.update_compiled_app(&application_id, &blob_id)?;
670
671 Ok((module, Some(blob)))
672 }
673 .into_actor(act)
674 });
675
676 module_task
677 .map_ok(move |(module, blob), act, _ctx| {
678 if let Some(blob) = blob {
679 if let Some(app) = act.applications.get_mut(&application_id) {
680 app.blob = blob;
681 }
682 }
683
684 module
685 })
686 .map_err(|err, _act, _ctx| {
687 error!(?err, "failed to initialize module for execution");
688
689 err
690 })
691 }
692}
693
694async fn internal_execute(
695 datastore: Store,
696 node_client: &NodeClient,
697 _context_client: &ContextClient,
698 module: calimero_runtime::Module,
699 guard: &OwnedMutexGuard<ContextId>,
700 context: &mut Context,
701 executor: PublicKey,
702 method: Cow<'static, str>,
703 input: Cow<'static, [u8]>,
704 is_state_op: bool,
705 identity_private_key: &PrivateKey,
706) -> eyre::Result<(Outcome, Option<CausalDelta>)> {
707 let context_host = StoreContextHost {
709 store: datastore.clone(),
710 context_id: context.id,
711 };
712
713 let storage = ContextStorage::from(datastore.clone(), context.id);
714 let private_storage = ContextPrivateStorage::from(datastore, context.id);
716 let (mut outcome, storage, private_storage) = execute(
717 guard,
718 module,
719 executor,
720 method.clone(),
721 input,
722 storage,
723 private_storage,
724 node_client.clone(),
725 Some(Box::new(context_host)),
726 )
727 .await?;
728
729 debug!(
730 context_id = %context.id,
731 method = %method,
732 is_state_op,
733 has_root_hash = outcome.root_hash.is_some(),
734 artifact_len = outcome.artifact.len(),
735 events_count = outcome.events.len(),
736 returns_ok = outcome.returns.is_ok(),
737 "WASM execution completed"
738 );
739
740 if outcome.returns.is_err() {
741 warn!(
742 context_id = %context.id,
743 method = %method,
744 error = ?outcome.returns,
745 "WASM execution returned error"
746 );
747 return Ok((outcome, None));
748 }
749
750 'fine: {
751 if outcome.root_hash.is_some() && outcome.artifact.is_empty() {
752 debug!(
753 context_id = %context.id,
754 has_root_hash = true,
755 artifact_empty = true,
756 is_state_op,
757 "Outcome has root hash but empty artifact - checking mitigation"
758 );
759
760 if is_state_op {
761 break 'fine;
763 }
764
765 bail!(ContextError::StateInconsistency);
766 }
767 }
768
769 let mut causal_delta = None;
770
771 if let Some(root_hash) = outcome.root_hash {
775 debug!(
776 context_id = %context.id,
777 old_root = ?context.root_hash,
778 new_root = ?Hash::from(root_hash),
779 is_state_op,
780 storage_empty = storage.is_empty(),
781 "Updating context root_hash after execution"
782 );
783 context.root_hash = root_hash.into();
784
785 let store = storage.commit()?;
787 let _private_store = private_storage.commit()?;
790
791 if !is_state_op && !outcome.artifact.is_empty() {
793 let mut actions = match borsh::from_slice::<StorageDelta>(&outcome.artifact) {
795 Ok(StorageDelta::Actions(actions)) => actions,
796 Ok(_) => {
797 warn!("Unexpected StorageDelta variant, using empty actions");
798 vec![]
799 }
800 Err(e) => {
801 warn!(
802 ?e,
803 "Failed to deserialize artifact for DAG, using empty actions"
804 );
805 vec![]
806 }
807 };
808
809 if actions.len() != 0 {
811 info!(
812 context_id = %context.id,
813 actions_count = actions.len(),
814 "Received several actions. Verify if there any user actions..."
815 );
816 sign_user_actions(&mut actions, &identity_private_key)
817 .wrap_err("Failed to sign user actions")?;
818
819 let new_artifact = borsh::to_vec(&StorageDelta::Actions(actions.clone()))?;
821 outcome.artifact = new_artifact;
822 }
823
824 let parents = if context.dag_heads.is_empty() {
826 vec![[0u8; 32]]
828 } else {
829 let mut verified_parents = Vec::new();
831 for head in &context.dag_heads {
832 if *head == [0u8; 32] {
833 verified_parents.push(*head);
834 continue;
835 }
836
837 let db_key = key::ContextDagDelta::new(context.id, *head);
839 if store.handle().get(&db_key).is_ok_and(|v| v.is_some()) {
840 verified_parents.push(*head);
841 } else {
842 warn!(
843 context_id = %context.id,
844 parent_id = ?head,
845 "DAG head not in RocksDB - skipping as parent (likely cascaded delta not yet persisted)"
846 );
847 }
848 }
849
850 if verified_parents.is_empty() {
852 warn!(
853 context_id = %context.id,
854 "No DAG heads in RocksDB - using genesis as parent"
855 );
856 vec![[0u8; 32]]
857 } else {
858 verified_parents
859 }
860 };
861
862 let hlc = calimero_storage::env::hlc_timestamp();
863 let delta_id = CausalDelta::compute_id(&parents, &actions, &hlc);
864
865 let delta = CausalDelta {
866 id: delta_id,
867 parents,
868 actions,
869 hlc,
870 expected_root_hash: root_hash,
871 };
872
873 context.dag_heads = vec![delta.id];
875
876 causal_delta = Some(delta);
877 } else if !is_state_op {
878 if context.dag_heads.is_empty() {
882 warn!(
883 context_id = %context.id,
884 root_hash = ?root_hash,
885 artifact_empty = outcome.artifact.is_empty(),
886 "State changed but no delta created - using root_hash as dag_head fallback"
887 );
888 context.dag_heads = vec![root_hash];
889 }
890 }
891
892 let mut handle = store.handle();
894
895 debug!(
896 context_id = %context.id,
897 root_hash = ?context.root_hash,
898 dag_heads_count = context.dag_heads.len(),
899 is_state_op,
900 "Persisting context metadata to database"
901 );
902
903 handle.put(
904 &key::ContextMeta::new(context.id),
905 &types::ContextMeta::new(
906 key::ApplicationMeta::new(context.application_id),
907 *context.root_hash,
908 context.dag_heads.clone(),
909 ),
910 )?;
911
912 if let Some(ref delta) = causal_delta {
914 let serialized_actions = borsh::to_vec(&delta.actions)?;
915
916 handle.put(
917 &key::ContextDagDelta::new(context.id, delta.id),
918 &types::ContextDagDelta {
919 delta_id: delta.id,
920 parents: delta.parents.clone(),
921 actions: serialized_actions,
922 hlc: delta.hlc,
923 applied: true,
924 expected_root_hash: delta.expected_root_hash,
925 events: None, },
927 )?;
928
929 debug!(
930 context_id = %context.id,
931 delta_id = ?delta.id,
932 "Persisted delta to database for future requests"
933 );
934 }
935
936 debug!(
937 context_id = %context.id,
938 root_hash = ?context.root_hash,
939 dag_heads_count = context.dag_heads.len(),
940 is_state_op,
941 "Context metadata persisted successfully"
942 );
943 }
944
945 if !outcome.events.is_empty() || outcome.root_hash.is_some() {
948 let new_root = outcome
949 .root_hash
950 .map(|h| h.into())
951 .unwrap_or((*context.root_hash).into());
952
953 let events_vec = outcome
954 .events
955 .iter()
956 .map(|e| ExecutionEvent {
957 kind: e.kind.clone(),
958 data: e.data.clone(),
959 handler: e.handler.clone(),
960 })
961 .collect();
962
963 node_client.send_event(NodeEvent::Context(ContextEvent {
964 context_id: context.id,
965 payload: ContextEventPayload::StateMutation(
966 StateMutationPayload::with_root_and_events(new_root, events_vec),
967 ),
968 }))?;
969 }
970
971 Ok((outcome, causal_delta))
972}
973
974pub async fn execute(
975 context: &OwnedMutexGuard<ContextId>,
976 module: calimero_runtime::Module,
977 executor: PublicKey,
978 method: Cow<'static, str>,
979 input: Cow<'static, [u8]>,
980 mut storage: ContextStorage,
981 mut private_storage: ContextPrivateStorage,
982 node_client: NodeClient,
983 context_host: Option<Box<dyn ContextHost>>,
984) -> eyre::Result<(Outcome, ContextStorage, ContextPrivateStorage)> {
985 let context_id = **context;
986
987 global_runtime()
990 .spawn_blocking(move || {
991 let outcome = module.run(
992 context_id,
993 executor,
994 &method,
995 &input,
996 &mut storage,
997 Some(&mut private_storage),
998 Some(node_client),
999 context_host,
1000 )?;
1001 Ok((outcome, storage, private_storage))
1002 })
1003 .await
1004 .wrap_err("failed to receive execution response")?
1005}
1006
1007fn substitute_aliases_in_payload(
1008 node_client: &NodeClient,
1009 context_id: ContextId,
1010 payload: Vec<u8>,
1011 aliases: &[Alias<PublicKey>],
1012) -> Result<Vec<u8>, ExecuteError> {
1013 if aliases.is_empty() {
1014 return Ok(payload);
1015 }
1016
1017 let mut result = Vec::with_capacity(payload.len());
1021 let mut remaining = &payload[..];
1022
1023 for alias in aliases {
1024 let needle_str = format!("{{{alias}}}");
1025 let needle = needle_str.into_bytes();
1026
1027 while let Some(pos) = memmem::find(remaining, &needle) {
1028 result.extend_from_slice(&remaining[..pos]);
1029
1030 let public_key = node_client
1031 .resolve_alias(*alias, Some(context_id))
1032 .map_err(|_| ExecuteError::InternalError)?
1033 .ok_or_else(|| ExecuteError::AliasResolutionFailed { alias: *alias })?;
1034
1035 result.extend_from_slice(public_key.as_str().as_bytes());
1036
1037 remaining = &remaining[pos + needle.len()..];
1038 }
1039 }
1040
1041 result.extend_from_slice(remaining);
1042
1043 Ok(result)
1044}
1045
1046fn sign_user_actions(
1049 actions: &mut [Action],
1050 identity_private_key: &PrivateKey,
1051) -> eyre::Result<()> {
1052 info!(actions_count = actions.len(), "Signing user actions...");
1054 for action in actions.iter_mut() {
1055 let action_id = action.id();
1056 let payload_for_signing = action.payload_for_signing();
1057
1058 let (metadata, nonce) = match action {
1063 Action::Add { metadata, .. } => {
1064 let nonce = *metadata.updated_at;
1065 (metadata, nonce)
1066 }
1067 Action::Update { metadata, .. } => {
1068 let nonce = *metadata.updated_at;
1069 (metadata, nonce)
1070 }
1071 Action::DeleteRef {
1072 metadata,
1073 deleted_at,
1074 ..
1075 } => {
1076 let nonce = *deleted_at;
1077 (metadata, nonce)
1078 }
1079 Action::Compare { .. } => continue,
1080 };
1081
1082 if let StorageType::User {
1083 owner,
1084 signature_data: Some(sig_data),
1085 } = &mut metadata.storage_type
1086 {
1087 debug!(
1088 action_id = ?action_id,
1089 owner = %owner,
1090 nonce = %nonce,
1091 "Received user action from the outcome"
1092 );
1093
1094 if *owner == identity_private_key.public_key() && sig_data.signature == [0; 64] {
1096 sig_data.nonce = nonce;
1098
1099 let signature = identity_private_key.sign(&payload_for_signing)?;
1101 sig_data.signature = signature.to_bytes();
1102
1103 debug!(
1104 action_id = ?action_id,
1105 action_id = %action_id,
1106 owner = %owner,
1107 owner = ?owner.digest(),
1108 nonce = %nonce,
1109 payload_for_signing = ?payload_for_signing,
1110 ed25519_signature = ?signature,
1111 signature = ?sig_data.signature,
1112 signature_len = sig_data.signature.len(),
1113 "Signed user action"
1114 );
1115 }
1116 }
1117
1118 if let StorageType::User {
1119 owner: _,
1120 signature_data: Some(_),
1121 } = &metadata.storage_type
1122 {
1123 debug!(
1124 action_serialized = ?borsh::to_vec(action)?,
1125 "After signing user action"
1126 );
1127 }
1128 }
1129 Ok(())
1130}