1use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15 Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19 ownership::ChainOwnership,
20 time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25use tracing::{info_span, instrument, Instrument as _};
26
27use crate::{
28 execution::UserAction,
29 runtime::ContractSyncRuntime,
30 system::{CreateApplicationResult, OpenChainConfig},
31 util::{OracleResponseExt as _, RespondExt as _},
32 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
33 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
34 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
35 ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
36};
37
38pub struct ExecutionStateActor<'a, C> {
40 state: &'a mut ExecutionStateView<C>,
41 txn_tracker: &'a mut TransactionTracker,
42 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
43}
44
45#[cfg(with_metrics)]
46mod metrics {
47 use std::sync::LazyLock;
48
49 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
50 use prometheus::HistogramVec;
51
52 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54 register_histogram_vec(
55 "load_contract_latency",
56 "Load contract latency",
57 &[],
58 exponential_bucket_latencies(250.0),
59 )
60 });
61
62 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
64 register_histogram_vec(
65 "load_service_latency",
66 "Load service latency",
67 &[],
68 exponential_bucket_latencies(250.0),
69 )
70 });
71}
72
73pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
74
75impl<'a, C> ExecutionStateActor<'a, C>
76where
77 C: Context + Clone + 'static,
78 C::Extra: ExecutionRuntimeContext,
79{
80 pub fn new(
82 state: &'a mut ExecutionStateView<C>,
83 txn_tracker: &'a mut TransactionTracker,
84 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
85 ) -> Self {
86 Self {
87 state,
88 txn_tracker,
89 resource_controller,
90 }
91 }
92
93 #[instrument(skip_all, fields(application_id = %id))]
94 pub(crate) async fn load_contract(
95 &mut self,
96 id: ApplicationId,
97 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
98 #[cfg(with_metrics)]
99 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
100 let blob_id = id.description_blob_id();
101 let description = match self.txn_tracker.get_blob_content(&blob_id) {
102 Some(blob) => bcs::from_bytes(blob.bytes())?,
103 None => {
104 self.state
105 .system
106 .describe_application(id, self.txn_tracker)
107 .await?
108 }
109 };
110 let code = self
111 .state
112 .context()
113 .extra()
114 .get_user_contract(&description, self.txn_tracker)
115 .await?;
116 Ok((code, description))
117 }
118
119 pub(crate) async fn load_service(
120 &mut self,
121 id: ApplicationId,
122 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
123 #[cfg(with_metrics)]
124 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
125 let blob_id = id.description_blob_id();
126 let description = match self.txn_tracker.get_blob_content(&blob_id) {
127 Some(blob) => bcs::from_bytes(blob.bytes())?,
128 None => {
129 self.state
130 .system
131 .describe_application(id, self.txn_tracker)
132 .await?
133 }
134 };
135 let code = self
136 .state
137 .context()
138 .extra()
139 .get_user_service(&description, self.txn_tracker)
140 .await?;
141 Ok((code, description))
142 }
143
144 #[instrument(
146 skip_all,
147 fields(request_type = %request.as_ref())
148 )]
149 pub(crate) async fn handle_request(
150 &mut self,
151 request: ExecutionRequest,
152 ) -> Result<(), ExecutionError> {
153 use ExecutionRequest::*;
154 match request {
155 #[cfg(not(web))]
156 LoadContract { id, callback } => {
157 let (code, description) = self.load_contract(id).await?;
158 callback.respond((code, description))
159 }
160 #[cfg(not(web))]
161 LoadService { id, callback } => {
162 let (code, description) = self.load_service(id).await?;
163 callback.respond((code, description))
164 }
165
166 ChainBalance { callback } => {
167 let balance = *self.state.system.balance.get();
168 callback.respond(balance);
169 }
170
171 OwnerBalance { owner, callback } => {
172 let balance = self
173 .state
174 .system
175 .balances
176 .get(&owner)
177 .await?
178 .unwrap_or_default();
179 callback.respond(balance);
180 }
181
182 OwnerBalances { callback } => {
183 let balances = self.state.system.balances.index_values().await?;
184 callback.respond(balances.into_iter().collect());
185 }
186
187 BalanceOwners { callback } => {
188 let owners = self.state.system.balances.indices().await?;
189 callback.respond(owners);
190 }
191
192 Transfer {
193 source,
194 destination,
195 amount,
196 signer,
197 application_id,
198 callback,
199 } => {
200 let maybe_message = self
201 .state
202 .system
203 .transfer(signer, Some(application_id), source, destination, amount)
204 .await?;
205 self.txn_tracker.add_outgoing_messages(maybe_message);
206 callback.respond(());
207 }
208
209 Claim {
210 source,
211 destination,
212 amount,
213 signer,
214 application_id,
215 callback,
216 } => {
217 let maybe_message = self
218 .state
219 .system
220 .claim(
221 signer,
222 Some(application_id),
223 source.owner,
224 source.chain_id,
225 destination,
226 amount,
227 )
228 .await?;
229 self.txn_tracker.add_outgoing_messages(maybe_message);
230 callback.respond(());
231 }
232
233 SystemTimestamp { callback } => {
234 let timestamp = *self.state.system.timestamp.get();
235 callback.respond(timestamp);
236 }
237
238 ChainOwnership { callback } => {
239 let ownership = self.state.system.ownership.get().clone();
240 callback.respond(ownership);
241 }
242
243 ApplicationPermissions { callback } => {
244 let permissions = self.state.system.application_permissions.get().clone();
245 callback.respond(permissions);
246 }
247
248 ReadApplicationDescription {
249 application_id,
250 callback,
251 } => {
252 let blob_id = application_id.description_blob_id();
253 let description = match self.txn_tracker.get_blob_content(&blob_id) {
254 Some(blob) => bcs::from_bytes(blob.bytes())?,
255 None => {
256 let blob_content = self.state.system.read_blob_content(blob_id).await?;
257 self.state
258 .system
259 .blob_used(self.txn_tracker, blob_id)
260 .await?;
261 bcs::from_bytes(blob_content.bytes())?
262 }
263 };
264 callback.respond(description);
265 }
266
267 ContainsKey { id, key, callback } => {
268 let view = self.state.users.try_load_entry(&id).await?;
269 let result = match view {
270 Some(view) => view.contains_key(&key).await?,
271 None => false,
272 };
273 callback.respond(result);
274 }
275
276 ContainsKeys { id, keys, callback } => {
277 let view = self.state.users.try_load_entry(&id).await?;
278 let result = match view {
279 Some(view) => view.contains_keys(&keys).await?,
280 None => vec![false; keys.len()],
281 };
282 callback.respond(result);
283 }
284
285 ReadMultiValuesBytes { id, keys, callback } => {
286 let view = self.state.users.try_load_entry(&id).await?;
287 let values = match view {
288 Some(view) => view.multi_get(&keys).await?,
289 None => vec![None; keys.len()],
290 };
291 callback.respond(values);
292 }
293
294 ReadValueBytes { id, key, callback } => {
295 let view = self.state.users.try_load_entry(&id).await?;
296 let result = match view {
297 Some(view) => view.get(&key).await?,
298 None => None,
299 };
300 callback.respond(result);
301 }
302
303 FindKeysByPrefix {
304 id,
305 key_prefix,
306 callback,
307 } => {
308 let view = self.state.users.try_load_entry(&id).await?;
309 let result = match view {
310 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
311 None => Vec::new(),
312 };
313 callback.respond(result);
314 }
315
316 FindKeyValuesByPrefix {
317 id,
318 key_prefix,
319 callback,
320 } => {
321 let view = self.state.users.try_load_entry(&id).await?;
322 let result = match view {
323 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
324 None => Vec::new(),
325 };
326 callback.respond(result);
327 }
328
329 WriteBatch {
330 id,
331 batch,
332 callback,
333 } => {
334 let mut view = self.state.users.try_load_entry_mut(&id).await?;
335 view.write_batch(batch).await?;
336 callback.respond(());
337 }
338
339 OpenChain {
340 ownership,
341 balance,
342 parent_id,
343 block_height,
344 application_permissions,
345 timestamp,
346 callback,
347 } => {
348 let config = OpenChainConfig {
349 ownership,
350 balance,
351 application_permissions,
352 };
353 let chain_id = self
354 .state
355 .system
356 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
357 .await?;
358 callback.respond(chain_id);
359 }
360
361 CloseChain {
362 application_id,
363 callback,
364 } => {
365 let app_permissions = self.state.system.application_permissions.get();
366 if !app_permissions.can_close_chain(&application_id) {
367 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
368 } else {
369 self.state.system.close_chain().await?;
370 callback.respond(Ok(()));
371 }
372 }
373
374 ChangeOwnership {
375 application_id,
376 ownership,
377 callback,
378 } => {
379 let app_permissions = self.state.system.application_permissions.get();
380 if !app_permissions.can_close_chain(&application_id) {
381 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
382 } else {
383 self.state.system.ownership.set(ownership);
384 callback.respond(Ok(()));
385 }
386 }
387
388 ChangeApplicationPermissions {
389 application_id,
390 application_permissions,
391 callback,
392 } => {
393 let app_permissions = self.state.system.application_permissions.get();
394 if !app_permissions.can_change_application_permissions(&application_id) {
395 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
396 } else {
397 self.state
398 .system
399 .application_permissions
400 .set(application_permissions);
401 callback.respond(Ok(()));
402 }
403 }
404
405 CreateApplication {
406 chain_id,
407 block_height,
408 module_id,
409 parameters,
410 required_application_ids,
411 callback,
412 } => {
413 let create_application_result = self
414 .state
415 .system
416 .create_application(
417 chain_id,
418 block_height,
419 module_id,
420 parameters,
421 required_application_ids,
422 self.txn_tracker,
423 )
424 .await?;
425 callback.respond(Ok(create_application_result));
426 }
427
428 PerformHttpRequest {
429 request,
430 http_responses_are_oracle_responses,
431 callback,
432 } => {
433 let system = &mut self.state.system;
434 let response = self
435 .txn_tracker
436 .oracle(|| async {
437 let headers = request
438 .headers
439 .into_iter()
440 .map(|http::Header { name, value }| {
441 Ok((name.parse()?, value.try_into()?))
442 })
443 .collect::<Result<HeaderMap, ExecutionError>>()?;
444
445 let url = Url::parse(&request.url)?;
446 let host = url
447 .host_str()
448 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
449
450 let (_epoch, committee) = system
451 .current_committee()
452 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
453 let allowed_hosts = &committee.policy().http_request_allow_list;
454
455 ensure!(
456 allowed_hosts.contains(host),
457 ExecutionError::UnauthorizedHttpRequest(url)
458 );
459
460 let request = Client::new()
461 .request(request.method.into(), url)
462 .body(request.body)
463 .headers(headers);
464 #[cfg(not(web))]
465 let request = request.timeout(linera_base::time::Duration::from_millis(
466 committee.policy().http_request_timeout_ms,
467 ));
468
469 let response = request.send().await?;
470
471 let mut response_size_limit =
472 committee.policy().maximum_http_response_bytes;
473
474 if http_responses_are_oracle_responses {
475 response_size_limit = response_size_limit
476 .min(committee.policy().maximum_oracle_response_bytes);
477 }
478 Ok(OracleResponse::Http(
479 Self::receive_http_response(response, response_size_limit).await?,
480 ))
481 })
482 .await?
483 .to_http_response()?;
484 callback.respond(response);
485 }
486
487 ReadBlobContent { blob_id, callback } => {
488 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
489 content.clone()
490 } else {
491 let content = self.state.system.read_blob_content(blob_id).await?;
492 if blob_id.blob_type == BlobType::Data {
493 self.resource_controller
494 .with_state(&mut self.state.system)
495 .await?
496 .track_blob_read(content.bytes().len() as u64)?;
497 }
498 self.state
499 .system
500 .blob_used(self.txn_tracker, blob_id)
501 .await?;
502 content
503 };
504 callback.respond(content)
505 }
506
507 AssertBlobExists { blob_id, callback } => {
508 self.state.system.assert_blob_exists(blob_id).await?;
509 if blob_id.blob_type == BlobType::Data {
511 self.resource_controller
512 .with_state(&mut self.state.system)
513 .await?
514 .track_blob_read(0)?;
515 }
516 let is_new = self
517 .state
518 .system
519 .blob_used(self.txn_tracker, blob_id)
520 .await?;
521 if is_new {
522 self.txn_tracker
523 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
524 }
525 callback.respond(());
526 }
527
528 Emit {
529 stream_id,
530 value,
531 callback,
532 } => {
533 let count = self
534 .state
535 .stream_event_counts
536 .get_mut_or_default(&stream_id)
537 .await?;
538 let index = *count;
539 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
540 self.txn_tracker.add_event(stream_id, index, value);
541 callback.respond(index)
542 }
543
544 ReadEvent { event_id, callback } => {
545 let extra = self.state.context().extra();
546 let event = self
547 .txn_tracker
548 .oracle(|| async {
549 let event = extra
550 .get_event(event_id.clone())
551 .await?
552 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
553 Ok(OracleResponse::Event(event_id.clone(), event))
554 })
555 .await?
556 .to_event(&event_id)?;
557 callback.respond(event);
558 }
559
560 SubscribeToEvents {
561 chain_id,
562 stream_id,
563 subscriber_app_id,
564 callback,
565 } => {
566 let subscriptions = self
567 .state
568 .system
569 .event_subscriptions
570 .get_mut_or_default(&(chain_id, stream_id.clone()))
571 .await?;
572 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
573 subscriptions.next_index
574 } else {
575 0
576 };
577 self.txn_tracker.add_stream_to_process(
578 subscriber_app_id,
579 chain_id,
580 stream_id,
581 0,
582 next_index,
583 );
584 callback.respond(());
585 }
586
587 UnsubscribeFromEvents {
588 chain_id,
589 stream_id,
590 subscriber_app_id,
591 callback,
592 } => {
593 let key = (chain_id, stream_id.clone());
594 let subscriptions = self
595 .state
596 .system
597 .event_subscriptions
598 .get_mut_or_default(&key)
599 .await?;
600 subscriptions.applications.remove(&subscriber_app_id);
601 if subscriptions.applications.is_empty() {
602 self.state.system.event_subscriptions.remove(&key)?;
603 }
604 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
605 self.txn_tracker
606 .remove_stream_to_process(app_id, chain_id, stream_id);
607 }
608 callback.respond(());
609 }
610
611 GetApplicationPermissions { callback } => {
612 let app_permissions = self.state.system.application_permissions.get();
613 callback.respond(app_permissions.clone());
614 }
615
616 QueryServiceOracle {
617 deadline,
618 application_id,
619 next_block_height,
620 query,
621 callback,
622 } => {
623 let state = &mut self.state;
624 let local_time = self.txn_tracker.local_time();
625 let created_blobs = self.txn_tracker.created_blobs().clone();
626 let bytes = self
627 .txn_tracker
628 .oracle(|| async {
629 let context = QueryContext {
630 chain_id: state.context().extra().chain_id(),
631 next_block_height,
632 local_time,
633 };
634 let QueryOutcome {
635 response,
636 operations,
637 } = Box::pin(state.query_user_application_with_deadline(
638 application_id,
639 context,
640 query,
641 deadline,
642 created_blobs,
643 ))
644 .await?;
645 ensure!(
646 operations.is_empty(),
647 ExecutionError::ServiceOracleQueryOperations(operations)
648 );
649 Ok(OracleResponse::Service(response))
650 })
651 .await?
652 .to_service_response()?;
653 callback.respond(bytes);
654 }
655
656 AddOutgoingMessage { message, callback } => {
657 self.txn_tracker.add_outgoing_message(message);
658 callback.respond(());
659 }
660
661 SetLocalTime {
662 local_time,
663 callback,
664 } => {
665 self.txn_tracker.set_local_time(local_time);
666 callback.respond(());
667 }
668
669 AssertBefore {
670 timestamp,
671 callback,
672 } => {
673 let result = if !self
674 .txn_tracker
675 .replay_oracle_response(OracleResponse::Assert)?
676 {
677 let local_time = self.txn_tracker.local_time();
679 if local_time >= timestamp {
680 Err(ExecutionError::AssertBefore {
681 timestamp,
682 local_time,
683 })
684 } else {
685 Ok(())
686 }
687 } else {
688 Ok(())
689 };
690 callback.respond(result);
691 }
692
693 AddCreatedBlob { blob, callback } => {
694 if self.resource_controller.is_free {
695 self.txn_tracker.mark_blob_free(blob.id());
696 }
697 self.txn_tracker.add_created_blob(blob);
698 callback.respond(());
699 }
700
701 ValidationRound { round, callback } => {
702 let validation_round = self
703 .txn_tracker
704 .oracle(|| async { Ok(OracleResponse::Round(round)) })
705 .await?
706 .to_round()?;
707 callback.respond(validation_round);
708 }
709
710 AllowApplicationLogs { callback } => {
711 let allow = self
712 .state
713 .context()
714 .extra()
715 .execution_runtime_config()
716 .allow_application_logs;
717 callback.respond(allow);
718 }
719
720 #[cfg(web)]
721 Log { message, level } => match level {
722 tracing::log::Level::Trace | tracing::log::Level::Debug => {
723 tracing::debug!(target: "user_application_log", message = %message);
724 }
725 tracing::log::Level::Info => {
726 tracing::info!(target: "user_application_log", message = %message);
727 }
728 tracing::log::Level::Warn => {
729 tracing::warn!(target: "user_application_log", message = %message);
730 }
731 tracing::log::Level::Error => {
732 tracing::error!(target: "user_application_log", message = %message);
733 }
734 },
735 }
736
737 Ok(())
738 }
739
740 #[instrument(skip_all)]
743 async fn process_subscriptions(
744 &mut self,
745 context: ProcessStreamsContext,
746 ) -> Result<(), ExecutionError> {
747 let mut processed = BTreeSet::new();
750 loop {
751 let to_process = self
752 .txn_tracker
753 .take_streams_to_process()
754 .into_iter()
755 .filter_map(|(app_id, updates)| {
756 let updates = updates
757 .into_iter()
758 .filter_map(|update| {
759 if !processed.insert((
760 app_id,
761 update.chain_id,
762 update.stream_id.clone(),
763 )) {
764 return None;
765 }
766 Some(update)
767 })
768 .collect::<Vec<_>>();
769 if updates.is_empty() {
770 return None;
771 }
772 Some((app_id, updates))
773 })
774 .collect::<BTreeMap<_, _>>();
775 if to_process.is_empty() {
776 return Ok(());
777 }
778 for (app_id, updates) in to_process {
779 self.run_user_action(
780 app_id,
781 UserAction::ProcessStreams(context, updates),
782 None,
783 None,
784 )
785 .await?;
786 }
787 }
788 }
789
790 pub(crate) async fn run_user_action(
791 &mut self,
792 application_id: ApplicationId,
793 action: UserAction,
794 refund_grant_to: Option<Account>,
795 grant: Option<&mut Amount>,
796 ) -> Result<(), ExecutionError> {
797 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
798 .await
799 }
800
801 pub(crate) async fn service_and_dependencies(
803 &mut self,
804 application: ApplicationId,
805 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
806 let mut stack = vec![application];
809 let mut codes = vec![];
810 let mut descriptions = vec![];
811
812 while let Some(id) = stack.pop() {
813 let (code, description) = self.load_service(id).await?;
814 stack.extend(description.required_application_ids.iter().rev().copied());
815 codes.push(code);
816 descriptions.push(description);
817 }
818
819 codes.reverse();
820 descriptions.reverse();
821
822 Ok((codes, descriptions))
823 }
824
825 #[instrument(skip_all, fields(application_id = %application))]
827 async fn contract_and_dependencies(
828 &mut self,
829 application: ApplicationId,
830 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
831 let mut stack = vec![application];
834 let mut codes = vec![];
835 let mut descriptions = vec![];
836
837 while let Some(id) = stack.pop() {
838 let (code, description) = self.load_contract(id).await?;
839 stack.extend(description.required_application_ids.iter().rev().copied());
840 codes.push(code);
841 descriptions.push(description);
842 }
843
844 codes.reverse();
845 descriptions.reverse();
846
847 Ok((codes, descriptions))
848 }
849
850 #[instrument(skip_all, fields(application_id = %application_id))]
851 async fn run_user_action_with_runtime(
852 &mut self,
853 application_id: ApplicationId,
854 action: UserAction,
855 refund_grant_to: Option<Account>,
856 grant: Option<&mut Amount>,
857 ) -> Result<(), ExecutionError> {
858 let chain_id = self.state.context().extra().chain_id();
859 let mut cloned_grant = grant.as_ref().map(|x| **x);
860 let initial_balance = self
861 .resource_controller
862 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
863 .await?
864 .balance()?;
865 let mut controller = ResourceController::new(
866 self.resource_controller.policy().clone(),
867 self.resource_controller.tracker,
868 initial_balance,
869 );
870 let is_free = matches!(
871 &action,
872 UserAction::Message(..) | UserAction::ProcessStreams(..)
873 ) && self
874 .resource_controller
875 .policy()
876 .is_free_app(&application_id);
877 controller.is_free = is_free;
878 self.resource_controller.is_free = is_free;
879 let (execution_state_sender, mut execution_state_receiver) =
880 futures::channel::mpsc::unbounded();
881
882 let (codes, descriptions): (Vec<_>, Vec<_>) =
883 self.contract_and_dependencies(application_id).await?;
884
885 let allow_application_logs = self
886 .state
887 .context()
888 .extra()
889 .execution_runtime_config()
890 .allow_application_logs;
891
892 let contract_runtime_task = self
893 .state
894 .context()
895 .extra()
896 .thread_pool()
897 .run_send(JsVec(codes), move |codes| async move {
898 let runtime = ContractSyncRuntime::new(
899 execution_state_sender,
900 chain_id,
901 refund_grant_to,
902 controller,
903 &action,
904 allow_application_logs,
905 );
906
907 for (code, description) in codes.0.into_iter().zip(descriptions) {
908 runtime.preload_contract(
909 ApplicationId::from(&description),
910 code,
911 description,
912 )?;
913 }
914
915 runtime.run_action(application_id, chain_id, action)
916 })
917 .await;
918
919 async {
920 while let Some(request) = execution_state_receiver.next().await {
921 self.handle_request(request).await?;
922 }
923 Ok::<(), ExecutionError>(())
924 }
925 .instrument(info_span!("handle_runtime_requests"))
926 .await?;
927
928 let (result, controller) = contract_runtime_task.await??;
929
930 self.resource_controller.is_free = false;
931
932 self.txn_tracker.add_operation_result(result);
933
934 self.resource_controller
935 .with_state_and_grant(&mut self.state.system, grant)
936 .await?
937 .merge_balance(initial_balance, controller.balance()?)?;
938 self.resource_controller.tracker = controller.tracker;
939
940 Ok(())
941 }
942
943 #[instrument(skip_all, fields(
944 chain_id = %context.chain_id,
945 block_height = %context.height,
946 operation_type = %operation.as_ref(),
947 ))]
948 pub async fn execute_operation(
949 &mut self,
950 context: OperationContext,
951 operation: Operation,
952 ) -> Result<(), ExecutionError> {
953 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
954 match operation {
955 Operation::System(op) => {
956 let new_application = self
957 .state
958 .system
959 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
960 .await?;
961 if let Some((application_id, argument)) = new_application {
962 let user_action = UserAction::Instantiate(context, argument);
963 self.run_user_action(
964 application_id,
965 user_action,
966 context.refund_grant_to(),
967 None,
968 )
969 .await?;
970 }
971 }
972 Operation::User {
973 application_id,
974 bytes,
975 } => {
976 self.run_user_action(
977 application_id,
978 UserAction::Operation(context, bytes),
979 context.refund_grant_to(),
980 None,
981 )
982 .await?;
983 }
984 }
985 self.process_subscriptions(context.into()).await?;
986 Ok(())
987 }
988
989 #[instrument(skip_all, fields(
990 chain_id = %context.chain_id,
991 block_height = %context.height,
992 origin = %context.origin,
993 is_bouncing = %context.is_bouncing,
994 message_type = %message.as_ref(),
995 ))]
996 pub async fn execute_message(
997 &mut self,
998 context: MessageContext,
999 message: Message,
1000 grant: Option<&mut Amount>,
1001 ) -> Result<(), ExecutionError> {
1002 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1003 match message {
1004 Message::System(message) => {
1005 let outcome = self.state.system.execute_message(context, message).await?;
1006 self.txn_tracker.add_outgoing_messages(outcome);
1007 }
1008 Message::User {
1009 application_id,
1010 bytes,
1011 } => {
1012 self.run_user_action(
1013 application_id,
1014 UserAction::Message(context, bytes),
1015 context.refund_grant_to,
1016 grant,
1017 )
1018 .await?;
1019 }
1020 }
1021 self.process_subscriptions(context.into()).await?;
1022 Ok(())
1023 }
1024
1025 pub fn bounce_message(
1026 &mut self,
1027 context: MessageContext,
1028 grant: Amount,
1029 message: Message,
1030 ) -> Result<(), ExecutionError> {
1031 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1032 self.txn_tracker.add_outgoing_message(OutgoingMessage {
1033 destination: context.origin,
1034 authenticated_signer: context.authenticated_signer,
1035 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1036 grant,
1037 kind: MessageKind::Bouncing,
1038 message,
1039 });
1040 Ok(())
1041 }
1042
1043 pub fn send_refund(
1044 &mut self,
1045 context: MessageContext,
1046 amount: Amount,
1047 ) -> Result<(), ExecutionError> {
1048 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1049 if amount.is_zero() {
1050 return Ok(());
1051 }
1052 let Some(account) = context.refund_grant_to else {
1053 return Err(ExecutionError::InternalError(
1054 "Messages with grants should have a non-empty `refund_grant_to`",
1055 ));
1056 };
1057 let message = SystemMessage::Credit {
1058 amount,
1059 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
1060 target: account.owner,
1061 };
1062 self.txn_tracker.add_outgoing_message(
1063 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1064 );
1065 Ok(())
1066 }
1067
1068 async fn receive_http_response(
1072 response: reqwest::Response,
1073 size_limit: u64,
1074 ) -> Result<http::Response, ExecutionError> {
1075 let status = response.status().as_u16();
1076 let maybe_content_length = response.content_length();
1077
1078 let headers = response
1079 .headers()
1080 .iter()
1081 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1082 .collect::<Vec<_>>();
1083
1084 let total_header_size = headers
1085 .iter()
1086 .map(|header| (header.name.len() + header.value.len()) as u64)
1087 .sum();
1088
1089 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1090 ExecutionError::HttpResponseSizeLimitExceeded {
1091 limit: size_limit,
1092 size: total_header_size,
1093 },
1094 )?;
1095
1096 if let Some(content_length) = maybe_content_length {
1097 if content_length > remaining_bytes {
1098 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1099 limit: size_limit,
1100 size: content_length + total_header_size,
1101 });
1102 }
1103 }
1104
1105 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1106 let mut body_stream = response.bytes_stream();
1107
1108 while let Some(bytes) = body_stream.next().await.transpose()? {
1109 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1110 ExecutionError::HttpResponseSizeLimitExceeded {
1111 limit: size_limit,
1112 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1113 },
1114 )?;
1115
1116 body.extend(&bytes);
1117 }
1118
1119 Ok(http::Response {
1120 status,
1121 headers,
1122 body,
1123 })
1124 }
1125}
1126
1127#[derive(Debug, strum::AsRefStr)]
1129pub enum ExecutionRequest {
1130 #[cfg(not(web))]
1131 LoadContract {
1132 id: ApplicationId,
1133 #[debug(skip)]
1134 callback: Sender<(UserContractCode, ApplicationDescription)>,
1135 },
1136
1137 #[cfg(not(web))]
1138 LoadService {
1139 id: ApplicationId,
1140 #[debug(skip)]
1141 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1142 },
1143
1144 ChainBalance {
1145 #[debug(skip)]
1146 callback: Sender<Amount>,
1147 },
1148
1149 OwnerBalance {
1150 owner: AccountOwner,
1151 #[debug(skip)]
1152 callback: Sender<Amount>,
1153 },
1154
1155 OwnerBalances {
1156 #[debug(skip)]
1157 callback: Sender<Vec<(AccountOwner, Amount)>>,
1158 },
1159
1160 BalanceOwners {
1161 #[debug(skip)]
1162 callback: Sender<Vec<AccountOwner>>,
1163 },
1164
1165 Transfer {
1166 source: AccountOwner,
1167 destination: Account,
1168 amount: Amount,
1169 #[debug(skip_if = Option::is_none)]
1170 signer: Option<AccountOwner>,
1171 application_id: ApplicationId,
1172 #[debug(skip)]
1173 callback: Sender<()>,
1174 },
1175
1176 Claim {
1177 source: Account,
1178 destination: Account,
1179 amount: Amount,
1180 #[debug(skip_if = Option::is_none)]
1181 signer: Option<AccountOwner>,
1182 application_id: ApplicationId,
1183 #[debug(skip)]
1184 callback: Sender<()>,
1185 },
1186
1187 SystemTimestamp {
1188 #[debug(skip)]
1189 callback: Sender<Timestamp>,
1190 },
1191
1192 ChainOwnership {
1193 #[debug(skip)]
1194 callback: Sender<ChainOwnership>,
1195 },
1196
1197 ApplicationPermissions {
1198 #[debug(skip)]
1199 callback: Sender<ApplicationPermissions>,
1200 },
1201
1202 ReadApplicationDescription {
1203 application_id: ApplicationId,
1204 #[debug(skip)]
1205 callback: Sender<ApplicationDescription>,
1206 },
1207
1208 ReadValueBytes {
1209 id: ApplicationId,
1210 #[debug(with = hex_debug)]
1211 key: Vec<u8>,
1212 #[debug(skip)]
1213 callback: Sender<Option<Vec<u8>>>,
1214 },
1215
1216 ContainsKey {
1217 id: ApplicationId,
1218 key: Vec<u8>,
1219 #[debug(skip)]
1220 callback: Sender<bool>,
1221 },
1222
1223 ContainsKeys {
1224 id: ApplicationId,
1225 #[debug(with = hex_vec_debug)]
1226 keys: Vec<Vec<u8>>,
1227 callback: Sender<Vec<bool>>,
1228 },
1229
1230 ReadMultiValuesBytes {
1231 id: ApplicationId,
1232 #[debug(with = hex_vec_debug)]
1233 keys: Vec<Vec<u8>>,
1234 #[debug(skip)]
1235 callback: Sender<Vec<Option<Vec<u8>>>>,
1236 },
1237
1238 FindKeysByPrefix {
1239 id: ApplicationId,
1240 #[debug(with = hex_debug)]
1241 key_prefix: Vec<u8>,
1242 #[debug(skip)]
1243 callback: Sender<Vec<Vec<u8>>>,
1244 },
1245
1246 FindKeyValuesByPrefix {
1247 id: ApplicationId,
1248 #[debug(with = hex_debug)]
1249 key_prefix: Vec<u8>,
1250 #[debug(skip)]
1251 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1252 },
1253
1254 WriteBatch {
1255 id: ApplicationId,
1256 batch: Batch,
1257 #[debug(skip)]
1258 callback: Sender<()>,
1259 },
1260
1261 OpenChain {
1262 ownership: ChainOwnership,
1263 #[debug(skip_if = Amount::is_zero)]
1264 balance: Amount,
1265 parent_id: ChainId,
1266 block_height: BlockHeight,
1267 application_permissions: ApplicationPermissions,
1268 timestamp: Timestamp,
1269 #[debug(skip)]
1270 callback: Sender<ChainId>,
1271 },
1272
1273 CloseChain {
1274 application_id: ApplicationId,
1275 #[debug(skip)]
1276 callback: Sender<Result<(), ExecutionError>>,
1277 },
1278
1279 ChangeOwnership {
1280 application_id: ApplicationId,
1281 ownership: ChainOwnership,
1282 #[debug(skip)]
1283 callback: Sender<Result<(), ExecutionError>>,
1284 },
1285
1286 ChangeApplicationPermissions {
1287 application_id: ApplicationId,
1288 application_permissions: ApplicationPermissions,
1289 #[debug(skip)]
1290 callback: Sender<Result<(), ExecutionError>>,
1291 },
1292
1293 CreateApplication {
1294 chain_id: ChainId,
1295 block_height: BlockHeight,
1296 module_id: ModuleId,
1297 parameters: Vec<u8>,
1298 required_application_ids: Vec<ApplicationId>,
1299 #[debug(skip)]
1300 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1301 },
1302
1303 PerformHttpRequest {
1304 request: http::Request,
1305 http_responses_are_oracle_responses: bool,
1306 #[debug(skip)]
1307 callback: Sender<http::Response>,
1308 },
1309
1310 ReadBlobContent {
1311 blob_id: BlobId,
1312 #[debug(skip)]
1313 callback: Sender<BlobContent>,
1314 },
1315
1316 AssertBlobExists {
1317 blob_id: BlobId,
1318 #[debug(skip)]
1319 callback: Sender<()>,
1320 },
1321
1322 Emit {
1323 stream_id: StreamId,
1324 #[debug(with = hex_debug)]
1325 value: Vec<u8>,
1326 #[debug(skip)]
1327 callback: Sender<u32>,
1328 },
1329
1330 ReadEvent {
1331 event_id: EventId,
1332 callback: oneshot::Sender<Vec<u8>>,
1333 },
1334
1335 SubscribeToEvents {
1336 chain_id: ChainId,
1337 stream_id: StreamId,
1338 subscriber_app_id: ApplicationId,
1339 #[debug(skip)]
1340 callback: Sender<()>,
1341 },
1342
1343 UnsubscribeFromEvents {
1344 chain_id: ChainId,
1345 stream_id: StreamId,
1346 subscriber_app_id: ApplicationId,
1347 #[debug(skip)]
1348 callback: Sender<()>,
1349 },
1350
1351 GetApplicationPermissions {
1352 #[debug(skip)]
1353 callback: Sender<ApplicationPermissions>,
1354 },
1355
1356 QueryServiceOracle {
1357 deadline: Option<Instant>,
1358 application_id: ApplicationId,
1359 next_block_height: BlockHeight,
1360 query: Vec<u8>,
1361 #[debug(skip)]
1362 callback: Sender<Vec<u8>>,
1363 },
1364
1365 AddOutgoingMessage {
1366 message: crate::OutgoingMessage,
1367 #[debug(skip)]
1368 callback: Sender<()>,
1369 },
1370
1371 SetLocalTime {
1372 local_time: Timestamp,
1373 #[debug(skip)]
1374 callback: Sender<()>,
1375 },
1376
1377 AssertBefore {
1378 timestamp: Timestamp,
1379 #[debug(skip)]
1380 callback: Sender<Result<(), ExecutionError>>,
1381 },
1382
1383 AddCreatedBlob {
1384 blob: crate::Blob,
1385 #[debug(skip)]
1386 callback: Sender<()>,
1387 },
1388
1389 ValidationRound {
1390 round: Option<u32>,
1391 #[debug(skip)]
1392 callback: Sender<Option<u32>>,
1393 },
1394
1395 AllowApplicationLogs {
1396 #[debug(skip)]
1397 callback: Sender<bool>,
1398 },
1399
1400 #[cfg(web)]
1402 Log {
1403 message: String,
1404 level: tracing::log::Level,
1405 },
1406}