1use std::{
7 collections::{BTreeMap, BTreeSet},
8 sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16 data_types::{
17 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18 Timestamp,
19 },
20 ensure, hex_debug, hex_vec_debug, http,
21 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
22 ownership::ChainOwnership,
23 time::Instant,
24};
25use linera_views::{batch::Batch, context::Context, views::View};
26use oneshot::Sender;
27use reqwest::{header::HeaderMap, Client, Url};
28use tracing::{info_span, instrument, Instrument as _};
29
30use crate::{
31 execution::UserAction,
32 runtime::ContractSyncRuntime,
33 system::{CreateApplicationResult, OpenChainConfig},
34 util::{OracleResponseExt as _, RespondExt as _},
35 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
36 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
37 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
38 ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
39};
40
41pub struct ExecutionStateActor<'a, C> {
43 state: &'a mut ExecutionStateView<C>,
44 txn_tracker: &'a mut TransactionTracker,
45 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
46}
47
48#[cfg(with_metrics)]
49mod metrics {
50 use std::sync::LazyLock;
51
52 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
53 use prometheus::HistogramVec;
54
55 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57 register_histogram_vec(
58 "load_contract_latency",
59 "Load contract latency",
60 &[],
61 exponential_bucket_latencies(250.0),
62 )
63 });
64
65 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
67 register_histogram_vec(
68 "load_service_latency",
69 "Load service latency",
70 &[],
71 exponential_bucket_latencies(250.0),
72 )
73 });
74}
75
76pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
77
78impl<'a, C> ExecutionStateActor<'a, C>
79where
80 C: Context + Clone + 'static,
81 C::Extra: ExecutionRuntimeContext,
82{
83 pub fn new(
85 state: &'a mut ExecutionStateView<C>,
86 txn_tracker: &'a mut TransactionTracker,
87 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
88 ) -> Self {
89 Self {
90 state,
91 txn_tracker,
92 resource_controller,
93 }
94 }
95
96 #[instrument(skip_all, fields(application_id = %id))]
97 pub(crate) async fn load_contract(
98 &mut self,
99 id: ApplicationId,
100 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
101 #[cfg(with_metrics)]
102 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
103 let blob_id = id.description_blob_id();
104 let description = match self.txn_tracker.get_blob_content(&blob_id) {
105 Some(blob) => bcs::from_bytes(blob.bytes())?,
106 None => {
107 self.state
108 .system
109 .describe_application(id, self.txn_tracker)
110 .await?
111 }
112 };
113 let code = self
114 .state
115 .context()
116 .extra()
117 .get_user_contract(&description, self.txn_tracker)
118 .await?;
119 Ok((code, description))
120 }
121
122 pub(crate) async fn load_service(
123 &mut self,
124 id: ApplicationId,
125 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
126 #[cfg(with_metrics)]
127 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
128 let blob_id = id.description_blob_id();
129 let description = match self.txn_tracker.get_blob_content(&blob_id) {
130 Some(blob) => bcs::from_bytes(blob.bytes())?,
131 None => {
132 self.state
133 .system
134 .describe_application(id, self.txn_tracker)
135 .await?
136 }
137 };
138 let code = self
139 .state
140 .context()
141 .extra()
142 .get_user_service(&description, self.txn_tracker)
143 .await?;
144 Ok((code, description))
145 }
146
147 #[instrument(
149 skip_all,
150 fields(request_type = %request.as_ref())
151 )]
152 pub(crate) async fn handle_request(
153 &mut self,
154 request: ExecutionRequest,
155 ) -> Result<(), ExecutionError> {
156 use ExecutionRequest::*;
157 match request {
158 #[cfg(not(web))]
159 LoadContract { id, callback } => {
160 let (code, description) = self.load_contract(id).await?;
161 callback.respond((code, description))
162 }
163 #[cfg(not(web))]
164 LoadService { id, callback } => {
165 let (code, description) = self.load_service(id).await?;
166 callback.respond((code, description))
167 }
168
169 ChainBalance { callback } => {
170 let balance = *self.state.system.balance.get();
171 callback.respond(balance);
172 }
173
174 OwnerBalance { owner, callback } => {
175 let balance = self
176 .state
177 .system
178 .balances
179 .get(&owner)
180 .await?
181 .unwrap_or_default();
182 callback.respond(balance);
183 }
184
185 OwnerBalances { callback } => {
186 let balances = self.state.system.balances.index_values().await?;
187 callback.respond(balances.into_iter().collect());
188 }
189
190 BalanceOwners { callback } => {
191 let owners = self.state.system.balances.indices().await?;
192 callback.respond(owners);
193 }
194
195 Transfer {
196 source,
197 destination,
198 amount,
199 signer,
200 application_id,
201 callback,
202 } => {
203 let maybe_message = self
204 .state
205 .system
206 .transfer(signer, Some(application_id), source, destination, amount)
207 .await?;
208 self.txn_tracker.add_outgoing_messages(maybe_message);
209 callback.respond(());
210 }
211
212 Claim {
213 source,
214 destination,
215 amount,
216 signer,
217 application_id,
218 callback,
219 } => {
220 let maybe_message = self
221 .state
222 .system
223 .claim(
224 signer,
225 Some(application_id),
226 source.owner,
227 source.chain_id,
228 destination,
229 amount,
230 )
231 .await?;
232 self.txn_tracker.add_outgoing_messages(maybe_message);
233 callback.respond(());
234 }
235
236 SystemTimestamp { callback } => {
237 let timestamp = *self.state.system.timestamp.get();
238 callback.respond(timestamp);
239 }
240
241 ChainOwnership { callback } => {
242 let ownership = self.state.system.ownership.get().await?.clone();
243 callback.respond(ownership);
244 }
245
246 ApplicationPermissions { callback } => {
247 let permissions = self
248 .state
249 .system
250 .application_permissions
251 .get()
252 .await?
253 .clone();
254 callback.respond(permissions);
255 }
256
257 ReadApplicationDescription {
258 application_id,
259 callback,
260 } => {
261 let blob_id = application_id.description_blob_id();
262 let description = match self.txn_tracker.get_blob_content(&blob_id) {
263 Some(blob) => bcs::from_bytes(blob.bytes())?,
264 None => {
265 let blob_content = self.state.system.read_blob_content(blob_id).await?;
266 self.state
267 .system
268 .blob_used(self.txn_tracker, blob_id)
269 .await?;
270 bcs::from_bytes(blob_content.bytes())?
271 }
272 };
273 callback.respond(description);
274 }
275
276 ContainsKey { id, key, callback } => {
277 let view = self.state.users.try_load_entry(&id).await?;
278 let result = match view {
279 Some(view) => view.contains_key(&key).await?,
280 None => false,
281 };
282 callback.respond(result);
283 }
284
285 ContainsKeys { id, keys, callback } => {
286 let view = self.state.users.try_load_entry(&id).await?;
287 let result = match view {
288 Some(view) => view.contains_keys(&keys).await?,
289 None => vec![false; keys.len()],
290 };
291 callback.respond(result);
292 }
293
294 ReadMultiValuesBytes { id, keys, callback } => {
295 let view = self.state.users.try_load_entry(&id).await?;
296 let values = match view {
297 Some(view) => view.multi_get(&keys).await?,
298 None => vec![None; keys.len()],
299 };
300 callback.respond(values);
301 }
302
303 ReadValueBytes { id, key, callback } => {
304 let view = self.state.users.try_load_entry(&id).await?;
305 let result = match view {
306 Some(view) => view.get(&key).await?,
307 None => None,
308 };
309 callback.respond(result);
310 }
311
312 FindKeysByPrefix {
313 id,
314 key_prefix,
315 callback,
316 } => {
317 let view = self.state.users.try_load_entry(&id).await?;
318 let result = match view {
319 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
320 None => Vec::new(),
321 };
322 callback.respond(result);
323 }
324
325 FindKeyValuesByPrefix {
326 id,
327 key_prefix,
328 callback,
329 } => {
330 let view = self.state.users.try_load_entry(&id).await?;
331 let result = match view {
332 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
333 None => Vec::new(),
334 };
335 callback.respond(result);
336 }
337
338 WriteBatch {
339 id,
340 batch,
341 callback,
342 } => {
343 let mut view = self.state.users.try_load_entry_mut(&id).await?;
344 view.write_batch(batch)?;
345 callback.respond(());
346 }
347
348 OpenChain {
349 ownership,
350 balance,
351 parent_id,
352 block_height,
353 application_permissions,
354 timestamp,
355 callback,
356 } => {
357 let config = OpenChainConfig {
358 ownership,
359 balance,
360 application_permissions,
361 };
362 let chain_id = self
363 .state
364 .system
365 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
366 .await?;
367 callback.respond(chain_id);
368 }
369
370 CloseChain {
371 application_id,
372 callback,
373 } => {
374 let app_permissions = self.state.system.application_permissions.get().await?;
375 if !app_permissions.can_close_chain(&application_id) {
376 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
377 } else {
378 self.state.system.close_chain()?;
379 callback.respond(Ok(()));
380 }
381 }
382
383 ChangeOwnership {
384 application_id,
385 ownership,
386 callback,
387 } => {
388 let app_permissions = self.state.system.application_permissions.get().await?;
389 if !app_permissions.can_close_chain(&application_id) {
390 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
391 } else {
392 self.state.system.ownership.set(ownership);
393 callback.respond(Ok(()));
394 }
395 }
396
397 ChangeApplicationPermissions {
398 application_id,
399 application_permissions,
400 callback,
401 } => {
402 let app_permissions = self.state.system.application_permissions.get().await?;
403 if !app_permissions.can_change_application_permissions(&application_id) {
404 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
405 } else {
406 self.state
407 .system
408 .application_permissions
409 .set(application_permissions);
410 callback.respond(Ok(()));
411 }
412 }
413
414 CreateApplication {
415 chain_id,
416 block_height,
417 module_id,
418 parameters,
419 required_application_ids,
420 callback,
421 } => {
422 let create_application_result = self
423 .state
424 .system
425 .create_application(
426 chain_id,
427 block_height,
428 module_id,
429 parameters,
430 required_application_ids,
431 self.txn_tracker,
432 )
433 .await?;
434 callback.respond(Ok(create_application_result));
435 }
436
437 PerformHttpRequest {
438 request,
439 http_responses_are_oracle_responses,
440 callback,
441 } => {
442 let system = &mut self.state.system;
443 let response = self
444 .txn_tracker
445 .oracle(|| async {
446 let headers = request
447 .headers
448 .into_iter()
449 .map(|http::Header { name, value }| {
450 Ok((name.parse()?, value.try_into()?))
451 })
452 .collect::<Result<HeaderMap, ExecutionError>>()?;
453
454 let url = Url::parse(&request.url)?;
455 let host = url
456 .host_str()
457 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
458
459 let (_epoch, committee) = system
460 .current_committee()
461 .await?
462 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
463 let allowed_hosts = &committee.policy().http_request_allow_list;
464
465 ensure!(
466 allowed_hosts.contains(host),
467 ExecutionError::UnauthorizedHttpRequest(url)
468 );
469
470 let request = Client::new()
471 .request(request.method.into(), url)
472 .body(request.body)
473 .headers(headers);
474 #[cfg(not(web))]
475 let request = request.timeout(linera_base::time::Duration::from_millis(
476 committee.policy().http_request_timeout_ms,
477 ));
478
479 let response = request.send().await?;
480
481 let mut response_size_limit =
482 committee.policy().maximum_http_response_bytes;
483
484 if http_responses_are_oracle_responses {
485 response_size_limit = response_size_limit
486 .min(committee.policy().maximum_oracle_response_bytes);
487 }
488 Ok(OracleResponse::Http(
489 Self::receive_http_response(response, response_size_limit).await?,
490 ))
491 })
492 .await?
493 .to_http_response()?;
494 callback.respond(response);
495 }
496
497 ReadBlobContent { blob_id, callback } => {
498 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
499 content.clone()
500 } else {
501 let content = self.state.system.read_blob_content(blob_id).await?;
502 if blob_id.blob_type == BlobType::Data {
503 self.resource_controller
504 .with_state(&mut self.state.system)
505 .await?
506 .track_blob_read(content.bytes().len() as u64)?;
507 }
508 self.state
509 .system
510 .blob_used(self.txn_tracker, blob_id)
511 .await?;
512 content
513 };
514 callback.respond(content)
515 }
516
517 AssertBlobExists { blob_id, callback } => {
518 self.state.system.assert_blob_exists(blob_id).await?;
519 if blob_id.blob_type == BlobType::Data {
521 self.resource_controller
522 .with_state(&mut self.state.system)
523 .await?
524 .track_blob_read(0)?;
525 }
526 let is_new = self
527 .state
528 .system
529 .blob_used(self.txn_tracker, blob_id)
530 .await?;
531 if is_new {
532 self.txn_tracker
533 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
534 }
535 callback.respond(());
536 }
537
538 Emit {
539 stream_id,
540 value,
541 callback,
542 } => {
543 let count = self
544 .state
545 .stream_event_counts
546 .get_mut_or_default(&stream_id)
547 .await?;
548 let index = *count;
549 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
550 self.txn_tracker.add_event(stream_id, index, value);
551 callback.respond(index)
552 }
553
554 ReadEvent { event_id, callback } => {
555 let extra = self.state.context().extra();
556 let event = self
557 .txn_tracker
558 .oracle(|| async {
559 let event = extra
560 .get_event(event_id.clone())
561 .await?
562 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
563 Ok(OracleResponse::Event(
564 event_id.clone(),
565 Arc::unwrap_or_clone(event),
566 ))
567 })
568 .await?
569 .to_event(&event_id)?;
570 callback.respond(event);
571 }
572
573 SubscribeToEvents {
574 chain_id,
575 stream_id,
576 subscriber_app_id,
577 callback,
578 } => {
579 let subscriptions = self
580 .state
581 .system
582 .event_subscriptions
583 .get_mut_or_default(&(chain_id, stream_id.clone()))
584 .await?;
585 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
586 subscriptions.next_index
587 } else {
588 0
589 };
590 self.txn_tracker.add_stream_to_process(
591 subscriber_app_id,
592 chain_id,
593 stream_id,
594 0,
595 next_index,
596 );
597 callback.respond(());
598 }
599
600 UnsubscribeFromEvents {
601 chain_id,
602 stream_id,
603 subscriber_app_id,
604 callback,
605 } => {
606 let key = (chain_id, stream_id.clone());
607 let subscriptions = self
608 .state
609 .system
610 .event_subscriptions
611 .get_mut_or_default(&key)
612 .await?;
613 subscriptions.applications.remove(&subscriber_app_id);
614 if subscriptions.applications.is_empty() {
615 self.state.system.event_subscriptions.remove(&key)?;
616 }
617 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
618 self.txn_tracker
619 .remove_stream_to_process(app_id, chain_id, stream_id);
620 }
621 callback.respond(());
622 }
623
624 GetApplicationPermissions { callback } => {
625 let app_permissions = self.state.system.application_permissions.get().await?;
626 callback.respond(app_permissions.clone());
627 }
628
629 QueryServiceOracle {
630 deadline,
631 application_id,
632 next_block_height,
633 query,
634 callback,
635 } => {
636 let state = &mut self.state;
637 let local_time = self.txn_tracker.local_time();
638 let created_blobs = self.txn_tracker.created_blobs().clone();
639 let bytes = self
640 .txn_tracker
641 .oracle(|| async {
642 let context = QueryContext {
643 chain_id: state.context().extra().chain_id(),
644 next_block_height,
645 local_time,
646 };
647 let QueryOutcome {
648 response,
649 operations,
650 } = Box::pin(state.query_user_application_with_deadline(
651 application_id,
652 context,
653 query,
654 deadline,
655 created_blobs,
656 ))
657 .await?;
658 ensure!(
659 operations.is_empty(),
660 ExecutionError::ServiceOracleQueryOperations(operations)
661 );
662 Ok(OracleResponse::Service(response))
663 })
664 .await?
665 .to_service_response()?;
666 callback.respond(bytes);
667 }
668
669 AddOutgoingMessage { message, callback } => {
670 self.txn_tracker.add_outgoing_message(message);
671 callback.respond(());
672 }
673
674 SetLocalTime {
675 local_time,
676 callback,
677 } => {
678 self.txn_tracker.set_local_time(local_time);
679 callback.respond(());
680 }
681
682 AssertBefore {
683 timestamp,
684 callback,
685 } => {
686 let result = if !self
687 .txn_tracker
688 .replay_oracle_response(OracleResponse::Assert)?
689 {
690 let local_time = self.txn_tracker.local_time();
692 if local_time >= timestamp {
693 Err(ExecutionError::AssertBefore {
694 timestamp,
695 local_time,
696 })
697 } else {
698 Ok(())
699 }
700 } else {
701 Ok(())
702 };
703 callback.respond(result);
704 }
705
706 AddCreatedBlob { blob, callback } => {
707 if self.resource_controller.is_free {
708 self.txn_tracker.mark_blob_free(blob.id());
709 }
710 self.txn_tracker.add_created_blob(blob);
711 callback.respond(());
712 }
713
714 ValidationRound { round, callback } => {
715 let validation_round = self
716 .txn_tracker
717 .oracle(|| async { Ok(OracleResponse::Round(round)) })
718 .await?
719 .to_round()?;
720 callback.respond(validation_round);
721 }
722
723 AllowApplicationLogs { callback } => {
724 let allow = self
725 .state
726 .context()
727 .extra()
728 .execution_runtime_config()
729 .allow_application_logs;
730 callback.respond(allow);
731 }
732
733 #[cfg(web)]
734 Log { message, level } => match level {
735 tracing::log::Level::Trace | tracing::log::Level::Debug => {
736 tracing::debug!(target: "user_application_log", message = %message);
737 }
738 tracing::log::Level::Info => {
739 tracing::info!(target: "user_application_log", message = %message);
740 }
741 tracing::log::Level::Warn => {
742 tracing::warn!(target: "user_application_log", message = %message);
743 }
744 tracing::log::Level::Error => {
745 tracing::error!(target: "user_application_log", message = %message);
746 }
747 },
748 }
749
750 Ok(())
751 }
752
753 #[instrument(skip_all)]
756 async fn process_subscriptions(
757 &mut self,
758 context: ProcessStreamsContext,
759 ) -> Result<(), ExecutionError> {
760 let mut processed = BTreeSet::new();
763 loop {
764 let to_process = self
765 .txn_tracker
766 .take_streams_to_process()
767 .into_iter()
768 .filter_map(|(app_id, updates)| {
769 let updates = updates
770 .into_iter()
771 .filter_map(|update| {
772 if !processed.insert((
773 app_id,
774 update.chain_id,
775 update.stream_id.clone(),
776 )) {
777 return None;
778 }
779 Some(update)
780 })
781 .collect::<Vec<_>>();
782 if updates.is_empty() {
783 return None;
784 }
785 Some((app_id, updates))
786 })
787 .collect::<BTreeMap<_, _>>();
788 if to_process.is_empty() {
789 return Ok(());
790 }
791 for (app_id, updates) in to_process {
792 self.run_user_action(
793 app_id,
794 UserAction::ProcessStreams(context, updates),
795 None,
796 None,
797 )
798 .await?;
799 }
800 }
801 }
802
803 pub(crate) async fn run_user_action(
804 &mut self,
805 application_id: ApplicationId,
806 action: UserAction,
807 refund_grant_to: Option<Account>,
808 grant: Option<&mut Amount>,
809 ) -> Result<(), ExecutionError> {
810 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
811 .await
812 }
813
814 pub(crate) async fn service_and_dependencies(
816 &mut self,
817 application: ApplicationId,
818 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
819 let mut stack = vec![application];
822 let mut codes = vec![];
823 let mut descriptions = vec![];
824
825 while let Some(id) = stack.pop() {
826 let (code, description) = self.load_service(id).await?;
827 stack.extend(description.required_application_ids.iter().rev().copied());
828 codes.push(code);
829 descriptions.push(description);
830 }
831
832 codes.reverse();
833 descriptions.reverse();
834
835 Ok((codes, descriptions))
836 }
837
838 #[instrument(skip_all, fields(application_id = %application))]
840 async fn contract_and_dependencies(
841 &mut self,
842 application: ApplicationId,
843 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
844 let mut stack = vec![application];
847 let mut codes = vec![];
848 let mut descriptions = vec![];
849
850 while let Some(id) = stack.pop() {
851 let (code, description) = self.load_contract(id).await?;
852 stack.extend(description.required_application_ids.iter().rev().copied());
853 codes.push(code);
854 descriptions.push(description);
855 }
856
857 codes.reverse();
858 descriptions.reverse();
859
860 Ok((codes, descriptions))
861 }
862
863 #[instrument(skip_all, fields(application_id = %application_id))]
864 async fn run_user_action_with_runtime(
865 &mut self,
866 application_id: ApplicationId,
867 action: UserAction,
868 refund_grant_to: Option<Account>,
869 grant: Option<&mut Amount>,
870 ) -> Result<(), ExecutionError> {
871 let chain_id = self.state.context().extra().chain_id();
872 let mut cloned_grant = grant.as_ref().map(|x| **x);
873 let initial_balance = self
874 .resource_controller
875 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
876 .await?
877 .balance()?;
878 let mut controller = ResourceController::new(
879 self.resource_controller.policy().clone(),
880 self.resource_controller.tracker,
881 initial_balance,
882 );
883 let is_free = matches!(
884 &action,
885 UserAction::Message(..) | UserAction::ProcessStreams(..)
886 ) && self
887 .resource_controller
888 .policy()
889 .is_free_app(&application_id);
890 controller.is_free = is_free;
891 self.resource_controller.is_free = is_free;
892 let (execution_state_sender, mut execution_state_receiver) =
893 futures::channel::mpsc::unbounded();
894
895 let (codes, descriptions): (Vec<_>, Vec<_>) =
896 self.contract_and_dependencies(application_id).await?;
897
898 let allow_application_logs = self
899 .state
900 .context()
901 .extra()
902 .execution_runtime_config()
903 .allow_application_logs;
904
905 let contract_runtime_task = self
906 .state
907 .context()
908 .extra()
909 .thread_pool()
910 .run_send(JsVec(codes), move |codes| async move {
911 let runtime = ContractSyncRuntime::new(
912 execution_state_sender,
913 chain_id,
914 refund_grant_to,
915 controller,
916 &action,
917 allow_application_logs,
918 );
919
920 for (code, description) in codes.0.into_iter().zip(descriptions) {
921 runtime.preload_contract(ApplicationId::from(&description), code, description);
922 }
923
924 runtime.run_action(application_id, chain_id, action)
925 })
926 .await;
927
928 async {
929 while let Some(request) = execution_state_receiver.next().await {
930 self.handle_request(request).await?;
931 }
932 Ok::<(), ExecutionError>(())
933 }
934 .instrument(info_span!("handle_runtime_requests"))
935 .await?;
936
937 let (result, controller) = contract_runtime_task.await??;
938
939 self.resource_controller.is_free = false;
940
941 self.txn_tracker.add_operation_result(result);
942
943 self.resource_controller
944 .with_state_and_grant(&mut self.state.system, grant)
945 .await?
946 .merge_balance(initial_balance, controller.balance()?)?;
947 self.resource_controller.tracker = controller.tracker;
948
949 Ok(())
950 }
951
952 #[instrument(skip_all, fields(
953 chain_id = %context.chain_id,
954 block_height = %context.height,
955 operation_type = %operation.as_ref(),
956 ))]
957 pub async fn execute_operation(
958 &mut self,
959 context: OperationContext,
960 operation: Operation,
961 ) -> Result<(), ExecutionError> {
962 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
963 match operation {
964 Operation::System(op) => {
965 let new_application = self
966 .state
967 .system
968 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
969 .await?;
970 if let Some((application_id, argument)) = new_application {
971 let user_action = UserAction::Instantiate(context, argument);
972 self.run_user_action(
973 application_id,
974 user_action,
975 context.refund_grant_to(),
976 None,
977 )
978 .await?;
979 }
980 }
981 Operation::User {
982 application_id,
983 bytes,
984 } => {
985 self.run_user_action(
986 application_id,
987 UserAction::Operation(context, bytes),
988 context.refund_grant_to(),
989 None,
990 )
991 .await?;
992 }
993 }
994 self.process_subscriptions(context.into()).await?;
995 Ok(())
996 }
997
998 #[instrument(skip_all, fields(
999 chain_id = %context.chain_id,
1000 block_height = %context.height,
1001 origin = %context.origin,
1002 is_bouncing = %context.is_bouncing,
1003 message_type = %message.as_ref(),
1004 ))]
1005 pub async fn execute_message(
1006 &mut self,
1007 context: MessageContext,
1008 message: Message,
1009 grant: Option<&mut Amount>,
1010 ) -> Result<(), ExecutionError> {
1011 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1012 match message {
1013 Message::System(message) => {
1014 let outcome = self.state.system.execute_message(context, message).await?;
1015 self.txn_tracker.add_outgoing_messages(outcome);
1016 }
1017 Message::User {
1018 application_id,
1019 bytes,
1020 } => {
1021 self.run_user_action(
1022 application_id,
1023 UserAction::Message(context, bytes),
1024 context.refund_grant_to,
1025 grant,
1026 )
1027 .await?;
1028 }
1029 }
1030 self.process_subscriptions(context.into()).await?;
1031 Ok(())
1032 }
1033
1034 pub fn bounce_message(
1035 &mut self,
1036 context: MessageContext,
1037 grant: Amount,
1038 message: Message,
1039 ) -> Result<(), ExecutionError> {
1040 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1041 self.txn_tracker.add_outgoing_message(OutgoingMessage {
1042 destination: context.origin,
1043 authenticated_signer: context.authenticated_signer,
1044 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1045 grant,
1046 kind: MessageKind::Bouncing,
1047 message,
1048 });
1049 Ok(())
1050 }
1051
1052 pub fn send_refund(
1053 &mut self,
1054 context: MessageContext,
1055 amount: Amount,
1056 ) -> Result<(), ExecutionError> {
1057 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1058 if amount.is_zero() {
1059 return Ok(());
1060 }
1061 let Some(account) = context.refund_grant_to else {
1062 return Err(ExecutionError::InternalError(
1063 "Messages with grants should have a non-empty `refund_grant_to`",
1064 ));
1065 };
1066 let message = SystemMessage::Credit {
1067 amount,
1068 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
1069 target: account.owner,
1070 };
1071 self.txn_tracker.add_outgoing_message(
1072 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1073 );
1074 Ok(())
1075 }
1076
1077 async fn receive_http_response(
1081 response: reqwest::Response,
1082 size_limit: u64,
1083 ) -> Result<http::Response, ExecutionError> {
1084 let status = response.status().as_u16();
1085 let maybe_content_length = response.content_length();
1086
1087 let headers = response
1088 .headers()
1089 .iter()
1090 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1091 .collect::<Vec<_>>();
1092
1093 let total_header_size = headers
1094 .iter()
1095 .map(|header| (header.name.len() + header.value.len()) as u64)
1096 .sum();
1097
1098 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1099 ExecutionError::HttpResponseSizeLimitExceeded {
1100 limit: size_limit,
1101 size: total_header_size,
1102 },
1103 )?;
1104
1105 if let Some(content_length) = maybe_content_length {
1106 if content_length > remaining_bytes {
1107 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1108 limit: size_limit,
1109 size: content_length + total_header_size,
1110 });
1111 }
1112 }
1113
1114 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1115 let mut body_stream = response.bytes_stream();
1116
1117 while let Some(bytes) = body_stream.next().await.transpose()? {
1118 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1119 ExecutionError::HttpResponseSizeLimitExceeded {
1120 limit: size_limit,
1121 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1122 },
1123 )?;
1124
1125 body.extend(&bytes);
1126 }
1127
1128 Ok(http::Response {
1129 status,
1130 headers,
1131 body,
1132 })
1133 }
1134}
1135
1136#[derive(Debug, strum::AsRefStr)]
1138pub enum ExecutionRequest {
1139 #[cfg(not(web))]
1140 LoadContract {
1141 id: ApplicationId,
1142 #[debug(skip)]
1143 callback: Sender<(UserContractCode, ApplicationDescription)>,
1144 },
1145
1146 #[cfg(not(web))]
1147 LoadService {
1148 id: ApplicationId,
1149 #[debug(skip)]
1150 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1151 },
1152
1153 ChainBalance {
1154 #[debug(skip)]
1155 callback: Sender<Amount>,
1156 },
1157
1158 OwnerBalance {
1159 owner: AccountOwner,
1160 #[debug(skip)]
1161 callback: Sender<Amount>,
1162 },
1163
1164 OwnerBalances {
1165 #[debug(skip)]
1166 callback: Sender<Vec<(AccountOwner, Amount)>>,
1167 },
1168
1169 BalanceOwners {
1170 #[debug(skip)]
1171 callback: Sender<Vec<AccountOwner>>,
1172 },
1173
1174 Transfer {
1175 source: AccountOwner,
1176 destination: Account,
1177 amount: Amount,
1178 #[debug(skip_if = Option::is_none)]
1179 signer: Option<AccountOwner>,
1180 application_id: ApplicationId,
1181 #[debug(skip)]
1182 callback: Sender<()>,
1183 },
1184
1185 Claim {
1186 source: Account,
1187 destination: Account,
1188 amount: Amount,
1189 #[debug(skip_if = Option::is_none)]
1190 signer: Option<AccountOwner>,
1191 application_id: ApplicationId,
1192 #[debug(skip)]
1193 callback: Sender<()>,
1194 },
1195
1196 SystemTimestamp {
1197 #[debug(skip)]
1198 callback: Sender<Timestamp>,
1199 },
1200
1201 ChainOwnership {
1202 #[debug(skip)]
1203 callback: Sender<ChainOwnership>,
1204 },
1205
1206 ApplicationPermissions {
1207 #[debug(skip)]
1208 callback: Sender<ApplicationPermissions>,
1209 },
1210
1211 ReadApplicationDescription {
1212 application_id: ApplicationId,
1213 #[debug(skip)]
1214 callback: Sender<ApplicationDescription>,
1215 },
1216
1217 ReadValueBytes {
1218 id: ApplicationId,
1219 #[debug(with = hex_debug)]
1220 key: Vec<u8>,
1221 #[debug(skip)]
1222 callback: Sender<Option<Vec<u8>>>,
1223 },
1224
1225 ContainsKey {
1226 id: ApplicationId,
1227 key: Vec<u8>,
1228 #[debug(skip)]
1229 callback: Sender<bool>,
1230 },
1231
1232 ContainsKeys {
1233 id: ApplicationId,
1234 #[debug(with = hex_vec_debug)]
1235 keys: Vec<Vec<u8>>,
1236 callback: Sender<Vec<bool>>,
1237 },
1238
1239 ReadMultiValuesBytes {
1240 id: ApplicationId,
1241 #[debug(with = hex_vec_debug)]
1242 keys: Vec<Vec<u8>>,
1243 #[debug(skip)]
1244 callback: Sender<Vec<Option<Vec<u8>>>>,
1245 },
1246
1247 FindKeysByPrefix {
1248 id: ApplicationId,
1249 #[debug(with = hex_debug)]
1250 key_prefix: Vec<u8>,
1251 #[debug(skip)]
1252 callback: Sender<Vec<Vec<u8>>>,
1253 },
1254
1255 FindKeyValuesByPrefix {
1256 id: ApplicationId,
1257 #[debug(with = hex_debug)]
1258 key_prefix: Vec<u8>,
1259 #[debug(skip)]
1260 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1261 },
1262
1263 WriteBatch {
1264 id: ApplicationId,
1265 batch: Batch,
1266 #[debug(skip)]
1267 callback: Sender<()>,
1268 },
1269
1270 OpenChain {
1271 ownership: ChainOwnership,
1272 #[debug(skip_if = Amount::is_zero)]
1273 balance: Amount,
1274 parent_id: ChainId,
1275 block_height: BlockHeight,
1276 application_permissions: ApplicationPermissions,
1277 timestamp: Timestamp,
1278 #[debug(skip)]
1279 callback: Sender<ChainId>,
1280 },
1281
1282 CloseChain {
1283 application_id: ApplicationId,
1284 #[debug(skip)]
1285 callback: Sender<Result<(), ExecutionError>>,
1286 },
1287
1288 ChangeOwnership {
1289 application_id: ApplicationId,
1290 ownership: ChainOwnership,
1291 #[debug(skip)]
1292 callback: Sender<Result<(), ExecutionError>>,
1293 },
1294
1295 ChangeApplicationPermissions {
1296 application_id: ApplicationId,
1297 application_permissions: ApplicationPermissions,
1298 #[debug(skip)]
1299 callback: Sender<Result<(), ExecutionError>>,
1300 },
1301
1302 CreateApplication {
1303 chain_id: ChainId,
1304 block_height: BlockHeight,
1305 module_id: ModuleId,
1306 parameters: Vec<u8>,
1307 required_application_ids: Vec<ApplicationId>,
1308 #[debug(skip)]
1309 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1310 },
1311
1312 PerformHttpRequest {
1313 request: http::Request,
1314 http_responses_are_oracle_responses: bool,
1315 #[debug(skip)]
1316 callback: Sender<http::Response>,
1317 },
1318
1319 ReadBlobContent {
1320 blob_id: BlobId,
1321 #[debug(skip)]
1322 callback: Sender<BlobContent>,
1323 },
1324
1325 AssertBlobExists {
1326 blob_id: BlobId,
1327 #[debug(skip)]
1328 callback: Sender<()>,
1329 },
1330
1331 Emit {
1332 stream_id: StreamId,
1333 #[debug(with = hex_debug)]
1334 value: Vec<u8>,
1335 #[debug(skip)]
1336 callback: Sender<u32>,
1337 },
1338
1339 ReadEvent {
1340 event_id: EventId,
1341 callback: oneshot::Sender<Vec<u8>>,
1342 },
1343
1344 SubscribeToEvents {
1345 chain_id: ChainId,
1346 stream_id: StreamId,
1347 subscriber_app_id: ApplicationId,
1348 #[debug(skip)]
1349 callback: Sender<()>,
1350 },
1351
1352 UnsubscribeFromEvents {
1353 chain_id: ChainId,
1354 stream_id: StreamId,
1355 subscriber_app_id: ApplicationId,
1356 #[debug(skip)]
1357 callback: Sender<()>,
1358 },
1359
1360 GetApplicationPermissions {
1361 #[debug(skip)]
1362 callback: Sender<ApplicationPermissions>,
1363 },
1364
1365 QueryServiceOracle {
1366 deadline: Option<Instant>,
1367 application_id: ApplicationId,
1368 next_block_height: BlockHeight,
1369 query: Vec<u8>,
1370 #[debug(skip)]
1371 callback: Sender<Vec<u8>>,
1372 },
1373
1374 AddOutgoingMessage {
1375 message: crate::OutgoingMessage,
1376 #[debug(skip)]
1377 callback: Sender<()>,
1378 },
1379
1380 SetLocalTime {
1381 local_time: Timestamp,
1382 #[debug(skip)]
1383 callback: Sender<()>,
1384 },
1385
1386 AssertBefore {
1387 timestamp: Timestamp,
1388 #[debug(skip)]
1389 callback: Sender<Result<(), ExecutionError>>,
1390 },
1391
1392 AddCreatedBlob {
1393 blob: crate::Blob,
1394 #[debug(skip)]
1395 callback: Sender<()>,
1396 },
1397
1398 ValidationRound {
1399 round: Option<u32>,
1400 #[debug(skip)]
1401 callback: Sender<Option<u32>>,
1402 },
1403
1404 AllowApplicationLogs {
1405 #[debug(skip)]
1406 callback: Sender<bool>,
1407 },
1408
1409 #[cfg(web)]
1411 Log {
1412 message: String,
1413 level: tracing::log::Level,
1414 },
1415}