1use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15 Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19 ownership::ChainOwnership,
20 time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27 execution::UserAction,
28 runtime::ContractSyncRuntime,
29 system::{CreateApplicationResult, OpenChainConfig},
30 util::{OracleResponseExt as _, RespondExt as _},
31 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
32 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
33 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
34 ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
35};
36
37pub struct ExecutionStateActor<'a, C> {
39 state: &'a mut ExecutionStateView<C>,
40 txn_tracker: &'a mut TransactionTracker,
41 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
42}
43
44#[cfg(with_metrics)]
45mod metrics {
46 use std::sync::LazyLock;
47
48 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
49 use prometheus::HistogramVec;
50
51 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
53 register_histogram_vec(
54 "load_contract_latency",
55 "Load contract latency",
56 &[],
57 exponential_bucket_latencies(250.0),
58 )
59 });
60
61 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
63 register_histogram_vec(
64 "load_service_latency",
65 "Load service latency",
66 &[],
67 exponential_bucket_latencies(250.0),
68 )
69 });
70}
71
72pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
73
74impl<'a, C> ExecutionStateActor<'a, C>
75where
76 C: Context + Clone + 'static,
77 C::Extra: ExecutionRuntimeContext,
78{
79 pub fn new(
81 state: &'a mut ExecutionStateView<C>,
82 txn_tracker: &'a mut TransactionTracker,
83 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
84 ) -> Self {
85 Self {
86 state,
87 txn_tracker,
88 resource_controller,
89 }
90 }
91
92 pub(crate) async fn load_contract(
93 &mut self,
94 id: ApplicationId,
95 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
96 #[cfg(with_metrics)]
97 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
98 let blob_id = id.description_blob_id();
99 let description = match self.txn_tracker.get_blob_content(&blob_id) {
100 Some(blob) => bcs::from_bytes(blob.bytes())?,
101 None => {
102 self.state
103 .system
104 .describe_application(id, self.txn_tracker)
105 .await?
106 }
107 };
108 let code = self
109 .state
110 .context()
111 .extra()
112 .get_user_contract(&description, self.txn_tracker)
113 .await?;
114 Ok((code, description))
115 }
116
117 pub(crate) async fn load_service(
118 &mut self,
119 id: ApplicationId,
120 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
121 #[cfg(with_metrics)]
122 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
123 let blob_id = id.description_blob_id();
124 let description = match self.txn_tracker.get_blob_content(&blob_id) {
125 Some(blob) => bcs::from_bytes(blob.bytes())?,
126 None => {
127 self.state
128 .system
129 .describe_application(id, self.txn_tracker)
130 .await?
131 }
132 };
133 let code = self
134 .state
135 .context()
136 .extra()
137 .get_user_service(&description, self.txn_tracker)
138 .await?;
139 Ok((code, description))
140 }
141
142 pub(crate) async fn handle_request(
144 &mut self,
145 request: ExecutionRequest,
146 ) -> Result<(), ExecutionError> {
147 use ExecutionRequest::*;
148 match request {
149 #[cfg(not(web))]
150 LoadContract { id, callback } => {
151 let (code, description) = self.load_contract(id).await?;
152 callback.respond((code, description))
153 }
154 #[cfg(not(web))]
155 LoadService { id, callback } => {
156 let (code, description) = self.load_service(id).await?;
157 callback.respond((code, description))
158 }
159
160 ChainBalance { callback } => {
161 let balance = *self.state.system.balance.get();
162 callback.respond(balance);
163 }
164
165 OwnerBalance { owner, callback } => {
166 let balance = self
167 .state
168 .system
169 .balances
170 .get(&owner)
171 .await?
172 .unwrap_or_default();
173 callback.respond(balance);
174 }
175
176 OwnerBalances { callback } => {
177 let balances = self.state.system.balances.index_values().await?;
178 callback.respond(balances.into_iter().collect());
179 }
180
181 BalanceOwners { callback } => {
182 let owners = self.state.system.balances.indices().await?;
183 callback.respond(owners);
184 }
185
186 Transfer {
187 source,
188 destination,
189 amount,
190 signer,
191 application_id,
192 callback,
193 } => {
194 let maybe_message = self
195 .state
196 .system
197 .transfer(signer, Some(application_id), source, destination, amount)
198 .await?;
199 self.txn_tracker.add_outgoing_messages(maybe_message);
200 callback.respond(());
201 }
202
203 Claim {
204 source,
205 destination,
206 amount,
207 signer,
208 application_id,
209 callback,
210 } => {
211 let maybe_message = self
212 .state
213 .system
214 .claim(
215 signer,
216 Some(application_id),
217 source.owner,
218 source.chain_id,
219 destination,
220 amount,
221 )
222 .await?;
223 self.txn_tracker.add_outgoing_messages(maybe_message);
224 callback.respond(());
225 }
226
227 SystemTimestamp { callback } => {
228 let timestamp = *self.state.system.timestamp.get();
229 callback.respond(timestamp);
230 }
231
232 ChainOwnership { callback } => {
233 let ownership = self.state.system.ownership.get().clone();
234 callback.respond(ownership);
235 }
236
237 ContainsKey { id, key, callback } => {
238 let view = self.state.users.try_load_entry(&id).await?;
239 let result = match view {
240 Some(view) => view.contains_key(&key).await?,
241 None => false,
242 };
243 callback.respond(result);
244 }
245
246 ContainsKeys { id, keys, callback } => {
247 let view = self.state.users.try_load_entry(&id).await?;
248 let result = match view {
249 Some(view) => view.contains_keys(&keys).await?,
250 None => vec![false; keys.len()],
251 };
252 callback.respond(result);
253 }
254
255 ReadMultiValuesBytes { id, keys, callback } => {
256 let view = self.state.users.try_load_entry(&id).await?;
257 let values = match view {
258 Some(view) => view.multi_get(&keys).await?,
259 None => vec![None; keys.len()],
260 };
261 callback.respond(values);
262 }
263
264 ReadValueBytes { id, key, callback } => {
265 let view = self.state.users.try_load_entry(&id).await?;
266 let result = match view {
267 Some(view) => view.get(&key).await?,
268 None => None,
269 };
270 callback.respond(result);
271 }
272
273 FindKeysByPrefix {
274 id,
275 key_prefix,
276 callback,
277 } => {
278 let view = self.state.users.try_load_entry(&id).await?;
279 let result = match view {
280 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
281 None => Vec::new(),
282 };
283 callback.respond(result);
284 }
285
286 FindKeyValuesByPrefix {
287 id,
288 key_prefix,
289 callback,
290 } => {
291 let view = self.state.users.try_load_entry(&id).await?;
292 let result = match view {
293 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
294 None => Vec::new(),
295 };
296 callback.respond(result);
297 }
298
299 WriteBatch {
300 id,
301 batch,
302 callback,
303 } => {
304 let mut view = self.state.users.try_load_entry_mut(&id).await?;
305 view.write_batch(batch).await?;
306 callback.respond(());
307 }
308
309 OpenChain {
310 ownership,
311 balance,
312 parent_id,
313 block_height,
314 application_permissions,
315 timestamp,
316 callback,
317 } => {
318 let config = OpenChainConfig {
319 ownership,
320 balance,
321 application_permissions,
322 };
323 let chain_id = self
324 .state
325 .system
326 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
327 .await?;
328 callback.respond(chain_id);
329 }
330
331 CloseChain {
332 application_id,
333 callback,
334 } => {
335 let app_permissions = self.state.system.application_permissions.get();
336 if !app_permissions.can_close_chain(&application_id) {
337 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
338 } else {
339 self.state.system.close_chain().await?;
340 callback.respond(Ok(()));
341 }
342 }
343
344 ChangeApplicationPermissions {
345 application_id,
346 application_permissions,
347 callback,
348 } => {
349 let app_permissions = self.state.system.application_permissions.get();
350 if !app_permissions.can_change_application_permissions(&application_id) {
351 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
352 } else {
353 self.state
354 .system
355 .application_permissions
356 .set(application_permissions);
357 callback.respond(Ok(()));
358 }
359 }
360
361 CreateApplication {
362 chain_id,
363 block_height,
364 module_id,
365 parameters,
366 required_application_ids,
367 callback,
368 } => {
369 let create_application_result = self
370 .state
371 .system
372 .create_application(
373 chain_id,
374 block_height,
375 module_id,
376 parameters,
377 required_application_ids,
378 self.txn_tracker,
379 )
380 .await?;
381 callback.respond(Ok(create_application_result));
382 }
383
384 PerformHttpRequest {
385 request,
386 http_responses_are_oracle_responses,
387 callback,
388 } => {
389 let system = &mut self.state.system;
390 let response = self
391 .txn_tracker
392 .oracle(|| async {
393 let headers = request
394 .headers
395 .into_iter()
396 .map(|http::Header { name, value }| {
397 Ok((name.parse()?, value.try_into()?))
398 })
399 .collect::<Result<HeaderMap, ExecutionError>>()?;
400
401 let url = Url::parse(&request.url)?;
402 let host = url
403 .host_str()
404 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
405
406 let (_epoch, committee) = system
407 .current_committee()
408 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
409 let allowed_hosts = &committee.policy().http_request_allow_list;
410
411 ensure!(
412 allowed_hosts.contains(host),
413 ExecutionError::UnauthorizedHttpRequest(url)
414 );
415
416 let request = Client::new()
417 .request(request.method.into(), url)
418 .body(request.body)
419 .headers(headers);
420 #[cfg(not(web))]
421 let request = request.timeout(linera_base::time::Duration::from_millis(
422 committee.policy().http_request_timeout_ms,
423 ));
424
425 let response = request.send().await?;
426
427 let mut response_size_limit =
428 committee.policy().maximum_http_response_bytes;
429
430 if http_responses_are_oracle_responses {
431 response_size_limit = response_size_limit
432 .min(committee.policy().maximum_oracle_response_bytes);
433 }
434 Ok(OracleResponse::Http(
435 Self::receive_http_response(response, response_size_limit).await?,
436 ))
437 })
438 .await?
439 .to_http_response()?;
440 callback.respond(response);
441 }
442
443 ReadBlobContent { blob_id, callback } => {
444 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
445 content.clone()
446 } else {
447 let content = self.state.system.read_blob_content(blob_id).await?;
448 if blob_id.blob_type == BlobType::Data {
449 self.resource_controller
450 .with_state(&mut self.state.system)
451 .await?
452 .track_blob_read(content.bytes().len() as u64)?;
453 }
454 self.state
455 .system
456 .blob_used(self.txn_tracker, blob_id)
457 .await?;
458 content
459 };
460 callback.respond(content)
461 }
462
463 AssertBlobExists { blob_id, callback } => {
464 self.state.system.assert_blob_exists(blob_id).await?;
465 if blob_id.blob_type == BlobType::Data {
467 self.resource_controller
468 .with_state(&mut self.state.system)
469 .await?
470 .track_blob_read(0)?;
471 }
472 let is_new = self
473 .state
474 .system
475 .blob_used(self.txn_tracker, blob_id)
476 .await?;
477 if is_new {
478 self.txn_tracker
479 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
480 }
481 callback.respond(());
482 }
483
484 Emit {
485 stream_id,
486 value,
487 callback,
488 } => {
489 let count = self
490 .state
491 .stream_event_counts
492 .get_mut_or_default(&stream_id)
493 .await?;
494 let index = *count;
495 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
496 self.txn_tracker.add_event(stream_id, index, value);
497 callback.respond(index)
498 }
499
500 ReadEvent { event_id, callback } => {
501 let extra = self.state.context().extra();
502 let event = self
503 .txn_tracker
504 .oracle(|| async {
505 let event = extra
506 .get_event(event_id.clone())
507 .await?
508 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
509 Ok(OracleResponse::Event(event_id.clone(), event))
510 })
511 .await?
512 .to_event(&event_id)?;
513 callback.respond(event);
514 }
515
516 SubscribeToEvents {
517 chain_id,
518 stream_id,
519 subscriber_app_id,
520 callback,
521 } => {
522 let subscriptions = self
523 .state
524 .system
525 .event_subscriptions
526 .get_mut_or_default(&(chain_id, stream_id.clone()))
527 .await?;
528 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
529 subscriptions.next_index
530 } else {
531 0
532 };
533 self.txn_tracker.add_stream_to_process(
534 subscriber_app_id,
535 chain_id,
536 stream_id,
537 0,
538 next_index,
539 );
540 callback.respond(());
541 }
542
543 UnsubscribeFromEvents {
544 chain_id,
545 stream_id,
546 subscriber_app_id,
547 callback,
548 } => {
549 let key = (chain_id, stream_id.clone());
550 let subscriptions = self
551 .state
552 .system
553 .event_subscriptions
554 .get_mut_or_default(&key)
555 .await?;
556 subscriptions.applications.remove(&subscriber_app_id);
557 if subscriptions.applications.is_empty() {
558 self.state.system.event_subscriptions.remove(&key)?;
559 }
560 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
561 self.txn_tracker
562 .remove_stream_to_process(app_id, chain_id, stream_id);
563 }
564 callback.respond(());
565 }
566
567 GetApplicationPermissions { callback } => {
568 let app_permissions = self.state.system.application_permissions.get();
569 callback.respond(app_permissions.clone());
570 }
571
572 QueryServiceOracle {
573 deadline,
574 application_id,
575 next_block_height,
576 query,
577 callback,
578 } => {
579 let state = &mut self.state;
580 let local_time = self.txn_tracker.local_time();
581 let created_blobs = self.txn_tracker.created_blobs().clone();
582 let bytes = self
583 .txn_tracker
584 .oracle(|| async {
585 let context = QueryContext {
586 chain_id: state.context().extra().chain_id(),
587 next_block_height,
588 local_time,
589 };
590 let QueryOutcome {
591 response,
592 operations,
593 } = Box::pin(state.query_user_application_with_deadline(
594 application_id,
595 context,
596 query,
597 deadline,
598 created_blobs,
599 ))
600 .await?;
601 ensure!(
602 operations.is_empty(),
603 ExecutionError::ServiceOracleQueryOperations(operations)
604 );
605 Ok(OracleResponse::Service(response))
606 })
607 .await?
608 .to_service_response()?;
609 callback.respond(bytes);
610 }
611
612 AddOutgoingMessage { message, callback } => {
613 self.txn_tracker.add_outgoing_message(message);
614 callback.respond(());
615 }
616
617 SetLocalTime {
618 local_time,
619 callback,
620 } => {
621 self.txn_tracker.set_local_time(local_time);
622 callback.respond(());
623 }
624
625 AssertBefore {
626 timestamp,
627 callback,
628 } => {
629 let result = if !self
630 .txn_tracker
631 .replay_oracle_response(OracleResponse::Assert)?
632 {
633 let local_time = self.txn_tracker.local_time();
635 if local_time >= timestamp {
636 Err(ExecutionError::AssertBefore {
637 timestamp,
638 local_time,
639 })
640 } else {
641 Ok(())
642 }
643 } else {
644 Ok(())
645 };
646 callback.respond(result);
647 }
648
649 AddCreatedBlob { blob, callback } => {
650 self.txn_tracker.add_created_blob(blob);
651 callback.respond(());
652 }
653
654 ValidationRound { round, callback } => {
655 let validation_round = self
656 .txn_tracker
657 .oracle(|| async { Ok(OracleResponse::Round(round)) })
658 .await?
659 .to_round()?;
660 callback.respond(validation_round);
661 }
662
663 AllowApplicationLogs { callback } => {
664 let allow = self
665 .state
666 .context()
667 .extra()
668 .execution_runtime_config()
669 .allow_application_logs;
670 callback.respond(allow);
671 }
672
673 #[cfg(web)]
674 Log { message, level } => {
675 let formatted: js_sys::JsString = format!("[CONTRACT {level}] {message}").into();
677 match level {
678 tracing::log::Level::Trace | tracing::log::Level::Debug => {
679 web_sys::console::debug_1(&formatted)
680 }
681 tracing::log::Level::Info => web_sys::console::log_1(&formatted),
682 tracing::log::Level::Warn => web_sys::console::warn_1(&formatted),
683 tracing::log::Level::Error => web_sys::console::error_1(&formatted),
684 }
685 }
686 }
687
688 Ok(())
689 }
690
691 async fn process_subscriptions(
694 &mut self,
695 context: ProcessStreamsContext,
696 ) -> Result<(), ExecutionError> {
697 let mut processed = BTreeSet::new();
700 loop {
701 let to_process = self
702 .txn_tracker
703 .take_streams_to_process()
704 .into_iter()
705 .filter_map(|(app_id, updates)| {
706 let updates = updates
707 .into_iter()
708 .filter_map(|update| {
709 if !processed.insert((
710 app_id,
711 update.chain_id,
712 update.stream_id.clone(),
713 )) {
714 return None;
715 }
716 Some(update)
717 })
718 .collect::<Vec<_>>();
719 if updates.is_empty() {
720 return None;
721 }
722 Some((app_id, updates))
723 })
724 .collect::<BTreeMap<_, _>>();
725 if to_process.is_empty() {
726 return Ok(());
727 }
728 for (app_id, updates) in to_process {
729 self.run_user_action(
730 app_id,
731 UserAction::ProcessStreams(context, updates),
732 None,
733 None,
734 )
735 .await?;
736 }
737 }
738 }
739
740 pub(crate) async fn run_user_action(
741 &mut self,
742 application_id: ApplicationId,
743 action: UserAction,
744 refund_grant_to: Option<Account>,
745 grant: Option<&mut Amount>,
746 ) -> Result<(), ExecutionError> {
747 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
748 .await
749 }
750
751 pub(crate) async fn service_and_dependencies(
753 &mut self,
754 application: ApplicationId,
755 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
756 let mut stack = vec![application];
759 let mut codes = vec![];
760 let mut descriptions = vec![];
761
762 while let Some(id) = stack.pop() {
763 let (code, description) = self.load_service(id).await?;
764 stack.extend(description.required_application_ids.iter().rev().copied());
765 codes.push(code);
766 descriptions.push(description);
767 }
768
769 codes.reverse();
770 descriptions.reverse();
771
772 Ok((codes, descriptions))
773 }
774
775 async fn contract_and_dependencies(
777 &mut self,
778 application: ApplicationId,
779 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
780 let mut stack = vec![application];
783 let mut codes = vec![];
784 let mut descriptions = vec![];
785
786 while let Some(id) = stack.pop() {
787 let (code, description) = self.load_contract(id).await?;
788 stack.extend(description.required_application_ids.iter().rev().copied());
789 codes.push(code);
790 descriptions.push(description);
791 }
792
793 codes.reverse();
794 descriptions.reverse();
795
796 Ok((codes, descriptions))
797 }
798
799 async fn run_user_action_with_runtime(
800 &mut self,
801 application_id: ApplicationId,
802 action: UserAction,
803 refund_grant_to: Option<Account>,
804 grant: Option<&mut Amount>,
805 ) -> Result<(), ExecutionError> {
806 let chain_id = self.state.context().extra().chain_id();
807 let mut cloned_grant = grant.as_ref().map(|x| **x);
808 let initial_balance = self
809 .resource_controller
810 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
811 .await?
812 .balance()?;
813 let controller = ResourceController::new(
814 self.resource_controller.policy().clone(),
815 self.resource_controller.tracker,
816 initial_balance,
817 );
818 let (execution_state_sender, mut execution_state_receiver) =
819 futures::channel::mpsc::unbounded();
820
821 let (codes, descriptions): (Vec<_>, Vec<_>) =
822 self.contract_and_dependencies(application_id).await?;
823
824 let allow_application_logs = self
825 .state
826 .context()
827 .extra()
828 .execution_runtime_config()
829 .allow_application_logs;
830
831 let contract_runtime_task = self
832 .state
833 .context()
834 .extra()
835 .thread_pool()
836 .run_send(JsVec(codes), move |codes| async move {
837 let runtime = ContractSyncRuntime::new(
838 execution_state_sender,
839 chain_id,
840 refund_grant_to,
841 controller,
842 &action,
843 allow_application_logs,
844 );
845
846 for (code, description) in codes.0.into_iter().zip(descriptions) {
847 runtime.preload_contract(
848 ApplicationId::from(&description),
849 code,
850 description,
851 )?;
852 }
853
854 runtime.run_action(application_id, chain_id, action)
855 })
856 .await;
857
858 while let Some(request) = execution_state_receiver.next().await {
859 self.handle_request(request).await?;
860 }
861
862 let (result, controller) = contract_runtime_task.await??;
863
864 self.txn_tracker.add_operation_result(result);
865
866 self.resource_controller
867 .with_state_and_grant(&mut self.state.system, grant)
868 .await?
869 .merge_balance(initial_balance, controller.balance()?)?;
870 self.resource_controller.tracker = controller.tracker;
871
872 Ok(())
873 }
874
875 pub async fn execute_operation(
876 &mut self,
877 context: OperationContext,
878 operation: Operation,
879 ) -> Result<(), ExecutionError> {
880 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
881 match operation {
882 Operation::System(op) => {
883 let new_application = self
884 .state
885 .system
886 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
887 .await?;
888 if let Some((application_id, argument)) = new_application {
889 let user_action = UserAction::Instantiate(context, argument);
890 self.run_user_action(
891 application_id,
892 user_action,
893 context.refund_grant_to(),
894 None,
895 )
896 .await?;
897 }
898 }
899 Operation::User {
900 application_id,
901 bytes,
902 } => {
903 self.run_user_action(
904 application_id,
905 UserAction::Operation(context, bytes),
906 context.refund_grant_to(),
907 None,
908 )
909 .await?;
910 }
911 }
912 self.process_subscriptions(context.into()).await?;
913 Ok(())
914 }
915
916 pub async fn execute_message(
917 &mut self,
918 context: MessageContext,
919 message: Message,
920 grant: Option<&mut Amount>,
921 ) -> Result<(), ExecutionError> {
922 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
923 match message {
924 Message::System(message) => {
925 let outcome = self.state.system.execute_message(context, message).await?;
926 self.txn_tracker.add_outgoing_messages(outcome);
927 }
928 Message::User {
929 application_id,
930 bytes,
931 } => {
932 self.run_user_action(
933 application_id,
934 UserAction::Message(context, bytes),
935 context.refund_grant_to,
936 grant,
937 )
938 .await?;
939 }
940 }
941 self.process_subscriptions(context.into()).await?;
942 Ok(())
943 }
944
945 pub fn bounce_message(
946 &mut self,
947 context: MessageContext,
948 grant: Amount,
949 message: Message,
950 ) -> Result<(), ExecutionError> {
951 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
952 self.txn_tracker.add_outgoing_message(OutgoingMessage {
953 destination: context.origin,
954 authenticated_signer: context.authenticated_signer,
955 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
956 grant,
957 kind: MessageKind::Bouncing,
958 message,
959 });
960 Ok(())
961 }
962
963 pub fn send_refund(
964 &mut self,
965 context: MessageContext,
966 amount: Amount,
967 ) -> Result<(), ExecutionError> {
968 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
969 if amount.is_zero() {
970 return Ok(());
971 }
972 let Some(account) = context.refund_grant_to else {
973 return Err(ExecutionError::InternalError(
974 "Messages with grants should have a non-empty `refund_grant_to`",
975 ));
976 };
977 let message = SystemMessage::Credit {
978 amount,
979 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
980 target: account.owner,
981 };
982 self.txn_tracker.add_outgoing_message(
983 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
984 );
985 Ok(())
986 }
987
988 async fn receive_http_response(
992 response: reqwest::Response,
993 size_limit: u64,
994 ) -> Result<http::Response, ExecutionError> {
995 let status = response.status().as_u16();
996 let maybe_content_length = response.content_length();
997
998 let headers = response
999 .headers()
1000 .iter()
1001 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1002 .collect::<Vec<_>>();
1003
1004 let total_header_size = headers
1005 .iter()
1006 .map(|header| (header.name.len() + header.value.len()) as u64)
1007 .sum();
1008
1009 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1010 ExecutionError::HttpResponseSizeLimitExceeded {
1011 limit: size_limit,
1012 size: total_header_size,
1013 },
1014 )?;
1015
1016 if let Some(content_length) = maybe_content_length {
1017 if content_length > remaining_bytes {
1018 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1019 limit: size_limit,
1020 size: content_length + total_header_size,
1021 });
1022 }
1023 }
1024
1025 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1026 let mut body_stream = response.bytes_stream();
1027
1028 while let Some(bytes) = body_stream.next().await.transpose()? {
1029 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1030 ExecutionError::HttpResponseSizeLimitExceeded {
1031 limit: size_limit,
1032 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1033 },
1034 )?;
1035
1036 body.extend(&bytes);
1037 }
1038
1039 Ok(http::Response {
1040 status,
1041 headers,
1042 body,
1043 })
1044 }
1045}
1046
1047#[derive(Debug)]
1049pub enum ExecutionRequest {
1050 #[cfg(not(web))]
1051 LoadContract {
1052 id: ApplicationId,
1053 #[debug(skip)]
1054 callback: Sender<(UserContractCode, ApplicationDescription)>,
1055 },
1056
1057 #[cfg(not(web))]
1058 LoadService {
1059 id: ApplicationId,
1060 #[debug(skip)]
1061 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1062 },
1063
1064 ChainBalance {
1065 #[debug(skip)]
1066 callback: Sender<Amount>,
1067 },
1068
1069 OwnerBalance {
1070 owner: AccountOwner,
1071 #[debug(skip)]
1072 callback: Sender<Amount>,
1073 },
1074
1075 OwnerBalances {
1076 #[debug(skip)]
1077 callback: Sender<Vec<(AccountOwner, Amount)>>,
1078 },
1079
1080 BalanceOwners {
1081 #[debug(skip)]
1082 callback: Sender<Vec<AccountOwner>>,
1083 },
1084
1085 Transfer {
1086 source: AccountOwner,
1087 destination: Account,
1088 amount: Amount,
1089 #[debug(skip_if = Option::is_none)]
1090 signer: Option<AccountOwner>,
1091 application_id: ApplicationId,
1092 #[debug(skip)]
1093 callback: Sender<()>,
1094 },
1095
1096 Claim {
1097 source: Account,
1098 destination: Account,
1099 amount: Amount,
1100 #[debug(skip_if = Option::is_none)]
1101 signer: Option<AccountOwner>,
1102 application_id: ApplicationId,
1103 #[debug(skip)]
1104 callback: Sender<()>,
1105 },
1106
1107 SystemTimestamp {
1108 #[debug(skip)]
1109 callback: Sender<Timestamp>,
1110 },
1111
1112 ChainOwnership {
1113 #[debug(skip)]
1114 callback: Sender<ChainOwnership>,
1115 },
1116
1117 ReadValueBytes {
1118 id: ApplicationId,
1119 #[debug(with = hex_debug)]
1120 key: Vec<u8>,
1121 #[debug(skip)]
1122 callback: Sender<Option<Vec<u8>>>,
1123 },
1124
1125 ContainsKey {
1126 id: ApplicationId,
1127 key: Vec<u8>,
1128 #[debug(skip)]
1129 callback: Sender<bool>,
1130 },
1131
1132 ContainsKeys {
1133 id: ApplicationId,
1134 #[debug(with = hex_vec_debug)]
1135 keys: Vec<Vec<u8>>,
1136 callback: Sender<Vec<bool>>,
1137 },
1138
1139 ReadMultiValuesBytes {
1140 id: ApplicationId,
1141 #[debug(with = hex_vec_debug)]
1142 keys: Vec<Vec<u8>>,
1143 #[debug(skip)]
1144 callback: Sender<Vec<Option<Vec<u8>>>>,
1145 },
1146
1147 FindKeysByPrefix {
1148 id: ApplicationId,
1149 #[debug(with = hex_debug)]
1150 key_prefix: Vec<u8>,
1151 #[debug(skip)]
1152 callback: Sender<Vec<Vec<u8>>>,
1153 },
1154
1155 FindKeyValuesByPrefix {
1156 id: ApplicationId,
1157 #[debug(with = hex_debug)]
1158 key_prefix: Vec<u8>,
1159 #[debug(skip)]
1160 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1161 },
1162
1163 WriteBatch {
1164 id: ApplicationId,
1165 batch: Batch,
1166 #[debug(skip)]
1167 callback: Sender<()>,
1168 },
1169
1170 OpenChain {
1171 ownership: ChainOwnership,
1172 #[debug(skip_if = Amount::is_zero)]
1173 balance: Amount,
1174 parent_id: ChainId,
1175 block_height: BlockHeight,
1176 application_permissions: ApplicationPermissions,
1177 timestamp: Timestamp,
1178 #[debug(skip)]
1179 callback: Sender<ChainId>,
1180 },
1181
1182 CloseChain {
1183 application_id: ApplicationId,
1184 #[debug(skip)]
1185 callback: Sender<Result<(), ExecutionError>>,
1186 },
1187
1188 ChangeApplicationPermissions {
1189 application_id: ApplicationId,
1190 application_permissions: ApplicationPermissions,
1191 #[debug(skip)]
1192 callback: Sender<Result<(), ExecutionError>>,
1193 },
1194
1195 CreateApplication {
1196 chain_id: ChainId,
1197 block_height: BlockHeight,
1198 module_id: ModuleId,
1199 parameters: Vec<u8>,
1200 required_application_ids: Vec<ApplicationId>,
1201 #[debug(skip)]
1202 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1203 },
1204
1205 PerformHttpRequest {
1206 request: http::Request,
1207 http_responses_are_oracle_responses: bool,
1208 #[debug(skip)]
1209 callback: Sender<http::Response>,
1210 },
1211
1212 ReadBlobContent {
1213 blob_id: BlobId,
1214 #[debug(skip)]
1215 callback: Sender<BlobContent>,
1216 },
1217
1218 AssertBlobExists {
1219 blob_id: BlobId,
1220 #[debug(skip)]
1221 callback: Sender<()>,
1222 },
1223
1224 Emit {
1225 stream_id: StreamId,
1226 #[debug(with = hex_debug)]
1227 value: Vec<u8>,
1228 #[debug(skip)]
1229 callback: Sender<u32>,
1230 },
1231
1232 ReadEvent {
1233 event_id: EventId,
1234 callback: oneshot::Sender<Vec<u8>>,
1235 },
1236
1237 SubscribeToEvents {
1238 chain_id: ChainId,
1239 stream_id: StreamId,
1240 subscriber_app_id: ApplicationId,
1241 #[debug(skip)]
1242 callback: Sender<()>,
1243 },
1244
1245 UnsubscribeFromEvents {
1246 chain_id: ChainId,
1247 stream_id: StreamId,
1248 subscriber_app_id: ApplicationId,
1249 #[debug(skip)]
1250 callback: Sender<()>,
1251 },
1252
1253 GetApplicationPermissions {
1254 #[debug(skip)]
1255 callback: Sender<ApplicationPermissions>,
1256 },
1257
1258 QueryServiceOracle {
1259 deadline: Option<Instant>,
1260 application_id: ApplicationId,
1261 next_block_height: BlockHeight,
1262 query: Vec<u8>,
1263 #[debug(skip)]
1264 callback: Sender<Vec<u8>>,
1265 },
1266
1267 AddOutgoingMessage {
1268 message: crate::OutgoingMessage,
1269 #[debug(skip)]
1270 callback: Sender<()>,
1271 },
1272
1273 SetLocalTime {
1274 local_time: Timestamp,
1275 #[debug(skip)]
1276 callback: Sender<()>,
1277 },
1278
1279 AssertBefore {
1280 timestamp: Timestamp,
1281 #[debug(skip)]
1282 callback: Sender<Result<(), ExecutionError>>,
1283 },
1284
1285 AddCreatedBlob {
1286 blob: crate::Blob,
1287 #[debug(skip)]
1288 callback: Sender<()>,
1289 },
1290
1291 ValidationRound {
1292 round: Option<u32>,
1293 #[debug(skip)]
1294 callback: Sender<Option<u32>>,
1295 },
1296
1297 AllowApplicationLogs {
1298 #[debug(skip)]
1299 callback: Sender<bool>,
1300 },
1301
1302 #[cfg(web)]
1304 Log {
1305 message: String,
1306 level: tracing::log::Level,
1307 },
1308}