1use std::{
7 collections::{BTreeMap, BTreeSet},
8 sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16 data_types::{
17 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18 Timestamp,
19 },
20 ensure, hex_debug, hex_vec_debug, http,
21 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
22 ownership::ChainOwnership,
23 time::Instant,
24};
25use linera_views::{batch::Batch, context::Context, views::View};
26use oneshot::Sender;
27use reqwest::{header::HeaderMap, Client, Url};
28use tracing::{info_span, instrument, Instrument as _};
29
30use crate::{
31 execution::UserAction,
32 runtime::ContractSyncRuntime,
33 system::{CreateApplicationResult, OpenChainConfig},
34 util::{OracleResponseExt as _, RespondExt as _},
35 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
36 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
37 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
38 ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
39};
40
41pub struct ExecutionStateActor<'a, C> {
43 state: &'a mut ExecutionStateView<C>,
44 txn_tracker: &'a mut TransactionTracker,
45 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
46}
47
48#[cfg(with_metrics)]
49mod metrics {
50 use std::sync::LazyLock;
51
52 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
53 use prometheus::HistogramVec;
54
55 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57 register_histogram_vec(
58 "load_contract_latency",
59 "Load contract latency",
60 &[],
61 exponential_bucket_latencies(250.0),
62 )
63 });
64
65 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
67 register_histogram_vec(
68 "load_service_latency",
69 "Load service latency",
70 &[],
71 exponential_bucket_latencies(250.0),
72 )
73 });
74}
75
76pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
77
78impl<'a, C> ExecutionStateActor<'a, C>
79where
80 C: Context + Clone + 'static,
81 C::Extra: ExecutionRuntimeContext,
82{
83 pub fn new(
85 state: &'a mut ExecutionStateView<C>,
86 txn_tracker: &'a mut TransactionTracker,
87 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
88 ) -> Self {
89 Self {
90 state,
91 txn_tracker,
92 resource_controller,
93 }
94 }
95
96 #[instrument(skip_all, fields(application_id = %id))]
97 pub(crate) async fn load_contract(
98 &mut self,
99 id: ApplicationId,
100 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
101 #[cfg(with_metrics)]
102 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
103 let blob_id = id.description_blob_id();
104 let description = match self.txn_tracker.get_blob_content(&blob_id) {
105 Some(blob) => bcs::from_bytes(blob.bytes())?,
106 None => {
107 self.state
108 .system
109 .describe_application(id, self.txn_tracker)
110 .await?
111 }
112 };
113 let code = self
114 .state
115 .context()
116 .extra()
117 .get_user_contract(&description, self.txn_tracker)
118 .await?;
119 Ok((code, description))
120 }
121
122 pub(crate) async fn load_service(
123 &mut self,
124 id: ApplicationId,
125 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
126 #[cfg(with_metrics)]
127 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
128 let blob_id = id.description_blob_id();
129 let description = match self.txn_tracker.get_blob_content(&blob_id) {
130 Some(blob) => bcs::from_bytes(blob.bytes())?,
131 None => {
132 self.state
133 .system
134 .describe_application(id, self.txn_tracker)
135 .await?
136 }
137 };
138 let code = self
139 .state
140 .context()
141 .extra()
142 .get_user_service(&description, self.txn_tracker)
143 .await?;
144 Ok((code, description))
145 }
146
147 #[instrument(
149 skip_all,
150 fields(request_type = %request.as_ref())
151 )]
152 pub(crate) async fn handle_request(
153 &mut self,
154 request: ExecutionRequest,
155 ) -> Result<(), ExecutionError> {
156 use ExecutionRequest::*;
157 match request {
158 #[cfg(not(web))]
159 LoadContract { id, callback } => {
160 let (code, description) = self.load_contract(id).await?;
161 callback.respond((code, description))
162 }
163 #[cfg(not(web))]
164 LoadService { id, callback } => {
165 let (code, description) = self.load_service(id).await?;
166 callback.respond((code, description))
167 }
168
169 ChainBalance { callback } => {
170 let balance = *self.state.system.balance.get();
171 callback.respond(balance);
172 }
173
174 OwnerBalance { owner, callback } => {
175 let balance = self
176 .state
177 .system
178 .balances
179 .get(&owner)
180 .await?
181 .unwrap_or_default();
182 callback.respond(balance);
183 }
184
185 OwnerBalances { callback } => {
186 let balances = self.state.system.balances.index_values().await?;
187 callback.respond(balances.into_iter().collect());
188 }
189
190 BalanceOwners { callback } => {
191 let owners = self.state.system.balances.indices().await?;
192 callback.respond(owners);
193 }
194
195 Transfer {
196 source,
197 destination,
198 amount,
199 signer,
200 application_id,
201 callback,
202 } => {
203 let maybe_message = self
204 .state
205 .system
206 .transfer(signer, Some(application_id), source, destination, amount)
207 .await?;
208 self.txn_tracker.add_outgoing_messages(maybe_message);
209 callback.respond(());
210 }
211
212 Claim {
213 source,
214 destination,
215 amount,
216 signer,
217 application_id,
218 callback,
219 } => {
220 let maybe_message = self
221 .state
222 .system
223 .claim(
224 signer,
225 Some(application_id),
226 source.owner,
227 source.chain_id,
228 destination,
229 amount,
230 )
231 .await?;
232 self.txn_tracker.add_outgoing_messages(maybe_message);
233 callback.respond(());
234 }
235
236 SystemTimestamp { callback } => {
237 let timestamp = *self.state.system.timestamp.get();
238 callback.respond(timestamp);
239 }
240
241 ChainOwnership { callback } => {
242 let ownership = self.state.system.ownership.get().await?.clone();
243 callback.respond(ownership);
244 }
245
246 ApplicationPermissions { callback } => {
247 let permissions = self
248 .state
249 .system
250 .application_permissions
251 .get()
252 .await?
253 .clone();
254 callback.respond(permissions);
255 }
256
257 ReadApplicationDescription {
258 application_id,
259 callback,
260 } => {
261 let blob_id = application_id.description_blob_id();
262 let description = match self.txn_tracker.get_blob_content(&blob_id) {
263 Some(blob) => bcs::from_bytes(blob.bytes())?,
264 None => {
265 let blob_content = self.state.system.read_blob_content(blob_id).await?;
266 self.state
267 .system
268 .blob_used(self.txn_tracker, blob_id)
269 .await?;
270 bcs::from_bytes(blob_content.bytes())?
271 }
272 };
273 callback.respond(description);
274 }
275
276 ContainsKey { id, key, callback } => {
277 let view = self.state.users.try_load_entry(&id).await?;
278 let result = match view {
279 Some(view) => view.contains_key(&key).await?,
280 None => false,
281 };
282 callback.respond(result);
283 }
284
285 ContainsKeys { id, keys, callback } => {
286 let view = self.state.users.try_load_entry(&id).await?;
287 let result = match view {
288 Some(view) => view.contains_keys(&keys).await?,
289 None => vec![false; keys.len()],
290 };
291 callback.respond(result);
292 }
293
294 ReadMultiValuesBytes { id, keys, callback } => {
295 let view = self.state.users.try_load_entry(&id).await?;
296 let values = match view {
297 Some(view) => view.multi_get(&keys).await?,
298 None => vec![None; keys.len()],
299 };
300 callback.respond(values);
301 }
302
303 ReadValueBytes { id, key, callback } => {
304 let view = self.state.users.try_load_entry(&id).await?;
305 let result = match view {
306 Some(view) => view.get(&key).await?,
307 None => None,
308 };
309 callback.respond(result);
310 }
311
312 FindKeysByPrefix {
313 id,
314 key_prefix,
315 callback,
316 } => {
317 let view = self.state.users.try_load_entry(&id).await?;
318 let result = match view {
319 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
320 None => Vec::new(),
321 };
322 callback.respond(result);
323 }
324
325 FindKeyValuesByPrefix {
326 id,
327 key_prefix,
328 callback,
329 } => {
330 let view = self.state.users.try_load_entry(&id).await?;
331 let result = match view {
332 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
333 None => Vec::new(),
334 };
335 callback.respond(result);
336 }
337
338 WriteBatch {
339 id,
340 batch,
341 callback,
342 } => {
343 let mut view = self.state.users.try_load_entry_mut(&id).await?;
344 view.write_batch(batch)?;
345 callback.respond(());
346 }
347
348 OpenChain {
349 ownership,
350 balance,
351 parent_id,
352 block_height,
353 application_permissions,
354 timestamp,
355 callback,
356 } => {
357 let config = OpenChainConfig {
358 ownership,
359 balance,
360 application_permissions,
361 };
362 let chain_id = self
363 .state
364 .system
365 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
366 .await?;
367 callback.respond(chain_id);
368 }
369
370 CloseChain {
371 application_id,
372 callback,
373 } => {
374 let app_permissions = self.state.system.application_permissions.get().await?;
375 if !app_permissions.can_close_chain(&application_id) {
376 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
377 } else {
378 self.state.system.close_chain()?;
379 callback.respond(Ok(()));
380 }
381 }
382
383 ChangeOwnership {
384 application_id,
385 ownership,
386 callback,
387 } => {
388 let app_permissions = self.state.system.application_permissions.get().await?;
389 if !app_permissions.can_close_chain(&application_id) {
390 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
391 } else {
392 self.state.system.ownership.set(ownership);
393 callback.respond(Ok(()));
394 }
395 }
396
397 ChangeApplicationPermissions {
398 application_id,
399 application_permissions,
400 callback,
401 } => {
402 let app_permissions = self.state.system.application_permissions.get().await?;
403 if !app_permissions.can_change_application_permissions(&application_id) {
404 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
405 } else {
406 self.state
407 .system
408 .application_permissions
409 .set(application_permissions);
410 callback.respond(Ok(()));
411 }
412 }
413
414 CreateApplication {
415 chain_id,
416 block_height,
417 module_id,
418 parameters,
419 required_application_ids,
420 callback,
421 } => {
422 let create_application_result = self
423 .state
424 .system
425 .create_application(
426 chain_id,
427 block_height,
428 module_id,
429 parameters,
430 required_application_ids,
431 self.txn_tracker,
432 )
433 .await?;
434 callback.respond(Ok(create_application_result));
435 }
436
437 PerformHttpRequest {
438 request,
439 http_responses_are_oracle_responses,
440 callback,
441 } => {
442 let system = &mut self.state.system;
443 let response = self
444 .txn_tracker
445 .oracle(|| async {
446 let headers = request
447 .headers
448 .into_iter()
449 .map(|http::Header { name, value }| {
450 Ok((name.parse()?, value.try_into()?))
451 })
452 .collect::<Result<HeaderMap, ExecutionError>>()?;
453
454 let url = Url::parse(&request.url)?;
455 let host = url
456 .host_str()
457 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
458
459 let (_epoch, committee) = system
460 .current_committee()
461 .await?
462 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
463 let allowed_hosts = &committee.policy().http_request_allow_list;
464
465 ensure!(
466 allowed_hosts.contains(host),
467 ExecutionError::UnauthorizedHttpRequest(url)
468 );
469
470 let request = Client::new()
471 .request(request.method.into(), url)
472 .body(request.body)
473 .headers(headers);
474 #[cfg(not(web))]
475 let request = request.timeout(linera_base::time::Duration::from_millis(
476 committee.policy().http_request_timeout_ms,
477 ));
478
479 let response = request.send().await?;
480
481 let mut response_size_limit =
482 committee.policy().maximum_http_response_bytes;
483
484 if http_responses_are_oracle_responses {
485 response_size_limit = response_size_limit
486 .min(committee.policy().maximum_oracle_response_bytes);
487 }
488 Ok(OracleResponse::Http(
489 Self::receive_http_response(response, response_size_limit).await?,
490 ))
491 })
492 .await?
493 .to_http_response()?;
494 callback.respond(response);
495 }
496
497 ReadBlobContent { blob_id, callback } => {
498 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
499 content.clone()
500 } else {
501 let content = self.state.system.read_blob_content(blob_id).await?;
502 if blob_id.blob_type == BlobType::Data {
503 self.resource_controller
504 .with_state(&mut self.state.system)
505 .await?
506 .track_blob_read(content.bytes().len() as u64)?;
507 }
508 self.state
509 .system
510 .blob_used(self.txn_tracker, blob_id)
511 .await?;
512 content
513 };
514 callback.respond(content)
515 }
516
517 AssertBlobExists { blob_id, callback } => {
518 self.state.system.assert_blob_exists(blob_id).await?;
519 if blob_id.blob_type == BlobType::Data {
521 self.resource_controller
522 .with_state(&mut self.state.system)
523 .await?
524 .track_blob_read(0)?;
525 }
526 let is_new = self
527 .state
528 .system
529 .blob_used(self.txn_tracker, blob_id)
530 .await?;
531 if is_new {
532 self.txn_tracker
533 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
534 }
535 callback.respond(());
536 }
537
538 Emit {
539 stream_id,
540 value,
541 callback,
542 } => {
543 let count = self
544 .state
545 .stream_event_counts
546 .get_mut_or_default(&stream_id)
547 .await?;
548 let index = *count;
549 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
550 self.txn_tracker.add_event(stream_id, index, value);
551 callback.respond(index)
552 }
553
554 ReadEvent { event_id, callback } => {
555 let extra = self.state.context().extra();
556 let event = self
557 .txn_tracker
558 .oracle(|| async {
559 let event = extra
560 .get_event(event_id.clone())
561 .await?
562 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
563 Ok(OracleResponse::Event(
564 event_id.clone(),
565 Arc::unwrap_or_clone(event),
566 ))
567 })
568 .await?
569 .to_event(&event_id)?;
570 callback.respond(event);
571 }
572
573 SubscribeToEvents {
574 chain_id,
575 stream_id,
576 subscriber_app_id,
577 callback,
578 } => {
579 let subscriptions = self
580 .state
581 .system
582 .event_subscriptions
583 .get_mut_or_default(&(chain_id, stream_id.clone()))
584 .await?;
585 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
586 subscriptions.next_index
587 } else {
588 0
589 };
590 self.txn_tracker.add_stream_to_process(
591 subscriber_app_id,
592 chain_id,
593 stream_id,
594 0,
595 next_index,
596 );
597 callback.respond(());
598 }
599
600 UnsubscribeFromEvents {
601 chain_id,
602 stream_id,
603 subscriber_app_id,
604 callback,
605 } => {
606 let key = (chain_id, stream_id.clone());
607 let subscriptions = self
608 .state
609 .system
610 .event_subscriptions
611 .get_mut_or_default(&key)
612 .await?;
613 subscriptions.applications.remove(&subscriber_app_id);
614 if subscriptions.applications.is_empty() {
615 self.state.system.event_subscriptions.remove(&key)?;
616 }
617 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
618 self.txn_tracker
619 .remove_stream_to_process(app_id, chain_id, stream_id);
620 }
621 callback.respond(());
622 }
623
624 GetApplicationPermissions { callback } => {
625 let app_permissions = self.state.system.application_permissions.get().await?;
626 callback.respond(app_permissions.clone());
627 }
628
629 QueryServiceOracle {
630 deadline,
631 application_id,
632 next_block_height,
633 query,
634 callback,
635 } => {
636 let state = &mut self.state;
637 let local_time = self.txn_tracker.local_time();
638 let created_blobs = self.txn_tracker.created_blobs().clone();
639 let bytes = self
640 .txn_tracker
641 .oracle(|| async {
642 let context = QueryContext {
643 chain_id: state.context().extra().chain_id(),
644 next_block_height,
645 local_time,
646 };
647 let QueryOutcome {
648 response,
649 operations,
650 } = Box::pin(state.query_user_application_with_deadline(
651 application_id,
652 context,
653 query,
654 deadline,
655 created_blobs,
656 ))
657 .await?;
658 ensure!(
659 operations.is_empty(),
660 ExecutionError::ServiceOracleQueryOperations(operations)
661 );
662 Ok(OracleResponse::Service(response))
663 })
664 .await?
665 .to_service_response()?;
666 callback.respond(bytes);
667 }
668
669 AddOutgoingMessage { message, callback } => {
670 self.txn_tracker.add_outgoing_message(message);
671 callback.respond(());
672 }
673
674 SetLocalTime {
675 local_time,
676 callback,
677 } => {
678 self.txn_tracker.set_local_time(local_time);
679 callback.respond(());
680 }
681
682 AssertBefore {
683 timestamp,
684 callback,
685 } => {
686 let result = if !self
687 .txn_tracker
688 .replay_oracle_response(OracleResponse::Assert)?
689 {
690 let local_time = self.txn_tracker.local_time();
692 if local_time >= timestamp {
693 Err(ExecutionError::AssertBefore {
694 timestamp,
695 local_time,
696 })
697 } else {
698 Ok(())
699 }
700 } else {
701 Ok(())
702 };
703 callback.respond(result);
704 }
705
706 AddCreatedBlob { blob, callback } => {
707 if self.resource_controller.is_free {
708 self.txn_tracker.mark_blob_free(blob.id());
709 }
710 self.txn_tracker.add_created_blob(blob);
711 callback.respond(());
712 }
713
714 ValidationRound { round, callback } => {
715 let validation_round = self
716 .txn_tracker
717 .oracle(|| async { Ok(OracleResponse::Round(round)) })
718 .await?
719 .to_round()?;
720 callback.respond(validation_round);
721 }
722
723 AllowApplicationLogs { callback } => {
724 let allow = self
725 .state
726 .context()
727 .extra()
728 .execution_runtime_config()
729 .allow_application_logs;
730 callback.respond(allow);
731 }
732
733 #[cfg(web)]
734 Log { message, level } => match level {
735 tracing::log::Level::Trace | tracing::log::Level::Debug => {
736 tracing::debug!(target: "user_application_log", message = %message);
737 }
738 tracing::log::Level::Info => {
739 tracing::info!(target: "user_application_log", message = %message);
740 }
741 tracing::log::Level::Warn => {
742 tracing::warn!(target: "user_application_log", message = %message);
743 }
744 tracing::log::Level::Error => {
745 tracing::error!(target: "user_application_log", message = %message);
746 }
747 },
748 }
749
750 Ok(())
751 }
752
753 #[instrument(skip_all)]
756 async fn process_subscriptions(
757 &mut self,
758 context: ProcessStreamsContext,
759 ) -> Result<(), ExecutionError> {
760 let mut processed = BTreeSet::new();
763 loop {
764 let to_process = self
765 .txn_tracker
766 .take_streams_to_process()
767 .into_iter()
768 .filter_map(|(app_id, updates)| {
769 let updates = updates
770 .into_iter()
771 .filter_map(|update| {
772 if !processed.insert((
773 app_id,
774 update.chain_id,
775 update.stream_id.clone(),
776 )) {
777 return None;
778 }
779 Some(update)
780 })
781 .collect::<Vec<_>>();
782 if updates.is_empty() {
783 return None;
784 }
785 Some((app_id, updates))
786 })
787 .collect::<BTreeMap<_, _>>();
788 if to_process.is_empty() {
789 return Ok(());
790 }
791 for (app_id, updates) in to_process {
792 self.run_user_action(
793 app_id,
794 UserAction::ProcessStreams(context, updates),
795 None,
796 None,
797 )
798 .await?;
799 }
800 }
801 }
802
803 pub(crate) async fn run_user_action(
804 &mut self,
805 application_id: ApplicationId,
806 action: UserAction,
807 refund_grant_to: Option<Account>,
808 grant: Option<&mut Amount>,
809 ) -> Result<(), ExecutionError> {
810 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
811 .await
812 }
813
814 pub(crate) async fn service_and_dependencies(
816 &mut self,
817 application: ApplicationId,
818 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
819 let mut stack = vec![application];
822 let mut codes = vec![];
823 let mut descriptions = vec![];
824
825 while let Some(id) = stack.pop() {
826 let (code, description) = self.load_service(id).await?;
827 stack.extend(description.required_application_ids.iter().rev().copied());
828 codes.push(code);
829 descriptions.push(description);
830 }
831
832 codes.reverse();
833 descriptions.reverse();
834
835 Ok((codes, descriptions))
836 }
837
838 #[instrument(skip_all, fields(application_id = %application))]
840 async fn contract_and_dependencies(
841 &mut self,
842 application: ApplicationId,
843 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
844 let mut stack = vec![application];
847 let mut codes = vec![];
848 let mut descriptions = vec![];
849
850 while let Some(id) = stack.pop() {
851 let (code, description) = self.load_contract(id).await?;
852 stack.extend(description.required_application_ids.iter().rev().copied());
853 codes.push(code);
854 descriptions.push(description);
855 }
856
857 codes.reverse();
858 descriptions.reverse();
859
860 Ok((codes, descriptions))
861 }
862
863 #[instrument(skip_all, fields(application_id = %application_id))]
864 async fn run_user_action_with_runtime(
865 &mut self,
866 application_id: ApplicationId,
867 action: UserAction,
868 refund_grant_to: Option<Account>,
869 grant: Option<&mut Amount>,
870 ) -> Result<(), ExecutionError> {
871 let chain_id = self.state.context().extra().chain_id();
872 let mut cloned_grant = grant.as_ref().map(|x| **x);
873 let initial_balance = self
874 .resource_controller
875 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
876 .await?
877 .balance()?;
878 let mut controller = ResourceController::new(
879 self.resource_controller.policy().clone(),
880 self.resource_controller.tracker,
881 initial_balance,
882 );
883 let is_free = matches!(
884 &action,
885 UserAction::Message(..) | UserAction::ProcessStreams(..)
886 ) && self
887 .resource_controller
888 .policy()
889 .is_free_app(&application_id);
890 controller.is_free = is_free;
891 self.resource_controller.is_free = is_free;
892 let (execution_state_sender, mut execution_state_receiver) =
893 futures::channel::mpsc::unbounded();
894
895 let (codes, descriptions): (Vec<_>, Vec<_>) =
896 self.contract_and_dependencies(application_id).await?;
897
898 let allow_application_logs = self
899 .state
900 .context()
901 .extra()
902 .execution_runtime_config()
903 .allow_application_logs;
904
905 let contract_runtime_task = self
906 .state
907 .context()
908 .extra()
909 .thread_pool()
910 .run_send(JsVec(codes), move |codes| async move {
911 let runtime = ContractSyncRuntime::new(
912 execution_state_sender,
913 chain_id,
914 refund_grant_to,
915 controller,
916 &action,
917 allow_application_logs,
918 );
919
920 for (code, description) in codes.0.into_iter().zip(descriptions) {
921 runtime.preload_contract(
922 ApplicationId::from(&description),
923 code,
924 description,
925 )?;
926 }
927
928 runtime.run_action(application_id, chain_id, action)
929 })
930 .await;
931
932 async {
933 while let Some(request) = execution_state_receiver.next().await {
934 self.handle_request(request).await?;
935 }
936 Ok::<(), ExecutionError>(())
937 }
938 .instrument(info_span!("handle_runtime_requests"))
939 .await?;
940
941 let (result, controller) = contract_runtime_task.await??;
942
943 self.resource_controller.is_free = false;
944
945 self.txn_tracker.add_operation_result(result);
946
947 self.resource_controller
948 .with_state_and_grant(&mut self.state.system, grant)
949 .await?
950 .merge_balance(initial_balance, controller.balance()?)?;
951 self.resource_controller.tracker = controller.tracker;
952
953 Ok(())
954 }
955
956 #[instrument(skip_all, fields(
957 chain_id = %context.chain_id,
958 block_height = %context.height,
959 operation_type = %operation.as_ref(),
960 ))]
961 pub async fn execute_operation(
962 &mut self,
963 context: OperationContext,
964 operation: Operation,
965 ) -> Result<(), ExecutionError> {
966 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
967 match operation {
968 Operation::System(op) => {
969 let new_application = self
970 .state
971 .system
972 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
973 .await?;
974 if let Some((application_id, argument)) = new_application {
975 let user_action = UserAction::Instantiate(context, argument);
976 self.run_user_action(
977 application_id,
978 user_action,
979 context.refund_grant_to(),
980 None,
981 )
982 .await?;
983 }
984 }
985 Operation::User {
986 application_id,
987 bytes,
988 } => {
989 self.run_user_action(
990 application_id,
991 UserAction::Operation(context, bytes),
992 context.refund_grant_to(),
993 None,
994 )
995 .await?;
996 }
997 }
998 self.process_subscriptions(context.into()).await?;
999 Ok(())
1000 }
1001
1002 #[instrument(skip_all, fields(
1003 chain_id = %context.chain_id,
1004 block_height = %context.height,
1005 origin = %context.origin,
1006 is_bouncing = %context.is_bouncing,
1007 message_type = %message.as_ref(),
1008 ))]
1009 pub async fn execute_message(
1010 &mut self,
1011 context: MessageContext,
1012 message: Message,
1013 grant: Option<&mut Amount>,
1014 ) -> Result<(), ExecutionError> {
1015 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1016 match message {
1017 Message::System(message) => {
1018 let outcome = self.state.system.execute_message(context, message).await?;
1019 self.txn_tracker.add_outgoing_messages(outcome);
1020 }
1021 Message::User {
1022 application_id,
1023 bytes,
1024 } => {
1025 self.run_user_action(
1026 application_id,
1027 UserAction::Message(context, bytes),
1028 context.refund_grant_to,
1029 grant,
1030 )
1031 .await?;
1032 }
1033 }
1034 self.process_subscriptions(context.into()).await?;
1035 Ok(())
1036 }
1037
1038 pub fn bounce_message(
1039 &mut self,
1040 context: MessageContext,
1041 grant: Amount,
1042 message: Message,
1043 ) -> Result<(), ExecutionError> {
1044 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1045 self.txn_tracker.add_outgoing_message(OutgoingMessage {
1046 destination: context.origin,
1047 authenticated_signer: context.authenticated_signer,
1048 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1049 grant,
1050 kind: MessageKind::Bouncing,
1051 message,
1052 });
1053 Ok(())
1054 }
1055
1056 pub fn send_refund(
1057 &mut self,
1058 context: MessageContext,
1059 amount: Amount,
1060 ) -> Result<(), ExecutionError> {
1061 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1062 if amount.is_zero() {
1063 return Ok(());
1064 }
1065 let Some(account) = context.refund_grant_to else {
1066 return Err(ExecutionError::InternalError(
1067 "Messages with grants should have a non-empty `refund_grant_to`",
1068 ));
1069 };
1070 let message = SystemMessage::Credit {
1071 amount,
1072 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
1073 target: account.owner,
1074 };
1075 self.txn_tracker.add_outgoing_message(
1076 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1077 );
1078 Ok(())
1079 }
1080
1081 async fn receive_http_response(
1085 response: reqwest::Response,
1086 size_limit: u64,
1087 ) -> Result<http::Response, ExecutionError> {
1088 let status = response.status().as_u16();
1089 let maybe_content_length = response.content_length();
1090
1091 let headers = response
1092 .headers()
1093 .iter()
1094 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1095 .collect::<Vec<_>>();
1096
1097 let total_header_size = headers
1098 .iter()
1099 .map(|header| (header.name.len() + header.value.len()) as u64)
1100 .sum();
1101
1102 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1103 ExecutionError::HttpResponseSizeLimitExceeded {
1104 limit: size_limit,
1105 size: total_header_size,
1106 },
1107 )?;
1108
1109 if let Some(content_length) = maybe_content_length {
1110 if content_length > remaining_bytes {
1111 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1112 limit: size_limit,
1113 size: content_length + total_header_size,
1114 });
1115 }
1116 }
1117
1118 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1119 let mut body_stream = response.bytes_stream();
1120
1121 while let Some(bytes) = body_stream.next().await.transpose()? {
1122 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1123 ExecutionError::HttpResponseSizeLimitExceeded {
1124 limit: size_limit,
1125 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1126 },
1127 )?;
1128
1129 body.extend(&bytes);
1130 }
1131
1132 Ok(http::Response {
1133 status,
1134 headers,
1135 body,
1136 })
1137 }
1138}
1139
1140#[derive(Debug, strum::AsRefStr)]
1142pub enum ExecutionRequest {
1143 #[cfg(not(web))]
1144 LoadContract {
1145 id: ApplicationId,
1146 #[debug(skip)]
1147 callback: Sender<(UserContractCode, ApplicationDescription)>,
1148 },
1149
1150 #[cfg(not(web))]
1151 LoadService {
1152 id: ApplicationId,
1153 #[debug(skip)]
1154 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1155 },
1156
1157 ChainBalance {
1158 #[debug(skip)]
1159 callback: Sender<Amount>,
1160 },
1161
1162 OwnerBalance {
1163 owner: AccountOwner,
1164 #[debug(skip)]
1165 callback: Sender<Amount>,
1166 },
1167
1168 OwnerBalances {
1169 #[debug(skip)]
1170 callback: Sender<Vec<(AccountOwner, Amount)>>,
1171 },
1172
1173 BalanceOwners {
1174 #[debug(skip)]
1175 callback: Sender<Vec<AccountOwner>>,
1176 },
1177
1178 Transfer {
1179 source: AccountOwner,
1180 destination: Account,
1181 amount: Amount,
1182 #[debug(skip_if = Option::is_none)]
1183 signer: Option<AccountOwner>,
1184 application_id: ApplicationId,
1185 #[debug(skip)]
1186 callback: Sender<()>,
1187 },
1188
1189 Claim {
1190 source: Account,
1191 destination: Account,
1192 amount: Amount,
1193 #[debug(skip_if = Option::is_none)]
1194 signer: Option<AccountOwner>,
1195 application_id: ApplicationId,
1196 #[debug(skip)]
1197 callback: Sender<()>,
1198 },
1199
1200 SystemTimestamp {
1201 #[debug(skip)]
1202 callback: Sender<Timestamp>,
1203 },
1204
1205 ChainOwnership {
1206 #[debug(skip)]
1207 callback: Sender<ChainOwnership>,
1208 },
1209
1210 ApplicationPermissions {
1211 #[debug(skip)]
1212 callback: Sender<ApplicationPermissions>,
1213 },
1214
1215 ReadApplicationDescription {
1216 application_id: ApplicationId,
1217 #[debug(skip)]
1218 callback: Sender<ApplicationDescription>,
1219 },
1220
1221 ReadValueBytes {
1222 id: ApplicationId,
1223 #[debug(with = hex_debug)]
1224 key: Vec<u8>,
1225 #[debug(skip)]
1226 callback: Sender<Option<Vec<u8>>>,
1227 },
1228
1229 ContainsKey {
1230 id: ApplicationId,
1231 key: Vec<u8>,
1232 #[debug(skip)]
1233 callback: Sender<bool>,
1234 },
1235
1236 ContainsKeys {
1237 id: ApplicationId,
1238 #[debug(with = hex_vec_debug)]
1239 keys: Vec<Vec<u8>>,
1240 callback: Sender<Vec<bool>>,
1241 },
1242
1243 ReadMultiValuesBytes {
1244 id: ApplicationId,
1245 #[debug(with = hex_vec_debug)]
1246 keys: Vec<Vec<u8>>,
1247 #[debug(skip)]
1248 callback: Sender<Vec<Option<Vec<u8>>>>,
1249 },
1250
1251 FindKeysByPrefix {
1252 id: ApplicationId,
1253 #[debug(with = hex_debug)]
1254 key_prefix: Vec<u8>,
1255 #[debug(skip)]
1256 callback: Sender<Vec<Vec<u8>>>,
1257 },
1258
1259 FindKeyValuesByPrefix {
1260 id: ApplicationId,
1261 #[debug(with = hex_debug)]
1262 key_prefix: Vec<u8>,
1263 #[debug(skip)]
1264 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1265 },
1266
1267 WriteBatch {
1268 id: ApplicationId,
1269 batch: Batch,
1270 #[debug(skip)]
1271 callback: Sender<()>,
1272 },
1273
1274 OpenChain {
1275 ownership: ChainOwnership,
1276 #[debug(skip_if = Amount::is_zero)]
1277 balance: Amount,
1278 parent_id: ChainId,
1279 block_height: BlockHeight,
1280 application_permissions: ApplicationPermissions,
1281 timestamp: Timestamp,
1282 #[debug(skip)]
1283 callback: Sender<ChainId>,
1284 },
1285
1286 CloseChain {
1287 application_id: ApplicationId,
1288 #[debug(skip)]
1289 callback: Sender<Result<(), ExecutionError>>,
1290 },
1291
1292 ChangeOwnership {
1293 application_id: ApplicationId,
1294 ownership: ChainOwnership,
1295 #[debug(skip)]
1296 callback: Sender<Result<(), ExecutionError>>,
1297 },
1298
1299 ChangeApplicationPermissions {
1300 application_id: ApplicationId,
1301 application_permissions: ApplicationPermissions,
1302 #[debug(skip)]
1303 callback: Sender<Result<(), ExecutionError>>,
1304 },
1305
1306 CreateApplication {
1307 chain_id: ChainId,
1308 block_height: BlockHeight,
1309 module_id: ModuleId,
1310 parameters: Vec<u8>,
1311 required_application_ids: Vec<ApplicationId>,
1312 #[debug(skip)]
1313 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1314 },
1315
1316 PerformHttpRequest {
1317 request: http::Request,
1318 http_responses_are_oracle_responses: bool,
1319 #[debug(skip)]
1320 callback: Sender<http::Response>,
1321 },
1322
1323 ReadBlobContent {
1324 blob_id: BlobId,
1325 #[debug(skip)]
1326 callback: Sender<BlobContent>,
1327 },
1328
1329 AssertBlobExists {
1330 blob_id: BlobId,
1331 #[debug(skip)]
1332 callback: Sender<()>,
1333 },
1334
1335 Emit {
1336 stream_id: StreamId,
1337 #[debug(with = hex_debug)]
1338 value: Vec<u8>,
1339 #[debug(skip)]
1340 callback: Sender<u32>,
1341 },
1342
1343 ReadEvent {
1344 event_id: EventId,
1345 callback: oneshot::Sender<Vec<u8>>,
1346 },
1347
1348 SubscribeToEvents {
1349 chain_id: ChainId,
1350 stream_id: StreamId,
1351 subscriber_app_id: ApplicationId,
1352 #[debug(skip)]
1353 callback: Sender<()>,
1354 },
1355
1356 UnsubscribeFromEvents {
1357 chain_id: ChainId,
1358 stream_id: StreamId,
1359 subscriber_app_id: ApplicationId,
1360 #[debug(skip)]
1361 callback: Sender<()>,
1362 },
1363
1364 GetApplicationPermissions {
1365 #[debug(skip)]
1366 callback: Sender<ApplicationPermissions>,
1367 },
1368
1369 QueryServiceOracle {
1370 deadline: Option<Instant>,
1371 application_id: ApplicationId,
1372 next_block_height: BlockHeight,
1373 query: Vec<u8>,
1374 #[debug(skip)]
1375 callback: Sender<Vec<u8>>,
1376 },
1377
1378 AddOutgoingMessage {
1379 message: crate::OutgoingMessage,
1380 #[debug(skip)]
1381 callback: Sender<()>,
1382 },
1383
1384 SetLocalTime {
1385 local_time: Timestamp,
1386 #[debug(skip)]
1387 callback: Sender<()>,
1388 },
1389
1390 AssertBefore {
1391 timestamp: Timestamp,
1392 #[debug(skip)]
1393 callback: Sender<Result<(), ExecutionError>>,
1394 },
1395
1396 AddCreatedBlob {
1397 blob: crate::Blob,
1398 #[debug(skip)]
1399 callback: Sender<()>,
1400 },
1401
1402 ValidationRound {
1403 round: Option<u32>,
1404 #[debug(skip)]
1405 callback: Sender<Option<u32>>,
1406 },
1407
1408 AllowApplicationLogs {
1409 #[debug(skip)]
1410 callback: Sender<bool>,
1411 },
1412
1413 #[cfg(web)]
1415 Log {
1416 message: String,
1417 level: tracing::log::Level,
1418 },
1419}