Skip to main content

linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::{
7    collections::{BTreeMap, BTreeSet},
8    sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16    data_types::{
17        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18        Timestamp,
19    },
20    ensure, hex_debug, hex_vec_debug, http,
21    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
22    ownership::ChainOwnership,
23    time::Instant,
24};
25use linera_views::{batch::Batch, context::Context, views::View};
26use oneshot::Sender;
27use reqwest::{header::HeaderMap, Client, Url};
28use tracing::{info_span, instrument, Instrument as _};
29
30use crate::{
31    execution::UserAction,
32    runtime::ContractSyncRuntime,
33    system::{CreateApplicationResult, OpenChainConfig},
34    util::{OracleResponseExt as _, RespondExt as _},
35    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
36    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
37    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
38    ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
39};
40
41/// Actor for handling requests to the execution state.
42pub struct ExecutionStateActor<'a, C> {
43    state: &'a mut ExecutionStateView<C>,
44    txn_tracker: &'a mut TransactionTracker,
45    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
46}
47
48#[cfg(with_metrics)]
49mod metrics {
50    use std::sync::LazyLock;
51
52    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
53    use prometheus::HistogramVec;
54
55    /// Histogram of the latency to load a contract bytecode.
56    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57        register_histogram_vec(
58            "load_contract_latency",
59            "Load contract latency",
60            &[],
61            exponential_bucket_latencies(250.0),
62        )
63    });
64
65    /// Histogram of the latency to load a service bytecode.
66    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
67        register_histogram_vec(
68            "load_service_latency",
69            "Load service latency",
70            &[],
71            exponential_bucket_latencies(250.0),
72        )
73    });
74}
75
76pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
77
78impl<'a, C> ExecutionStateActor<'a, C>
79where
80    C: Context + Clone + 'static,
81    C::Extra: ExecutionRuntimeContext,
82{
83    /// Creates a new execution state actor.
84    pub fn new(
85        state: &'a mut ExecutionStateView<C>,
86        txn_tracker: &'a mut TransactionTracker,
87        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
88    ) -> Self {
89        Self {
90            state,
91            txn_tracker,
92            resource_controller,
93        }
94    }
95
96    #[instrument(skip_all, fields(application_id = %id))]
97    pub(crate) async fn load_contract(
98        &mut self,
99        id: ApplicationId,
100    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
101        #[cfg(with_metrics)]
102        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
103        let blob_id = id.description_blob_id();
104        let description = match self.txn_tracker.get_blob_content(&blob_id) {
105            Some(blob) => bcs::from_bytes(blob.bytes())?,
106            None => {
107                self.state
108                    .system
109                    .describe_application(id, self.txn_tracker)
110                    .await?
111            }
112        };
113        let code = self
114            .state
115            .context()
116            .extra()
117            .get_user_contract(&description, self.txn_tracker)
118            .await?;
119        Ok((code, description))
120    }
121
122    pub(crate) async fn load_service(
123        &mut self,
124        id: ApplicationId,
125    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
126        #[cfg(with_metrics)]
127        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
128        let blob_id = id.description_blob_id();
129        let description = match self.txn_tracker.get_blob_content(&blob_id) {
130            Some(blob) => bcs::from_bytes(blob.bytes())?,
131            None => {
132                self.state
133                    .system
134                    .describe_application(id, self.txn_tracker)
135                    .await?
136            }
137        };
138        let code = self
139            .state
140            .context()
141            .extra()
142            .get_user_service(&description, self.txn_tracker)
143            .await?;
144        Ok((code, description))
145    }
146
147    // TODO(#1416): Support concurrent I/O.
148    #[instrument(
149        skip_all,
150        fields(request_type = %request.as_ref())
151    )]
152    pub(crate) async fn handle_request(
153        &mut self,
154        request: ExecutionRequest,
155    ) -> Result<(), ExecutionError> {
156        use ExecutionRequest::*;
157        match request {
158            #[cfg(not(web))]
159            LoadContract { id, callback } => {
160                let (code, description) = self.load_contract(id).await?;
161                callback.respond((code, description))
162            }
163            #[cfg(not(web))]
164            LoadService { id, callback } => {
165                let (code, description) = self.load_service(id).await?;
166                callback.respond((code, description))
167            }
168
169            ChainBalance { callback } => {
170                let balance = *self.state.system.balance.get();
171                callback.respond(balance);
172            }
173
174            OwnerBalance { owner, callback } => {
175                let balance = self
176                    .state
177                    .system
178                    .balances
179                    .get(&owner)
180                    .await?
181                    .unwrap_or_default();
182                callback.respond(balance);
183            }
184
185            OwnerBalances { callback } => {
186                let balances = self.state.system.balances.index_values().await?;
187                callback.respond(balances.into_iter().collect());
188            }
189
190            BalanceOwners { callback } => {
191                let owners = self.state.system.balances.indices().await?;
192                callback.respond(owners);
193            }
194
195            Transfer {
196                source,
197                destination,
198                amount,
199                signer,
200                application_id,
201                callback,
202            } => {
203                let maybe_message = self
204                    .state
205                    .system
206                    .transfer(signer, Some(application_id), source, destination, amount)
207                    .await?;
208                self.txn_tracker.add_outgoing_messages(maybe_message);
209                callback.respond(());
210            }
211
212            Claim {
213                source,
214                destination,
215                amount,
216                signer,
217                application_id,
218                callback,
219            } => {
220                let maybe_message = self
221                    .state
222                    .system
223                    .claim(
224                        signer,
225                        Some(application_id),
226                        source.owner,
227                        source.chain_id,
228                        destination,
229                        amount,
230                    )
231                    .await?;
232                self.txn_tracker.add_outgoing_messages(maybe_message);
233                callback.respond(());
234            }
235
236            SystemTimestamp { callback } => {
237                let timestamp = *self.state.system.timestamp.get();
238                callback.respond(timestamp);
239            }
240
241            ChainOwnership { callback } => {
242                let ownership = self.state.system.ownership.get().await?.clone();
243                callback.respond(ownership);
244            }
245
246            ApplicationPermissions { callback } => {
247                let permissions = self
248                    .state
249                    .system
250                    .application_permissions
251                    .get()
252                    .await?
253                    .clone();
254                callback.respond(permissions);
255            }
256
257            ReadApplicationDescription {
258                application_id,
259                callback,
260            } => {
261                let blob_id = application_id.description_blob_id();
262                let description = match self.txn_tracker.get_blob_content(&blob_id) {
263                    Some(blob) => bcs::from_bytes(blob.bytes())?,
264                    None => {
265                        let blob_content = self.state.system.read_blob_content(blob_id).await?;
266                        self.state
267                            .system
268                            .blob_used(self.txn_tracker, blob_id)
269                            .await?;
270                        bcs::from_bytes(blob_content.bytes())?
271                    }
272                };
273                callback.respond(description);
274            }
275
276            ContainsKey { id, key, callback } => {
277                let view = self.state.users.try_load_entry(&id).await?;
278                let result = match view {
279                    Some(view) => view.contains_key(&key).await?,
280                    None => false,
281                };
282                callback.respond(result);
283            }
284
285            ContainsKeys { id, keys, callback } => {
286                let view = self.state.users.try_load_entry(&id).await?;
287                let result = match view {
288                    Some(view) => view.contains_keys(&keys).await?,
289                    None => vec![false; keys.len()],
290                };
291                callback.respond(result);
292            }
293
294            ReadMultiValuesBytes { id, keys, callback } => {
295                let view = self.state.users.try_load_entry(&id).await?;
296                let values = match view {
297                    Some(view) => view.multi_get(&keys).await?,
298                    None => vec![None; keys.len()],
299                };
300                callback.respond(values);
301            }
302
303            ReadValueBytes { id, key, callback } => {
304                let view = self.state.users.try_load_entry(&id).await?;
305                let result = match view {
306                    Some(view) => view.get(&key).await?,
307                    None => None,
308                };
309                callback.respond(result);
310            }
311
312            FindKeysByPrefix {
313                id,
314                key_prefix,
315                callback,
316            } => {
317                let view = self.state.users.try_load_entry(&id).await?;
318                let result = match view {
319                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
320                    None => Vec::new(),
321                };
322                callback.respond(result);
323            }
324
325            FindKeyValuesByPrefix {
326                id,
327                key_prefix,
328                callback,
329            } => {
330                let view = self.state.users.try_load_entry(&id).await?;
331                let result = match view {
332                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
333                    None => Vec::new(),
334                };
335                callback.respond(result);
336            }
337
338            WriteBatch {
339                id,
340                batch,
341                callback,
342            } => {
343                let mut view = self.state.users.try_load_entry_mut(&id).await?;
344                view.write_batch(batch)?;
345                callback.respond(());
346            }
347
348            OpenChain {
349                ownership,
350                balance,
351                parent_id,
352                block_height,
353                application_permissions,
354                timestamp,
355                callback,
356            } => {
357                let config = OpenChainConfig {
358                    ownership,
359                    balance,
360                    application_permissions,
361                };
362                let chain_id = self
363                    .state
364                    .system
365                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
366                    .await?;
367                callback.respond(chain_id);
368            }
369
370            CloseChain {
371                application_id,
372                callback,
373            } => {
374                let app_permissions = self.state.system.application_permissions.get().await?;
375                if !app_permissions.can_close_chain(&application_id) {
376                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
377                } else {
378                    self.state.system.close_chain()?;
379                    callback.respond(Ok(()));
380                }
381            }
382
383            ChangeOwnership {
384                application_id,
385                ownership,
386                callback,
387            } => {
388                let app_permissions = self.state.system.application_permissions.get().await?;
389                if !app_permissions.can_close_chain(&application_id) {
390                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
391                } else {
392                    self.state.system.ownership.set(ownership);
393                    callback.respond(Ok(()));
394                }
395            }
396
397            ChangeApplicationPermissions {
398                application_id,
399                application_permissions,
400                callback,
401            } => {
402                let app_permissions = self.state.system.application_permissions.get().await?;
403                if !app_permissions.can_change_application_permissions(&application_id) {
404                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
405                } else {
406                    self.state
407                        .system
408                        .application_permissions
409                        .set(application_permissions);
410                    callback.respond(Ok(()));
411                }
412            }
413
414            CreateApplication {
415                chain_id,
416                block_height,
417                module_id,
418                parameters,
419                required_application_ids,
420                callback,
421            } => {
422                let create_application_result = self
423                    .state
424                    .system
425                    .create_application(
426                        chain_id,
427                        block_height,
428                        module_id,
429                        parameters,
430                        required_application_ids,
431                        self.txn_tracker,
432                    )
433                    .await?;
434                callback.respond(Ok(create_application_result));
435            }
436
437            PerformHttpRequest {
438                request,
439                http_responses_are_oracle_responses,
440                callback,
441            } => {
442                let system = &mut self.state.system;
443                let response = self
444                    .txn_tracker
445                    .oracle(|| async {
446                        let headers = request
447                            .headers
448                            .into_iter()
449                            .map(|http::Header { name, value }| {
450                                Ok((name.parse()?, value.try_into()?))
451                            })
452                            .collect::<Result<HeaderMap, ExecutionError>>()?;
453
454                        let url = Url::parse(&request.url)?;
455                        let host = url
456                            .host_str()
457                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
458
459                        let (_epoch, committee) = system
460                            .current_committee()
461                            .await?
462                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
463                        let allowed_hosts = &committee.policy().http_request_allow_list;
464
465                        ensure!(
466                            allowed_hosts.contains(host),
467                            ExecutionError::UnauthorizedHttpRequest(url)
468                        );
469
470                        let request = Client::new()
471                            .request(request.method.into(), url)
472                            .body(request.body)
473                            .headers(headers);
474                        #[cfg(not(web))]
475                        let request = request.timeout(linera_base::time::Duration::from_millis(
476                            committee.policy().http_request_timeout_ms,
477                        ));
478
479                        let response = request.send().await?;
480
481                        let mut response_size_limit =
482                            committee.policy().maximum_http_response_bytes;
483
484                        if http_responses_are_oracle_responses {
485                            response_size_limit = response_size_limit
486                                .min(committee.policy().maximum_oracle_response_bytes);
487                        }
488                        Ok(OracleResponse::Http(
489                            Self::receive_http_response(response, response_size_limit).await?,
490                        ))
491                    })
492                    .await?
493                    .to_http_response()?;
494                callback.respond(response);
495            }
496
497            ReadBlobContent { blob_id, callback } => {
498                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
499                    content.clone()
500                } else {
501                    let content = self.state.system.read_blob_content(blob_id).await?;
502                    if blob_id.blob_type == BlobType::Data {
503                        self.resource_controller
504                            .with_state(&mut self.state.system)
505                            .await?
506                            .track_blob_read(content.bytes().len() as u64)?;
507                    }
508                    self.state
509                        .system
510                        .blob_used(self.txn_tracker, blob_id)
511                        .await?;
512                    content
513                };
514                callback.respond(content)
515            }
516
517            AssertBlobExists { blob_id, callback } => {
518                self.state.system.assert_blob_exists(blob_id).await?;
519                // Treating this as reading a size-0 blob for fee purposes.
520                if blob_id.blob_type == BlobType::Data {
521                    self.resource_controller
522                        .with_state(&mut self.state.system)
523                        .await?
524                        .track_blob_read(0)?;
525                }
526                let is_new = self
527                    .state
528                    .system
529                    .blob_used(self.txn_tracker, blob_id)
530                    .await?;
531                if is_new {
532                    self.txn_tracker
533                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
534                }
535                callback.respond(());
536            }
537
538            Emit {
539                stream_id,
540                value,
541                callback,
542            } => {
543                let count = self
544                    .state
545                    .stream_event_counts
546                    .get_mut_or_default(&stream_id)
547                    .await?;
548                let index = *count;
549                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
550                self.txn_tracker.add_event(stream_id, index, value);
551                callback.respond(index)
552            }
553
554            ReadEvent { event_id, callback } => {
555                let extra = self.state.context().extra();
556                let event = self
557                    .txn_tracker
558                    .oracle(|| async {
559                        let event = extra
560                            .get_event(event_id.clone())
561                            .await?
562                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
563                        Ok(OracleResponse::Event(
564                            event_id.clone(),
565                            Arc::unwrap_or_clone(event),
566                        ))
567                    })
568                    .await?
569                    .to_event(&event_id)?;
570                callback.respond(event);
571            }
572
573            SubscribeToEvents {
574                chain_id,
575                stream_id,
576                subscriber_app_id,
577                callback,
578            } => {
579                let subscriptions = self
580                    .state
581                    .system
582                    .event_subscriptions
583                    .get_mut_or_default(&(chain_id, stream_id.clone()))
584                    .await?;
585                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
586                    subscriptions.next_index
587                } else {
588                    0
589                };
590                self.txn_tracker.add_stream_to_process(
591                    subscriber_app_id,
592                    chain_id,
593                    stream_id,
594                    0,
595                    next_index,
596                );
597                callback.respond(());
598            }
599
600            UnsubscribeFromEvents {
601                chain_id,
602                stream_id,
603                subscriber_app_id,
604                callback,
605            } => {
606                let key = (chain_id, stream_id.clone());
607                let subscriptions = self
608                    .state
609                    .system
610                    .event_subscriptions
611                    .get_mut_or_default(&key)
612                    .await?;
613                subscriptions.applications.remove(&subscriber_app_id);
614                if subscriptions.applications.is_empty() {
615                    self.state.system.event_subscriptions.remove(&key)?;
616                }
617                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
618                    self.txn_tracker
619                        .remove_stream_to_process(app_id, chain_id, stream_id);
620                }
621                callback.respond(());
622            }
623
624            GetApplicationPermissions { callback } => {
625                let app_permissions = self.state.system.application_permissions.get().await?;
626                callback.respond(app_permissions.clone());
627            }
628
629            QueryServiceOracle {
630                deadline,
631                application_id,
632                next_block_height,
633                query,
634                callback,
635            } => {
636                let state = &mut self.state;
637                let local_time = self.txn_tracker.local_time();
638                let created_blobs = self.txn_tracker.created_blobs().clone();
639                let bytes = self
640                    .txn_tracker
641                    .oracle(|| async {
642                        let context = QueryContext {
643                            chain_id: state.context().extra().chain_id(),
644                            next_block_height,
645                            local_time,
646                        };
647                        let QueryOutcome {
648                            response,
649                            operations,
650                        } = Box::pin(state.query_user_application_with_deadline(
651                            application_id,
652                            context,
653                            query,
654                            deadline,
655                            created_blobs,
656                        ))
657                        .await?;
658                        ensure!(
659                            operations.is_empty(),
660                            ExecutionError::ServiceOracleQueryOperations(operations)
661                        );
662                        Ok(OracleResponse::Service(response))
663                    })
664                    .await?
665                    .to_service_response()?;
666                callback.respond(bytes);
667            }
668
669            AddOutgoingMessage { message, callback } => {
670                self.txn_tracker.add_outgoing_message(message);
671                callback.respond(());
672            }
673
674            SetLocalTime {
675                local_time,
676                callback,
677            } => {
678                self.txn_tracker.set_local_time(local_time);
679                callback.respond(());
680            }
681
682            AssertBefore {
683                timestamp,
684                callback,
685            } => {
686                let result = if !self
687                    .txn_tracker
688                    .replay_oracle_response(OracleResponse::Assert)?
689                {
690                    // There are no recorded oracle responses, so we check the local time.
691                    let local_time = self.txn_tracker.local_time();
692                    if local_time >= timestamp {
693                        Err(ExecutionError::AssertBefore {
694                            timestamp,
695                            local_time,
696                        })
697                    } else {
698                        Ok(())
699                    }
700                } else {
701                    Ok(())
702                };
703                callback.respond(result);
704            }
705
706            AddCreatedBlob { blob, callback } => {
707                if self.resource_controller.is_free {
708                    self.txn_tracker.mark_blob_free(blob.id());
709                }
710                self.txn_tracker.add_created_blob(blob);
711                callback.respond(());
712            }
713
714            ValidationRound { round, callback } => {
715                let validation_round = self
716                    .txn_tracker
717                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
718                    .await?
719                    .to_round()?;
720                callback.respond(validation_round);
721            }
722
723            AllowApplicationLogs { callback } => {
724                let allow = self
725                    .state
726                    .context()
727                    .extra()
728                    .execution_runtime_config()
729                    .allow_application_logs;
730                callback.respond(allow);
731            }
732
733            #[cfg(web)]
734            Log { message, level } => match level {
735                tracing::log::Level::Trace | tracing::log::Level::Debug => {
736                    tracing::debug!(target: "user_application_log", message = %message);
737                }
738                tracing::log::Level::Info => {
739                    tracing::info!(target: "user_application_log", message = %message);
740                }
741                tracing::log::Level::Warn => {
742                    tracing::warn!(target: "user_application_log", message = %message);
743                }
744                tracing::log::Level::Error => {
745                    tracing::error!(target: "user_application_log", message = %message);
746                }
747            },
748        }
749
750        Ok(())
751    }
752
753    /// Calls `process_streams` for all applications that are subscribed to streams with new
754    /// events or that have new subscriptions.
755    #[instrument(skip_all)]
756    async fn process_subscriptions(
757        &mut self,
758        context: ProcessStreamsContext,
759    ) -> Result<(), ExecutionError> {
760        // Keep track of which streams we have already processed. This is to guard against
761        // applications unsubscribing and subscribing in the process_streams call itself.
762        let mut processed = BTreeSet::new();
763        loop {
764            let to_process = self
765                .txn_tracker
766                .take_streams_to_process()
767                .into_iter()
768                .filter_map(|(app_id, updates)| {
769                    let updates = updates
770                        .into_iter()
771                        .filter_map(|update| {
772                            if !processed.insert((
773                                app_id,
774                                update.chain_id,
775                                update.stream_id.clone(),
776                            )) {
777                                return None;
778                            }
779                            Some(update)
780                        })
781                        .collect::<Vec<_>>();
782                    if updates.is_empty() {
783                        return None;
784                    }
785                    Some((app_id, updates))
786                })
787                .collect::<BTreeMap<_, _>>();
788            if to_process.is_empty() {
789                return Ok(());
790            }
791            for (app_id, updates) in to_process {
792                self.run_user_action(
793                    app_id,
794                    UserAction::ProcessStreams(context, updates),
795                    None,
796                    None,
797                )
798                .await?;
799            }
800        }
801    }
802
803    pub(crate) async fn run_user_action(
804        &mut self,
805        application_id: ApplicationId,
806        action: UserAction,
807        refund_grant_to: Option<Account>,
808        grant: Option<&mut Amount>,
809    ) -> Result<(), ExecutionError> {
810        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
811            .await
812    }
813
814    // TODO(#5034): unify with `contract_and_dependencies`
815    pub(crate) async fn service_and_dependencies(
816        &mut self,
817        application: ApplicationId,
818    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
819        // cyclic futures are illegal so we need to either box the frames or keep our own
820        // stack
821        let mut stack = vec![application];
822        let mut codes = vec![];
823        let mut descriptions = vec![];
824
825        while let Some(id) = stack.pop() {
826            let (code, description) = self.load_service(id).await?;
827            stack.extend(description.required_application_ids.iter().rev().copied());
828            codes.push(code);
829            descriptions.push(description);
830        }
831
832        codes.reverse();
833        descriptions.reverse();
834
835        Ok((codes, descriptions))
836    }
837
838    // TODO(#5034): unify with `service_and_dependencies`
839    #[instrument(skip_all, fields(application_id = %application))]
840    async fn contract_and_dependencies(
841        &mut self,
842        application: ApplicationId,
843    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
844        // cyclic futures are illegal so we need to either box the frames or keep our own
845        // stack
846        let mut stack = vec![application];
847        let mut codes = vec![];
848        let mut descriptions = vec![];
849
850        while let Some(id) = stack.pop() {
851            let (code, description) = self.load_contract(id).await?;
852            stack.extend(description.required_application_ids.iter().rev().copied());
853            codes.push(code);
854            descriptions.push(description);
855        }
856
857        codes.reverse();
858        descriptions.reverse();
859
860        Ok((codes, descriptions))
861    }
862
863    #[instrument(skip_all, fields(application_id = %application_id))]
864    async fn run_user_action_with_runtime(
865        &mut self,
866        application_id: ApplicationId,
867        action: UserAction,
868        refund_grant_to: Option<Account>,
869        grant: Option<&mut Amount>,
870    ) -> Result<(), ExecutionError> {
871        let chain_id = self.state.context().extra().chain_id();
872        let mut cloned_grant = grant.as_ref().map(|x| **x);
873        let initial_balance = self
874            .resource_controller
875            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
876            .await?
877            .balance()?;
878        let mut controller = ResourceController::new(
879            self.resource_controller.policy().clone(),
880            self.resource_controller.tracker,
881            initial_balance,
882        );
883        let is_free = matches!(
884            &action,
885            UserAction::Message(..) | UserAction::ProcessStreams(..)
886        ) && self
887            .resource_controller
888            .policy()
889            .is_free_app(&application_id);
890        controller.is_free = is_free;
891        self.resource_controller.is_free = is_free;
892        let (execution_state_sender, mut execution_state_receiver) =
893            futures::channel::mpsc::unbounded();
894
895        let (codes, descriptions): (Vec<_>, Vec<_>) =
896            self.contract_and_dependencies(application_id).await?;
897
898        let allow_application_logs = self
899            .state
900            .context()
901            .extra()
902            .execution_runtime_config()
903            .allow_application_logs;
904
905        let contract_runtime_task = self
906            .state
907            .context()
908            .extra()
909            .thread_pool()
910            .run_send(JsVec(codes), move |codes| async move {
911                let runtime = ContractSyncRuntime::new(
912                    execution_state_sender,
913                    chain_id,
914                    refund_grant_to,
915                    controller,
916                    &action,
917                    allow_application_logs,
918                );
919
920                for (code, description) in codes.0.into_iter().zip(descriptions) {
921                    runtime.preload_contract(
922                        ApplicationId::from(&description),
923                        code,
924                        description,
925                    )?;
926                }
927
928                runtime.run_action(application_id, chain_id, action)
929            })
930            .await;
931
932        async {
933            while let Some(request) = execution_state_receiver.next().await {
934                self.handle_request(request).await?;
935            }
936            Ok::<(), ExecutionError>(())
937        }
938        .instrument(info_span!("handle_runtime_requests"))
939        .await?;
940
941        let (result, controller) = contract_runtime_task.await??;
942
943        self.resource_controller.is_free = false;
944
945        self.txn_tracker.add_operation_result(result);
946
947        self.resource_controller
948            .with_state_and_grant(&mut self.state.system, grant)
949            .await?
950            .merge_balance(initial_balance, controller.balance()?)?;
951        self.resource_controller.tracker = controller.tracker;
952
953        Ok(())
954    }
955
956    #[instrument(skip_all, fields(
957        chain_id = %context.chain_id,
958        block_height = %context.height,
959        operation_type = %operation.as_ref(),
960    ))]
961    pub async fn execute_operation(
962        &mut self,
963        context: OperationContext,
964        operation: Operation,
965    ) -> Result<(), ExecutionError> {
966        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
967        match operation {
968            Operation::System(op) => {
969                let new_application = self
970                    .state
971                    .system
972                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
973                    .await?;
974                if let Some((application_id, argument)) = new_application {
975                    let user_action = UserAction::Instantiate(context, argument);
976                    self.run_user_action(
977                        application_id,
978                        user_action,
979                        context.refund_grant_to(),
980                        None,
981                    )
982                    .await?;
983                }
984            }
985            Operation::User {
986                application_id,
987                bytes,
988            } => {
989                self.run_user_action(
990                    application_id,
991                    UserAction::Operation(context, bytes),
992                    context.refund_grant_to(),
993                    None,
994                )
995                .await?;
996            }
997        }
998        self.process_subscriptions(context.into()).await?;
999        Ok(())
1000    }
1001
1002    #[instrument(skip_all, fields(
1003        chain_id = %context.chain_id,
1004        block_height = %context.height,
1005        origin = %context.origin,
1006        is_bouncing = %context.is_bouncing,
1007        message_type = %message.as_ref(),
1008    ))]
1009    pub async fn execute_message(
1010        &mut self,
1011        context: MessageContext,
1012        message: Message,
1013        grant: Option<&mut Amount>,
1014    ) -> Result<(), ExecutionError> {
1015        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1016        match message {
1017            Message::System(message) => {
1018                let outcome = self.state.system.execute_message(context, message).await?;
1019                self.txn_tracker.add_outgoing_messages(outcome);
1020            }
1021            Message::User {
1022                application_id,
1023                bytes,
1024            } => {
1025                self.run_user_action(
1026                    application_id,
1027                    UserAction::Message(context, bytes),
1028                    context.refund_grant_to,
1029                    grant,
1030                )
1031                .await?;
1032            }
1033        }
1034        self.process_subscriptions(context.into()).await?;
1035        Ok(())
1036    }
1037
1038    pub fn bounce_message(
1039        &mut self,
1040        context: MessageContext,
1041        grant: Amount,
1042        message: Message,
1043    ) -> Result<(), ExecutionError> {
1044        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1045        self.txn_tracker.add_outgoing_message(OutgoingMessage {
1046            destination: context.origin,
1047            authenticated_signer: context.authenticated_signer,
1048            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1049            grant,
1050            kind: MessageKind::Bouncing,
1051            message,
1052        });
1053        Ok(())
1054    }
1055
1056    pub fn send_refund(
1057        &mut self,
1058        context: MessageContext,
1059        amount: Amount,
1060    ) -> Result<(), ExecutionError> {
1061        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1062        if amount.is_zero() {
1063            return Ok(());
1064        }
1065        let Some(account) = context.refund_grant_to else {
1066            return Err(ExecutionError::InternalError(
1067                "Messages with grants should have a non-empty `refund_grant_to`",
1068            ));
1069        };
1070        let message = SystemMessage::Credit {
1071            amount,
1072            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
1073            target: account.owner,
1074        };
1075        self.txn_tracker.add_outgoing_message(
1076            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1077        );
1078        Ok(())
1079    }
1080
1081    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
1082    ///
1083    /// Ensures that the response does not exceed the provided `size_limit`.
1084    async fn receive_http_response(
1085        response: reqwest::Response,
1086        size_limit: u64,
1087    ) -> Result<http::Response, ExecutionError> {
1088        let status = response.status().as_u16();
1089        let maybe_content_length = response.content_length();
1090
1091        let headers = response
1092            .headers()
1093            .iter()
1094            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1095            .collect::<Vec<_>>();
1096
1097        let total_header_size = headers
1098            .iter()
1099            .map(|header| (header.name.len() + header.value.len()) as u64)
1100            .sum();
1101
1102        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1103            ExecutionError::HttpResponseSizeLimitExceeded {
1104                limit: size_limit,
1105                size: total_header_size,
1106            },
1107        )?;
1108
1109        if let Some(content_length) = maybe_content_length {
1110            if content_length > remaining_bytes {
1111                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1112                    limit: size_limit,
1113                    size: content_length + total_header_size,
1114                });
1115            }
1116        }
1117
1118        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1119        let mut body_stream = response.bytes_stream();
1120
1121        while let Some(bytes) = body_stream.next().await.transpose()? {
1122            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1123                ExecutionError::HttpResponseSizeLimitExceeded {
1124                    limit: size_limit,
1125                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1126                },
1127            )?;
1128
1129            body.extend(&bytes);
1130        }
1131
1132        Ok(http::Response {
1133            status,
1134            headers,
1135            body,
1136        })
1137    }
1138}
1139
1140/// Requests to the execution state.
1141#[derive(Debug, strum::AsRefStr)]
1142pub enum ExecutionRequest {
1143    #[cfg(not(web))]
1144    LoadContract {
1145        id: ApplicationId,
1146        #[debug(skip)]
1147        callback: Sender<(UserContractCode, ApplicationDescription)>,
1148    },
1149
1150    #[cfg(not(web))]
1151    LoadService {
1152        id: ApplicationId,
1153        #[debug(skip)]
1154        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1155    },
1156
1157    ChainBalance {
1158        #[debug(skip)]
1159        callback: Sender<Amount>,
1160    },
1161
1162    OwnerBalance {
1163        owner: AccountOwner,
1164        #[debug(skip)]
1165        callback: Sender<Amount>,
1166    },
1167
1168    OwnerBalances {
1169        #[debug(skip)]
1170        callback: Sender<Vec<(AccountOwner, Amount)>>,
1171    },
1172
1173    BalanceOwners {
1174        #[debug(skip)]
1175        callback: Sender<Vec<AccountOwner>>,
1176    },
1177
1178    Transfer {
1179        source: AccountOwner,
1180        destination: Account,
1181        amount: Amount,
1182        #[debug(skip_if = Option::is_none)]
1183        signer: Option<AccountOwner>,
1184        application_id: ApplicationId,
1185        #[debug(skip)]
1186        callback: Sender<()>,
1187    },
1188
1189    Claim {
1190        source: Account,
1191        destination: Account,
1192        amount: Amount,
1193        #[debug(skip_if = Option::is_none)]
1194        signer: Option<AccountOwner>,
1195        application_id: ApplicationId,
1196        #[debug(skip)]
1197        callback: Sender<()>,
1198    },
1199
1200    SystemTimestamp {
1201        #[debug(skip)]
1202        callback: Sender<Timestamp>,
1203    },
1204
1205    ChainOwnership {
1206        #[debug(skip)]
1207        callback: Sender<ChainOwnership>,
1208    },
1209
1210    ApplicationPermissions {
1211        #[debug(skip)]
1212        callback: Sender<ApplicationPermissions>,
1213    },
1214
1215    ReadApplicationDescription {
1216        application_id: ApplicationId,
1217        #[debug(skip)]
1218        callback: Sender<ApplicationDescription>,
1219    },
1220
1221    ReadValueBytes {
1222        id: ApplicationId,
1223        #[debug(with = hex_debug)]
1224        key: Vec<u8>,
1225        #[debug(skip)]
1226        callback: Sender<Option<Vec<u8>>>,
1227    },
1228
1229    ContainsKey {
1230        id: ApplicationId,
1231        key: Vec<u8>,
1232        #[debug(skip)]
1233        callback: Sender<bool>,
1234    },
1235
1236    ContainsKeys {
1237        id: ApplicationId,
1238        #[debug(with = hex_vec_debug)]
1239        keys: Vec<Vec<u8>>,
1240        callback: Sender<Vec<bool>>,
1241    },
1242
1243    ReadMultiValuesBytes {
1244        id: ApplicationId,
1245        #[debug(with = hex_vec_debug)]
1246        keys: Vec<Vec<u8>>,
1247        #[debug(skip)]
1248        callback: Sender<Vec<Option<Vec<u8>>>>,
1249    },
1250
1251    FindKeysByPrefix {
1252        id: ApplicationId,
1253        #[debug(with = hex_debug)]
1254        key_prefix: Vec<u8>,
1255        #[debug(skip)]
1256        callback: Sender<Vec<Vec<u8>>>,
1257    },
1258
1259    FindKeyValuesByPrefix {
1260        id: ApplicationId,
1261        #[debug(with = hex_debug)]
1262        key_prefix: Vec<u8>,
1263        #[debug(skip)]
1264        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1265    },
1266
1267    WriteBatch {
1268        id: ApplicationId,
1269        batch: Batch,
1270        #[debug(skip)]
1271        callback: Sender<()>,
1272    },
1273
1274    OpenChain {
1275        ownership: ChainOwnership,
1276        #[debug(skip_if = Amount::is_zero)]
1277        balance: Amount,
1278        parent_id: ChainId,
1279        block_height: BlockHeight,
1280        application_permissions: ApplicationPermissions,
1281        timestamp: Timestamp,
1282        #[debug(skip)]
1283        callback: Sender<ChainId>,
1284    },
1285
1286    CloseChain {
1287        application_id: ApplicationId,
1288        #[debug(skip)]
1289        callback: Sender<Result<(), ExecutionError>>,
1290    },
1291
1292    ChangeOwnership {
1293        application_id: ApplicationId,
1294        ownership: ChainOwnership,
1295        #[debug(skip)]
1296        callback: Sender<Result<(), ExecutionError>>,
1297    },
1298
1299    ChangeApplicationPermissions {
1300        application_id: ApplicationId,
1301        application_permissions: ApplicationPermissions,
1302        #[debug(skip)]
1303        callback: Sender<Result<(), ExecutionError>>,
1304    },
1305
1306    CreateApplication {
1307        chain_id: ChainId,
1308        block_height: BlockHeight,
1309        module_id: ModuleId,
1310        parameters: Vec<u8>,
1311        required_application_ids: Vec<ApplicationId>,
1312        #[debug(skip)]
1313        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1314    },
1315
1316    PerformHttpRequest {
1317        request: http::Request,
1318        http_responses_are_oracle_responses: bool,
1319        #[debug(skip)]
1320        callback: Sender<http::Response>,
1321    },
1322
1323    ReadBlobContent {
1324        blob_id: BlobId,
1325        #[debug(skip)]
1326        callback: Sender<BlobContent>,
1327    },
1328
1329    AssertBlobExists {
1330        blob_id: BlobId,
1331        #[debug(skip)]
1332        callback: Sender<()>,
1333    },
1334
1335    Emit {
1336        stream_id: StreamId,
1337        #[debug(with = hex_debug)]
1338        value: Vec<u8>,
1339        #[debug(skip)]
1340        callback: Sender<u32>,
1341    },
1342
1343    ReadEvent {
1344        event_id: EventId,
1345        callback: oneshot::Sender<Vec<u8>>,
1346    },
1347
1348    SubscribeToEvents {
1349        chain_id: ChainId,
1350        stream_id: StreamId,
1351        subscriber_app_id: ApplicationId,
1352        #[debug(skip)]
1353        callback: Sender<()>,
1354    },
1355
1356    UnsubscribeFromEvents {
1357        chain_id: ChainId,
1358        stream_id: StreamId,
1359        subscriber_app_id: ApplicationId,
1360        #[debug(skip)]
1361        callback: Sender<()>,
1362    },
1363
1364    GetApplicationPermissions {
1365        #[debug(skip)]
1366        callback: Sender<ApplicationPermissions>,
1367    },
1368
1369    QueryServiceOracle {
1370        deadline: Option<Instant>,
1371        application_id: ApplicationId,
1372        next_block_height: BlockHeight,
1373        query: Vec<u8>,
1374        #[debug(skip)]
1375        callback: Sender<Vec<u8>>,
1376    },
1377
1378    AddOutgoingMessage {
1379        message: crate::OutgoingMessage,
1380        #[debug(skip)]
1381        callback: Sender<()>,
1382    },
1383
1384    SetLocalTime {
1385        local_time: Timestamp,
1386        #[debug(skip)]
1387        callback: Sender<()>,
1388    },
1389
1390    AssertBefore {
1391        timestamp: Timestamp,
1392        #[debug(skip)]
1393        callback: Sender<Result<(), ExecutionError>>,
1394    },
1395
1396    AddCreatedBlob {
1397        blob: crate::Blob,
1398        #[debug(skip)]
1399        callback: Sender<()>,
1400    },
1401
1402    ValidationRound {
1403        round: Option<u32>,
1404        #[debug(skip)]
1405        callback: Sender<Option<u32>>,
1406    },
1407
1408    AllowApplicationLogs {
1409        #[debug(skip)]
1410        callback: Sender<bool>,
1411    },
1412
1413    /// Log message from contract execution (fire-and-forget, no callback needed).
1414    #[cfg(web)]
1415    Log {
1416        message: String,
1417        level: tracing::log::Level,
1418    },
1419}