1use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15 Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19 ownership::ChainOwnership,
20 time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27 execution::UserAction,
28 runtime::ContractSyncRuntime,
29 system::{CreateApplicationResult, OpenChainConfig},
30 util::{OracleResponseExt as _, RespondExt as _},
31 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
32 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
33 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
34 ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
35};
36
37pub struct ExecutionStateActor<'a, C> {
39 state: &'a mut ExecutionStateView<C>,
40 txn_tracker: &'a mut TransactionTracker,
41 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
42}
43
44#[cfg(with_metrics)]
45mod metrics {
46 use std::sync::LazyLock;
47
48 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
49 use prometheus::HistogramVec;
50
51 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
53 register_histogram_vec(
54 "load_contract_latency",
55 "Load contract latency",
56 &[],
57 exponential_bucket_latencies(250.0),
58 )
59 });
60
61 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
63 register_histogram_vec(
64 "load_service_latency",
65 "Load service latency",
66 &[],
67 exponential_bucket_latencies(250.0),
68 )
69 });
70}
71
72pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
73
74impl<'a, C> ExecutionStateActor<'a, C>
75where
76 C: Context + Clone + 'static,
77 C::Extra: ExecutionRuntimeContext,
78{
79 pub fn new(
81 state: &'a mut ExecutionStateView<C>,
82 txn_tracker: &'a mut TransactionTracker,
83 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
84 ) -> Self {
85 Self {
86 state,
87 txn_tracker,
88 resource_controller,
89 }
90 }
91
92 pub(crate) async fn load_contract(
93 &mut self,
94 id: ApplicationId,
95 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
96 #[cfg(with_metrics)]
97 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
98 let blob_id = id.description_blob_id();
99 let description = match self.txn_tracker.get_blob_content(&blob_id) {
100 Some(blob) => bcs::from_bytes(blob.bytes())?,
101 None => {
102 self.state
103 .system
104 .describe_application(id, self.txn_tracker)
105 .await?
106 }
107 };
108 let code = self
109 .state
110 .context()
111 .extra()
112 .get_user_contract(&description, self.txn_tracker)
113 .await?;
114 Ok((code, description))
115 }
116
117 pub(crate) async fn load_service(
118 &mut self,
119 id: ApplicationId,
120 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
121 #[cfg(with_metrics)]
122 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
123 let blob_id = id.description_blob_id();
124 let description = match self.txn_tracker.get_blob_content(&blob_id) {
125 Some(blob) => bcs::from_bytes(blob.bytes())?,
126 None => {
127 self.state
128 .system
129 .describe_application(id, self.txn_tracker)
130 .await?
131 }
132 };
133 let code = self
134 .state
135 .context()
136 .extra()
137 .get_user_service(&description, self.txn_tracker)
138 .await?;
139 Ok((code, description))
140 }
141
142 pub(crate) async fn handle_request(
144 &mut self,
145 request: ExecutionRequest,
146 ) -> Result<(), ExecutionError> {
147 use ExecutionRequest::*;
148 match request {
149 #[cfg(not(web))]
150 LoadContract { id, callback } => {
151 let (code, description) = self.load_contract(id).await?;
152 callback.respond((code, description))
153 }
154 #[cfg(not(web))]
155 LoadService { id, callback } => {
156 let (code, description) = self.load_service(id).await?;
157 callback.respond((code, description))
158 }
159
160 ChainBalance { callback } => {
161 let balance = *self.state.system.balance.get();
162 callback.respond(balance);
163 }
164
165 OwnerBalance { owner, callback } => {
166 let balance = self
167 .state
168 .system
169 .balances
170 .get(&owner)
171 .await?
172 .unwrap_or_default();
173 callback.respond(balance);
174 }
175
176 OwnerBalances { callback } => {
177 let balances = self.state.system.balances.index_values().await?;
178 callback.respond(balances.into_iter().collect());
179 }
180
181 BalanceOwners { callback } => {
182 let owners = self.state.system.balances.indices().await?;
183 callback.respond(owners);
184 }
185
186 Transfer {
187 source,
188 destination,
189 amount,
190 signer,
191 application_id,
192 callback,
193 } => {
194 let maybe_message = self
195 .state
196 .system
197 .transfer(signer, Some(application_id), source, destination, amount)
198 .await?;
199 self.txn_tracker.add_outgoing_messages(maybe_message);
200 callback.respond(());
201 }
202
203 Claim {
204 source,
205 destination,
206 amount,
207 signer,
208 application_id,
209 callback,
210 } => {
211 let maybe_message = self
212 .state
213 .system
214 .claim(
215 signer,
216 Some(application_id),
217 source.owner,
218 source.chain_id,
219 destination,
220 amount,
221 )
222 .await?;
223 self.txn_tracker.add_outgoing_messages(maybe_message);
224 callback.respond(());
225 }
226
227 SystemTimestamp { callback } => {
228 let timestamp = *self.state.system.timestamp.get();
229 callback.respond(timestamp);
230 }
231
232 ChainOwnership { callback } => {
233 let ownership = self.state.system.ownership.get().clone();
234 callback.respond(ownership);
235 }
236
237 ApplicationPermissions { callback } => {
238 let permissions = self.state.system.application_permissions.get().clone();
239 callback.respond(permissions);
240 }
241
242 ContainsKey { id, key, callback } => {
243 let view = self.state.users.try_load_entry(&id).await?;
244 let result = match view {
245 Some(view) => view.contains_key(&key).await?,
246 None => false,
247 };
248 callback.respond(result);
249 }
250
251 ContainsKeys { id, keys, callback } => {
252 let view = self.state.users.try_load_entry(&id).await?;
253 let result = match view {
254 Some(view) => view.contains_keys(&keys).await?,
255 None => vec![false; keys.len()],
256 };
257 callback.respond(result);
258 }
259
260 ReadMultiValuesBytes { id, keys, callback } => {
261 let view = self.state.users.try_load_entry(&id).await?;
262 let values = match view {
263 Some(view) => view.multi_get(&keys).await?,
264 None => vec![None; keys.len()],
265 };
266 callback.respond(values);
267 }
268
269 ReadValueBytes { id, key, callback } => {
270 let view = self.state.users.try_load_entry(&id).await?;
271 let result = match view {
272 Some(view) => view.get(&key).await?,
273 None => None,
274 };
275 callback.respond(result);
276 }
277
278 FindKeysByPrefix {
279 id,
280 key_prefix,
281 callback,
282 } => {
283 let view = self.state.users.try_load_entry(&id).await?;
284 let result = match view {
285 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
286 None => Vec::new(),
287 };
288 callback.respond(result);
289 }
290
291 FindKeyValuesByPrefix {
292 id,
293 key_prefix,
294 callback,
295 } => {
296 let view = self.state.users.try_load_entry(&id).await?;
297 let result = match view {
298 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
299 None => Vec::new(),
300 };
301 callback.respond(result);
302 }
303
304 WriteBatch {
305 id,
306 batch,
307 callback,
308 } => {
309 let mut view = self.state.users.try_load_entry_mut(&id).await?;
310 view.write_batch(batch).await?;
311 callback.respond(());
312 }
313
314 OpenChain {
315 ownership,
316 balance,
317 parent_id,
318 block_height,
319 application_permissions,
320 timestamp,
321 callback,
322 } => {
323 let config = OpenChainConfig {
324 ownership,
325 balance,
326 application_permissions,
327 };
328 let chain_id = self
329 .state
330 .system
331 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
332 .await?;
333 callback.respond(chain_id);
334 }
335
336 CloseChain {
337 application_id,
338 callback,
339 } => {
340 let app_permissions = self.state.system.application_permissions.get();
341 if !app_permissions.can_close_chain(&application_id) {
342 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
343 } else {
344 self.state.system.close_chain().await?;
345 callback.respond(Ok(()));
346 }
347 }
348
349 ChangeOwnership {
350 application_id,
351 ownership,
352 callback,
353 } => {
354 let app_permissions = self.state.system.application_permissions.get();
355 if !app_permissions.can_close_chain(&application_id) {
356 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
357 } else {
358 self.state.system.ownership.set(ownership);
359 callback.respond(Ok(()));
360 }
361 }
362
363 ChangeApplicationPermissions {
364 application_id,
365 application_permissions,
366 callback,
367 } => {
368 let app_permissions = self.state.system.application_permissions.get();
369 if !app_permissions.can_change_application_permissions(&application_id) {
370 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
371 } else {
372 self.state
373 .system
374 .application_permissions
375 .set(application_permissions);
376 callback.respond(Ok(()));
377 }
378 }
379
380 CreateApplication {
381 chain_id,
382 block_height,
383 module_id,
384 parameters,
385 required_application_ids,
386 callback,
387 } => {
388 let create_application_result = self
389 .state
390 .system
391 .create_application(
392 chain_id,
393 block_height,
394 module_id,
395 parameters,
396 required_application_ids,
397 self.txn_tracker,
398 )
399 .await?;
400 callback.respond(Ok(create_application_result));
401 }
402
403 PerformHttpRequest {
404 request,
405 http_responses_are_oracle_responses,
406 callback,
407 } => {
408 let system = &mut self.state.system;
409 let response = self
410 .txn_tracker
411 .oracle(|| async {
412 let headers = request
413 .headers
414 .into_iter()
415 .map(|http::Header { name, value }| {
416 Ok((name.parse()?, value.try_into()?))
417 })
418 .collect::<Result<HeaderMap, ExecutionError>>()?;
419
420 let url = Url::parse(&request.url)?;
421 let host = url
422 .host_str()
423 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
424
425 let (_epoch, committee) = system
426 .current_committee()
427 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
428 let allowed_hosts = &committee.policy().http_request_allow_list;
429
430 ensure!(
431 allowed_hosts.contains(host),
432 ExecutionError::UnauthorizedHttpRequest(url)
433 );
434
435 let request = Client::new()
436 .request(request.method.into(), url)
437 .body(request.body)
438 .headers(headers);
439 #[cfg(not(web))]
440 let request = request.timeout(linera_base::time::Duration::from_millis(
441 committee.policy().http_request_timeout_ms,
442 ));
443
444 let response = request.send().await?;
445
446 let mut response_size_limit =
447 committee.policy().maximum_http_response_bytes;
448
449 if http_responses_are_oracle_responses {
450 response_size_limit = response_size_limit
451 .min(committee.policy().maximum_oracle_response_bytes);
452 }
453 Ok(OracleResponse::Http(
454 Self::receive_http_response(response, response_size_limit).await?,
455 ))
456 })
457 .await?
458 .to_http_response()?;
459 callback.respond(response);
460 }
461
462 ReadBlobContent { blob_id, callback } => {
463 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
464 content.clone()
465 } else {
466 let content = self.state.system.read_blob_content(blob_id).await?;
467 if blob_id.blob_type == BlobType::Data {
468 self.resource_controller
469 .with_state(&mut self.state.system)
470 .await?
471 .track_blob_read(content.bytes().len() as u64)?;
472 }
473 self.state
474 .system
475 .blob_used(self.txn_tracker, blob_id)
476 .await?;
477 content
478 };
479 callback.respond(content)
480 }
481
482 AssertBlobExists { blob_id, callback } => {
483 self.state.system.assert_blob_exists(blob_id).await?;
484 if blob_id.blob_type == BlobType::Data {
486 self.resource_controller
487 .with_state(&mut self.state.system)
488 .await?
489 .track_blob_read(0)?;
490 }
491 let is_new = self
492 .state
493 .system
494 .blob_used(self.txn_tracker, blob_id)
495 .await?;
496 if is_new {
497 self.txn_tracker
498 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
499 }
500 callback.respond(());
501 }
502
503 Emit {
504 stream_id,
505 value,
506 callback,
507 } => {
508 let count = self
509 .state
510 .stream_event_counts
511 .get_mut_or_default(&stream_id)
512 .await?;
513 let index = *count;
514 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
515 self.txn_tracker.add_event(stream_id, index, value);
516 callback.respond(index)
517 }
518
519 ReadEvent { event_id, callback } => {
520 let extra = self.state.context().extra();
521 let event = self
522 .txn_tracker
523 .oracle(|| async {
524 let event = extra
525 .get_event(event_id.clone())
526 .await?
527 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
528 Ok(OracleResponse::Event(event_id.clone(), event))
529 })
530 .await?
531 .to_event(&event_id)?;
532 callback.respond(event);
533 }
534
535 SubscribeToEvents {
536 chain_id,
537 stream_id,
538 subscriber_app_id,
539 callback,
540 } => {
541 let subscriptions = self
542 .state
543 .system
544 .event_subscriptions
545 .get_mut_or_default(&(chain_id, stream_id.clone()))
546 .await?;
547 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
548 subscriptions.next_index
549 } else {
550 0
551 };
552 self.txn_tracker.add_stream_to_process(
553 subscriber_app_id,
554 chain_id,
555 stream_id,
556 0,
557 next_index,
558 );
559 callback.respond(());
560 }
561
562 UnsubscribeFromEvents {
563 chain_id,
564 stream_id,
565 subscriber_app_id,
566 callback,
567 } => {
568 let key = (chain_id, stream_id.clone());
569 let subscriptions = self
570 .state
571 .system
572 .event_subscriptions
573 .get_mut_or_default(&key)
574 .await?;
575 subscriptions.applications.remove(&subscriber_app_id);
576 if subscriptions.applications.is_empty() {
577 self.state.system.event_subscriptions.remove(&key)?;
578 }
579 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
580 self.txn_tracker
581 .remove_stream_to_process(app_id, chain_id, stream_id);
582 }
583 callback.respond(());
584 }
585
586 GetApplicationPermissions { callback } => {
587 let app_permissions = self.state.system.application_permissions.get();
588 callback.respond(app_permissions.clone());
589 }
590
591 QueryServiceOracle {
592 deadline,
593 application_id,
594 next_block_height,
595 query,
596 callback,
597 } => {
598 let state = &mut self.state;
599 let local_time = self.txn_tracker.local_time();
600 let created_blobs = self.txn_tracker.created_blobs().clone();
601 let bytes = self
602 .txn_tracker
603 .oracle(|| async {
604 let context = QueryContext {
605 chain_id: state.context().extra().chain_id(),
606 next_block_height,
607 local_time,
608 };
609 let QueryOutcome {
610 response,
611 operations,
612 } = Box::pin(state.query_user_application_with_deadline(
613 application_id,
614 context,
615 query,
616 deadline,
617 created_blobs,
618 ))
619 .await?;
620 ensure!(
621 operations.is_empty(),
622 ExecutionError::ServiceOracleQueryOperations(operations)
623 );
624 Ok(OracleResponse::Service(response))
625 })
626 .await?
627 .to_service_response()?;
628 callback.respond(bytes);
629 }
630
631 AddOutgoingMessage { message, callback } => {
632 self.txn_tracker.add_outgoing_message(message);
633 callback.respond(());
634 }
635
636 SetLocalTime {
637 local_time,
638 callback,
639 } => {
640 self.txn_tracker.set_local_time(local_time);
641 callback.respond(());
642 }
643
644 AssertBefore {
645 timestamp,
646 callback,
647 } => {
648 let result = if !self
649 .txn_tracker
650 .replay_oracle_response(OracleResponse::Assert)?
651 {
652 let local_time = self.txn_tracker.local_time();
654 if local_time >= timestamp {
655 Err(ExecutionError::AssertBefore {
656 timestamp,
657 local_time,
658 })
659 } else {
660 Ok(())
661 }
662 } else {
663 Ok(())
664 };
665 callback.respond(result);
666 }
667
668 AddCreatedBlob { blob, callback } => {
669 self.txn_tracker.add_created_blob(blob);
670 callback.respond(());
671 }
672
673 ValidationRound { round, callback } => {
674 let validation_round = self
675 .txn_tracker
676 .oracle(|| async { Ok(OracleResponse::Round(round)) })
677 .await?
678 .to_round()?;
679 callback.respond(validation_round);
680 }
681
682 AllowApplicationLogs { callback } => {
683 let allow = self
684 .state
685 .context()
686 .extra()
687 .execution_runtime_config()
688 .allow_application_logs;
689 callback.respond(allow);
690 }
691
692 #[cfg(web)]
693 Log { message, level } => {
694 let formatted: js_sys::JsString = format!("[CONTRACT {level}] {message}").into();
696 match level {
697 tracing::log::Level::Trace | tracing::log::Level::Debug => {
698 web_sys::console::debug_1(&formatted)
699 }
700 tracing::log::Level::Info => web_sys::console::log_1(&formatted),
701 tracing::log::Level::Warn => web_sys::console::warn_1(&formatted),
702 tracing::log::Level::Error => web_sys::console::error_1(&formatted),
703 }
704 }
705 }
706
707 Ok(())
708 }
709
710 async fn process_subscriptions(
713 &mut self,
714 context: ProcessStreamsContext,
715 ) -> Result<(), ExecutionError> {
716 let mut processed = BTreeSet::new();
719 loop {
720 let to_process = self
721 .txn_tracker
722 .take_streams_to_process()
723 .into_iter()
724 .filter_map(|(app_id, updates)| {
725 let updates = updates
726 .into_iter()
727 .filter_map(|update| {
728 if !processed.insert((
729 app_id,
730 update.chain_id,
731 update.stream_id.clone(),
732 )) {
733 return None;
734 }
735 Some(update)
736 })
737 .collect::<Vec<_>>();
738 if updates.is_empty() {
739 return None;
740 }
741 Some((app_id, updates))
742 })
743 .collect::<BTreeMap<_, _>>();
744 if to_process.is_empty() {
745 return Ok(());
746 }
747 for (app_id, updates) in to_process {
748 self.run_user_action(
749 app_id,
750 UserAction::ProcessStreams(context, updates),
751 None,
752 None,
753 )
754 .await?;
755 }
756 }
757 }
758
759 pub(crate) async fn run_user_action(
760 &mut self,
761 application_id: ApplicationId,
762 action: UserAction,
763 refund_grant_to: Option<Account>,
764 grant: Option<&mut Amount>,
765 ) -> Result<(), ExecutionError> {
766 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
767 .await
768 }
769
770 pub(crate) async fn service_and_dependencies(
772 &mut self,
773 application: ApplicationId,
774 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
775 let mut stack = vec![application];
778 let mut codes = vec![];
779 let mut descriptions = vec![];
780
781 while let Some(id) = stack.pop() {
782 let (code, description) = self.load_service(id).await?;
783 stack.extend(description.required_application_ids.iter().rev().copied());
784 codes.push(code);
785 descriptions.push(description);
786 }
787
788 codes.reverse();
789 descriptions.reverse();
790
791 Ok((codes, descriptions))
792 }
793
794 async fn contract_and_dependencies(
796 &mut self,
797 application: ApplicationId,
798 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
799 let mut stack = vec![application];
802 let mut codes = vec![];
803 let mut descriptions = vec![];
804
805 while let Some(id) = stack.pop() {
806 let (code, description) = self.load_contract(id).await?;
807 stack.extend(description.required_application_ids.iter().rev().copied());
808 codes.push(code);
809 descriptions.push(description);
810 }
811
812 codes.reverse();
813 descriptions.reverse();
814
815 Ok((codes, descriptions))
816 }
817
818 async fn run_user_action_with_runtime(
819 &mut self,
820 application_id: ApplicationId,
821 action: UserAction,
822 refund_grant_to: Option<Account>,
823 grant: Option<&mut Amount>,
824 ) -> Result<(), ExecutionError> {
825 let chain_id = self.state.context().extra().chain_id();
826 let mut cloned_grant = grant.as_ref().map(|x| **x);
827 let initial_balance = self
828 .resource_controller
829 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
830 .await?
831 .balance()?;
832 let controller = ResourceController::new(
833 self.resource_controller.policy().clone(),
834 self.resource_controller.tracker,
835 initial_balance,
836 );
837 let (execution_state_sender, mut execution_state_receiver) =
838 futures::channel::mpsc::unbounded();
839
840 let (codes, descriptions): (Vec<_>, Vec<_>) =
841 self.contract_and_dependencies(application_id).await?;
842
843 let allow_application_logs = self
844 .state
845 .context()
846 .extra()
847 .execution_runtime_config()
848 .allow_application_logs;
849
850 let contract_runtime_task = self
851 .state
852 .context()
853 .extra()
854 .thread_pool()
855 .run_send(JsVec(codes), move |codes| async move {
856 let runtime = ContractSyncRuntime::new(
857 execution_state_sender,
858 chain_id,
859 refund_grant_to,
860 controller,
861 &action,
862 allow_application_logs,
863 );
864
865 for (code, description) in codes.0.into_iter().zip(descriptions) {
866 runtime.preload_contract(
867 ApplicationId::from(&description),
868 code,
869 description,
870 )?;
871 }
872
873 runtime.run_action(application_id, chain_id, action)
874 })
875 .await;
876
877 while let Some(request) = execution_state_receiver.next().await {
878 self.handle_request(request).await?;
879 }
880
881 let (result, controller) = contract_runtime_task.await??;
882
883 self.txn_tracker.add_operation_result(result);
884
885 self.resource_controller
886 .with_state_and_grant(&mut self.state.system, grant)
887 .await?
888 .merge_balance(initial_balance, controller.balance()?)?;
889 self.resource_controller.tracker = controller.tracker;
890
891 Ok(())
892 }
893
894 pub async fn execute_operation(
895 &mut self,
896 context: OperationContext,
897 operation: Operation,
898 ) -> Result<(), ExecutionError> {
899 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
900 match operation {
901 Operation::System(op) => {
902 let new_application = self
903 .state
904 .system
905 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
906 .await?;
907 if let Some((application_id, argument)) = new_application {
908 let user_action = UserAction::Instantiate(context, argument);
909 self.run_user_action(
910 application_id,
911 user_action,
912 context.refund_grant_to(),
913 None,
914 )
915 .await?;
916 }
917 }
918 Operation::User {
919 application_id,
920 bytes,
921 } => {
922 self.run_user_action(
923 application_id,
924 UserAction::Operation(context, bytes),
925 context.refund_grant_to(),
926 None,
927 )
928 .await?;
929 }
930 }
931 self.process_subscriptions(context.into()).await?;
932 Ok(())
933 }
934
935 pub async fn execute_message(
936 &mut self,
937 context: MessageContext,
938 message: Message,
939 grant: Option<&mut Amount>,
940 ) -> Result<(), ExecutionError> {
941 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
942 match message {
943 Message::System(message) => {
944 let outcome = self.state.system.execute_message(context, message).await?;
945 self.txn_tracker.add_outgoing_messages(outcome);
946 }
947 Message::User {
948 application_id,
949 bytes,
950 } => {
951 self.run_user_action(
952 application_id,
953 UserAction::Message(context, bytes),
954 context.refund_grant_to,
955 grant,
956 )
957 .await?;
958 }
959 }
960 self.process_subscriptions(context.into()).await?;
961 Ok(())
962 }
963
964 pub fn bounce_message(
965 &mut self,
966 context: MessageContext,
967 grant: Amount,
968 message: Message,
969 ) -> Result<(), ExecutionError> {
970 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
971 self.txn_tracker.add_outgoing_message(OutgoingMessage {
972 destination: context.origin,
973 authenticated_signer: context.authenticated_signer,
974 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
975 grant,
976 kind: MessageKind::Bouncing,
977 message,
978 });
979 Ok(())
980 }
981
982 pub fn send_refund(
983 &mut self,
984 context: MessageContext,
985 amount: Amount,
986 ) -> Result<(), ExecutionError> {
987 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
988 if amount.is_zero() {
989 return Ok(());
990 }
991 let Some(account) = context.refund_grant_to else {
992 return Err(ExecutionError::InternalError(
993 "Messages with grants should have a non-empty `refund_grant_to`",
994 ));
995 };
996 let message = SystemMessage::Credit {
997 amount,
998 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
999 target: account.owner,
1000 };
1001 self.txn_tracker.add_outgoing_message(
1002 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1003 );
1004 Ok(())
1005 }
1006
1007 async fn receive_http_response(
1011 response: reqwest::Response,
1012 size_limit: u64,
1013 ) -> Result<http::Response, ExecutionError> {
1014 let status = response.status().as_u16();
1015 let maybe_content_length = response.content_length();
1016
1017 let headers = response
1018 .headers()
1019 .iter()
1020 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1021 .collect::<Vec<_>>();
1022
1023 let total_header_size = headers
1024 .iter()
1025 .map(|header| (header.name.len() + header.value.len()) as u64)
1026 .sum();
1027
1028 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1029 ExecutionError::HttpResponseSizeLimitExceeded {
1030 limit: size_limit,
1031 size: total_header_size,
1032 },
1033 )?;
1034
1035 if let Some(content_length) = maybe_content_length {
1036 if content_length > remaining_bytes {
1037 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1038 limit: size_limit,
1039 size: content_length + total_header_size,
1040 });
1041 }
1042 }
1043
1044 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1045 let mut body_stream = response.bytes_stream();
1046
1047 while let Some(bytes) = body_stream.next().await.transpose()? {
1048 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1049 ExecutionError::HttpResponseSizeLimitExceeded {
1050 limit: size_limit,
1051 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1052 },
1053 )?;
1054
1055 body.extend(&bytes);
1056 }
1057
1058 Ok(http::Response {
1059 status,
1060 headers,
1061 body,
1062 })
1063 }
1064}
1065
1066#[derive(Debug)]
1068pub enum ExecutionRequest {
1069 #[cfg(not(web))]
1070 LoadContract {
1071 id: ApplicationId,
1072 #[debug(skip)]
1073 callback: Sender<(UserContractCode, ApplicationDescription)>,
1074 },
1075
1076 #[cfg(not(web))]
1077 LoadService {
1078 id: ApplicationId,
1079 #[debug(skip)]
1080 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1081 },
1082
1083 ChainBalance {
1084 #[debug(skip)]
1085 callback: Sender<Amount>,
1086 },
1087
1088 OwnerBalance {
1089 owner: AccountOwner,
1090 #[debug(skip)]
1091 callback: Sender<Amount>,
1092 },
1093
1094 OwnerBalances {
1095 #[debug(skip)]
1096 callback: Sender<Vec<(AccountOwner, Amount)>>,
1097 },
1098
1099 BalanceOwners {
1100 #[debug(skip)]
1101 callback: Sender<Vec<AccountOwner>>,
1102 },
1103
1104 Transfer {
1105 source: AccountOwner,
1106 destination: Account,
1107 amount: Amount,
1108 #[debug(skip_if = Option::is_none)]
1109 signer: Option<AccountOwner>,
1110 application_id: ApplicationId,
1111 #[debug(skip)]
1112 callback: Sender<()>,
1113 },
1114
1115 Claim {
1116 source: Account,
1117 destination: Account,
1118 amount: Amount,
1119 #[debug(skip_if = Option::is_none)]
1120 signer: Option<AccountOwner>,
1121 application_id: ApplicationId,
1122 #[debug(skip)]
1123 callback: Sender<()>,
1124 },
1125
1126 SystemTimestamp {
1127 #[debug(skip)]
1128 callback: Sender<Timestamp>,
1129 },
1130
1131 ChainOwnership {
1132 #[debug(skip)]
1133 callback: Sender<ChainOwnership>,
1134 },
1135
1136 ApplicationPermissions {
1137 #[debug(skip)]
1138 callback: Sender<ApplicationPermissions>,
1139 },
1140
1141 ReadValueBytes {
1142 id: ApplicationId,
1143 #[debug(with = hex_debug)]
1144 key: Vec<u8>,
1145 #[debug(skip)]
1146 callback: Sender<Option<Vec<u8>>>,
1147 },
1148
1149 ContainsKey {
1150 id: ApplicationId,
1151 key: Vec<u8>,
1152 #[debug(skip)]
1153 callback: Sender<bool>,
1154 },
1155
1156 ContainsKeys {
1157 id: ApplicationId,
1158 #[debug(with = hex_vec_debug)]
1159 keys: Vec<Vec<u8>>,
1160 callback: Sender<Vec<bool>>,
1161 },
1162
1163 ReadMultiValuesBytes {
1164 id: ApplicationId,
1165 #[debug(with = hex_vec_debug)]
1166 keys: Vec<Vec<u8>>,
1167 #[debug(skip)]
1168 callback: Sender<Vec<Option<Vec<u8>>>>,
1169 },
1170
1171 FindKeysByPrefix {
1172 id: ApplicationId,
1173 #[debug(with = hex_debug)]
1174 key_prefix: Vec<u8>,
1175 #[debug(skip)]
1176 callback: Sender<Vec<Vec<u8>>>,
1177 },
1178
1179 FindKeyValuesByPrefix {
1180 id: ApplicationId,
1181 #[debug(with = hex_debug)]
1182 key_prefix: Vec<u8>,
1183 #[debug(skip)]
1184 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1185 },
1186
1187 WriteBatch {
1188 id: ApplicationId,
1189 batch: Batch,
1190 #[debug(skip)]
1191 callback: Sender<()>,
1192 },
1193
1194 OpenChain {
1195 ownership: ChainOwnership,
1196 #[debug(skip_if = Amount::is_zero)]
1197 balance: Amount,
1198 parent_id: ChainId,
1199 block_height: BlockHeight,
1200 application_permissions: ApplicationPermissions,
1201 timestamp: Timestamp,
1202 #[debug(skip)]
1203 callback: Sender<ChainId>,
1204 },
1205
1206 CloseChain {
1207 application_id: ApplicationId,
1208 #[debug(skip)]
1209 callback: Sender<Result<(), ExecutionError>>,
1210 },
1211
1212 ChangeOwnership {
1213 application_id: ApplicationId,
1214 ownership: ChainOwnership,
1215 #[debug(skip)]
1216 callback: Sender<Result<(), ExecutionError>>,
1217 },
1218
1219 ChangeApplicationPermissions {
1220 application_id: ApplicationId,
1221 application_permissions: ApplicationPermissions,
1222 #[debug(skip)]
1223 callback: Sender<Result<(), ExecutionError>>,
1224 },
1225
1226 CreateApplication {
1227 chain_id: ChainId,
1228 block_height: BlockHeight,
1229 module_id: ModuleId,
1230 parameters: Vec<u8>,
1231 required_application_ids: Vec<ApplicationId>,
1232 #[debug(skip)]
1233 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1234 },
1235
1236 PerformHttpRequest {
1237 request: http::Request,
1238 http_responses_are_oracle_responses: bool,
1239 #[debug(skip)]
1240 callback: Sender<http::Response>,
1241 },
1242
1243 ReadBlobContent {
1244 blob_id: BlobId,
1245 #[debug(skip)]
1246 callback: Sender<BlobContent>,
1247 },
1248
1249 AssertBlobExists {
1250 blob_id: BlobId,
1251 #[debug(skip)]
1252 callback: Sender<()>,
1253 },
1254
1255 Emit {
1256 stream_id: StreamId,
1257 #[debug(with = hex_debug)]
1258 value: Vec<u8>,
1259 #[debug(skip)]
1260 callback: Sender<u32>,
1261 },
1262
1263 ReadEvent {
1264 event_id: EventId,
1265 callback: oneshot::Sender<Vec<u8>>,
1266 },
1267
1268 SubscribeToEvents {
1269 chain_id: ChainId,
1270 stream_id: StreamId,
1271 subscriber_app_id: ApplicationId,
1272 #[debug(skip)]
1273 callback: Sender<()>,
1274 },
1275
1276 UnsubscribeFromEvents {
1277 chain_id: ChainId,
1278 stream_id: StreamId,
1279 subscriber_app_id: ApplicationId,
1280 #[debug(skip)]
1281 callback: Sender<()>,
1282 },
1283
1284 GetApplicationPermissions {
1285 #[debug(skip)]
1286 callback: Sender<ApplicationPermissions>,
1287 },
1288
1289 QueryServiceOracle {
1290 deadline: Option<Instant>,
1291 application_id: ApplicationId,
1292 next_block_height: BlockHeight,
1293 query: Vec<u8>,
1294 #[debug(skip)]
1295 callback: Sender<Vec<u8>>,
1296 },
1297
1298 AddOutgoingMessage {
1299 message: crate::OutgoingMessage,
1300 #[debug(skip)]
1301 callback: Sender<()>,
1302 },
1303
1304 SetLocalTime {
1305 local_time: Timestamp,
1306 #[debug(skip)]
1307 callback: Sender<()>,
1308 },
1309
1310 AssertBefore {
1311 timestamp: Timestamp,
1312 #[debug(skip)]
1313 callback: Sender<Result<(), ExecutionError>>,
1314 },
1315
1316 AddCreatedBlob {
1317 blob: crate::Blob,
1318 #[debug(skip)]
1319 callback: Sender<()>,
1320 },
1321
1322 ValidationRound {
1323 round: Option<u32>,
1324 #[debug(skip)]
1325 callback: Sender<Option<u32>>,
1326 },
1327
1328 AllowApplicationLogs {
1329 #[debug(skip)]
1330 callback: Sender<bool>,
1331 },
1332
1333 #[cfg(web)]
1335 Log {
1336 message: String,
1337 level: tracing::log::Level,
1338 },
1339}