1use std::{
7 collections::{BTreeMap, BTreeSet},
8 sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16 data_types::{
17 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18 Timestamp,
19 },
20 ensure, hex_debug, hex_vec_debug, http,
21 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
22 ownership::ChainOwnership,
23 time::Instant,
24};
25use linera_views::{batch::Batch, context::Context, views::View};
26use oneshot::Sender;
27use reqwest::{header::HeaderMap, Client, Url};
28use tracing::{info_span, instrument, Instrument as _};
29
30use crate::{
31 execution::UserAction,
32 runtime::ContractSyncRuntime,
33 system::{CreateApplicationResult, OpenChainConfig},
34 util::{OracleResponseExt as _, RespondExt as _},
35 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
36 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
37 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
38 ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
39};
40
41pub struct ExecutionStateActor<'a, C> {
43 state: &'a mut ExecutionStateView<C>,
44 txn_tracker: &'a mut TransactionTracker,
45 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
46}
47
48#[cfg(with_metrics)]
49mod metrics {
50 use std::sync::LazyLock;
51
52 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
53 use prometheus::HistogramVec;
54
55 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57 register_histogram_vec(
58 "load_contract_latency",
59 "Load contract latency",
60 &[],
61 exponential_bucket_latencies(250.0),
62 )
63 });
64
65 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
67 register_histogram_vec(
68 "load_service_latency",
69 "Load service latency",
70 &[],
71 exponential_bucket_latencies(250.0),
72 )
73 });
74}
75
76pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
77
78impl<'a, C> ExecutionStateActor<'a, C>
79where
80 C: Context + Clone + 'static,
81 C::Extra: ExecutionRuntimeContext,
82{
83 pub fn new(
85 state: &'a mut ExecutionStateView<C>,
86 txn_tracker: &'a mut TransactionTracker,
87 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
88 ) -> Self {
89 Self {
90 state,
91 txn_tracker,
92 resource_controller,
93 }
94 }
95
96 #[instrument(skip_all, fields(application_id = %id))]
97 pub(crate) async fn load_contract(
98 &mut self,
99 id: ApplicationId,
100 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
101 #[cfg(with_metrics)]
102 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
103 let blob_id = id.description_blob_id();
104 let description = match self.txn_tracker.get_blob_content(&blob_id) {
105 Some(blob) => bcs::from_bytes(blob.bytes())?,
106 None => {
107 self.state
108 .system
109 .describe_application(id, self.txn_tracker)
110 .await?
111 }
112 };
113 let code = self
114 .state
115 .context()
116 .extra()
117 .get_user_contract(&description, self.txn_tracker)
118 .await?;
119 Ok((code, description))
120 }
121
122 pub(crate) async fn load_service(
123 &mut self,
124 id: ApplicationId,
125 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
126 #[cfg(with_metrics)]
127 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
128 let blob_id = id.description_blob_id();
129 let description = match self.txn_tracker.get_blob_content(&blob_id) {
130 Some(blob) => bcs::from_bytes(blob.bytes())?,
131 None => {
132 self.state
133 .system
134 .describe_application(id, self.txn_tracker)
135 .await?
136 }
137 };
138 let code = self
139 .state
140 .context()
141 .extra()
142 .get_user_service(&description, self.txn_tracker)
143 .await?;
144 Ok((code, description))
145 }
146
147 #[instrument(
149 skip_all,
150 fields(request_type = %request.as_ref())
151 )]
152 pub(crate) async fn handle_request(
153 &mut self,
154 request: ExecutionRequest,
155 ) -> Result<(), ExecutionError> {
156 use ExecutionRequest::*;
157 match request {
158 #[cfg(not(web))]
159 LoadContract { id, callback } => {
160 let (code, description) = self.load_contract(id).await?;
161 callback.respond((code, description))
162 }
163 #[cfg(not(web))]
164 LoadService { id, callback } => {
165 let (code, description) = self.load_service(id).await?;
166 callback.respond((code, description))
167 }
168
169 ChainBalance { callback } => {
170 let balance = *self.state.system.balance.get();
171 callback.respond(balance);
172 }
173
174 OwnerBalance { owner, callback } => {
175 let balance = self
176 .state
177 .system
178 .balances
179 .get(&owner)
180 .await?
181 .unwrap_or_default();
182 callback.respond(balance);
183 }
184
185 OwnerBalances { callback } => {
186 let balances = self.state.system.balances.index_values().await?;
187 callback.respond(balances.into_iter().collect());
188 }
189
190 BalanceOwners { callback } => {
191 let owners = self.state.system.balances.indices().await?;
192 callback.respond(owners);
193 }
194
195 Transfer {
196 source,
197 destination,
198 amount,
199 signer,
200 application_id,
201 callback,
202 } => {
203 let maybe_message = self
204 .state
205 .system
206 .transfer(signer, Some(application_id), source, destination, amount)
207 .await?;
208 self.txn_tracker.add_outgoing_messages(maybe_message);
209 callback.respond(());
210 }
211
212 Claim {
213 source,
214 destination,
215 amount,
216 signer,
217 application_id,
218 callback,
219 } => {
220 let maybe_message = self
221 .state
222 .system
223 .claim(
224 signer,
225 Some(application_id),
226 source.owner,
227 source.chain_id,
228 destination,
229 amount,
230 )
231 .await?;
232 self.txn_tracker.add_outgoing_messages(maybe_message);
233 callback.respond(());
234 }
235
236 SystemTimestamp { callback } => {
237 let timestamp = *self.state.system.timestamp.get();
238 callback.respond(timestamp);
239 }
240
241 ChainOwnership { callback } => {
242 let ownership = self.state.system.ownership.get().await?.clone();
243 callback.respond(ownership);
244 }
245
246 ApplicationPermissions { callback } => {
247 let permissions = self
248 .state
249 .system
250 .application_permissions
251 .get()
252 .await?
253 .clone();
254 callback.respond(permissions);
255 }
256
257 ReadApplicationDescription {
258 application_id,
259 callback,
260 } => {
261 let blob_id = application_id.description_blob_id();
262 let description = match self.txn_tracker.get_blob_content(&blob_id) {
263 Some(blob) => bcs::from_bytes(blob.bytes())?,
264 None => {
265 let blob_content = self.state.system.read_blob_content(blob_id).await?;
266 self.state
267 .system
268 .blob_used(self.txn_tracker, blob_id)
269 .await?;
270 bcs::from_bytes(blob_content.bytes())?
271 }
272 };
273 callback.respond(description);
274 }
275
276 ContainsKey { id, key, callback } => {
277 let view = self.state.users.try_load_entry(&id).await?;
278 let result = match view {
279 Some(view) => view.contains_key(&key).await?,
280 None => false,
281 };
282 callback.respond(result);
283 }
284
285 ContainsKeys { id, keys, callback } => {
286 let view = self.state.users.try_load_entry(&id).await?;
287 let result = match view {
288 Some(view) => view.contains_keys(&keys).await?,
289 None => vec![false; keys.len()],
290 };
291 callback.respond(result);
292 }
293
294 ReadMultiValuesBytes { id, keys, callback } => {
295 let view = self.state.users.try_load_entry(&id).await?;
296 let values = match view {
297 Some(view) => view.multi_get(&keys).await?,
298 None => vec![None; keys.len()],
299 };
300 callback.respond(values);
301 }
302
303 ReadValueBytes { id, key, callback } => {
304 let view = self.state.users.try_load_entry(&id).await?;
305 let result = match view {
306 Some(view) => view.get(&key).await?,
307 None => None,
308 };
309 callback.respond(result);
310 }
311
312 FindKeysByPrefix {
313 id,
314 key_prefix,
315 callback,
316 } => {
317 let view = self.state.users.try_load_entry(&id).await?;
318 let result = match view {
319 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
320 None => Vec::new(),
321 };
322 callback.respond(result);
323 }
324
325 FindKeyValuesByPrefix {
326 id,
327 key_prefix,
328 callback,
329 } => {
330 let view = self.state.users.try_load_entry(&id).await?;
331 let result = match view {
332 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
333 None => Vec::new(),
334 };
335 callback.respond(result);
336 }
337
338 WriteBatch {
339 id,
340 batch,
341 callback,
342 } => {
343 let mut view = self.state.users.try_load_entry_mut(&id).await?;
344 view.write_batch(batch)?;
345 callback.respond(());
346 }
347
348 OpenChain {
349 ownership,
350 balance,
351 parent_id,
352 block_height,
353 application_permissions,
354 timestamp,
355 callback,
356 } => {
357 let config = OpenChainConfig {
358 ownership,
359 balance,
360 application_permissions,
361 };
362 let chain_id = self
363 .state
364 .system
365 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
366 .await?;
367 callback.respond(chain_id);
368 }
369
370 CloseChain {
371 application_id,
372 callback,
373 } => {
374 let app_permissions = self.state.system.application_permissions.get().await?;
375 if !app_permissions.can_close_chain(&application_id) {
376 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
377 } else {
378 self.state.system.close_chain()?;
379 callback.respond(Ok(()));
380 }
381 }
382
383 ChangeOwnership {
384 application_id,
385 ownership,
386 callback,
387 } => {
388 let app_permissions = self.state.system.application_permissions.get().await?;
389 if !app_permissions.can_close_chain(&application_id) {
390 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
391 } else {
392 self.state.system.ownership.set(ownership);
393 callback.respond(Ok(()));
394 }
395 }
396
397 ChangeApplicationPermissions {
398 application_id,
399 application_permissions,
400 callback,
401 } => {
402 let app_permissions = self.state.system.application_permissions.get().await?;
403 if !app_permissions.can_change_application_permissions(&application_id) {
404 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
405 } else {
406 self.state
407 .system
408 .application_permissions
409 .set(application_permissions);
410 callback.respond(Ok(()));
411 }
412 }
413
414 CreateApplication {
415 chain_id,
416 block_height,
417 module_id,
418 parameters,
419 required_application_ids,
420 callback,
421 } => {
422 let create_application_result = self
423 .state
424 .system
425 .create_application(
426 chain_id,
427 block_height,
428 module_id,
429 parameters,
430 required_application_ids,
431 self.txn_tracker,
432 )
433 .await?;
434 callback.respond(Ok(create_application_result));
435 }
436
437 PerformHttpRequest {
438 request,
439 http_responses_are_oracle_responses,
440 callback,
441 } => {
442 let system = &mut self.state.system;
443 let response = self
444 .txn_tracker
445 .oracle(|| async {
446 let headers = request
447 .headers
448 .into_iter()
449 .map(|http::Header { name, value }| {
450 Ok((name.parse()?, value.try_into()?))
451 })
452 .collect::<Result<HeaderMap, ExecutionError>>()?;
453
454 let url = Url::parse(&request.url)?;
455 let host = url
456 .host_str()
457 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
458
459 let (_epoch, committee) = system
460 .current_committee()
461 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
462 let allowed_hosts = &committee.policy().http_request_allow_list;
463
464 ensure!(
465 allowed_hosts.contains(host),
466 ExecutionError::UnauthorizedHttpRequest(url)
467 );
468
469 let request = Client::new()
470 .request(request.method.into(), url)
471 .body(request.body)
472 .headers(headers);
473 #[cfg(not(web))]
474 let request = request.timeout(linera_base::time::Duration::from_millis(
475 committee.policy().http_request_timeout_ms,
476 ));
477
478 let response = request.send().await?;
479
480 let mut response_size_limit =
481 committee.policy().maximum_http_response_bytes;
482
483 if http_responses_are_oracle_responses {
484 response_size_limit = response_size_limit
485 .min(committee.policy().maximum_oracle_response_bytes);
486 }
487 Ok(OracleResponse::Http(
488 Self::receive_http_response(response, response_size_limit).await?,
489 ))
490 })
491 .await?
492 .to_http_response()?;
493 callback.respond(response);
494 }
495
496 ReadBlobContent { blob_id, callback } => {
497 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
498 content.clone()
499 } else {
500 let content = self.state.system.read_blob_content(blob_id).await?;
501 if blob_id.blob_type == BlobType::Data {
502 self.resource_controller
503 .with_state(&mut self.state.system)
504 .await?
505 .track_blob_read(content.bytes().len() as u64)?;
506 }
507 self.state
508 .system
509 .blob_used(self.txn_tracker, blob_id)
510 .await?;
511 content
512 };
513 callback.respond(content)
514 }
515
516 AssertBlobExists { blob_id, callback } => {
517 self.state.system.assert_blob_exists(blob_id).await?;
518 if blob_id.blob_type == BlobType::Data {
520 self.resource_controller
521 .with_state(&mut self.state.system)
522 .await?
523 .track_blob_read(0)?;
524 }
525 let is_new = self
526 .state
527 .system
528 .blob_used(self.txn_tracker, blob_id)
529 .await?;
530 if is_new {
531 self.txn_tracker
532 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
533 }
534 callback.respond(());
535 }
536
537 Emit {
538 stream_id,
539 value,
540 callback,
541 } => {
542 let count = self
543 .state
544 .stream_event_counts
545 .get_mut_or_default(&stream_id)
546 .await?;
547 let index = *count;
548 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
549 self.txn_tracker.add_event(stream_id, index, value);
550 callback.respond(index)
551 }
552
553 ReadEvent { event_id, callback } => {
554 let extra = self.state.context().extra();
555 let event = self
556 .txn_tracker
557 .oracle(|| async {
558 let event = extra
559 .get_event(event_id.clone())
560 .await?
561 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
562 Ok(OracleResponse::Event(
563 event_id.clone(),
564 Arc::unwrap_or_clone(event),
565 ))
566 })
567 .await?
568 .to_event(&event_id)?;
569 callback.respond(event);
570 }
571
572 SubscribeToEvents {
573 chain_id,
574 stream_id,
575 subscriber_app_id,
576 callback,
577 } => {
578 let subscriptions = self
579 .state
580 .system
581 .event_subscriptions
582 .get_mut_or_default(&(chain_id, stream_id.clone()))
583 .await?;
584 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
585 subscriptions.next_index
586 } else {
587 0
588 };
589 self.txn_tracker.add_stream_to_process(
590 subscriber_app_id,
591 chain_id,
592 stream_id,
593 0,
594 next_index,
595 );
596 callback.respond(());
597 }
598
599 UnsubscribeFromEvents {
600 chain_id,
601 stream_id,
602 subscriber_app_id,
603 callback,
604 } => {
605 let key = (chain_id, stream_id.clone());
606 let subscriptions = self
607 .state
608 .system
609 .event_subscriptions
610 .get_mut_or_default(&key)
611 .await?;
612 subscriptions.applications.remove(&subscriber_app_id);
613 if subscriptions.applications.is_empty() {
614 self.state.system.event_subscriptions.remove(&key)?;
615 }
616 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
617 self.txn_tracker
618 .remove_stream_to_process(app_id, chain_id, stream_id);
619 }
620 callback.respond(());
621 }
622
623 GetApplicationPermissions { callback } => {
624 let app_permissions = self.state.system.application_permissions.get().await?;
625 callback.respond(app_permissions.clone());
626 }
627
628 QueryServiceOracle {
629 deadline,
630 application_id,
631 next_block_height,
632 query,
633 callback,
634 } => {
635 let state = &mut self.state;
636 let local_time = self.txn_tracker.local_time();
637 let created_blobs = self.txn_tracker.created_blobs().clone();
638 let bytes = self
639 .txn_tracker
640 .oracle(|| async {
641 let context = QueryContext {
642 chain_id: state.context().extra().chain_id(),
643 next_block_height,
644 local_time,
645 };
646 let QueryOutcome {
647 response,
648 operations,
649 } = Box::pin(state.query_user_application_with_deadline(
650 application_id,
651 context,
652 query,
653 deadline,
654 created_blobs,
655 ))
656 .await?;
657 ensure!(
658 operations.is_empty(),
659 ExecutionError::ServiceOracleQueryOperations(operations)
660 );
661 Ok(OracleResponse::Service(response))
662 })
663 .await?
664 .to_service_response()?;
665 callback.respond(bytes);
666 }
667
668 AddOutgoingMessage { message, callback } => {
669 self.txn_tracker.add_outgoing_message(message);
670 callback.respond(());
671 }
672
673 SetLocalTime {
674 local_time,
675 callback,
676 } => {
677 self.txn_tracker.set_local_time(local_time);
678 callback.respond(());
679 }
680
681 AssertBefore {
682 timestamp,
683 callback,
684 } => {
685 let result = if !self
686 .txn_tracker
687 .replay_oracle_response(OracleResponse::Assert)?
688 {
689 let local_time = self.txn_tracker.local_time();
691 if local_time >= timestamp {
692 Err(ExecutionError::AssertBefore {
693 timestamp,
694 local_time,
695 })
696 } else {
697 Ok(())
698 }
699 } else {
700 Ok(())
701 };
702 callback.respond(result);
703 }
704
705 AddCreatedBlob { blob, callback } => {
706 if self.resource_controller.is_free {
707 self.txn_tracker.mark_blob_free(blob.id());
708 }
709 self.txn_tracker.add_created_blob(blob);
710 callback.respond(());
711 }
712
713 ValidationRound { round, callback } => {
714 let validation_round = self
715 .txn_tracker
716 .oracle(|| async { Ok(OracleResponse::Round(round)) })
717 .await?
718 .to_round()?;
719 callback.respond(validation_round);
720 }
721
722 AllowApplicationLogs { callback } => {
723 let allow = self
724 .state
725 .context()
726 .extra()
727 .execution_runtime_config()
728 .allow_application_logs;
729 callback.respond(allow);
730 }
731
732 #[cfg(web)]
733 Log { message, level } => match level {
734 tracing::log::Level::Trace | tracing::log::Level::Debug => {
735 tracing::debug!(target: "user_application_log", message = %message);
736 }
737 tracing::log::Level::Info => {
738 tracing::info!(target: "user_application_log", message = %message);
739 }
740 tracing::log::Level::Warn => {
741 tracing::warn!(target: "user_application_log", message = %message);
742 }
743 tracing::log::Level::Error => {
744 tracing::error!(target: "user_application_log", message = %message);
745 }
746 },
747 }
748
749 Ok(())
750 }
751
752 #[instrument(skip_all)]
755 async fn process_subscriptions(
756 &mut self,
757 context: ProcessStreamsContext,
758 ) -> Result<(), ExecutionError> {
759 let mut processed = BTreeSet::new();
762 loop {
763 let to_process = self
764 .txn_tracker
765 .take_streams_to_process()
766 .into_iter()
767 .filter_map(|(app_id, updates)| {
768 let updates = updates
769 .into_iter()
770 .filter_map(|update| {
771 if !processed.insert((
772 app_id,
773 update.chain_id,
774 update.stream_id.clone(),
775 )) {
776 return None;
777 }
778 Some(update)
779 })
780 .collect::<Vec<_>>();
781 if updates.is_empty() {
782 return None;
783 }
784 Some((app_id, updates))
785 })
786 .collect::<BTreeMap<_, _>>();
787 if to_process.is_empty() {
788 return Ok(());
789 }
790 for (app_id, updates) in to_process {
791 self.run_user_action(
792 app_id,
793 UserAction::ProcessStreams(context, updates),
794 None,
795 None,
796 )
797 .await?;
798 }
799 }
800 }
801
802 pub(crate) async fn run_user_action(
803 &mut self,
804 application_id: ApplicationId,
805 action: UserAction,
806 refund_grant_to: Option<Account>,
807 grant: Option<&mut Amount>,
808 ) -> Result<(), ExecutionError> {
809 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
810 .await
811 }
812
813 pub(crate) async fn service_and_dependencies(
815 &mut self,
816 application: ApplicationId,
817 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
818 let mut stack = vec![application];
821 let mut codes = vec![];
822 let mut descriptions = vec![];
823
824 while let Some(id) = stack.pop() {
825 let (code, description) = self.load_service(id).await?;
826 stack.extend(description.required_application_ids.iter().rev().copied());
827 codes.push(code);
828 descriptions.push(description);
829 }
830
831 codes.reverse();
832 descriptions.reverse();
833
834 Ok((codes, descriptions))
835 }
836
837 #[instrument(skip_all, fields(application_id = %application))]
839 async fn contract_and_dependencies(
840 &mut self,
841 application: ApplicationId,
842 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
843 let mut stack = vec![application];
846 let mut codes = vec![];
847 let mut descriptions = vec![];
848
849 while let Some(id) = stack.pop() {
850 let (code, description) = self.load_contract(id).await?;
851 stack.extend(description.required_application_ids.iter().rev().copied());
852 codes.push(code);
853 descriptions.push(description);
854 }
855
856 codes.reverse();
857 descriptions.reverse();
858
859 Ok((codes, descriptions))
860 }
861
862 #[instrument(skip_all, fields(application_id = %application_id))]
863 async fn run_user_action_with_runtime(
864 &mut self,
865 application_id: ApplicationId,
866 action: UserAction,
867 refund_grant_to: Option<Account>,
868 grant: Option<&mut Amount>,
869 ) -> Result<(), ExecutionError> {
870 let chain_id = self.state.context().extra().chain_id();
871 let mut cloned_grant = grant.as_ref().map(|x| **x);
872 let initial_balance = self
873 .resource_controller
874 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
875 .await?
876 .balance()?;
877 let mut controller = ResourceController::new(
878 self.resource_controller.policy().clone(),
879 self.resource_controller.tracker,
880 initial_balance,
881 );
882 let is_free = matches!(
883 &action,
884 UserAction::Message(..) | UserAction::ProcessStreams(..)
885 ) && self
886 .resource_controller
887 .policy()
888 .is_free_app(&application_id);
889 controller.is_free = is_free;
890 self.resource_controller.is_free = is_free;
891 let (execution_state_sender, mut execution_state_receiver) =
892 futures::channel::mpsc::unbounded();
893
894 let (codes, descriptions): (Vec<_>, Vec<_>) =
895 self.contract_and_dependencies(application_id).await?;
896
897 let allow_application_logs = self
898 .state
899 .context()
900 .extra()
901 .execution_runtime_config()
902 .allow_application_logs;
903
904 let contract_runtime_task = self
905 .state
906 .context()
907 .extra()
908 .thread_pool()
909 .run_send(JsVec(codes), move |codes| async move {
910 let runtime = ContractSyncRuntime::new(
911 execution_state_sender,
912 chain_id,
913 refund_grant_to,
914 controller,
915 &action,
916 allow_application_logs,
917 );
918
919 for (code, description) in codes.0.into_iter().zip(descriptions) {
920 runtime.preload_contract(
921 ApplicationId::from(&description),
922 code,
923 description,
924 )?;
925 }
926
927 runtime.run_action(application_id, chain_id, action)
928 })
929 .await;
930
931 async {
932 while let Some(request) = execution_state_receiver.next().await {
933 self.handle_request(request).await?;
934 }
935 Ok::<(), ExecutionError>(())
936 }
937 .instrument(info_span!("handle_runtime_requests"))
938 .await?;
939
940 let (result, controller) = contract_runtime_task.await??;
941
942 self.resource_controller.is_free = false;
943
944 self.txn_tracker.add_operation_result(result);
945
946 self.resource_controller
947 .with_state_and_grant(&mut self.state.system, grant)
948 .await?
949 .merge_balance(initial_balance, controller.balance()?)?;
950 self.resource_controller.tracker = controller.tracker;
951
952 Ok(())
953 }
954
955 #[instrument(skip_all, fields(
956 chain_id = %context.chain_id,
957 block_height = %context.height,
958 operation_type = %operation.as_ref(),
959 ))]
960 pub async fn execute_operation(
961 &mut self,
962 context: OperationContext,
963 operation: Operation,
964 ) -> Result<(), ExecutionError> {
965 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
966 match operation {
967 Operation::System(op) => {
968 let new_application = self
969 .state
970 .system
971 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
972 .await?;
973 if let Some((application_id, argument)) = new_application {
974 let user_action = UserAction::Instantiate(context, argument);
975 self.run_user_action(
976 application_id,
977 user_action,
978 context.refund_grant_to(),
979 None,
980 )
981 .await?;
982 }
983 }
984 Operation::User {
985 application_id,
986 bytes,
987 } => {
988 self.run_user_action(
989 application_id,
990 UserAction::Operation(context, bytes),
991 context.refund_grant_to(),
992 None,
993 )
994 .await?;
995 }
996 }
997 self.process_subscriptions(context.into()).await?;
998 Ok(())
999 }
1000
1001 #[instrument(skip_all, fields(
1002 chain_id = %context.chain_id,
1003 block_height = %context.height,
1004 origin = %context.origin,
1005 is_bouncing = %context.is_bouncing,
1006 message_type = %message.as_ref(),
1007 ))]
1008 pub async fn execute_message(
1009 &mut self,
1010 context: MessageContext,
1011 message: Message,
1012 grant: Option<&mut Amount>,
1013 ) -> Result<(), ExecutionError> {
1014 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1015 match message {
1016 Message::System(message) => {
1017 let outcome = self.state.system.execute_message(context, message).await?;
1018 self.txn_tracker.add_outgoing_messages(outcome);
1019 }
1020 Message::User {
1021 application_id,
1022 bytes,
1023 } => {
1024 self.run_user_action(
1025 application_id,
1026 UserAction::Message(context, bytes),
1027 context.refund_grant_to,
1028 grant,
1029 )
1030 .await?;
1031 }
1032 }
1033 self.process_subscriptions(context.into()).await?;
1034 Ok(())
1035 }
1036
1037 pub fn bounce_message(
1038 &mut self,
1039 context: MessageContext,
1040 grant: Amount,
1041 message: Message,
1042 ) -> Result<(), ExecutionError> {
1043 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1044 self.txn_tracker.add_outgoing_message(OutgoingMessage {
1045 destination: context.origin,
1046 authenticated_signer: context.authenticated_signer,
1047 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1048 grant,
1049 kind: MessageKind::Bouncing,
1050 message,
1051 });
1052 Ok(())
1053 }
1054
1055 pub fn send_refund(
1056 &mut self,
1057 context: MessageContext,
1058 amount: Amount,
1059 ) -> Result<(), ExecutionError> {
1060 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1061 if amount.is_zero() {
1062 return Ok(());
1063 }
1064 let Some(account) = context.refund_grant_to else {
1065 return Err(ExecutionError::InternalError(
1066 "Messages with grants should have a non-empty `refund_grant_to`",
1067 ));
1068 };
1069 let message = SystemMessage::Credit {
1070 amount,
1071 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
1072 target: account.owner,
1073 };
1074 self.txn_tracker.add_outgoing_message(
1075 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1076 );
1077 Ok(())
1078 }
1079
1080 async fn receive_http_response(
1084 response: reqwest::Response,
1085 size_limit: u64,
1086 ) -> Result<http::Response, ExecutionError> {
1087 let status = response.status().as_u16();
1088 let maybe_content_length = response.content_length();
1089
1090 let headers = response
1091 .headers()
1092 .iter()
1093 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1094 .collect::<Vec<_>>();
1095
1096 let total_header_size = headers
1097 .iter()
1098 .map(|header| (header.name.len() + header.value.len()) as u64)
1099 .sum();
1100
1101 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1102 ExecutionError::HttpResponseSizeLimitExceeded {
1103 limit: size_limit,
1104 size: total_header_size,
1105 },
1106 )?;
1107
1108 if let Some(content_length) = maybe_content_length {
1109 if content_length > remaining_bytes {
1110 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1111 limit: size_limit,
1112 size: content_length + total_header_size,
1113 });
1114 }
1115 }
1116
1117 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1118 let mut body_stream = response.bytes_stream();
1119
1120 while let Some(bytes) = body_stream.next().await.transpose()? {
1121 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1122 ExecutionError::HttpResponseSizeLimitExceeded {
1123 limit: size_limit,
1124 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1125 },
1126 )?;
1127
1128 body.extend(&bytes);
1129 }
1130
1131 Ok(http::Response {
1132 status,
1133 headers,
1134 body,
1135 })
1136 }
1137}
1138
1139#[derive(Debug, strum::AsRefStr)]
1141pub enum ExecutionRequest {
1142 #[cfg(not(web))]
1143 LoadContract {
1144 id: ApplicationId,
1145 #[debug(skip)]
1146 callback: Sender<(UserContractCode, ApplicationDescription)>,
1147 },
1148
1149 #[cfg(not(web))]
1150 LoadService {
1151 id: ApplicationId,
1152 #[debug(skip)]
1153 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1154 },
1155
1156 ChainBalance {
1157 #[debug(skip)]
1158 callback: Sender<Amount>,
1159 },
1160
1161 OwnerBalance {
1162 owner: AccountOwner,
1163 #[debug(skip)]
1164 callback: Sender<Amount>,
1165 },
1166
1167 OwnerBalances {
1168 #[debug(skip)]
1169 callback: Sender<Vec<(AccountOwner, Amount)>>,
1170 },
1171
1172 BalanceOwners {
1173 #[debug(skip)]
1174 callback: Sender<Vec<AccountOwner>>,
1175 },
1176
1177 Transfer {
1178 source: AccountOwner,
1179 destination: Account,
1180 amount: Amount,
1181 #[debug(skip_if = Option::is_none)]
1182 signer: Option<AccountOwner>,
1183 application_id: ApplicationId,
1184 #[debug(skip)]
1185 callback: Sender<()>,
1186 },
1187
1188 Claim {
1189 source: Account,
1190 destination: Account,
1191 amount: Amount,
1192 #[debug(skip_if = Option::is_none)]
1193 signer: Option<AccountOwner>,
1194 application_id: ApplicationId,
1195 #[debug(skip)]
1196 callback: Sender<()>,
1197 },
1198
1199 SystemTimestamp {
1200 #[debug(skip)]
1201 callback: Sender<Timestamp>,
1202 },
1203
1204 ChainOwnership {
1205 #[debug(skip)]
1206 callback: Sender<ChainOwnership>,
1207 },
1208
1209 ApplicationPermissions {
1210 #[debug(skip)]
1211 callback: Sender<ApplicationPermissions>,
1212 },
1213
1214 ReadApplicationDescription {
1215 application_id: ApplicationId,
1216 #[debug(skip)]
1217 callback: Sender<ApplicationDescription>,
1218 },
1219
1220 ReadValueBytes {
1221 id: ApplicationId,
1222 #[debug(with = hex_debug)]
1223 key: Vec<u8>,
1224 #[debug(skip)]
1225 callback: Sender<Option<Vec<u8>>>,
1226 },
1227
1228 ContainsKey {
1229 id: ApplicationId,
1230 key: Vec<u8>,
1231 #[debug(skip)]
1232 callback: Sender<bool>,
1233 },
1234
1235 ContainsKeys {
1236 id: ApplicationId,
1237 #[debug(with = hex_vec_debug)]
1238 keys: Vec<Vec<u8>>,
1239 callback: Sender<Vec<bool>>,
1240 },
1241
1242 ReadMultiValuesBytes {
1243 id: ApplicationId,
1244 #[debug(with = hex_vec_debug)]
1245 keys: Vec<Vec<u8>>,
1246 #[debug(skip)]
1247 callback: Sender<Vec<Option<Vec<u8>>>>,
1248 },
1249
1250 FindKeysByPrefix {
1251 id: ApplicationId,
1252 #[debug(with = hex_debug)]
1253 key_prefix: Vec<u8>,
1254 #[debug(skip)]
1255 callback: Sender<Vec<Vec<u8>>>,
1256 },
1257
1258 FindKeyValuesByPrefix {
1259 id: ApplicationId,
1260 #[debug(with = hex_debug)]
1261 key_prefix: Vec<u8>,
1262 #[debug(skip)]
1263 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1264 },
1265
1266 WriteBatch {
1267 id: ApplicationId,
1268 batch: Batch,
1269 #[debug(skip)]
1270 callback: Sender<()>,
1271 },
1272
1273 OpenChain {
1274 ownership: ChainOwnership,
1275 #[debug(skip_if = Amount::is_zero)]
1276 balance: Amount,
1277 parent_id: ChainId,
1278 block_height: BlockHeight,
1279 application_permissions: ApplicationPermissions,
1280 timestamp: Timestamp,
1281 #[debug(skip)]
1282 callback: Sender<ChainId>,
1283 },
1284
1285 CloseChain {
1286 application_id: ApplicationId,
1287 #[debug(skip)]
1288 callback: Sender<Result<(), ExecutionError>>,
1289 },
1290
1291 ChangeOwnership {
1292 application_id: ApplicationId,
1293 ownership: ChainOwnership,
1294 #[debug(skip)]
1295 callback: Sender<Result<(), ExecutionError>>,
1296 },
1297
1298 ChangeApplicationPermissions {
1299 application_id: ApplicationId,
1300 application_permissions: ApplicationPermissions,
1301 #[debug(skip)]
1302 callback: Sender<Result<(), ExecutionError>>,
1303 },
1304
1305 CreateApplication {
1306 chain_id: ChainId,
1307 block_height: BlockHeight,
1308 module_id: ModuleId,
1309 parameters: Vec<u8>,
1310 required_application_ids: Vec<ApplicationId>,
1311 #[debug(skip)]
1312 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1313 },
1314
1315 PerformHttpRequest {
1316 request: http::Request,
1317 http_responses_are_oracle_responses: bool,
1318 #[debug(skip)]
1319 callback: Sender<http::Response>,
1320 },
1321
1322 ReadBlobContent {
1323 blob_id: BlobId,
1324 #[debug(skip)]
1325 callback: Sender<BlobContent>,
1326 },
1327
1328 AssertBlobExists {
1329 blob_id: BlobId,
1330 #[debug(skip)]
1331 callback: Sender<()>,
1332 },
1333
1334 Emit {
1335 stream_id: StreamId,
1336 #[debug(with = hex_debug)]
1337 value: Vec<u8>,
1338 #[debug(skip)]
1339 callback: Sender<u32>,
1340 },
1341
1342 ReadEvent {
1343 event_id: EventId,
1344 callback: oneshot::Sender<Vec<u8>>,
1345 },
1346
1347 SubscribeToEvents {
1348 chain_id: ChainId,
1349 stream_id: StreamId,
1350 subscriber_app_id: ApplicationId,
1351 #[debug(skip)]
1352 callback: Sender<()>,
1353 },
1354
1355 UnsubscribeFromEvents {
1356 chain_id: ChainId,
1357 stream_id: StreamId,
1358 subscriber_app_id: ApplicationId,
1359 #[debug(skip)]
1360 callback: Sender<()>,
1361 },
1362
1363 GetApplicationPermissions {
1364 #[debug(skip)]
1365 callback: Sender<ApplicationPermissions>,
1366 },
1367
1368 QueryServiceOracle {
1369 deadline: Option<Instant>,
1370 application_id: ApplicationId,
1371 next_block_height: BlockHeight,
1372 query: Vec<u8>,
1373 #[debug(skip)]
1374 callback: Sender<Vec<u8>>,
1375 },
1376
1377 AddOutgoingMessage {
1378 message: crate::OutgoingMessage,
1379 #[debug(skip)]
1380 callback: Sender<()>,
1381 },
1382
1383 SetLocalTime {
1384 local_time: Timestamp,
1385 #[debug(skip)]
1386 callback: Sender<()>,
1387 },
1388
1389 AssertBefore {
1390 timestamp: Timestamp,
1391 #[debug(skip)]
1392 callback: Sender<Result<(), ExecutionError>>,
1393 },
1394
1395 AddCreatedBlob {
1396 blob: crate::Blob,
1397 #[debug(skip)]
1398 callback: Sender<()>,
1399 },
1400
1401 ValidationRound {
1402 round: Option<u32>,
1403 #[debug(skip)]
1404 callback: Sender<Option<u32>>,
1405 },
1406
1407 AllowApplicationLogs {
1408 #[debug(skip)]
1409 callback: Sender<bool>,
1410 },
1411
1412 #[cfg(web)]
1414 Log {
1415 message: String,
1416 level: tracing::log::Level,
1417 },
1418}