1use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15 Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19 ownership::ChainOwnership,
20 time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27 execution::UserAction,
28 runtime::ContractSyncRuntime,
29 system::{CreateApplicationResult, OpenChainConfig},
30 util::{OracleResponseExt as _, RespondExt as _},
31 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeConfig,
32 ExecutionRuntimeContext, ExecutionStateView, JsVec, Message, MessageContext, MessageKind,
33 ModuleId, Operation, OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext,
34 QueryOutcome, ResourceController, SystemMessage, TransactionTracker, UserContractCode,
35 UserServiceCode,
36};
37
38pub struct ExecutionStateActor<'a, C> {
40 state: &'a mut ExecutionStateView<C>,
41 txn_tracker: &'a mut TransactionTracker,
42 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
43}
44
45#[cfg(with_metrics)]
46mod metrics {
47 use std::sync::LazyLock;
48
49 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
50 use prometheus::HistogramVec;
51
52 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54 register_histogram_vec(
55 "load_contract_latency",
56 "Load contract latency",
57 &[],
58 exponential_bucket_latencies(250.0),
59 )
60 });
61
62 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
64 register_histogram_vec(
65 "load_service_latency",
66 "Load service latency",
67 &[],
68 exponential_bucket_latencies(250.0),
69 )
70 });
71}
72
73pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
74
75impl<'a, C> ExecutionStateActor<'a, C>
76where
77 C: Context + Clone + Send + Sync + 'static,
78 C::Extra: ExecutionRuntimeContext,
79{
80 pub fn new(
82 state: &'a mut ExecutionStateView<C>,
83 txn_tracker: &'a mut TransactionTracker,
84 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
85 ) -> Self {
86 Self {
87 state,
88 txn_tracker,
89 resource_controller,
90 }
91 }
92
93 pub(crate) async fn load_contract(
94 &mut self,
95 id: ApplicationId,
96 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
97 #[cfg(with_metrics)]
98 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
99 let blob_id = id.description_blob_id();
100 let description = match self.txn_tracker.get_blob_content(&blob_id) {
101 Some(blob) => bcs::from_bytes(blob.bytes())?,
102 None => {
103 self.state
104 .system
105 .describe_application(id, self.txn_tracker)
106 .await?
107 }
108 };
109 let code = self
110 .state
111 .context()
112 .extra()
113 .get_user_contract(&description, self.txn_tracker)
114 .await?;
115 Ok((code, description))
116 }
117
118 pub(crate) async fn load_service(
119 &mut self,
120 id: ApplicationId,
121 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
122 #[cfg(with_metrics)]
123 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
124 let blob_id = id.description_blob_id();
125 let description = match self.txn_tracker.get_blob_content(&blob_id) {
126 Some(blob) => bcs::from_bytes(blob.bytes())?,
127 None => {
128 self.state
129 .system
130 .describe_application(id, self.txn_tracker)
131 .await?
132 }
133 };
134 let code = self
135 .state
136 .context()
137 .extra()
138 .get_user_service(&description, self.txn_tracker)
139 .await?;
140 Ok((code, description))
141 }
142
143 pub(crate) async fn handle_request(
145 &mut self,
146 request: ExecutionRequest,
147 ) -> Result<(), ExecutionError> {
148 use ExecutionRequest::*;
149 match request {
150 #[cfg(not(web))]
151 LoadContract { id, callback } => {
152 let (code, description) = self.load_contract(id).await?;
153 callback.respond((code, description))
154 }
155 #[cfg(not(web))]
156 LoadService { id, callback } => {
157 let (code, description) = self.load_service(id).await?;
158 callback.respond((code, description))
159 }
160
161 ChainBalance { callback } => {
162 let balance = *self.state.system.balance.get();
163 callback.respond(balance);
164 }
165
166 OwnerBalance { owner, callback } => {
167 let balance = self
168 .state
169 .system
170 .balances
171 .get(&owner)
172 .await?
173 .unwrap_or_default();
174 callback.respond(balance);
175 }
176
177 OwnerBalances { callback } => {
178 let balances = self.state.system.balances.index_values().await?;
179 callback.respond(balances.into_iter().collect());
180 }
181
182 BalanceOwners { callback } => {
183 let owners = self.state.system.balances.indices().await?;
184 callback.respond(owners);
185 }
186
187 Transfer {
188 source,
189 destination,
190 amount,
191 signer,
192 application_id,
193 callback,
194 } => {
195 let maybe_message = self
196 .state
197 .system
198 .transfer(signer, Some(application_id), source, destination, amount)
199 .await?;
200 self.txn_tracker.add_outgoing_messages(maybe_message);
201 callback.respond(());
202 }
203
204 Claim {
205 source,
206 destination,
207 amount,
208 signer,
209 application_id,
210 callback,
211 } => {
212 let maybe_message = self
213 .state
214 .system
215 .claim(
216 signer,
217 Some(application_id),
218 source.owner,
219 source.chain_id,
220 destination,
221 amount,
222 )
223 .await?;
224 self.txn_tracker.add_outgoing_messages(maybe_message);
225 callback.respond(());
226 }
227
228 SystemTimestamp { callback } => {
229 let timestamp = *self.state.system.timestamp.get();
230 callback.respond(timestamp);
231 }
232
233 ChainOwnership { callback } => {
234 let ownership = self.state.system.ownership.get().clone();
235 callback.respond(ownership);
236 }
237
238 ContainsKey { id, key, callback } => {
239 let view = self.state.users.try_load_entry(&id).await?;
240 let result = match view {
241 Some(view) => view.contains_key(&key).await?,
242 None => false,
243 };
244 callback.respond(result);
245 }
246
247 ContainsKeys { id, keys, callback } => {
248 let view = self.state.users.try_load_entry(&id).await?;
249 let result = match view {
250 Some(view) => view.contains_keys(&keys).await?,
251 None => vec![false; keys.len()],
252 };
253 callback.respond(result);
254 }
255
256 ReadMultiValuesBytes { id, keys, callback } => {
257 let view = self.state.users.try_load_entry(&id).await?;
258 let values = match view {
259 Some(view) => view.multi_get(&keys).await?,
260 None => vec![None; keys.len()],
261 };
262 callback.respond(values);
263 }
264
265 ReadValueBytes { id, key, callback } => {
266 let view = self.state.users.try_load_entry(&id).await?;
267 let result = match view {
268 Some(view) => view.get(&key).await?,
269 None => None,
270 };
271 callback.respond(result);
272 }
273
274 FindKeysByPrefix {
275 id,
276 key_prefix,
277 callback,
278 } => {
279 let view = self.state.users.try_load_entry(&id).await?;
280 let result = match view {
281 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
282 None => Vec::new(),
283 };
284 callback.respond(result);
285 }
286
287 FindKeyValuesByPrefix {
288 id,
289 key_prefix,
290 callback,
291 } => {
292 let view = self.state.users.try_load_entry(&id).await?;
293 let result = match view {
294 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
295 None => Vec::new(),
296 };
297 callback.respond(result);
298 }
299
300 WriteBatch {
301 id,
302 batch,
303 callback,
304 } => {
305 let mut view = self.state.users.try_load_entry_mut(&id).await?;
306 view.write_batch(batch).await?;
307 callback.respond(());
308 }
309
310 OpenChain {
311 ownership,
312 balance,
313 parent_id,
314 block_height,
315 application_permissions,
316 timestamp,
317 callback,
318 } => {
319 let config = OpenChainConfig {
320 ownership,
321 balance,
322 application_permissions,
323 };
324 let chain_id = self
325 .state
326 .system
327 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
328 .await?;
329 callback.respond(chain_id);
330 }
331
332 CloseChain {
333 application_id,
334 callback,
335 } => {
336 let app_permissions = self.state.system.application_permissions.get();
337 if !app_permissions.can_close_chain(&application_id) {
338 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
339 } else {
340 self.state.system.close_chain().await?;
341 callback.respond(Ok(()));
342 }
343 }
344
345 ChangeApplicationPermissions {
346 application_id,
347 application_permissions,
348 callback,
349 } => {
350 let app_permissions = self.state.system.application_permissions.get();
351 if !app_permissions.can_change_application_permissions(&application_id) {
352 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
353 } else {
354 self.state
355 .system
356 .application_permissions
357 .set(application_permissions);
358 callback.respond(Ok(()));
359 }
360 }
361
362 CreateApplication {
363 chain_id,
364 block_height,
365 module_id,
366 parameters,
367 required_application_ids,
368 callback,
369 } => {
370 let create_application_result = self
371 .state
372 .system
373 .create_application(
374 chain_id,
375 block_height,
376 module_id,
377 parameters,
378 required_application_ids,
379 self.txn_tracker,
380 )
381 .await?;
382 callback.respond(Ok(create_application_result));
383 }
384
385 PerformHttpRequest {
386 request,
387 http_responses_are_oracle_responses,
388 callback,
389 } => {
390 let system = &mut self.state.system;
391 let response = self
392 .txn_tracker
393 .oracle(|| async {
394 let headers = request
395 .headers
396 .into_iter()
397 .map(|http::Header { name, value }| {
398 Ok((name.parse()?, value.try_into()?))
399 })
400 .collect::<Result<HeaderMap, ExecutionError>>()?;
401
402 let url = Url::parse(&request.url)?;
403 let host = url
404 .host_str()
405 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
406
407 let (_epoch, committee) = system
408 .current_committee()
409 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
410 let allowed_hosts = &committee.policy().http_request_allow_list;
411
412 ensure!(
413 allowed_hosts.contains(host),
414 ExecutionError::UnauthorizedHttpRequest(url)
415 );
416
417 let request = Client::new()
418 .request(request.method.into(), url)
419 .body(request.body)
420 .headers(headers);
421 #[cfg(not(web))]
422 let request = request.timeout(linera_base::time::Duration::from_millis(
423 committee.policy().http_request_timeout_ms,
424 ));
425
426 let response = request.send().await?;
427
428 let mut response_size_limit =
429 committee.policy().maximum_http_response_bytes;
430
431 if http_responses_are_oracle_responses {
432 response_size_limit = response_size_limit
433 .min(committee.policy().maximum_oracle_response_bytes);
434 }
435 Ok(OracleResponse::Http(
436 Self::receive_http_response(response, response_size_limit).await?,
437 ))
438 })
439 .await?
440 .to_http_response()?;
441 callback.respond(response);
442 }
443
444 ReadBlobContent { blob_id, callback } => {
445 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
446 content.clone()
447 } else {
448 let content = self.state.system.read_blob_content(blob_id).await?;
449 if blob_id.blob_type == BlobType::Data {
450 self.resource_controller
451 .with_state(&mut self.state.system)
452 .await?
453 .track_blob_read(content.bytes().len() as u64)?;
454 }
455 self.state
456 .system
457 .blob_used(self.txn_tracker, blob_id)
458 .await?;
459 content
460 };
461 callback.respond(content)
462 }
463
464 AssertBlobExists { blob_id, callback } => {
465 self.state.system.assert_blob_exists(blob_id).await?;
466 if blob_id.blob_type == BlobType::Data {
468 self.resource_controller
469 .with_state(&mut self.state.system)
470 .await?
471 .track_blob_read(0)?;
472 }
473 let is_new = self
474 .state
475 .system
476 .blob_used(self.txn_tracker, blob_id)
477 .await?;
478 if is_new {
479 self.txn_tracker
480 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
481 }
482 callback.respond(());
483 }
484
485 Emit {
486 stream_id,
487 value,
488 callback,
489 } => {
490 let count = self
491 .state
492 .stream_event_counts
493 .get_mut_or_default(&stream_id)
494 .await?;
495 let index = *count;
496 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
497 self.txn_tracker.add_event(stream_id, index, value);
498 callback.respond(index)
499 }
500
501 ReadEvent { event_id, callback } => {
502 let extra = self.state.context().extra();
503 let event = self
504 .txn_tracker
505 .oracle(|| async {
506 let event = extra
507 .get_event(event_id.clone())
508 .await?
509 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
510 Ok(OracleResponse::Event(event_id.clone(), event))
511 })
512 .await?
513 .to_event(&event_id)?;
514 callback.respond(event);
515 }
516
517 SubscribeToEvents {
518 chain_id,
519 stream_id,
520 subscriber_app_id,
521 callback,
522 } => {
523 let subscriptions = self
524 .state
525 .system
526 .event_subscriptions
527 .get_mut_or_default(&(chain_id, stream_id.clone()))
528 .await?;
529 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
530 subscriptions.next_index
531 } else {
532 0
533 };
534 self.txn_tracker.add_stream_to_process(
535 subscriber_app_id,
536 chain_id,
537 stream_id,
538 0,
539 next_index,
540 );
541 callback.respond(());
542 }
543
544 UnsubscribeFromEvents {
545 chain_id,
546 stream_id,
547 subscriber_app_id,
548 callback,
549 } => {
550 let key = (chain_id, stream_id.clone());
551 let subscriptions = self
552 .state
553 .system
554 .event_subscriptions
555 .get_mut_or_default(&key)
556 .await?;
557 subscriptions.applications.remove(&subscriber_app_id);
558 if subscriptions.applications.is_empty() {
559 self.state.system.event_subscriptions.remove(&key)?;
560 }
561 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
562 self.txn_tracker
563 .remove_stream_to_process(app_id, chain_id, stream_id);
564 }
565 callback.respond(());
566 }
567
568 GetApplicationPermissions { callback } => {
569 let app_permissions = self.state.system.application_permissions.get();
570 callback.respond(app_permissions.clone());
571 }
572
573 QueryServiceOracle {
574 deadline,
575 application_id,
576 next_block_height,
577 query,
578 callback,
579 } => {
580 let state = &mut self.state;
581 let local_time = self.txn_tracker.local_time();
582 let created_blobs = self.txn_tracker.created_blobs().clone();
583 let bytes = self
584 .txn_tracker
585 .oracle(|| async {
586 let context = QueryContext {
587 chain_id: state.context().extra().chain_id(),
588 next_block_height,
589 local_time,
590 };
591 let QueryOutcome {
592 response,
593 operations,
594 } = Box::pin(state.query_user_application_with_deadline(
595 application_id,
596 context,
597 query,
598 deadline,
599 created_blobs,
600 ))
601 .await?;
602 ensure!(
603 operations.is_empty(),
604 ExecutionError::ServiceOracleQueryOperations(operations)
605 );
606 Ok(OracleResponse::Service(response))
607 })
608 .await?
609 .to_service_response()?;
610 callback.respond(bytes);
611 }
612
613 AddOutgoingMessage { message, callback } => {
614 self.txn_tracker.add_outgoing_message(message);
615 callback.respond(());
616 }
617
618 SetLocalTime {
619 local_time,
620 callback,
621 } => {
622 self.txn_tracker.set_local_time(local_time);
623 callback.respond(());
624 }
625
626 AssertBefore {
627 timestamp,
628 callback,
629 } => {
630 let result = if !self
631 .txn_tracker
632 .replay_oracle_response(OracleResponse::Assert)?
633 {
634 let local_time = self.txn_tracker.local_time();
636 if local_time >= timestamp {
637 Err(ExecutionError::AssertBefore {
638 timestamp,
639 local_time,
640 })
641 } else {
642 Ok(())
643 }
644 } else {
645 Ok(())
646 };
647 callback.respond(result);
648 }
649
650 AddCreatedBlob { blob, callback } => {
651 self.txn_tracker.add_created_blob(blob);
652 callback.respond(());
653 }
654
655 ValidationRound { round, callback } => {
656 let validation_round = self
657 .txn_tracker
658 .oracle(|| async { Ok(OracleResponse::Round(round)) })
659 .await?
660 .to_round()?;
661 callback.respond(validation_round);
662 }
663 }
664
665 Ok(())
666 }
667
668 async fn process_subscriptions(
671 &mut self,
672 context: ProcessStreamsContext,
673 ) -> Result<(), ExecutionError> {
674 let mut processed = BTreeSet::new();
677 loop {
678 let to_process = self
679 .txn_tracker
680 .take_streams_to_process()
681 .into_iter()
682 .filter_map(|(app_id, updates)| {
683 let updates = updates
684 .into_iter()
685 .filter_map(|update| {
686 if !processed.insert((
687 app_id,
688 update.chain_id,
689 update.stream_id.clone(),
690 )) {
691 return None;
692 }
693 Some(update)
694 })
695 .collect::<Vec<_>>();
696 if updates.is_empty() {
697 return None;
698 }
699 Some((app_id, updates))
700 })
701 .collect::<BTreeMap<_, _>>();
702 if to_process.is_empty() {
703 return Ok(());
704 }
705 for (app_id, updates) in to_process {
706 self.run_user_action(
707 app_id,
708 UserAction::ProcessStreams(context, updates),
709 None,
710 None,
711 )
712 .await?;
713 }
714 }
715 }
716
717 pub(crate) async fn run_user_action(
718 &mut self,
719 application_id: ApplicationId,
720 action: UserAction,
721 refund_grant_to: Option<Account>,
722 grant: Option<&mut Amount>,
723 ) -> Result<(), ExecutionError> {
724 let ExecutionRuntimeConfig {} = self.state.context().extra().execution_runtime_config();
725 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
726 .await
727 }
728
729 pub(crate) async fn service_and_dependencies(
731 &mut self,
732 application: ApplicationId,
733 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
734 let mut stack = vec![application];
737 let mut codes = vec![];
738 let mut descriptions = vec![];
739
740 while let Some(id) = stack.pop() {
741 let (code, description) = self.load_service(id).await?;
742 stack.extend(description.required_application_ids.iter().rev().copied());
743 codes.push(code);
744 descriptions.push(description);
745 }
746
747 codes.reverse();
748 descriptions.reverse();
749
750 Ok((codes, descriptions))
751 }
752
753 async fn contract_and_dependencies(
755 &mut self,
756 application: ApplicationId,
757 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
758 let mut stack = vec![application];
761 let mut codes = vec![];
762 let mut descriptions = vec![];
763
764 while let Some(id) = stack.pop() {
765 let (code, description) = self.load_contract(id).await?;
766 stack.extend(description.required_application_ids.iter().rev().copied());
767 codes.push(code);
768 descriptions.push(description);
769 }
770
771 codes.reverse();
772 descriptions.reverse();
773
774 Ok((codes, descriptions))
775 }
776
777 async fn run_user_action_with_runtime(
778 &mut self,
779 application_id: ApplicationId,
780 action: UserAction,
781 refund_grant_to: Option<Account>,
782 grant: Option<&mut Amount>,
783 ) -> Result<(), ExecutionError> {
784 let chain_id = self.state.context().extra().chain_id();
785 let mut cloned_grant = grant.as_ref().map(|x| **x);
786 let initial_balance = self
787 .resource_controller
788 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
789 .await?
790 .balance()?;
791 let controller = ResourceController::new(
792 self.resource_controller.policy().clone(),
793 self.resource_controller.tracker,
794 initial_balance,
795 );
796 let (execution_state_sender, mut execution_state_receiver) =
797 futures::channel::mpsc::unbounded();
798
799 let (codes, descriptions): (Vec<_>, Vec<_>) =
800 self.contract_and_dependencies(application_id).await?;
801
802 let thread = web_thread::Thread::new();
803 let contract_runtime_task = thread.run_send(JsVec(codes), move |codes| async move {
804 let runtime = ContractSyncRuntime::new(
805 execution_state_sender,
806 chain_id,
807 refund_grant_to,
808 controller,
809 &action,
810 );
811
812 for (code, description) in codes.0.into_iter().zip(descriptions) {
813 runtime.preload_contract(ApplicationId::from(&description), code, description)?;
814 }
815
816 runtime.run_action(application_id, chain_id, action)
817 });
818
819 while let Some(request) = execution_state_receiver.next().await {
820 self.handle_request(request).await?;
821 }
822
823 let (result, controller) = contract_runtime_task.await??;
824
825 self.txn_tracker.add_operation_result(result);
826
827 self.resource_controller
828 .with_state_and_grant(&mut self.state.system, grant)
829 .await?
830 .merge_balance(initial_balance, controller.balance()?)?;
831 self.resource_controller.tracker = controller.tracker;
832
833 Ok(())
834 }
835
836 pub async fn execute_operation(
837 &mut self,
838 context: OperationContext,
839 operation: Operation,
840 ) -> Result<(), ExecutionError> {
841 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
842 match operation {
843 Operation::System(op) => {
844 let new_application = self
845 .state
846 .system
847 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
848 .await?;
849 if let Some((application_id, argument)) = new_application {
850 let user_action = UserAction::Instantiate(context, argument);
851 self.run_user_action(
852 application_id,
853 user_action,
854 context.refund_grant_to(),
855 None,
856 )
857 .await?;
858 }
859 }
860 Operation::User {
861 application_id,
862 bytes,
863 } => {
864 self.run_user_action(
865 application_id,
866 UserAction::Operation(context, bytes),
867 context.refund_grant_to(),
868 None,
869 )
870 .await?;
871 }
872 }
873 self.process_subscriptions(context.into()).await?;
874 Ok(())
875 }
876
877 pub async fn execute_message(
878 &mut self,
879 context: MessageContext,
880 message: Message,
881 grant: Option<&mut Amount>,
882 ) -> Result<(), ExecutionError> {
883 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
884 match message {
885 Message::System(message) => {
886 let outcome = self.state.system.execute_message(context, message).await?;
887 self.txn_tracker.add_outgoing_messages(outcome);
888 }
889 Message::User {
890 application_id,
891 bytes,
892 } => {
893 self.run_user_action(
894 application_id,
895 UserAction::Message(context, bytes),
896 context.refund_grant_to,
897 grant,
898 )
899 .await?;
900 }
901 }
902 self.process_subscriptions(context.into()).await?;
903 Ok(())
904 }
905
906 pub fn bounce_message(
907 &mut self,
908 context: MessageContext,
909 grant: Amount,
910 message: Message,
911 ) -> Result<(), ExecutionError> {
912 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
913 self.txn_tracker.add_outgoing_message(OutgoingMessage {
914 destination: context.origin,
915 authenticated_signer: context.authenticated_signer,
916 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
917 grant,
918 kind: MessageKind::Bouncing,
919 message,
920 });
921 Ok(())
922 }
923
924 pub fn send_refund(
925 &mut self,
926 context: MessageContext,
927 amount: Amount,
928 ) -> Result<(), ExecutionError> {
929 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
930 if amount.is_zero() {
931 return Ok(());
932 }
933 let Some(account) = context.refund_grant_to else {
934 return Err(ExecutionError::InternalError(
935 "Messages with grants should have a non-empty `refund_grant_to`",
936 ));
937 };
938 let message = SystemMessage::Credit {
939 amount,
940 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
941 target: account.owner,
942 };
943 self.txn_tracker.add_outgoing_message(
944 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
945 );
946 Ok(())
947 }
948
949 async fn receive_http_response(
953 response: reqwest::Response,
954 size_limit: u64,
955 ) -> Result<http::Response, ExecutionError> {
956 let status = response.status().as_u16();
957 let maybe_content_length = response.content_length();
958
959 let headers = response
960 .headers()
961 .iter()
962 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
963 .collect::<Vec<_>>();
964
965 let total_header_size = headers
966 .iter()
967 .map(|header| (header.name.len() + header.value.len()) as u64)
968 .sum();
969
970 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
971 ExecutionError::HttpResponseSizeLimitExceeded {
972 limit: size_limit,
973 size: total_header_size,
974 },
975 )?;
976
977 if let Some(content_length) = maybe_content_length {
978 if content_length > remaining_bytes {
979 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
980 limit: size_limit,
981 size: content_length + total_header_size,
982 });
983 }
984 }
985
986 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
987 let mut body_stream = response.bytes_stream();
988
989 while let Some(bytes) = body_stream.next().await.transpose()? {
990 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
991 ExecutionError::HttpResponseSizeLimitExceeded {
992 limit: size_limit,
993 size: bytes.len() as u64 + (size_limit - remaining_bytes),
994 },
995 )?;
996
997 body.extend(&bytes);
998 }
999
1000 Ok(http::Response {
1001 status,
1002 headers,
1003 body,
1004 })
1005 }
1006}
1007
1008#[derive(Debug)]
1010pub enum ExecutionRequest {
1011 #[cfg(not(web))]
1012 LoadContract {
1013 id: ApplicationId,
1014 #[debug(skip)]
1015 callback: Sender<(UserContractCode, ApplicationDescription)>,
1016 },
1017
1018 #[cfg(not(web))]
1019 LoadService {
1020 id: ApplicationId,
1021 #[debug(skip)]
1022 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1023 },
1024
1025 ChainBalance {
1026 #[debug(skip)]
1027 callback: Sender<Amount>,
1028 },
1029
1030 OwnerBalance {
1031 owner: AccountOwner,
1032 #[debug(skip)]
1033 callback: Sender<Amount>,
1034 },
1035
1036 OwnerBalances {
1037 #[debug(skip)]
1038 callback: Sender<Vec<(AccountOwner, Amount)>>,
1039 },
1040
1041 BalanceOwners {
1042 #[debug(skip)]
1043 callback: Sender<Vec<AccountOwner>>,
1044 },
1045
1046 Transfer {
1047 source: AccountOwner,
1048 destination: Account,
1049 amount: Amount,
1050 #[debug(skip_if = Option::is_none)]
1051 signer: Option<AccountOwner>,
1052 application_id: ApplicationId,
1053 #[debug(skip)]
1054 callback: Sender<()>,
1055 },
1056
1057 Claim {
1058 source: Account,
1059 destination: Account,
1060 amount: Amount,
1061 #[debug(skip_if = Option::is_none)]
1062 signer: Option<AccountOwner>,
1063 application_id: ApplicationId,
1064 #[debug(skip)]
1065 callback: Sender<()>,
1066 },
1067
1068 SystemTimestamp {
1069 #[debug(skip)]
1070 callback: Sender<Timestamp>,
1071 },
1072
1073 ChainOwnership {
1074 #[debug(skip)]
1075 callback: Sender<ChainOwnership>,
1076 },
1077
1078 ReadValueBytes {
1079 id: ApplicationId,
1080 #[debug(with = hex_debug)]
1081 key: Vec<u8>,
1082 #[debug(skip)]
1083 callback: Sender<Option<Vec<u8>>>,
1084 },
1085
1086 ContainsKey {
1087 id: ApplicationId,
1088 key: Vec<u8>,
1089 #[debug(skip)]
1090 callback: Sender<bool>,
1091 },
1092
1093 ContainsKeys {
1094 id: ApplicationId,
1095 #[debug(with = hex_vec_debug)]
1096 keys: Vec<Vec<u8>>,
1097 callback: Sender<Vec<bool>>,
1098 },
1099
1100 ReadMultiValuesBytes {
1101 id: ApplicationId,
1102 #[debug(with = hex_vec_debug)]
1103 keys: Vec<Vec<u8>>,
1104 #[debug(skip)]
1105 callback: Sender<Vec<Option<Vec<u8>>>>,
1106 },
1107
1108 FindKeysByPrefix {
1109 id: ApplicationId,
1110 #[debug(with = hex_debug)]
1111 key_prefix: Vec<u8>,
1112 #[debug(skip)]
1113 callback: Sender<Vec<Vec<u8>>>,
1114 },
1115
1116 FindKeyValuesByPrefix {
1117 id: ApplicationId,
1118 #[debug(with = hex_debug)]
1119 key_prefix: Vec<u8>,
1120 #[debug(skip)]
1121 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1122 },
1123
1124 WriteBatch {
1125 id: ApplicationId,
1126 batch: Batch,
1127 #[debug(skip)]
1128 callback: Sender<()>,
1129 },
1130
1131 OpenChain {
1132 ownership: ChainOwnership,
1133 #[debug(skip_if = Amount::is_zero)]
1134 balance: Amount,
1135 parent_id: ChainId,
1136 block_height: BlockHeight,
1137 application_permissions: ApplicationPermissions,
1138 timestamp: Timestamp,
1139 #[debug(skip)]
1140 callback: Sender<ChainId>,
1141 },
1142
1143 CloseChain {
1144 application_id: ApplicationId,
1145 #[debug(skip)]
1146 callback: Sender<Result<(), ExecutionError>>,
1147 },
1148
1149 ChangeApplicationPermissions {
1150 application_id: ApplicationId,
1151 application_permissions: ApplicationPermissions,
1152 #[debug(skip)]
1153 callback: Sender<Result<(), ExecutionError>>,
1154 },
1155
1156 CreateApplication {
1157 chain_id: ChainId,
1158 block_height: BlockHeight,
1159 module_id: ModuleId,
1160 parameters: Vec<u8>,
1161 required_application_ids: Vec<ApplicationId>,
1162 #[debug(skip)]
1163 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1164 },
1165
1166 PerformHttpRequest {
1167 request: http::Request,
1168 http_responses_are_oracle_responses: bool,
1169 #[debug(skip)]
1170 callback: Sender<http::Response>,
1171 },
1172
1173 ReadBlobContent {
1174 blob_id: BlobId,
1175 #[debug(skip)]
1176 callback: Sender<BlobContent>,
1177 },
1178
1179 AssertBlobExists {
1180 blob_id: BlobId,
1181 #[debug(skip)]
1182 callback: Sender<()>,
1183 },
1184
1185 Emit {
1186 stream_id: StreamId,
1187 #[debug(with = hex_debug)]
1188 value: Vec<u8>,
1189 #[debug(skip)]
1190 callback: Sender<u32>,
1191 },
1192
1193 ReadEvent {
1194 event_id: EventId,
1195 callback: oneshot::Sender<Vec<u8>>,
1196 },
1197
1198 SubscribeToEvents {
1199 chain_id: ChainId,
1200 stream_id: StreamId,
1201 subscriber_app_id: ApplicationId,
1202 #[debug(skip)]
1203 callback: Sender<()>,
1204 },
1205
1206 UnsubscribeFromEvents {
1207 chain_id: ChainId,
1208 stream_id: StreamId,
1209 subscriber_app_id: ApplicationId,
1210 #[debug(skip)]
1211 callback: Sender<()>,
1212 },
1213
1214 GetApplicationPermissions {
1215 #[debug(skip)]
1216 callback: Sender<ApplicationPermissions>,
1217 },
1218
1219 QueryServiceOracle {
1220 deadline: Option<Instant>,
1221 application_id: ApplicationId,
1222 next_block_height: BlockHeight,
1223 query: Vec<u8>,
1224 #[debug(skip)]
1225 callback: Sender<Vec<u8>>,
1226 },
1227
1228 AddOutgoingMessage {
1229 message: crate::OutgoingMessage,
1230 #[debug(skip)]
1231 callback: Sender<()>,
1232 },
1233
1234 SetLocalTime {
1235 local_time: Timestamp,
1236 #[debug(skip)]
1237 callback: Sender<()>,
1238 },
1239
1240 AssertBefore {
1241 timestamp: Timestamp,
1242 #[debug(skip)]
1243 callback: Sender<Result<(), ExecutionError>>,
1244 },
1245
1246 AddCreatedBlob {
1247 blob: crate::Blob,
1248 #[debug(skip)]
1249 callback: Sender<()>,
1250 },
1251
1252 ValidationRound {
1253 round: Option<u32>,
1254 #[debug(skip)]
1255 callback: Sender<Option<u32>>,
1256 },
1257}