Skip to main content

linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::{
7    collections::{BTreeMap, BTreeSet},
8    sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16    data_types::{
17        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18        Timestamp,
19    },
20    ensure, hex_debug, hex_vec_debug, http,
21    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
22    ownership::ChainOwnership,
23    time::Instant,
24};
25use linera_views::{batch::Batch, context::Context, views::View};
26use oneshot::Sender;
27use reqwest::{header::HeaderMap, Client, Url};
28use tracing::{info_span, instrument, Instrument as _};
29
30use crate::{
31    execution::UserAction,
32    runtime::ContractSyncRuntime,
33    system::{CreateApplicationResult, OpenChainConfig},
34    util::{OracleResponseExt as _, RespondExt as _},
35    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
36    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
37    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
38    ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
39};
40
41/// Actor for handling requests to the execution state.
42pub struct ExecutionStateActor<'a, C> {
43    state: &'a mut ExecutionStateView<C>,
44    txn_tracker: &'a mut TransactionTracker,
45    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
46}
47
48#[cfg(with_metrics)]
49mod metrics {
50    use std::sync::LazyLock;
51
52    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
53    use prometheus::HistogramVec;
54
55    /// Histogram of the latency to load a contract bytecode.
56    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57        register_histogram_vec(
58            "load_contract_latency",
59            "Load contract latency",
60            &[],
61            exponential_bucket_latencies(250.0),
62        )
63    });
64
65    /// Histogram of the latency to load a service bytecode.
66    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
67        register_histogram_vec(
68            "load_service_latency",
69            "Load service latency",
70            &[],
71            exponential_bucket_latencies(250.0),
72        )
73    });
74}
75
76pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
77
78impl<'a, C> ExecutionStateActor<'a, C>
79where
80    C: Context + Clone + 'static,
81    C::Extra: ExecutionRuntimeContext,
82{
83    /// Creates a new execution state actor.
84    pub fn new(
85        state: &'a mut ExecutionStateView<C>,
86        txn_tracker: &'a mut TransactionTracker,
87        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
88    ) -> Self {
89        Self {
90            state,
91            txn_tracker,
92            resource_controller,
93        }
94    }
95
96    #[instrument(skip_all, fields(application_id = %id))]
97    pub(crate) async fn load_contract(
98        &mut self,
99        id: ApplicationId,
100    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
101        #[cfg(with_metrics)]
102        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
103        let blob_id = id.description_blob_id();
104        let description = match self.txn_tracker.get_blob_content(&blob_id) {
105            Some(blob) => bcs::from_bytes(blob.bytes())?,
106            None => {
107                self.state
108                    .system
109                    .describe_application(id, self.txn_tracker)
110                    .await?
111            }
112        };
113        let code = self
114            .state
115            .context()
116            .extra()
117            .get_user_contract(&description, self.txn_tracker)
118            .await?;
119        Ok((code, description))
120    }
121
122    pub(crate) async fn load_service(
123        &mut self,
124        id: ApplicationId,
125    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
126        #[cfg(with_metrics)]
127        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
128        let blob_id = id.description_blob_id();
129        let description = match self.txn_tracker.get_blob_content(&blob_id) {
130            Some(blob) => bcs::from_bytes(blob.bytes())?,
131            None => {
132                self.state
133                    .system
134                    .describe_application(id, self.txn_tracker)
135                    .await?
136            }
137        };
138        let code = self
139            .state
140            .context()
141            .extra()
142            .get_user_service(&description, self.txn_tracker)
143            .await?;
144        Ok((code, description))
145    }
146
147    // TODO(#1416): Support concurrent I/O.
148    #[instrument(
149        skip_all,
150        fields(request_type = %request.as_ref())
151    )]
152    pub(crate) async fn handle_request(
153        &mut self,
154        request: ExecutionRequest,
155    ) -> Result<(), ExecutionError> {
156        use ExecutionRequest::*;
157        match request {
158            #[cfg(not(web))]
159            LoadContract { id, callback } => {
160                let (code, description) = self.load_contract(id).await?;
161                callback.respond((code, description))
162            }
163            #[cfg(not(web))]
164            LoadService { id, callback } => {
165                let (code, description) = self.load_service(id).await?;
166                callback.respond((code, description))
167            }
168
169            ChainBalance { callback } => {
170                let balance = *self.state.system.balance.get();
171                callback.respond(balance);
172            }
173
174            OwnerBalance { owner, callback } => {
175                let balance = self
176                    .state
177                    .system
178                    .balances
179                    .get(&owner)
180                    .await?
181                    .unwrap_or_default();
182                callback.respond(balance);
183            }
184
185            OwnerBalances { callback } => {
186                let balances = self.state.system.balances.index_values().await?;
187                callback.respond(balances.into_iter().collect());
188            }
189
190            BalanceOwners { callback } => {
191                let owners = self.state.system.balances.indices().await?;
192                callback.respond(owners);
193            }
194
195            Transfer {
196                source,
197                destination,
198                amount,
199                signer,
200                application_id,
201                callback,
202            } => {
203                let maybe_message = self
204                    .state
205                    .system
206                    .transfer(signer, Some(application_id), source, destination, amount)
207                    .await?;
208                self.txn_tracker.add_outgoing_messages(maybe_message);
209                callback.respond(());
210            }
211
212            Claim {
213                source,
214                destination,
215                amount,
216                signer,
217                application_id,
218                callback,
219            } => {
220                let maybe_message = self
221                    .state
222                    .system
223                    .claim(
224                        signer,
225                        Some(application_id),
226                        source.owner,
227                        source.chain_id,
228                        destination,
229                        amount,
230                    )
231                    .await?;
232                self.txn_tracker.add_outgoing_messages(maybe_message);
233                callback.respond(());
234            }
235
236            SystemTimestamp { callback } => {
237                let timestamp = *self.state.system.timestamp.get();
238                callback.respond(timestamp);
239            }
240
241            ChainOwnership { callback } => {
242                let ownership = self.state.system.ownership.get().await?.clone();
243                callback.respond(ownership);
244            }
245
246            ApplicationPermissions { callback } => {
247                let permissions = self
248                    .state
249                    .system
250                    .application_permissions
251                    .get()
252                    .await?
253                    .clone();
254                callback.respond(permissions);
255            }
256
257            ReadApplicationDescription {
258                application_id,
259                callback,
260            } => {
261                let blob_id = application_id.description_blob_id();
262                let description = match self.txn_tracker.get_blob_content(&blob_id) {
263                    Some(blob) => bcs::from_bytes(blob.bytes())?,
264                    None => {
265                        let blob_content = self.state.system.read_blob_content(blob_id).await?;
266                        self.state
267                            .system
268                            .blob_used(self.txn_tracker, blob_id)
269                            .await?;
270                        bcs::from_bytes(blob_content.bytes())?
271                    }
272                };
273                callback.respond(description);
274            }
275
276            ContainsKey { id, key, callback } => {
277                let view = self.state.users.try_load_entry(&id).await?;
278                let result = match view {
279                    Some(view) => view.contains_key(&key).await?,
280                    None => false,
281                };
282                callback.respond(result);
283            }
284
285            ContainsKeys { id, keys, callback } => {
286                let view = self.state.users.try_load_entry(&id).await?;
287                let result = match view {
288                    Some(view) => view.contains_keys(&keys).await?,
289                    None => vec![false; keys.len()],
290                };
291                callback.respond(result);
292            }
293
294            ReadMultiValuesBytes { id, keys, callback } => {
295                let view = self.state.users.try_load_entry(&id).await?;
296                let values = match view {
297                    Some(view) => view.multi_get(&keys).await?,
298                    None => vec![None; keys.len()],
299                };
300                callback.respond(values);
301            }
302
303            ReadValueBytes { id, key, callback } => {
304                let view = self.state.users.try_load_entry(&id).await?;
305                let result = match view {
306                    Some(view) => view.get(&key).await?,
307                    None => None,
308                };
309                callback.respond(result);
310            }
311
312            FindKeysByPrefix {
313                id,
314                key_prefix,
315                callback,
316            } => {
317                let view = self.state.users.try_load_entry(&id).await?;
318                let result = match view {
319                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
320                    None => Vec::new(),
321                };
322                callback.respond(result);
323            }
324
325            FindKeyValuesByPrefix {
326                id,
327                key_prefix,
328                callback,
329            } => {
330                let view = self.state.users.try_load_entry(&id).await?;
331                let result = match view {
332                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
333                    None => Vec::new(),
334                };
335                callback.respond(result);
336            }
337
338            WriteBatch {
339                id,
340                batch,
341                callback,
342            } => {
343                let mut view = self.state.users.try_load_entry_mut(&id).await?;
344                view.write_batch(batch)?;
345                callback.respond(());
346            }
347
348            OpenChain {
349                ownership,
350                balance,
351                parent_id,
352                block_height,
353                application_permissions,
354                timestamp,
355                callback,
356            } => {
357                let config = OpenChainConfig {
358                    ownership,
359                    balance,
360                    application_permissions,
361                };
362                let chain_id = self
363                    .state
364                    .system
365                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
366                    .await?;
367                callback.respond(chain_id);
368            }
369
370            CloseChain {
371                application_id,
372                callback,
373            } => {
374                let app_permissions = self.state.system.application_permissions.get().await?;
375                if !app_permissions.can_close_chain(&application_id) {
376                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
377                } else {
378                    self.state.system.close_chain()?;
379                    callback.respond(Ok(()));
380                }
381            }
382
383            ChangeOwnership {
384                application_id,
385                ownership,
386                callback,
387            } => {
388                let app_permissions = self.state.system.application_permissions.get().await?;
389                if !app_permissions.can_close_chain(&application_id) {
390                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
391                } else {
392                    self.state.system.ownership.set(ownership);
393                    callback.respond(Ok(()));
394                }
395            }
396
397            ChangeApplicationPermissions {
398                application_id,
399                application_permissions,
400                callback,
401            } => {
402                let app_permissions = self.state.system.application_permissions.get().await?;
403                if !app_permissions.can_change_application_permissions(&application_id) {
404                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
405                } else {
406                    self.state
407                        .system
408                        .application_permissions
409                        .set(application_permissions);
410                    callback.respond(Ok(()));
411                }
412            }
413
414            CreateApplication {
415                chain_id,
416                block_height,
417                module_id,
418                parameters,
419                required_application_ids,
420                callback,
421            } => {
422                let create_application_result = self
423                    .state
424                    .system
425                    .create_application(
426                        chain_id,
427                        block_height,
428                        module_id,
429                        parameters,
430                        required_application_ids,
431                        self.txn_tracker,
432                    )
433                    .await?;
434                callback.respond(Ok(create_application_result));
435            }
436
437            PerformHttpRequest {
438                request,
439                http_responses_are_oracle_responses,
440                callback,
441            } => {
442                let system = &mut self.state.system;
443                let response = self
444                    .txn_tracker
445                    .oracle(|| async {
446                        let headers = request
447                            .headers
448                            .into_iter()
449                            .map(|http::Header { name, value }| {
450                                Ok((name.parse()?, value.try_into()?))
451                            })
452                            .collect::<Result<HeaderMap, ExecutionError>>()?;
453
454                        let url = Url::parse(&request.url)?;
455                        let host = url
456                            .host_str()
457                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
458
459                        let (_epoch, committee) = system
460                            .current_committee()
461                            .await?
462                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
463                        let allowed_hosts = &committee.policy().http_request_allow_list;
464
465                        ensure!(
466                            allowed_hosts.contains(host),
467                            ExecutionError::UnauthorizedHttpRequest(url)
468                        );
469
470                        let request = Client::new()
471                            .request(request.method.into(), url)
472                            .body(request.body)
473                            .headers(headers);
474                        #[cfg(not(web))]
475                        let request = request.timeout(linera_base::time::Duration::from_millis(
476                            committee.policy().http_request_timeout_ms,
477                        ));
478
479                        let response = request.send().await?;
480
481                        let mut response_size_limit =
482                            committee.policy().maximum_http_response_bytes;
483
484                        if http_responses_are_oracle_responses {
485                            response_size_limit = response_size_limit
486                                .min(committee.policy().maximum_oracle_response_bytes);
487                        }
488                        Ok(OracleResponse::Http(
489                            Self::receive_http_response(response, response_size_limit).await?,
490                        ))
491                    })
492                    .await?
493                    .to_http_response()?;
494                callback.respond(response);
495            }
496
497            ReadBlobContent { blob_id, callback } => {
498                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
499                    content.clone()
500                } else {
501                    let content = self.state.system.read_blob_content(blob_id).await?;
502                    if blob_id.blob_type == BlobType::Data {
503                        self.resource_controller
504                            .with_state(&mut self.state.system)
505                            .await?
506                            .track_blob_read(content.bytes().len() as u64)?;
507                    }
508                    self.state
509                        .system
510                        .blob_used(self.txn_tracker, blob_id)
511                        .await?;
512                    content
513                };
514                callback.respond(content)
515            }
516
517            AssertBlobExists { blob_id, callback } => {
518                self.state.system.assert_blob_exists(blob_id).await?;
519                // Treating this as reading a size-0 blob for fee purposes.
520                if blob_id.blob_type == BlobType::Data {
521                    self.resource_controller
522                        .with_state(&mut self.state.system)
523                        .await?
524                        .track_blob_read(0)?;
525                }
526                let is_new = self
527                    .state
528                    .system
529                    .blob_used(self.txn_tracker, blob_id)
530                    .await?;
531                if is_new {
532                    self.txn_tracker
533                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
534                }
535                callback.respond(());
536            }
537
538            Emit {
539                stream_id,
540                value,
541                callback,
542            } => {
543                let count = self
544                    .state
545                    .stream_event_counts
546                    .get_mut_or_default(&stream_id)
547                    .await?;
548                let index = *count;
549                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
550                self.txn_tracker.add_event(stream_id, index, value);
551                callback.respond(index)
552            }
553
554            ReadEvent { event_id, callback } => {
555                let extra = self.state.context().extra();
556                let event = self
557                    .txn_tracker
558                    .oracle(|| async {
559                        let event = extra
560                            .get_event(event_id.clone())
561                            .await?
562                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
563                        Ok(OracleResponse::Event(
564                            event_id.clone(),
565                            Arc::unwrap_or_clone(event),
566                        ))
567                    })
568                    .await?
569                    .to_event(&event_id)?;
570                callback.respond(event);
571            }
572
573            SubscribeToEvents {
574                chain_id,
575                stream_id,
576                subscriber_app_id,
577                callback,
578            } => {
579                let subscriptions = self
580                    .state
581                    .system
582                    .event_subscriptions
583                    .get_mut_or_default(&(chain_id, stream_id.clone()))
584                    .await?;
585                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
586                    subscriptions.next_index
587                } else {
588                    0
589                };
590                self.txn_tracker.add_stream_to_process(
591                    subscriber_app_id,
592                    chain_id,
593                    stream_id,
594                    0,
595                    next_index,
596                );
597                callback.respond(());
598            }
599
600            UnsubscribeFromEvents {
601                chain_id,
602                stream_id,
603                subscriber_app_id,
604                callback,
605            } => {
606                let key = (chain_id, stream_id.clone());
607                let subscriptions = self
608                    .state
609                    .system
610                    .event_subscriptions
611                    .get_mut_or_default(&key)
612                    .await?;
613                subscriptions.applications.remove(&subscriber_app_id);
614                if subscriptions.applications.is_empty() {
615                    self.state.system.event_subscriptions.remove(&key)?;
616                }
617                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
618                    self.txn_tracker
619                        .remove_stream_to_process(app_id, chain_id, stream_id);
620                }
621                callback.respond(());
622            }
623
624            GetApplicationPermissions { callback } => {
625                let app_permissions = self.state.system.application_permissions.get().await?;
626                callback.respond(app_permissions.clone());
627            }
628
629            QueryServiceOracle {
630                deadline,
631                application_id,
632                next_block_height,
633                query,
634                callback,
635            } => {
636                let state = &mut self.state;
637                let local_time = self.txn_tracker.local_time();
638                let created_blobs = self.txn_tracker.created_blobs().clone();
639                let bytes = self
640                    .txn_tracker
641                    .oracle(|| async {
642                        let context = QueryContext {
643                            chain_id: state.context().extra().chain_id(),
644                            next_block_height,
645                            local_time,
646                        };
647                        let QueryOutcome {
648                            response,
649                            operations,
650                        } = Box::pin(state.query_user_application_with_deadline(
651                            application_id,
652                            context,
653                            query,
654                            deadline,
655                            created_blobs,
656                        ))
657                        .await?;
658                        ensure!(
659                            operations.is_empty(),
660                            ExecutionError::ServiceOracleQueryOperations(operations)
661                        );
662                        Ok(OracleResponse::Service(response))
663                    })
664                    .await?
665                    .to_service_response()?;
666                callback.respond(bytes);
667            }
668
669            AddOutgoingMessage { message, callback } => {
670                self.txn_tracker.add_outgoing_message(message);
671                callback.respond(());
672            }
673
674            SetLocalTime {
675                local_time,
676                callback,
677            } => {
678                self.txn_tracker.set_local_time(local_time);
679                callback.respond(());
680            }
681
682            AssertBefore {
683                timestamp,
684                callback,
685            } => {
686                let result = if !self
687                    .txn_tracker
688                    .replay_oracle_response(OracleResponse::Assert)?
689                {
690                    // There are no recorded oracle responses, so we check the local time.
691                    let local_time = self.txn_tracker.local_time();
692                    if local_time >= timestamp {
693                        Err(ExecutionError::AssertBefore {
694                            timestamp,
695                            local_time,
696                        })
697                    } else {
698                        Ok(())
699                    }
700                } else {
701                    Ok(())
702                };
703                callback.respond(result);
704            }
705
706            AddCreatedBlob { blob, callback } => {
707                if self.resource_controller.is_free {
708                    self.txn_tracker.mark_blob_free(blob.id());
709                }
710                self.txn_tracker.add_created_blob(blob);
711                callback.respond(());
712            }
713
714            ValidationRound { round, callback } => {
715                let validation_round = self
716                    .txn_tracker
717                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
718                    .await?
719                    .to_round()?;
720                callback.respond(validation_round);
721            }
722
723            AllowApplicationLogs { callback } => {
724                let allow = self
725                    .state
726                    .context()
727                    .extra()
728                    .execution_runtime_config()
729                    .allow_application_logs;
730                callback.respond(allow);
731            }
732
733            #[cfg(web)]
734            Log { message, level } => match level {
735                tracing::log::Level::Trace | tracing::log::Level::Debug => {
736                    tracing::debug!(target: "user_application_log", message = %message);
737                }
738                tracing::log::Level::Info => {
739                    tracing::info!(target: "user_application_log", message = %message);
740                }
741                tracing::log::Level::Warn => {
742                    tracing::warn!(target: "user_application_log", message = %message);
743                }
744                tracing::log::Level::Error => {
745                    tracing::error!(target: "user_application_log", message = %message);
746                }
747            },
748        }
749
750        Ok(())
751    }
752
753    /// Calls `process_streams` for all applications that are subscribed to streams with new
754    /// events or that have new subscriptions.
755    #[instrument(skip_all)]
756    async fn process_subscriptions(
757        &mut self,
758        context: ProcessStreamsContext,
759    ) -> Result<(), ExecutionError> {
760        // Keep track of which streams we have already processed. This is to guard against
761        // applications unsubscribing and subscribing in the process_streams call itself.
762        let mut processed = BTreeSet::new();
763        loop {
764            let to_process = self
765                .txn_tracker
766                .take_streams_to_process()
767                .into_iter()
768                .filter_map(|(app_id, updates)| {
769                    let updates = updates
770                        .into_iter()
771                        .filter_map(|update| {
772                            if !processed.insert((
773                                app_id,
774                                update.chain_id,
775                                update.stream_id.clone(),
776                            )) {
777                                return None;
778                            }
779                            Some(update)
780                        })
781                        .collect::<Vec<_>>();
782                    if updates.is_empty() {
783                        return None;
784                    }
785                    Some((app_id, updates))
786                })
787                .collect::<BTreeMap<_, _>>();
788            if to_process.is_empty() {
789                return Ok(());
790            }
791            for (app_id, updates) in to_process {
792                self.run_user_action(
793                    app_id,
794                    UserAction::ProcessStreams(context, updates),
795                    None,
796                    None,
797                )
798                .await?;
799            }
800        }
801    }
802
803    pub(crate) async fn run_user_action(
804        &mut self,
805        application_id: ApplicationId,
806        action: UserAction,
807        refund_grant_to: Option<Account>,
808        grant: Option<&mut Amount>,
809    ) -> Result<(), ExecutionError> {
810        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
811            .await
812    }
813
814    // TODO(#5034): unify with `contract_and_dependencies`
815    pub(crate) async fn service_and_dependencies(
816        &mut self,
817        application: ApplicationId,
818    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
819        // cyclic futures are illegal so we need to either box the frames or keep our own
820        // stack
821        let mut stack = vec![application];
822        let mut codes = vec![];
823        let mut descriptions = vec![];
824
825        while let Some(id) = stack.pop() {
826            let (code, description) = self.load_service(id).await?;
827            stack.extend(description.required_application_ids.iter().rev().copied());
828            codes.push(code);
829            descriptions.push(description);
830        }
831
832        codes.reverse();
833        descriptions.reverse();
834
835        Ok((codes, descriptions))
836    }
837
838    // TODO(#5034): unify with `service_and_dependencies`
839    #[instrument(skip_all, fields(application_id = %application))]
840    async fn contract_and_dependencies(
841        &mut self,
842        application: ApplicationId,
843    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
844        // cyclic futures are illegal so we need to either box the frames or keep our own
845        // stack
846        let mut stack = vec![application];
847        let mut codes = vec![];
848        let mut descriptions = vec![];
849
850        while let Some(id) = stack.pop() {
851            let (code, description) = self.load_contract(id).await?;
852            stack.extend(description.required_application_ids.iter().rev().copied());
853            codes.push(code);
854            descriptions.push(description);
855        }
856
857        codes.reverse();
858        descriptions.reverse();
859
860        Ok((codes, descriptions))
861    }
862
863    #[instrument(skip_all, fields(application_id = %application_id))]
864    async fn run_user_action_with_runtime(
865        &mut self,
866        application_id: ApplicationId,
867        action: UserAction,
868        refund_grant_to: Option<Account>,
869        grant: Option<&mut Amount>,
870    ) -> Result<(), ExecutionError> {
871        let chain_id = self.state.context().extra().chain_id();
872        let mut cloned_grant = grant.as_ref().map(|x| **x);
873        let initial_balance = self
874            .resource_controller
875            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
876            .await?
877            .balance()?;
878        let mut controller = ResourceController::new(
879            self.resource_controller.policy().clone(),
880            self.resource_controller.tracker,
881            initial_balance,
882        );
883        let is_free = matches!(
884            &action,
885            UserAction::Message(..) | UserAction::ProcessStreams(..)
886        ) && self
887            .resource_controller
888            .policy()
889            .is_free_app(&application_id);
890        controller.is_free = is_free;
891        self.resource_controller.is_free = is_free;
892        let (execution_state_sender, mut execution_state_receiver) =
893            futures::channel::mpsc::unbounded();
894
895        let (codes, descriptions): (Vec<_>, Vec<_>) =
896            self.contract_and_dependencies(application_id).await?;
897
898        let allow_application_logs = self
899            .state
900            .context()
901            .extra()
902            .execution_runtime_config()
903            .allow_application_logs;
904
905        let contract_runtime_task = self
906            .state
907            .context()
908            .extra()
909            .thread_pool()
910            .run_send(JsVec(codes), move |codes| async move {
911                let runtime = ContractSyncRuntime::new(
912                    execution_state_sender,
913                    chain_id,
914                    refund_grant_to,
915                    controller,
916                    &action,
917                    allow_application_logs,
918                );
919
920                for (code, description) in codes.0.into_iter().zip(descriptions) {
921                    runtime.preload_contract(ApplicationId::from(&description), code, description);
922                }
923
924                runtime.run_action(application_id, chain_id, action)
925            })
926            .await;
927
928        async {
929            while let Some(request) = execution_state_receiver.next().await {
930                self.handle_request(request).await?;
931            }
932            Ok::<(), ExecutionError>(())
933        }
934        .instrument(info_span!("handle_runtime_requests"))
935        .await?;
936
937        let (result, controller) = contract_runtime_task.await??;
938
939        self.resource_controller.is_free = false;
940
941        self.txn_tracker.add_operation_result(result);
942
943        self.resource_controller
944            .with_state_and_grant(&mut self.state.system, grant)
945            .await?
946            .merge_balance(initial_balance, controller.balance()?)?;
947        self.resource_controller.tracker = controller.tracker;
948
949        Ok(())
950    }
951
952    #[instrument(skip_all, fields(
953        chain_id = %context.chain_id,
954        block_height = %context.height,
955        operation_type = %operation.as_ref(),
956    ))]
957    pub async fn execute_operation(
958        &mut self,
959        context: OperationContext,
960        operation: Operation,
961    ) -> Result<(), ExecutionError> {
962        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
963        match operation {
964            Operation::System(op) => {
965                let new_application = self
966                    .state
967                    .system
968                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
969                    .await?;
970                if let Some((application_id, argument)) = new_application {
971                    let user_action = UserAction::Instantiate(context, argument);
972                    self.run_user_action(
973                        application_id,
974                        user_action,
975                        context.refund_grant_to(),
976                        None,
977                    )
978                    .await?;
979                }
980            }
981            Operation::User {
982                application_id,
983                bytes,
984            } => {
985                self.run_user_action(
986                    application_id,
987                    UserAction::Operation(context, bytes),
988                    context.refund_grant_to(),
989                    None,
990                )
991                .await?;
992            }
993        }
994        self.process_subscriptions(context.into()).await?;
995        Ok(())
996    }
997
998    #[instrument(skip_all, fields(
999        chain_id = %context.chain_id,
1000        block_height = %context.height,
1001        origin = %context.origin,
1002        is_bouncing = %context.is_bouncing,
1003        message_type = %message.as_ref(),
1004    ))]
1005    pub async fn execute_message(
1006        &mut self,
1007        context: MessageContext,
1008        message: Message,
1009        grant: Option<&mut Amount>,
1010    ) -> Result<(), ExecutionError> {
1011        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1012        match message {
1013            Message::System(message) => {
1014                let outcome = self.state.system.execute_message(context, message).await?;
1015                self.txn_tracker.add_outgoing_messages(outcome);
1016            }
1017            Message::User {
1018                application_id,
1019                bytes,
1020            } => {
1021                self.run_user_action(
1022                    application_id,
1023                    UserAction::Message(context, bytes),
1024                    context.refund_grant_to,
1025                    grant,
1026                )
1027                .await?;
1028            }
1029        }
1030        self.process_subscriptions(context.into()).await?;
1031        Ok(())
1032    }
1033
1034    pub fn bounce_message(
1035        &mut self,
1036        context: MessageContext,
1037        grant: Amount,
1038        message: Message,
1039    ) -> Result<(), ExecutionError> {
1040        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1041        self.txn_tracker.add_outgoing_message(OutgoingMessage {
1042            destination: context.origin,
1043            authenticated_signer: context.authenticated_signer,
1044            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1045            grant,
1046            kind: MessageKind::Bouncing,
1047            message,
1048        });
1049        Ok(())
1050    }
1051
1052    pub fn send_refund(
1053        &mut self,
1054        context: MessageContext,
1055        amount: Amount,
1056    ) -> Result<(), ExecutionError> {
1057        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1058        if amount.is_zero() {
1059            return Ok(());
1060        }
1061        let Some(account) = context.refund_grant_to else {
1062            return Err(ExecutionError::InternalError(
1063                "Messages with grants should have a non-empty `refund_grant_to`",
1064            ));
1065        };
1066        let message = SystemMessage::Credit {
1067            amount,
1068            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
1069            target: account.owner,
1070        };
1071        self.txn_tracker.add_outgoing_message(
1072            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1073        );
1074        Ok(())
1075    }
1076
1077    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
1078    ///
1079    /// Ensures that the response does not exceed the provided `size_limit`.
1080    async fn receive_http_response(
1081        response: reqwest::Response,
1082        size_limit: u64,
1083    ) -> Result<http::Response, ExecutionError> {
1084        let status = response.status().as_u16();
1085        let maybe_content_length = response.content_length();
1086
1087        let headers = response
1088            .headers()
1089            .iter()
1090            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1091            .collect::<Vec<_>>();
1092
1093        let total_header_size = headers
1094            .iter()
1095            .map(|header| (header.name.len() + header.value.len()) as u64)
1096            .sum();
1097
1098        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1099            ExecutionError::HttpResponseSizeLimitExceeded {
1100                limit: size_limit,
1101                size: total_header_size,
1102            },
1103        )?;
1104
1105        if let Some(content_length) = maybe_content_length {
1106            if content_length > remaining_bytes {
1107                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1108                    limit: size_limit,
1109                    size: content_length + total_header_size,
1110                });
1111            }
1112        }
1113
1114        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1115        let mut body_stream = response.bytes_stream();
1116
1117        while let Some(bytes) = body_stream.next().await.transpose()? {
1118            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1119                ExecutionError::HttpResponseSizeLimitExceeded {
1120                    limit: size_limit,
1121                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1122                },
1123            )?;
1124
1125            body.extend(&bytes);
1126        }
1127
1128        Ok(http::Response {
1129            status,
1130            headers,
1131            body,
1132        })
1133    }
1134}
1135
1136/// Requests to the execution state.
1137#[derive(Debug, strum::AsRefStr)]
1138pub enum ExecutionRequest {
1139    #[cfg(not(web))]
1140    LoadContract {
1141        id: ApplicationId,
1142        #[debug(skip)]
1143        callback: Sender<(UserContractCode, ApplicationDescription)>,
1144    },
1145
1146    #[cfg(not(web))]
1147    LoadService {
1148        id: ApplicationId,
1149        #[debug(skip)]
1150        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1151    },
1152
1153    ChainBalance {
1154        #[debug(skip)]
1155        callback: Sender<Amount>,
1156    },
1157
1158    OwnerBalance {
1159        owner: AccountOwner,
1160        #[debug(skip)]
1161        callback: Sender<Amount>,
1162    },
1163
1164    OwnerBalances {
1165        #[debug(skip)]
1166        callback: Sender<Vec<(AccountOwner, Amount)>>,
1167    },
1168
1169    BalanceOwners {
1170        #[debug(skip)]
1171        callback: Sender<Vec<AccountOwner>>,
1172    },
1173
1174    Transfer {
1175        source: AccountOwner,
1176        destination: Account,
1177        amount: Amount,
1178        #[debug(skip_if = Option::is_none)]
1179        signer: Option<AccountOwner>,
1180        application_id: ApplicationId,
1181        #[debug(skip)]
1182        callback: Sender<()>,
1183    },
1184
1185    Claim {
1186        source: Account,
1187        destination: Account,
1188        amount: Amount,
1189        #[debug(skip_if = Option::is_none)]
1190        signer: Option<AccountOwner>,
1191        application_id: ApplicationId,
1192        #[debug(skip)]
1193        callback: Sender<()>,
1194    },
1195
1196    SystemTimestamp {
1197        #[debug(skip)]
1198        callback: Sender<Timestamp>,
1199    },
1200
1201    ChainOwnership {
1202        #[debug(skip)]
1203        callback: Sender<ChainOwnership>,
1204    },
1205
1206    ApplicationPermissions {
1207        #[debug(skip)]
1208        callback: Sender<ApplicationPermissions>,
1209    },
1210
1211    ReadApplicationDescription {
1212        application_id: ApplicationId,
1213        #[debug(skip)]
1214        callback: Sender<ApplicationDescription>,
1215    },
1216
1217    ReadValueBytes {
1218        id: ApplicationId,
1219        #[debug(with = hex_debug)]
1220        key: Vec<u8>,
1221        #[debug(skip)]
1222        callback: Sender<Option<Vec<u8>>>,
1223    },
1224
1225    ContainsKey {
1226        id: ApplicationId,
1227        key: Vec<u8>,
1228        #[debug(skip)]
1229        callback: Sender<bool>,
1230    },
1231
1232    ContainsKeys {
1233        id: ApplicationId,
1234        #[debug(with = hex_vec_debug)]
1235        keys: Vec<Vec<u8>>,
1236        callback: Sender<Vec<bool>>,
1237    },
1238
1239    ReadMultiValuesBytes {
1240        id: ApplicationId,
1241        #[debug(with = hex_vec_debug)]
1242        keys: Vec<Vec<u8>>,
1243        #[debug(skip)]
1244        callback: Sender<Vec<Option<Vec<u8>>>>,
1245    },
1246
1247    FindKeysByPrefix {
1248        id: ApplicationId,
1249        #[debug(with = hex_debug)]
1250        key_prefix: Vec<u8>,
1251        #[debug(skip)]
1252        callback: Sender<Vec<Vec<u8>>>,
1253    },
1254
1255    FindKeyValuesByPrefix {
1256        id: ApplicationId,
1257        #[debug(with = hex_debug)]
1258        key_prefix: Vec<u8>,
1259        #[debug(skip)]
1260        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1261    },
1262
1263    WriteBatch {
1264        id: ApplicationId,
1265        batch: Batch,
1266        #[debug(skip)]
1267        callback: Sender<()>,
1268    },
1269
1270    OpenChain {
1271        ownership: ChainOwnership,
1272        #[debug(skip_if = Amount::is_zero)]
1273        balance: Amount,
1274        parent_id: ChainId,
1275        block_height: BlockHeight,
1276        application_permissions: ApplicationPermissions,
1277        timestamp: Timestamp,
1278        #[debug(skip)]
1279        callback: Sender<ChainId>,
1280    },
1281
1282    CloseChain {
1283        application_id: ApplicationId,
1284        #[debug(skip)]
1285        callback: Sender<Result<(), ExecutionError>>,
1286    },
1287
1288    ChangeOwnership {
1289        application_id: ApplicationId,
1290        ownership: ChainOwnership,
1291        #[debug(skip)]
1292        callback: Sender<Result<(), ExecutionError>>,
1293    },
1294
1295    ChangeApplicationPermissions {
1296        application_id: ApplicationId,
1297        application_permissions: ApplicationPermissions,
1298        #[debug(skip)]
1299        callback: Sender<Result<(), ExecutionError>>,
1300    },
1301
1302    CreateApplication {
1303        chain_id: ChainId,
1304        block_height: BlockHeight,
1305        module_id: ModuleId,
1306        parameters: Vec<u8>,
1307        required_application_ids: Vec<ApplicationId>,
1308        #[debug(skip)]
1309        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1310    },
1311
1312    PerformHttpRequest {
1313        request: http::Request,
1314        http_responses_are_oracle_responses: bool,
1315        #[debug(skip)]
1316        callback: Sender<http::Response>,
1317    },
1318
1319    ReadBlobContent {
1320        blob_id: BlobId,
1321        #[debug(skip)]
1322        callback: Sender<BlobContent>,
1323    },
1324
1325    AssertBlobExists {
1326        blob_id: BlobId,
1327        #[debug(skip)]
1328        callback: Sender<()>,
1329    },
1330
1331    Emit {
1332        stream_id: StreamId,
1333        #[debug(with = hex_debug)]
1334        value: Vec<u8>,
1335        #[debug(skip)]
1336        callback: Sender<u32>,
1337    },
1338
1339    ReadEvent {
1340        event_id: EventId,
1341        callback: oneshot::Sender<Vec<u8>>,
1342    },
1343
1344    SubscribeToEvents {
1345        chain_id: ChainId,
1346        stream_id: StreamId,
1347        subscriber_app_id: ApplicationId,
1348        #[debug(skip)]
1349        callback: Sender<()>,
1350    },
1351
1352    UnsubscribeFromEvents {
1353        chain_id: ChainId,
1354        stream_id: StreamId,
1355        subscriber_app_id: ApplicationId,
1356        #[debug(skip)]
1357        callback: Sender<()>,
1358    },
1359
1360    GetApplicationPermissions {
1361        #[debug(skip)]
1362        callback: Sender<ApplicationPermissions>,
1363    },
1364
1365    QueryServiceOracle {
1366        deadline: Option<Instant>,
1367        application_id: ApplicationId,
1368        next_block_height: BlockHeight,
1369        query: Vec<u8>,
1370        #[debug(skip)]
1371        callback: Sender<Vec<u8>>,
1372    },
1373
1374    AddOutgoingMessage {
1375        message: crate::OutgoingMessage,
1376        #[debug(skip)]
1377        callback: Sender<()>,
1378    },
1379
1380    SetLocalTime {
1381        local_time: Timestamp,
1382        #[debug(skip)]
1383        callback: Sender<()>,
1384    },
1385
1386    AssertBefore {
1387        timestamp: Timestamp,
1388        #[debug(skip)]
1389        callback: Sender<Result<(), ExecutionError>>,
1390    },
1391
1392    AddCreatedBlob {
1393        blob: crate::Blob,
1394        #[debug(skip)]
1395        callback: Sender<()>,
1396    },
1397
1398    ValidationRound {
1399        round: Option<u32>,
1400        #[debug(skip)]
1401        callback: Sender<Option<u32>>,
1402    },
1403
1404    AllowApplicationLogs {
1405        #[debug(skip)]
1406        callback: Sender<bool>,
1407    },
1408
1409    /// Log message from contract execution (fire-and-forget, no callback needed).
1410    #[cfg(web)]
1411    Log {
1412        message: String,
1413        level: tracing::log::Level,
1414    },
1415}