linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15        Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19    ownership::ChainOwnership,
20    time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27    execution::UserAction,
28    runtime::ContractSyncRuntime,
29    system::{CreateApplicationResult, OpenChainConfig},
30    util::{OracleResponseExt as _, RespondExt as _},
31    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeConfig,
32    ExecutionRuntimeContext, ExecutionStateView, JsVec, Message, MessageContext, MessageKind,
33    ModuleId, Operation, OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext,
34    QueryOutcome, ResourceController, SystemMessage, TransactionTracker, UserContractCode,
35    UserServiceCode,
36};
37
38/// Actor for handling requests to the execution state.
39pub struct ExecutionStateActor<'a, C> {
40    state: &'a mut ExecutionStateView<C>,
41    txn_tracker: &'a mut TransactionTracker,
42    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
43}
44
45#[cfg(with_metrics)]
46mod metrics {
47    use std::sync::LazyLock;
48
49    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
50    use prometheus::HistogramVec;
51
52    /// Histogram of the latency to load a contract bytecode.
53    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54        register_histogram_vec(
55            "load_contract_latency",
56            "Load contract latency",
57            &[],
58            exponential_bucket_latencies(250.0),
59        )
60    });
61
62    /// Histogram of the latency to load a service bytecode.
63    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
64        register_histogram_vec(
65            "load_service_latency",
66            "Load service latency",
67            &[],
68            exponential_bucket_latencies(250.0),
69        )
70    });
71}
72
73pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
74
75impl<'a, C> ExecutionStateActor<'a, C>
76where
77    C: Context + Clone + Send + Sync + 'static,
78    C::Extra: ExecutionRuntimeContext,
79{
80    /// Creates a new execution state actor.
81    pub fn new(
82        state: &'a mut ExecutionStateView<C>,
83        txn_tracker: &'a mut TransactionTracker,
84        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
85    ) -> Self {
86        Self {
87            state,
88            txn_tracker,
89            resource_controller,
90        }
91    }
92
93    pub(crate) async fn load_contract(
94        &mut self,
95        id: ApplicationId,
96    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
97        #[cfg(with_metrics)]
98        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
99        let blob_id = id.description_blob_id();
100        let description = match self.txn_tracker.get_blob_content(&blob_id) {
101            Some(blob) => bcs::from_bytes(blob.bytes())?,
102            None => {
103                self.state
104                    .system
105                    .describe_application(id, self.txn_tracker)
106                    .await?
107            }
108        };
109        let code = self
110            .state
111            .context()
112            .extra()
113            .get_user_contract(&description, self.txn_tracker)
114            .await?;
115        Ok((code, description))
116    }
117
118    pub(crate) async fn load_service(
119        &mut self,
120        id: ApplicationId,
121    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
122        #[cfg(with_metrics)]
123        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
124        let blob_id = id.description_blob_id();
125        let description = match self.txn_tracker.get_blob_content(&blob_id) {
126            Some(blob) => bcs::from_bytes(blob.bytes())?,
127            None => {
128                self.state
129                    .system
130                    .describe_application(id, self.txn_tracker)
131                    .await?
132            }
133        };
134        let code = self
135            .state
136            .context()
137            .extra()
138            .get_user_service(&description, self.txn_tracker)
139            .await?;
140        Ok((code, description))
141    }
142
143    // TODO(#1416): Support concurrent I/O.
144    pub(crate) async fn handle_request(
145        &mut self,
146        request: ExecutionRequest,
147    ) -> Result<(), ExecutionError> {
148        use ExecutionRequest::*;
149        match request {
150            #[cfg(not(web))]
151            LoadContract { id, callback } => {
152                let (code, description) = self.load_contract(id).await?;
153                callback.respond((code, description))
154            }
155            #[cfg(not(web))]
156            LoadService { id, callback } => {
157                let (code, description) = self.load_service(id).await?;
158                callback.respond((code, description))
159            }
160
161            ChainBalance { callback } => {
162                let balance = *self.state.system.balance.get();
163                callback.respond(balance);
164            }
165
166            OwnerBalance { owner, callback } => {
167                let balance = self
168                    .state
169                    .system
170                    .balances
171                    .get(&owner)
172                    .await?
173                    .unwrap_or_default();
174                callback.respond(balance);
175            }
176
177            OwnerBalances { callback } => {
178                let balances = self.state.system.balances.index_values().await?;
179                callback.respond(balances.into_iter().collect());
180            }
181
182            BalanceOwners { callback } => {
183                let owners = self.state.system.balances.indices().await?;
184                callback.respond(owners);
185            }
186
187            Transfer {
188                source,
189                destination,
190                amount,
191                signer,
192                application_id,
193                callback,
194            } => {
195                let maybe_message = self
196                    .state
197                    .system
198                    .transfer(signer, Some(application_id), source, destination, amount)
199                    .await?;
200                self.txn_tracker.add_outgoing_messages(maybe_message);
201                callback.respond(());
202            }
203
204            Claim {
205                source,
206                destination,
207                amount,
208                signer,
209                application_id,
210                callback,
211            } => {
212                let maybe_message = self
213                    .state
214                    .system
215                    .claim(
216                        signer,
217                        Some(application_id),
218                        source.owner,
219                        source.chain_id,
220                        destination,
221                        amount,
222                    )
223                    .await?;
224                self.txn_tracker.add_outgoing_messages(maybe_message);
225                callback.respond(());
226            }
227
228            SystemTimestamp { callback } => {
229                let timestamp = *self.state.system.timestamp.get();
230                callback.respond(timestamp);
231            }
232
233            ChainOwnership { callback } => {
234                let ownership = self.state.system.ownership.get().clone();
235                callback.respond(ownership);
236            }
237
238            ContainsKey { id, key, callback } => {
239                let view = self.state.users.try_load_entry(&id).await?;
240                let result = match view {
241                    Some(view) => view.contains_key(&key).await?,
242                    None => false,
243                };
244                callback.respond(result);
245            }
246
247            ContainsKeys { id, keys, callback } => {
248                let view = self.state.users.try_load_entry(&id).await?;
249                let result = match view {
250                    Some(view) => view.contains_keys(&keys).await?,
251                    None => vec![false; keys.len()],
252                };
253                callback.respond(result);
254            }
255
256            ReadMultiValuesBytes { id, keys, callback } => {
257                let view = self.state.users.try_load_entry(&id).await?;
258                let values = match view {
259                    Some(view) => view.multi_get(&keys).await?,
260                    None => vec![None; keys.len()],
261                };
262                callback.respond(values);
263            }
264
265            ReadValueBytes { id, key, callback } => {
266                let view = self.state.users.try_load_entry(&id).await?;
267                let result = match view {
268                    Some(view) => view.get(&key).await?,
269                    None => None,
270                };
271                callback.respond(result);
272            }
273
274            FindKeysByPrefix {
275                id,
276                key_prefix,
277                callback,
278            } => {
279                let view = self.state.users.try_load_entry(&id).await?;
280                let result = match view {
281                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
282                    None => Vec::new(),
283                };
284                callback.respond(result);
285            }
286
287            FindKeyValuesByPrefix {
288                id,
289                key_prefix,
290                callback,
291            } => {
292                let view = self.state.users.try_load_entry(&id).await?;
293                let result = match view {
294                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
295                    None => Vec::new(),
296                };
297                callback.respond(result);
298            }
299
300            WriteBatch {
301                id,
302                batch,
303                callback,
304            } => {
305                let mut view = self.state.users.try_load_entry_mut(&id).await?;
306                view.write_batch(batch).await?;
307                callback.respond(());
308            }
309
310            OpenChain {
311                ownership,
312                balance,
313                parent_id,
314                block_height,
315                application_permissions,
316                timestamp,
317                callback,
318            } => {
319                let config = OpenChainConfig {
320                    ownership,
321                    balance,
322                    application_permissions,
323                };
324                let chain_id = self
325                    .state
326                    .system
327                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
328                    .await?;
329                callback.respond(chain_id);
330            }
331
332            CloseChain {
333                application_id,
334                callback,
335            } => {
336                let app_permissions = self.state.system.application_permissions.get();
337                if !app_permissions.can_close_chain(&application_id) {
338                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
339                } else {
340                    self.state.system.close_chain().await?;
341                    callback.respond(Ok(()));
342                }
343            }
344
345            ChangeApplicationPermissions {
346                application_id,
347                application_permissions,
348                callback,
349            } => {
350                let app_permissions = self.state.system.application_permissions.get();
351                if !app_permissions.can_change_application_permissions(&application_id) {
352                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
353                } else {
354                    self.state
355                        .system
356                        .application_permissions
357                        .set(application_permissions);
358                    callback.respond(Ok(()));
359                }
360            }
361
362            CreateApplication {
363                chain_id,
364                block_height,
365                module_id,
366                parameters,
367                required_application_ids,
368                callback,
369            } => {
370                let create_application_result = self
371                    .state
372                    .system
373                    .create_application(
374                        chain_id,
375                        block_height,
376                        module_id,
377                        parameters,
378                        required_application_ids,
379                        self.txn_tracker,
380                    )
381                    .await?;
382                callback.respond(Ok(create_application_result));
383            }
384
385            PerformHttpRequest {
386                request,
387                http_responses_are_oracle_responses,
388                callback,
389            } => {
390                let system = &mut self.state.system;
391                let response = self
392                    .txn_tracker
393                    .oracle(|| async {
394                        let headers = request
395                            .headers
396                            .into_iter()
397                            .map(|http::Header { name, value }| {
398                                Ok((name.parse()?, value.try_into()?))
399                            })
400                            .collect::<Result<HeaderMap, ExecutionError>>()?;
401
402                        let url = Url::parse(&request.url)?;
403                        let host = url
404                            .host_str()
405                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
406
407                        let (_epoch, committee) = system
408                            .current_committee()
409                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
410                        let allowed_hosts = &committee.policy().http_request_allow_list;
411
412                        ensure!(
413                            allowed_hosts.contains(host),
414                            ExecutionError::UnauthorizedHttpRequest(url)
415                        );
416
417                        let request = Client::new()
418                            .request(request.method.into(), url)
419                            .body(request.body)
420                            .headers(headers);
421                        #[cfg(not(web))]
422                        let request = request.timeout(linera_base::time::Duration::from_millis(
423                            committee.policy().http_request_timeout_ms,
424                        ));
425
426                        let response = request.send().await?;
427
428                        let mut response_size_limit =
429                            committee.policy().maximum_http_response_bytes;
430
431                        if http_responses_are_oracle_responses {
432                            response_size_limit = response_size_limit
433                                .min(committee.policy().maximum_oracle_response_bytes);
434                        }
435                        Ok(OracleResponse::Http(
436                            Self::receive_http_response(response, response_size_limit).await?,
437                        ))
438                    })
439                    .await?
440                    .to_http_response()?;
441                callback.respond(response);
442            }
443
444            ReadBlobContent { blob_id, callback } => {
445                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
446                    content.clone()
447                } else {
448                    let content = self.state.system.read_blob_content(blob_id).await?;
449                    if blob_id.blob_type == BlobType::Data {
450                        self.resource_controller
451                            .with_state(&mut self.state.system)
452                            .await?
453                            .track_blob_read(content.bytes().len() as u64)?;
454                    }
455                    self.state
456                        .system
457                        .blob_used(self.txn_tracker, blob_id)
458                        .await?;
459                    content
460                };
461                callback.respond(content)
462            }
463
464            AssertBlobExists { blob_id, callback } => {
465                self.state.system.assert_blob_exists(blob_id).await?;
466                // Treating this as reading a size-0 blob for fee purposes.
467                if blob_id.blob_type == BlobType::Data {
468                    self.resource_controller
469                        .with_state(&mut self.state.system)
470                        .await?
471                        .track_blob_read(0)?;
472                }
473                let is_new = self
474                    .state
475                    .system
476                    .blob_used(self.txn_tracker, blob_id)
477                    .await?;
478                if is_new {
479                    self.txn_tracker
480                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
481                }
482                callback.respond(());
483            }
484
485            Emit {
486                stream_id,
487                value,
488                callback,
489            } => {
490                let count = self
491                    .state
492                    .stream_event_counts
493                    .get_mut_or_default(&stream_id)
494                    .await?;
495                let index = *count;
496                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
497                self.txn_tracker.add_event(stream_id, index, value);
498                callback.respond(index)
499            }
500
501            ReadEvent { event_id, callback } => {
502                let extra = self.state.context().extra();
503                let event = self
504                    .txn_tracker
505                    .oracle(|| async {
506                        let event = extra
507                            .get_event(event_id.clone())
508                            .await?
509                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
510                        Ok(OracleResponse::Event(event_id.clone(), event))
511                    })
512                    .await?
513                    .to_event(&event_id)?;
514                callback.respond(event);
515            }
516
517            SubscribeToEvents {
518                chain_id,
519                stream_id,
520                subscriber_app_id,
521                callback,
522            } => {
523                let subscriptions = self
524                    .state
525                    .system
526                    .event_subscriptions
527                    .get_mut_or_default(&(chain_id, stream_id.clone()))
528                    .await?;
529                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
530                    subscriptions.next_index
531                } else {
532                    0
533                };
534                self.txn_tracker.add_stream_to_process(
535                    subscriber_app_id,
536                    chain_id,
537                    stream_id,
538                    0,
539                    next_index,
540                );
541                callback.respond(());
542            }
543
544            UnsubscribeFromEvents {
545                chain_id,
546                stream_id,
547                subscriber_app_id,
548                callback,
549            } => {
550                let key = (chain_id, stream_id.clone());
551                let subscriptions = self
552                    .state
553                    .system
554                    .event_subscriptions
555                    .get_mut_or_default(&key)
556                    .await?;
557                subscriptions.applications.remove(&subscriber_app_id);
558                if subscriptions.applications.is_empty() {
559                    self.state.system.event_subscriptions.remove(&key)?;
560                }
561                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
562                    self.txn_tracker
563                        .remove_stream_to_process(app_id, chain_id, stream_id);
564                }
565                callback.respond(());
566            }
567
568            GetApplicationPermissions { callback } => {
569                let app_permissions = self.state.system.application_permissions.get();
570                callback.respond(app_permissions.clone());
571            }
572
573            QueryServiceOracle {
574                deadline,
575                application_id,
576                next_block_height,
577                query,
578                callback,
579            } => {
580                let state = &mut self.state;
581                let local_time = self.txn_tracker.local_time();
582                let created_blobs = self.txn_tracker.created_blobs().clone();
583                let bytes = self
584                    .txn_tracker
585                    .oracle(|| async {
586                        let context = QueryContext {
587                            chain_id: state.context().extra().chain_id(),
588                            next_block_height,
589                            local_time,
590                        };
591                        let QueryOutcome {
592                            response,
593                            operations,
594                        } = Box::pin(state.query_user_application_with_deadline(
595                            application_id,
596                            context,
597                            query,
598                            deadline,
599                            created_blobs,
600                        ))
601                        .await?;
602                        ensure!(
603                            operations.is_empty(),
604                            ExecutionError::ServiceOracleQueryOperations(operations)
605                        );
606                        Ok(OracleResponse::Service(response))
607                    })
608                    .await?
609                    .to_service_response()?;
610                callback.respond(bytes);
611            }
612
613            AddOutgoingMessage { message, callback } => {
614                self.txn_tracker.add_outgoing_message(message);
615                callback.respond(());
616            }
617
618            SetLocalTime {
619                local_time,
620                callback,
621            } => {
622                self.txn_tracker.set_local_time(local_time);
623                callback.respond(());
624            }
625
626            AssertBefore {
627                timestamp,
628                callback,
629            } => {
630                let result = if !self
631                    .txn_tracker
632                    .replay_oracle_response(OracleResponse::Assert)?
633                {
634                    // There are no recorded oracle responses, so we check the local time.
635                    let local_time = self.txn_tracker.local_time();
636                    if local_time >= timestamp {
637                        Err(ExecutionError::AssertBefore {
638                            timestamp,
639                            local_time,
640                        })
641                    } else {
642                        Ok(())
643                    }
644                } else {
645                    Ok(())
646                };
647                callback.respond(result);
648            }
649
650            AddCreatedBlob { blob, callback } => {
651                self.txn_tracker.add_created_blob(blob);
652                callback.respond(());
653            }
654
655            ValidationRound { round, callback } => {
656                let validation_round = self
657                    .txn_tracker
658                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
659                    .await?
660                    .to_round()?;
661                callback.respond(validation_round);
662            }
663        }
664
665        Ok(())
666    }
667
668    /// Calls `process_streams` for all applications that are subscribed to streams with new
669    /// events or that have new subscriptions.
670    async fn process_subscriptions(
671        &mut self,
672        context: ProcessStreamsContext,
673    ) -> Result<(), ExecutionError> {
674        // Keep track of which streams we have already processed. This is to guard against
675        // applications unsubscribing and subscribing in the process_streams call itself.
676        let mut processed = BTreeSet::new();
677        loop {
678            let to_process = self
679                .txn_tracker
680                .take_streams_to_process()
681                .into_iter()
682                .filter_map(|(app_id, updates)| {
683                    let updates = updates
684                        .into_iter()
685                        .filter_map(|update| {
686                            if !processed.insert((
687                                app_id,
688                                update.chain_id,
689                                update.stream_id.clone(),
690                            )) {
691                                return None;
692                            }
693                            Some(update)
694                        })
695                        .collect::<Vec<_>>();
696                    if updates.is_empty() {
697                        return None;
698                    }
699                    Some((app_id, updates))
700                })
701                .collect::<BTreeMap<_, _>>();
702            if to_process.is_empty() {
703                return Ok(());
704            }
705            for (app_id, updates) in to_process {
706                self.run_user_action(
707                    app_id,
708                    UserAction::ProcessStreams(context, updates),
709                    None,
710                    None,
711                )
712                .await?;
713            }
714        }
715    }
716
717    pub(crate) async fn run_user_action(
718        &mut self,
719        application_id: ApplicationId,
720        action: UserAction,
721        refund_grant_to: Option<Account>,
722        grant: Option<&mut Amount>,
723    ) -> Result<(), ExecutionError> {
724        let ExecutionRuntimeConfig {} = self.state.context().extra().execution_runtime_config();
725        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
726            .await
727    }
728
729    // TODO(#5034): unify with `contract_and_dependencies`
730    pub(crate) async fn service_and_dependencies(
731        &mut self,
732        application: ApplicationId,
733    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
734        // cyclic futures are illegal so we need to either box the frames or keep our own
735        // stack
736        let mut stack = vec![application];
737        let mut codes = vec![];
738        let mut descriptions = vec![];
739
740        while let Some(id) = stack.pop() {
741            let (code, description) = self.load_service(id).await?;
742            stack.extend(description.required_application_ids.iter().rev().copied());
743            codes.push(code);
744            descriptions.push(description);
745        }
746
747        codes.reverse();
748        descriptions.reverse();
749
750        Ok((codes, descriptions))
751    }
752
753    // TODO(#5034): unify with `service_and_dependencies`
754    async fn contract_and_dependencies(
755        &mut self,
756        application: ApplicationId,
757    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
758        // cyclic futures are illegal so we need to either box the frames or keep our own
759        // stack
760        let mut stack = vec![application];
761        let mut codes = vec![];
762        let mut descriptions = vec![];
763
764        while let Some(id) = stack.pop() {
765            let (code, description) = self.load_contract(id).await?;
766            stack.extend(description.required_application_ids.iter().rev().copied());
767            codes.push(code);
768            descriptions.push(description);
769        }
770
771        codes.reverse();
772        descriptions.reverse();
773
774        Ok((codes, descriptions))
775    }
776
777    async fn run_user_action_with_runtime(
778        &mut self,
779        application_id: ApplicationId,
780        action: UserAction,
781        refund_grant_to: Option<Account>,
782        grant: Option<&mut Amount>,
783    ) -> Result<(), ExecutionError> {
784        let chain_id = self.state.context().extra().chain_id();
785        let mut cloned_grant = grant.as_ref().map(|x| **x);
786        let initial_balance = self
787            .resource_controller
788            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
789            .await?
790            .balance()?;
791        let controller = ResourceController::new(
792            self.resource_controller.policy().clone(),
793            self.resource_controller.tracker,
794            initial_balance,
795        );
796        let (execution_state_sender, mut execution_state_receiver) =
797            futures::channel::mpsc::unbounded();
798
799        let (codes, descriptions): (Vec<_>, Vec<_>) =
800            self.contract_and_dependencies(application_id).await?;
801
802        let thread = web_thread::Thread::new();
803        let contract_runtime_task = thread.run_send(JsVec(codes), move |codes| async move {
804            let runtime = ContractSyncRuntime::new(
805                execution_state_sender,
806                chain_id,
807                refund_grant_to,
808                controller,
809                &action,
810            );
811
812            for (code, description) in codes.0.into_iter().zip(descriptions) {
813                runtime.preload_contract(ApplicationId::from(&description), code, description)?;
814            }
815
816            runtime.run_action(application_id, chain_id, action)
817        });
818
819        while let Some(request) = execution_state_receiver.next().await {
820            self.handle_request(request).await?;
821        }
822
823        let (result, controller) = contract_runtime_task.await??;
824
825        self.txn_tracker.add_operation_result(result);
826
827        self.resource_controller
828            .with_state_and_grant(&mut self.state.system, grant)
829            .await?
830            .merge_balance(initial_balance, controller.balance()?)?;
831        self.resource_controller.tracker = controller.tracker;
832
833        Ok(())
834    }
835
836    pub async fn execute_operation(
837        &mut self,
838        context: OperationContext,
839        operation: Operation,
840    ) -> Result<(), ExecutionError> {
841        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
842        match operation {
843            Operation::System(op) => {
844                let new_application = self
845                    .state
846                    .system
847                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
848                    .await?;
849                if let Some((application_id, argument)) = new_application {
850                    let user_action = UserAction::Instantiate(context, argument);
851                    self.run_user_action(
852                        application_id,
853                        user_action,
854                        context.refund_grant_to(),
855                        None,
856                    )
857                    .await?;
858                }
859            }
860            Operation::User {
861                application_id,
862                bytes,
863            } => {
864                self.run_user_action(
865                    application_id,
866                    UserAction::Operation(context, bytes),
867                    context.refund_grant_to(),
868                    None,
869                )
870                .await?;
871            }
872        }
873        self.process_subscriptions(context.into()).await?;
874        Ok(())
875    }
876
877    pub async fn execute_message(
878        &mut self,
879        context: MessageContext,
880        message: Message,
881        grant: Option<&mut Amount>,
882    ) -> Result<(), ExecutionError> {
883        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
884        match message {
885            Message::System(message) => {
886                let outcome = self.state.system.execute_message(context, message).await?;
887                self.txn_tracker.add_outgoing_messages(outcome);
888            }
889            Message::User {
890                application_id,
891                bytes,
892            } => {
893                self.run_user_action(
894                    application_id,
895                    UserAction::Message(context, bytes),
896                    context.refund_grant_to,
897                    grant,
898                )
899                .await?;
900            }
901        }
902        self.process_subscriptions(context.into()).await?;
903        Ok(())
904    }
905
906    pub fn bounce_message(
907        &mut self,
908        context: MessageContext,
909        grant: Amount,
910        message: Message,
911    ) -> Result<(), ExecutionError> {
912        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
913        self.txn_tracker.add_outgoing_message(OutgoingMessage {
914            destination: context.origin,
915            authenticated_signer: context.authenticated_signer,
916            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
917            grant,
918            kind: MessageKind::Bouncing,
919            message,
920        });
921        Ok(())
922    }
923
924    pub fn send_refund(
925        &mut self,
926        context: MessageContext,
927        amount: Amount,
928    ) -> Result<(), ExecutionError> {
929        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
930        if amount.is_zero() {
931            return Ok(());
932        }
933        let Some(account) = context.refund_grant_to else {
934            return Err(ExecutionError::InternalError(
935                "Messages with grants should have a non-empty `refund_grant_to`",
936            ));
937        };
938        let message = SystemMessage::Credit {
939            amount,
940            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
941            target: account.owner,
942        };
943        self.txn_tracker.add_outgoing_message(
944            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
945        );
946        Ok(())
947    }
948
949    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
950    ///
951    /// Ensures that the response does not exceed the provided `size_limit`.
952    async fn receive_http_response(
953        response: reqwest::Response,
954        size_limit: u64,
955    ) -> Result<http::Response, ExecutionError> {
956        let status = response.status().as_u16();
957        let maybe_content_length = response.content_length();
958
959        let headers = response
960            .headers()
961            .iter()
962            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
963            .collect::<Vec<_>>();
964
965        let total_header_size = headers
966            .iter()
967            .map(|header| (header.name.len() + header.value.len()) as u64)
968            .sum();
969
970        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
971            ExecutionError::HttpResponseSizeLimitExceeded {
972                limit: size_limit,
973                size: total_header_size,
974            },
975        )?;
976
977        if let Some(content_length) = maybe_content_length {
978            if content_length > remaining_bytes {
979                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
980                    limit: size_limit,
981                    size: content_length + total_header_size,
982                });
983            }
984        }
985
986        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
987        let mut body_stream = response.bytes_stream();
988
989        while let Some(bytes) = body_stream.next().await.transpose()? {
990            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
991                ExecutionError::HttpResponseSizeLimitExceeded {
992                    limit: size_limit,
993                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
994                },
995            )?;
996
997            body.extend(&bytes);
998        }
999
1000        Ok(http::Response {
1001            status,
1002            headers,
1003            body,
1004        })
1005    }
1006}
1007
1008/// Requests to the execution state.
1009#[derive(Debug)]
1010pub enum ExecutionRequest {
1011    #[cfg(not(web))]
1012    LoadContract {
1013        id: ApplicationId,
1014        #[debug(skip)]
1015        callback: Sender<(UserContractCode, ApplicationDescription)>,
1016    },
1017
1018    #[cfg(not(web))]
1019    LoadService {
1020        id: ApplicationId,
1021        #[debug(skip)]
1022        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1023    },
1024
1025    ChainBalance {
1026        #[debug(skip)]
1027        callback: Sender<Amount>,
1028    },
1029
1030    OwnerBalance {
1031        owner: AccountOwner,
1032        #[debug(skip)]
1033        callback: Sender<Amount>,
1034    },
1035
1036    OwnerBalances {
1037        #[debug(skip)]
1038        callback: Sender<Vec<(AccountOwner, Amount)>>,
1039    },
1040
1041    BalanceOwners {
1042        #[debug(skip)]
1043        callback: Sender<Vec<AccountOwner>>,
1044    },
1045
1046    Transfer {
1047        source: AccountOwner,
1048        destination: Account,
1049        amount: Amount,
1050        #[debug(skip_if = Option::is_none)]
1051        signer: Option<AccountOwner>,
1052        application_id: ApplicationId,
1053        #[debug(skip)]
1054        callback: Sender<()>,
1055    },
1056
1057    Claim {
1058        source: Account,
1059        destination: Account,
1060        amount: Amount,
1061        #[debug(skip_if = Option::is_none)]
1062        signer: Option<AccountOwner>,
1063        application_id: ApplicationId,
1064        #[debug(skip)]
1065        callback: Sender<()>,
1066    },
1067
1068    SystemTimestamp {
1069        #[debug(skip)]
1070        callback: Sender<Timestamp>,
1071    },
1072
1073    ChainOwnership {
1074        #[debug(skip)]
1075        callback: Sender<ChainOwnership>,
1076    },
1077
1078    ReadValueBytes {
1079        id: ApplicationId,
1080        #[debug(with = hex_debug)]
1081        key: Vec<u8>,
1082        #[debug(skip)]
1083        callback: Sender<Option<Vec<u8>>>,
1084    },
1085
1086    ContainsKey {
1087        id: ApplicationId,
1088        key: Vec<u8>,
1089        #[debug(skip)]
1090        callback: Sender<bool>,
1091    },
1092
1093    ContainsKeys {
1094        id: ApplicationId,
1095        #[debug(with = hex_vec_debug)]
1096        keys: Vec<Vec<u8>>,
1097        callback: Sender<Vec<bool>>,
1098    },
1099
1100    ReadMultiValuesBytes {
1101        id: ApplicationId,
1102        #[debug(with = hex_vec_debug)]
1103        keys: Vec<Vec<u8>>,
1104        #[debug(skip)]
1105        callback: Sender<Vec<Option<Vec<u8>>>>,
1106    },
1107
1108    FindKeysByPrefix {
1109        id: ApplicationId,
1110        #[debug(with = hex_debug)]
1111        key_prefix: Vec<u8>,
1112        #[debug(skip)]
1113        callback: Sender<Vec<Vec<u8>>>,
1114    },
1115
1116    FindKeyValuesByPrefix {
1117        id: ApplicationId,
1118        #[debug(with = hex_debug)]
1119        key_prefix: Vec<u8>,
1120        #[debug(skip)]
1121        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1122    },
1123
1124    WriteBatch {
1125        id: ApplicationId,
1126        batch: Batch,
1127        #[debug(skip)]
1128        callback: Sender<()>,
1129    },
1130
1131    OpenChain {
1132        ownership: ChainOwnership,
1133        #[debug(skip_if = Amount::is_zero)]
1134        balance: Amount,
1135        parent_id: ChainId,
1136        block_height: BlockHeight,
1137        application_permissions: ApplicationPermissions,
1138        timestamp: Timestamp,
1139        #[debug(skip)]
1140        callback: Sender<ChainId>,
1141    },
1142
1143    CloseChain {
1144        application_id: ApplicationId,
1145        #[debug(skip)]
1146        callback: Sender<Result<(), ExecutionError>>,
1147    },
1148
1149    ChangeApplicationPermissions {
1150        application_id: ApplicationId,
1151        application_permissions: ApplicationPermissions,
1152        #[debug(skip)]
1153        callback: Sender<Result<(), ExecutionError>>,
1154    },
1155
1156    CreateApplication {
1157        chain_id: ChainId,
1158        block_height: BlockHeight,
1159        module_id: ModuleId,
1160        parameters: Vec<u8>,
1161        required_application_ids: Vec<ApplicationId>,
1162        #[debug(skip)]
1163        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1164    },
1165
1166    PerformHttpRequest {
1167        request: http::Request,
1168        http_responses_are_oracle_responses: bool,
1169        #[debug(skip)]
1170        callback: Sender<http::Response>,
1171    },
1172
1173    ReadBlobContent {
1174        blob_id: BlobId,
1175        #[debug(skip)]
1176        callback: Sender<BlobContent>,
1177    },
1178
1179    AssertBlobExists {
1180        blob_id: BlobId,
1181        #[debug(skip)]
1182        callback: Sender<()>,
1183    },
1184
1185    Emit {
1186        stream_id: StreamId,
1187        #[debug(with = hex_debug)]
1188        value: Vec<u8>,
1189        #[debug(skip)]
1190        callback: Sender<u32>,
1191    },
1192
1193    ReadEvent {
1194        event_id: EventId,
1195        callback: oneshot::Sender<Vec<u8>>,
1196    },
1197
1198    SubscribeToEvents {
1199        chain_id: ChainId,
1200        stream_id: StreamId,
1201        subscriber_app_id: ApplicationId,
1202        #[debug(skip)]
1203        callback: Sender<()>,
1204    },
1205
1206    UnsubscribeFromEvents {
1207        chain_id: ChainId,
1208        stream_id: StreamId,
1209        subscriber_app_id: ApplicationId,
1210        #[debug(skip)]
1211        callback: Sender<()>,
1212    },
1213
1214    GetApplicationPermissions {
1215        #[debug(skip)]
1216        callback: Sender<ApplicationPermissions>,
1217    },
1218
1219    QueryServiceOracle {
1220        deadline: Option<Instant>,
1221        application_id: ApplicationId,
1222        next_block_height: BlockHeight,
1223        query: Vec<u8>,
1224        #[debug(skip)]
1225        callback: Sender<Vec<u8>>,
1226    },
1227
1228    AddOutgoingMessage {
1229        message: crate::OutgoingMessage,
1230        #[debug(skip)]
1231        callback: Sender<()>,
1232    },
1233
1234    SetLocalTime {
1235        local_time: Timestamp,
1236        #[debug(skip)]
1237        callback: Sender<()>,
1238    },
1239
1240    AssertBefore {
1241        timestamp: Timestamp,
1242        #[debug(skip)]
1243        callback: Sender<Result<(), ExecutionError>>,
1244    },
1245
1246    AddCreatedBlob {
1247        blob: crate::Blob,
1248        #[debug(skip)]
1249        callback: Sender<()>,
1250    },
1251
1252    ValidationRound {
1253        round: Option<u32>,
1254        #[debug(skip)]
1255        callback: Sender<Option<u32>>,
1256    },
1257}