Skip to main content

linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::{
7    collections::{BTreeMap, BTreeSet},
8    sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16    data_types::{
17        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18        Timestamp,
19    },
20    ensure, hex_debug, hex_vec_debug, http,
21    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
22    ownership::ChainOwnership,
23    time::Instant,
24};
25use linera_views::{batch::Batch, context::Context, views::View};
26use oneshot::Sender;
27use reqwest::{header::HeaderMap, Client, Url};
28use tracing::{info_span, instrument, Instrument as _};
29
30use crate::{
31    execution::UserAction,
32    runtime::ContractSyncRuntime,
33    system::{CreateApplicationResult, OpenChainConfig},
34    util::{OracleResponseExt as _, RespondExt as _},
35    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
36    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
37    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
38    ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
39};
40
41/// Actor for handling requests to the execution state.
42pub struct ExecutionStateActor<'a, C> {
43    state: &'a mut ExecutionStateView<C>,
44    txn_tracker: &'a mut TransactionTracker,
45    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
46}
47
48#[cfg(with_metrics)]
49mod metrics {
50    use std::sync::LazyLock;
51
52    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
53    use prometheus::HistogramVec;
54
55    /// Histogram of the latency to load a contract bytecode.
56    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57        register_histogram_vec(
58            "load_contract_latency",
59            "Load contract latency",
60            &[],
61            exponential_bucket_latencies(250.0),
62        )
63    });
64
65    /// Histogram of the latency to load a service bytecode.
66    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
67        register_histogram_vec(
68            "load_service_latency",
69            "Load service latency",
70            &[],
71            exponential_bucket_latencies(250.0),
72        )
73    });
74}
75
76pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
77
78impl<'a, C> ExecutionStateActor<'a, C>
79where
80    C: Context + Clone + 'static,
81    C::Extra: ExecutionRuntimeContext,
82{
83    /// Creates a new execution state actor.
84    pub fn new(
85        state: &'a mut ExecutionStateView<C>,
86        txn_tracker: &'a mut TransactionTracker,
87        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
88    ) -> Self {
89        Self {
90            state,
91            txn_tracker,
92            resource_controller,
93        }
94    }
95
96    #[instrument(skip_all, fields(application_id = %id))]
97    pub(crate) async fn load_contract(
98        &mut self,
99        id: ApplicationId,
100    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
101        #[cfg(with_metrics)]
102        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
103        let blob_id = id.description_blob_id();
104        let description = match self.txn_tracker.get_blob_content(&blob_id) {
105            Some(blob) => bcs::from_bytes(blob.bytes())?,
106            None => {
107                self.state
108                    .system
109                    .describe_application(id, self.txn_tracker)
110                    .await?
111            }
112        };
113        let code = self
114            .state
115            .context()
116            .extra()
117            .get_user_contract(&description, self.txn_tracker)
118            .await?;
119        Ok((code, description))
120    }
121
122    pub(crate) async fn load_service(
123        &mut self,
124        id: ApplicationId,
125    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
126        #[cfg(with_metrics)]
127        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
128        let blob_id = id.description_blob_id();
129        let description = match self.txn_tracker.get_blob_content(&blob_id) {
130            Some(blob) => bcs::from_bytes(blob.bytes())?,
131            None => {
132                self.state
133                    .system
134                    .describe_application(id, self.txn_tracker)
135                    .await?
136            }
137        };
138        let code = self
139            .state
140            .context()
141            .extra()
142            .get_user_service(&description, self.txn_tracker)
143            .await?;
144        Ok((code, description))
145    }
146
147    // TODO(#1416): Support concurrent I/O.
148    #[instrument(
149        skip_all,
150        fields(request_type = %request.as_ref())
151    )]
152    pub(crate) async fn handle_request(
153        &mut self,
154        request: ExecutionRequest,
155    ) -> Result<(), ExecutionError> {
156        use ExecutionRequest::*;
157        match request {
158            #[cfg(not(web))]
159            LoadContract { id, callback } => {
160                let (code, description) = self.load_contract(id).await?;
161                callback.respond((code, description))
162            }
163            #[cfg(not(web))]
164            LoadService { id, callback } => {
165                let (code, description) = self.load_service(id).await?;
166                callback.respond((code, description))
167            }
168
169            ChainBalance { callback } => {
170                let balance = *self.state.system.balance.get();
171                callback.respond(balance);
172            }
173
174            OwnerBalance { owner, callback } => {
175                let balance = self
176                    .state
177                    .system
178                    .balances
179                    .get(&owner)
180                    .await?
181                    .unwrap_or_default();
182                callback.respond(balance);
183            }
184
185            OwnerBalances { callback } => {
186                let balances = self.state.system.balances.index_values().await?;
187                callback.respond(balances.into_iter().collect());
188            }
189
190            BalanceOwners { callback } => {
191                let owners = self.state.system.balances.indices().await?;
192                callback.respond(owners);
193            }
194
195            Transfer {
196                source,
197                destination,
198                amount,
199                signer,
200                application_id,
201                callback,
202            } => {
203                let maybe_message = self
204                    .state
205                    .system
206                    .transfer(signer, Some(application_id), source, destination, amount)
207                    .await?;
208                self.txn_tracker.add_outgoing_messages(maybe_message);
209                callback.respond(());
210            }
211
212            Claim {
213                source,
214                destination,
215                amount,
216                signer,
217                application_id,
218                callback,
219            } => {
220                let maybe_message = self
221                    .state
222                    .system
223                    .claim(
224                        signer,
225                        Some(application_id),
226                        source.owner,
227                        source.chain_id,
228                        destination,
229                        amount,
230                    )
231                    .await?;
232                self.txn_tracker.add_outgoing_messages(maybe_message);
233                callback.respond(());
234            }
235
236            SystemTimestamp { callback } => {
237                let timestamp = *self.state.system.timestamp.get();
238                callback.respond(timestamp);
239            }
240
241            ChainOwnership { callback } => {
242                let ownership = self.state.system.ownership.get().await?.clone();
243                callback.respond(ownership);
244            }
245
246            ApplicationPermissions { callback } => {
247                let permissions = self
248                    .state
249                    .system
250                    .application_permissions
251                    .get()
252                    .await?
253                    .clone();
254                callback.respond(permissions);
255            }
256
257            ReadApplicationDescription {
258                application_id,
259                callback,
260            } => {
261                let blob_id = application_id.description_blob_id();
262                let description = match self.txn_tracker.get_blob_content(&blob_id) {
263                    Some(blob) => bcs::from_bytes(blob.bytes())?,
264                    None => {
265                        let blob_content = self.state.system.read_blob_content(blob_id).await?;
266                        self.state
267                            .system
268                            .blob_used(self.txn_tracker, blob_id)
269                            .await?;
270                        bcs::from_bytes(blob_content.bytes())?
271                    }
272                };
273                callback.respond(description);
274            }
275
276            ContainsKey { id, key, callback } => {
277                let view = self.state.users.try_load_entry(&id).await?;
278                let result = match view {
279                    Some(view) => view.contains_key(&key).await?,
280                    None => false,
281                };
282                callback.respond(result);
283            }
284
285            ContainsKeys { id, keys, callback } => {
286                let view = self.state.users.try_load_entry(&id).await?;
287                let result = match view {
288                    Some(view) => view.contains_keys(&keys).await?,
289                    None => vec![false; keys.len()],
290                };
291                callback.respond(result);
292            }
293
294            ReadMultiValuesBytes { id, keys, callback } => {
295                let view = self.state.users.try_load_entry(&id).await?;
296                let values = match view {
297                    Some(view) => view.multi_get(&keys).await?,
298                    None => vec![None; keys.len()],
299                };
300                callback.respond(values);
301            }
302
303            ReadValueBytes { id, key, callback } => {
304                let view = self.state.users.try_load_entry(&id).await?;
305                let result = match view {
306                    Some(view) => view.get(&key).await?,
307                    None => None,
308                };
309                callback.respond(result);
310            }
311
312            FindKeysByPrefix {
313                id,
314                key_prefix,
315                callback,
316            } => {
317                let view = self.state.users.try_load_entry(&id).await?;
318                let result = match view {
319                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
320                    None => Vec::new(),
321                };
322                callback.respond(result);
323            }
324
325            FindKeyValuesByPrefix {
326                id,
327                key_prefix,
328                callback,
329            } => {
330                let view = self.state.users.try_load_entry(&id).await?;
331                let result = match view {
332                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
333                    None => Vec::new(),
334                };
335                callback.respond(result);
336            }
337
338            WriteBatch {
339                id,
340                batch,
341                callback,
342            } => {
343                let mut view = self.state.users.try_load_entry_mut(&id).await?;
344                view.write_batch(batch)?;
345                callback.respond(());
346            }
347
348            OpenChain {
349                ownership,
350                balance,
351                parent_id,
352                block_height,
353                application_permissions,
354                timestamp,
355                callback,
356            } => {
357                let config = OpenChainConfig {
358                    ownership,
359                    balance,
360                    application_permissions,
361                };
362                let chain_id = self
363                    .state
364                    .system
365                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
366                    .await?;
367                callback.respond(chain_id);
368            }
369
370            CloseChain {
371                application_id,
372                callback,
373            } => {
374                let app_permissions = self.state.system.application_permissions.get().await?;
375                if !app_permissions.can_close_chain(&application_id) {
376                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
377                } else {
378                    self.state.system.close_chain()?;
379                    callback.respond(Ok(()));
380                }
381            }
382
383            ChangeOwnership {
384                application_id,
385                ownership,
386                callback,
387            } => {
388                let app_permissions = self.state.system.application_permissions.get().await?;
389                if !app_permissions.can_close_chain(&application_id) {
390                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
391                } else {
392                    self.state.system.ownership.set(ownership);
393                    callback.respond(Ok(()));
394                }
395            }
396
397            ChangeApplicationPermissions {
398                application_id,
399                application_permissions,
400                callback,
401            } => {
402                let app_permissions = self.state.system.application_permissions.get().await?;
403                if !app_permissions.can_change_application_permissions(&application_id) {
404                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
405                } else {
406                    self.state
407                        .system
408                        .application_permissions
409                        .set(application_permissions);
410                    callback.respond(Ok(()));
411                }
412            }
413
414            CreateApplication {
415                chain_id,
416                block_height,
417                module_id,
418                parameters,
419                required_application_ids,
420                callback,
421            } => {
422                let create_application_result = self
423                    .state
424                    .system
425                    .create_application(
426                        chain_id,
427                        block_height,
428                        module_id,
429                        parameters,
430                        required_application_ids,
431                        self.txn_tracker,
432                    )
433                    .await?;
434                callback.respond(Ok(create_application_result));
435            }
436
437            PerformHttpRequest {
438                request,
439                http_responses_are_oracle_responses,
440                callback,
441            } => {
442                let system = &mut self.state.system;
443                let response = self
444                    .txn_tracker
445                    .oracle(|| async {
446                        let headers = request
447                            .headers
448                            .into_iter()
449                            .map(|http::Header { name, value }| {
450                                Ok((name.parse()?, value.try_into()?))
451                            })
452                            .collect::<Result<HeaderMap, ExecutionError>>()?;
453
454                        let url = Url::parse(&request.url)?;
455                        let host = url
456                            .host_str()
457                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
458
459                        let (_epoch, committee) = system
460                            .current_committee()
461                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
462                        let allowed_hosts = &committee.policy().http_request_allow_list;
463
464                        ensure!(
465                            allowed_hosts.contains(host),
466                            ExecutionError::UnauthorizedHttpRequest(url)
467                        );
468
469                        let request = Client::new()
470                            .request(request.method.into(), url)
471                            .body(request.body)
472                            .headers(headers);
473                        #[cfg(not(web))]
474                        let request = request.timeout(linera_base::time::Duration::from_millis(
475                            committee.policy().http_request_timeout_ms,
476                        ));
477
478                        let response = request.send().await?;
479
480                        let mut response_size_limit =
481                            committee.policy().maximum_http_response_bytes;
482
483                        if http_responses_are_oracle_responses {
484                            response_size_limit = response_size_limit
485                                .min(committee.policy().maximum_oracle_response_bytes);
486                        }
487                        Ok(OracleResponse::Http(
488                            Self::receive_http_response(response, response_size_limit).await?,
489                        ))
490                    })
491                    .await?
492                    .to_http_response()?;
493                callback.respond(response);
494            }
495
496            ReadBlobContent { blob_id, callback } => {
497                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
498                    content.clone()
499                } else {
500                    let content = self.state.system.read_blob_content(blob_id).await?;
501                    if blob_id.blob_type == BlobType::Data {
502                        self.resource_controller
503                            .with_state(&mut self.state.system)
504                            .await?
505                            .track_blob_read(content.bytes().len() as u64)?;
506                    }
507                    self.state
508                        .system
509                        .blob_used(self.txn_tracker, blob_id)
510                        .await?;
511                    content
512                };
513                callback.respond(content)
514            }
515
516            AssertBlobExists { blob_id, callback } => {
517                self.state.system.assert_blob_exists(blob_id).await?;
518                // Treating this as reading a size-0 blob for fee purposes.
519                if blob_id.blob_type == BlobType::Data {
520                    self.resource_controller
521                        .with_state(&mut self.state.system)
522                        .await?
523                        .track_blob_read(0)?;
524                }
525                let is_new = self
526                    .state
527                    .system
528                    .blob_used(self.txn_tracker, blob_id)
529                    .await?;
530                if is_new {
531                    self.txn_tracker
532                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
533                }
534                callback.respond(());
535            }
536
537            Emit {
538                stream_id,
539                value,
540                callback,
541            } => {
542                let count = self
543                    .state
544                    .stream_event_counts
545                    .get_mut_or_default(&stream_id)
546                    .await?;
547                let index = *count;
548                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
549                self.txn_tracker.add_event(stream_id, index, value);
550                callback.respond(index)
551            }
552
553            ReadEvent { event_id, callback } => {
554                let extra = self.state.context().extra();
555                let event = self
556                    .txn_tracker
557                    .oracle(|| async {
558                        let event = extra
559                            .get_event(event_id.clone())
560                            .await?
561                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
562                        Ok(OracleResponse::Event(
563                            event_id.clone(),
564                            Arc::unwrap_or_clone(event),
565                        ))
566                    })
567                    .await?
568                    .to_event(&event_id)?;
569                callback.respond(event);
570            }
571
572            SubscribeToEvents {
573                chain_id,
574                stream_id,
575                subscriber_app_id,
576                callback,
577            } => {
578                let subscriptions = self
579                    .state
580                    .system
581                    .event_subscriptions
582                    .get_mut_or_default(&(chain_id, stream_id.clone()))
583                    .await?;
584                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
585                    subscriptions.next_index
586                } else {
587                    0
588                };
589                self.txn_tracker.add_stream_to_process(
590                    subscriber_app_id,
591                    chain_id,
592                    stream_id,
593                    0,
594                    next_index,
595                );
596                callback.respond(());
597            }
598
599            UnsubscribeFromEvents {
600                chain_id,
601                stream_id,
602                subscriber_app_id,
603                callback,
604            } => {
605                let key = (chain_id, stream_id.clone());
606                let subscriptions = self
607                    .state
608                    .system
609                    .event_subscriptions
610                    .get_mut_or_default(&key)
611                    .await?;
612                subscriptions.applications.remove(&subscriber_app_id);
613                if subscriptions.applications.is_empty() {
614                    self.state.system.event_subscriptions.remove(&key)?;
615                }
616                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
617                    self.txn_tracker
618                        .remove_stream_to_process(app_id, chain_id, stream_id);
619                }
620                callback.respond(());
621            }
622
623            GetApplicationPermissions { callback } => {
624                let app_permissions = self.state.system.application_permissions.get().await?;
625                callback.respond(app_permissions.clone());
626            }
627
628            QueryServiceOracle {
629                deadline,
630                application_id,
631                next_block_height,
632                query,
633                callback,
634            } => {
635                let state = &mut self.state;
636                let local_time = self.txn_tracker.local_time();
637                let created_blobs = self.txn_tracker.created_blobs().clone();
638                let bytes = self
639                    .txn_tracker
640                    .oracle(|| async {
641                        let context = QueryContext {
642                            chain_id: state.context().extra().chain_id(),
643                            next_block_height,
644                            local_time,
645                        };
646                        let QueryOutcome {
647                            response,
648                            operations,
649                        } = Box::pin(state.query_user_application_with_deadline(
650                            application_id,
651                            context,
652                            query,
653                            deadline,
654                            created_blobs,
655                        ))
656                        .await?;
657                        ensure!(
658                            operations.is_empty(),
659                            ExecutionError::ServiceOracleQueryOperations(operations)
660                        );
661                        Ok(OracleResponse::Service(response))
662                    })
663                    .await?
664                    .to_service_response()?;
665                callback.respond(bytes);
666            }
667
668            AddOutgoingMessage { message, callback } => {
669                self.txn_tracker.add_outgoing_message(message);
670                callback.respond(());
671            }
672
673            SetLocalTime {
674                local_time,
675                callback,
676            } => {
677                self.txn_tracker.set_local_time(local_time);
678                callback.respond(());
679            }
680
681            AssertBefore {
682                timestamp,
683                callback,
684            } => {
685                let result = if !self
686                    .txn_tracker
687                    .replay_oracle_response(OracleResponse::Assert)?
688                {
689                    // There are no recorded oracle responses, so we check the local time.
690                    let local_time = self.txn_tracker.local_time();
691                    if local_time >= timestamp {
692                        Err(ExecutionError::AssertBefore {
693                            timestamp,
694                            local_time,
695                        })
696                    } else {
697                        Ok(())
698                    }
699                } else {
700                    Ok(())
701                };
702                callback.respond(result);
703            }
704
705            AddCreatedBlob { blob, callback } => {
706                if self.resource_controller.is_free {
707                    self.txn_tracker.mark_blob_free(blob.id());
708                }
709                self.txn_tracker.add_created_blob(blob);
710                callback.respond(());
711            }
712
713            ValidationRound { round, callback } => {
714                let validation_round = self
715                    .txn_tracker
716                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
717                    .await?
718                    .to_round()?;
719                callback.respond(validation_round);
720            }
721
722            AllowApplicationLogs { callback } => {
723                let allow = self
724                    .state
725                    .context()
726                    .extra()
727                    .execution_runtime_config()
728                    .allow_application_logs;
729                callback.respond(allow);
730            }
731
732            #[cfg(web)]
733            Log { message, level } => match level {
734                tracing::log::Level::Trace | tracing::log::Level::Debug => {
735                    tracing::debug!(target: "user_application_log", message = %message);
736                }
737                tracing::log::Level::Info => {
738                    tracing::info!(target: "user_application_log", message = %message);
739                }
740                tracing::log::Level::Warn => {
741                    tracing::warn!(target: "user_application_log", message = %message);
742                }
743                tracing::log::Level::Error => {
744                    tracing::error!(target: "user_application_log", message = %message);
745                }
746            },
747        }
748
749        Ok(())
750    }
751
752    /// Calls `process_streams` for all applications that are subscribed to streams with new
753    /// events or that have new subscriptions.
754    #[instrument(skip_all)]
755    async fn process_subscriptions(
756        &mut self,
757        context: ProcessStreamsContext,
758    ) -> Result<(), ExecutionError> {
759        // Keep track of which streams we have already processed. This is to guard against
760        // applications unsubscribing and subscribing in the process_streams call itself.
761        let mut processed = BTreeSet::new();
762        loop {
763            let to_process = self
764                .txn_tracker
765                .take_streams_to_process()
766                .into_iter()
767                .filter_map(|(app_id, updates)| {
768                    let updates = updates
769                        .into_iter()
770                        .filter_map(|update| {
771                            if !processed.insert((
772                                app_id,
773                                update.chain_id,
774                                update.stream_id.clone(),
775                            )) {
776                                return None;
777                            }
778                            Some(update)
779                        })
780                        .collect::<Vec<_>>();
781                    if updates.is_empty() {
782                        return None;
783                    }
784                    Some((app_id, updates))
785                })
786                .collect::<BTreeMap<_, _>>();
787            if to_process.is_empty() {
788                return Ok(());
789            }
790            for (app_id, updates) in to_process {
791                self.run_user_action(
792                    app_id,
793                    UserAction::ProcessStreams(context, updates),
794                    None,
795                    None,
796                )
797                .await?;
798            }
799        }
800    }
801
802    pub(crate) async fn run_user_action(
803        &mut self,
804        application_id: ApplicationId,
805        action: UserAction,
806        refund_grant_to: Option<Account>,
807        grant: Option<&mut Amount>,
808    ) -> Result<(), ExecutionError> {
809        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
810            .await
811    }
812
813    // TODO(#5034): unify with `contract_and_dependencies`
814    pub(crate) async fn service_and_dependencies(
815        &mut self,
816        application: ApplicationId,
817    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
818        // cyclic futures are illegal so we need to either box the frames or keep our own
819        // stack
820        let mut stack = vec![application];
821        let mut codes = vec![];
822        let mut descriptions = vec![];
823
824        while let Some(id) = stack.pop() {
825            let (code, description) = self.load_service(id).await?;
826            stack.extend(description.required_application_ids.iter().rev().copied());
827            codes.push(code);
828            descriptions.push(description);
829        }
830
831        codes.reverse();
832        descriptions.reverse();
833
834        Ok((codes, descriptions))
835    }
836
837    // TODO(#5034): unify with `service_and_dependencies`
838    #[instrument(skip_all, fields(application_id = %application))]
839    async fn contract_and_dependencies(
840        &mut self,
841        application: ApplicationId,
842    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
843        // cyclic futures are illegal so we need to either box the frames or keep our own
844        // stack
845        let mut stack = vec![application];
846        let mut codes = vec![];
847        let mut descriptions = vec![];
848
849        while let Some(id) = stack.pop() {
850            let (code, description) = self.load_contract(id).await?;
851            stack.extend(description.required_application_ids.iter().rev().copied());
852            codes.push(code);
853            descriptions.push(description);
854        }
855
856        codes.reverse();
857        descriptions.reverse();
858
859        Ok((codes, descriptions))
860    }
861
862    #[instrument(skip_all, fields(application_id = %application_id))]
863    async fn run_user_action_with_runtime(
864        &mut self,
865        application_id: ApplicationId,
866        action: UserAction,
867        refund_grant_to: Option<Account>,
868        grant: Option<&mut Amount>,
869    ) -> Result<(), ExecutionError> {
870        let chain_id = self.state.context().extra().chain_id();
871        let mut cloned_grant = grant.as_ref().map(|x| **x);
872        let initial_balance = self
873            .resource_controller
874            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
875            .await?
876            .balance()?;
877        let mut controller = ResourceController::new(
878            self.resource_controller.policy().clone(),
879            self.resource_controller.tracker,
880            initial_balance,
881        );
882        let is_free = matches!(
883            &action,
884            UserAction::Message(..) | UserAction::ProcessStreams(..)
885        ) && self
886            .resource_controller
887            .policy()
888            .is_free_app(&application_id);
889        controller.is_free = is_free;
890        self.resource_controller.is_free = is_free;
891        let (execution_state_sender, mut execution_state_receiver) =
892            futures::channel::mpsc::unbounded();
893
894        let (codes, descriptions): (Vec<_>, Vec<_>) =
895            self.contract_and_dependencies(application_id).await?;
896
897        let allow_application_logs = self
898            .state
899            .context()
900            .extra()
901            .execution_runtime_config()
902            .allow_application_logs;
903
904        let contract_runtime_task = self
905            .state
906            .context()
907            .extra()
908            .thread_pool()
909            .run_send(JsVec(codes), move |codes| async move {
910                let runtime = ContractSyncRuntime::new(
911                    execution_state_sender,
912                    chain_id,
913                    refund_grant_to,
914                    controller,
915                    &action,
916                    allow_application_logs,
917                );
918
919                for (code, description) in codes.0.into_iter().zip(descriptions) {
920                    runtime.preload_contract(
921                        ApplicationId::from(&description),
922                        code,
923                        description,
924                    )?;
925                }
926
927                runtime.run_action(application_id, chain_id, action)
928            })
929            .await;
930
931        async {
932            while let Some(request) = execution_state_receiver.next().await {
933                self.handle_request(request).await?;
934            }
935            Ok::<(), ExecutionError>(())
936        }
937        .instrument(info_span!("handle_runtime_requests"))
938        .await?;
939
940        let (result, controller) = contract_runtime_task.await??;
941
942        self.resource_controller.is_free = false;
943
944        self.txn_tracker.add_operation_result(result);
945
946        self.resource_controller
947            .with_state_and_grant(&mut self.state.system, grant)
948            .await?
949            .merge_balance(initial_balance, controller.balance()?)?;
950        self.resource_controller.tracker = controller.tracker;
951
952        Ok(())
953    }
954
955    #[instrument(skip_all, fields(
956        chain_id = %context.chain_id,
957        block_height = %context.height,
958        operation_type = %operation.as_ref(),
959    ))]
960    pub async fn execute_operation(
961        &mut self,
962        context: OperationContext,
963        operation: Operation,
964    ) -> Result<(), ExecutionError> {
965        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
966        match operation {
967            Operation::System(op) => {
968                let new_application = self
969                    .state
970                    .system
971                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
972                    .await?;
973                if let Some((application_id, argument)) = new_application {
974                    let user_action = UserAction::Instantiate(context, argument);
975                    self.run_user_action(
976                        application_id,
977                        user_action,
978                        context.refund_grant_to(),
979                        None,
980                    )
981                    .await?;
982                }
983            }
984            Operation::User {
985                application_id,
986                bytes,
987            } => {
988                self.run_user_action(
989                    application_id,
990                    UserAction::Operation(context, bytes),
991                    context.refund_grant_to(),
992                    None,
993                )
994                .await?;
995            }
996        }
997        self.process_subscriptions(context.into()).await?;
998        Ok(())
999    }
1000
1001    #[instrument(skip_all, fields(
1002        chain_id = %context.chain_id,
1003        block_height = %context.height,
1004        origin = %context.origin,
1005        is_bouncing = %context.is_bouncing,
1006        message_type = %message.as_ref(),
1007    ))]
1008    pub async fn execute_message(
1009        &mut self,
1010        context: MessageContext,
1011        message: Message,
1012        grant: Option<&mut Amount>,
1013    ) -> Result<(), ExecutionError> {
1014        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1015        match message {
1016            Message::System(message) => {
1017                let outcome = self.state.system.execute_message(context, message).await?;
1018                self.txn_tracker.add_outgoing_messages(outcome);
1019            }
1020            Message::User {
1021                application_id,
1022                bytes,
1023            } => {
1024                self.run_user_action(
1025                    application_id,
1026                    UserAction::Message(context, bytes),
1027                    context.refund_grant_to,
1028                    grant,
1029                )
1030                .await?;
1031            }
1032        }
1033        self.process_subscriptions(context.into()).await?;
1034        Ok(())
1035    }
1036
1037    pub fn bounce_message(
1038        &mut self,
1039        context: MessageContext,
1040        grant: Amount,
1041        message: Message,
1042    ) -> Result<(), ExecutionError> {
1043        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1044        self.txn_tracker.add_outgoing_message(OutgoingMessage {
1045            destination: context.origin,
1046            authenticated_signer: context.authenticated_signer,
1047            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1048            grant,
1049            kind: MessageKind::Bouncing,
1050            message,
1051        });
1052        Ok(())
1053    }
1054
1055    pub fn send_refund(
1056        &mut self,
1057        context: MessageContext,
1058        amount: Amount,
1059    ) -> Result<(), ExecutionError> {
1060        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1061        if amount.is_zero() {
1062            return Ok(());
1063        }
1064        let Some(account) = context.refund_grant_to else {
1065            return Err(ExecutionError::InternalError(
1066                "Messages with grants should have a non-empty `refund_grant_to`",
1067            ));
1068        };
1069        let message = SystemMessage::Credit {
1070            amount,
1071            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
1072            target: account.owner,
1073        };
1074        self.txn_tracker.add_outgoing_message(
1075            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1076        );
1077        Ok(())
1078    }
1079
1080    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
1081    ///
1082    /// Ensures that the response does not exceed the provided `size_limit`.
1083    async fn receive_http_response(
1084        response: reqwest::Response,
1085        size_limit: u64,
1086    ) -> Result<http::Response, ExecutionError> {
1087        let status = response.status().as_u16();
1088        let maybe_content_length = response.content_length();
1089
1090        let headers = response
1091            .headers()
1092            .iter()
1093            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1094            .collect::<Vec<_>>();
1095
1096        let total_header_size = headers
1097            .iter()
1098            .map(|header| (header.name.len() + header.value.len()) as u64)
1099            .sum();
1100
1101        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1102            ExecutionError::HttpResponseSizeLimitExceeded {
1103                limit: size_limit,
1104                size: total_header_size,
1105            },
1106        )?;
1107
1108        if let Some(content_length) = maybe_content_length {
1109            if content_length > remaining_bytes {
1110                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1111                    limit: size_limit,
1112                    size: content_length + total_header_size,
1113                });
1114            }
1115        }
1116
1117        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1118        let mut body_stream = response.bytes_stream();
1119
1120        while let Some(bytes) = body_stream.next().await.transpose()? {
1121            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1122                ExecutionError::HttpResponseSizeLimitExceeded {
1123                    limit: size_limit,
1124                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1125                },
1126            )?;
1127
1128            body.extend(&bytes);
1129        }
1130
1131        Ok(http::Response {
1132            status,
1133            headers,
1134            body,
1135        })
1136    }
1137}
1138
1139/// Requests to the execution state.
1140#[derive(Debug, strum::AsRefStr)]
1141pub enum ExecutionRequest {
1142    #[cfg(not(web))]
1143    LoadContract {
1144        id: ApplicationId,
1145        #[debug(skip)]
1146        callback: Sender<(UserContractCode, ApplicationDescription)>,
1147    },
1148
1149    #[cfg(not(web))]
1150    LoadService {
1151        id: ApplicationId,
1152        #[debug(skip)]
1153        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1154    },
1155
1156    ChainBalance {
1157        #[debug(skip)]
1158        callback: Sender<Amount>,
1159    },
1160
1161    OwnerBalance {
1162        owner: AccountOwner,
1163        #[debug(skip)]
1164        callback: Sender<Amount>,
1165    },
1166
1167    OwnerBalances {
1168        #[debug(skip)]
1169        callback: Sender<Vec<(AccountOwner, Amount)>>,
1170    },
1171
1172    BalanceOwners {
1173        #[debug(skip)]
1174        callback: Sender<Vec<AccountOwner>>,
1175    },
1176
1177    Transfer {
1178        source: AccountOwner,
1179        destination: Account,
1180        amount: Amount,
1181        #[debug(skip_if = Option::is_none)]
1182        signer: Option<AccountOwner>,
1183        application_id: ApplicationId,
1184        #[debug(skip)]
1185        callback: Sender<()>,
1186    },
1187
1188    Claim {
1189        source: Account,
1190        destination: Account,
1191        amount: Amount,
1192        #[debug(skip_if = Option::is_none)]
1193        signer: Option<AccountOwner>,
1194        application_id: ApplicationId,
1195        #[debug(skip)]
1196        callback: Sender<()>,
1197    },
1198
1199    SystemTimestamp {
1200        #[debug(skip)]
1201        callback: Sender<Timestamp>,
1202    },
1203
1204    ChainOwnership {
1205        #[debug(skip)]
1206        callback: Sender<ChainOwnership>,
1207    },
1208
1209    ApplicationPermissions {
1210        #[debug(skip)]
1211        callback: Sender<ApplicationPermissions>,
1212    },
1213
1214    ReadApplicationDescription {
1215        application_id: ApplicationId,
1216        #[debug(skip)]
1217        callback: Sender<ApplicationDescription>,
1218    },
1219
1220    ReadValueBytes {
1221        id: ApplicationId,
1222        #[debug(with = hex_debug)]
1223        key: Vec<u8>,
1224        #[debug(skip)]
1225        callback: Sender<Option<Vec<u8>>>,
1226    },
1227
1228    ContainsKey {
1229        id: ApplicationId,
1230        key: Vec<u8>,
1231        #[debug(skip)]
1232        callback: Sender<bool>,
1233    },
1234
1235    ContainsKeys {
1236        id: ApplicationId,
1237        #[debug(with = hex_vec_debug)]
1238        keys: Vec<Vec<u8>>,
1239        callback: Sender<Vec<bool>>,
1240    },
1241
1242    ReadMultiValuesBytes {
1243        id: ApplicationId,
1244        #[debug(with = hex_vec_debug)]
1245        keys: Vec<Vec<u8>>,
1246        #[debug(skip)]
1247        callback: Sender<Vec<Option<Vec<u8>>>>,
1248    },
1249
1250    FindKeysByPrefix {
1251        id: ApplicationId,
1252        #[debug(with = hex_debug)]
1253        key_prefix: Vec<u8>,
1254        #[debug(skip)]
1255        callback: Sender<Vec<Vec<u8>>>,
1256    },
1257
1258    FindKeyValuesByPrefix {
1259        id: ApplicationId,
1260        #[debug(with = hex_debug)]
1261        key_prefix: Vec<u8>,
1262        #[debug(skip)]
1263        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1264    },
1265
1266    WriteBatch {
1267        id: ApplicationId,
1268        batch: Batch,
1269        #[debug(skip)]
1270        callback: Sender<()>,
1271    },
1272
1273    OpenChain {
1274        ownership: ChainOwnership,
1275        #[debug(skip_if = Amount::is_zero)]
1276        balance: Amount,
1277        parent_id: ChainId,
1278        block_height: BlockHeight,
1279        application_permissions: ApplicationPermissions,
1280        timestamp: Timestamp,
1281        #[debug(skip)]
1282        callback: Sender<ChainId>,
1283    },
1284
1285    CloseChain {
1286        application_id: ApplicationId,
1287        #[debug(skip)]
1288        callback: Sender<Result<(), ExecutionError>>,
1289    },
1290
1291    ChangeOwnership {
1292        application_id: ApplicationId,
1293        ownership: ChainOwnership,
1294        #[debug(skip)]
1295        callback: Sender<Result<(), ExecutionError>>,
1296    },
1297
1298    ChangeApplicationPermissions {
1299        application_id: ApplicationId,
1300        application_permissions: ApplicationPermissions,
1301        #[debug(skip)]
1302        callback: Sender<Result<(), ExecutionError>>,
1303    },
1304
1305    CreateApplication {
1306        chain_id: ChainId,
1307        block_height: BlockHeight,
1308        module_id: ModuleId,
1309        parameters: Vec<u8>,
1310        required_application_ids: Vec<ApplicationId>,
1311        #[debug(skip)]
1312        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1313    },
1314
1315    PerformHttpRequest {
1316        request: http::Request,
1317        http_responses_are_oracle_responses: bool,
1318        #[debug(skip)]
1319        callback: Sender<http::Response>,
1320    },
1321
1322    ReadBlobContent {
1323        blob_id: BlobId,
1324        #[debug(skip)]
1325        callback: Sender<BlobContent>,
1326    },
1327
1328    AssertBlobExists {
1329        blob_id: BlobId,
1330        #[debug(skip)]
1331        callback: Sender<()>,
1332    },
1333
1334    Emit {
1335        stream_id: StreamId,
1336        #[debug(with = hex_debug)]
1337        value: Vec<u8>,
1338        #[debug(skip)]
1339        callback: Sender<u32>,
1340    },
1341
1342    ReadEvent {
1343        event_id: EventId,
1344        callback: oneshot::Sender<Vec<u8>>,
1345    },
1346
1347    SubscribeToEvents {
1348        chain_id: ChainId,
1349        stream_id: StreamId,
1350        subscriber_app_id: ApplicationId,
1351        #[debug(skip)]
1352        callback: Sender<()>,
1353    },
1354
1355    UnsubscribeFromEvents {
1356        chain_id: ChainId,
1357        stream_id: StreamId,
1358        subscriber_app_id: ApplicationId,
1359        #[debug(skip)]
1360        callback: Sender<()>,
1361    },
1362
1363    GetApplicationPermissions {
1364        #[debug(skip)]
1365        callback: Sender<ApplicationPermissions>,
1366    },
1367
1368    QueryServiceOracle {
1369        deadline: Option<Instant>,
1370        application_id: ApplicationId,
1371        next_block_height: BlockHeight,
1372        query: Vec<u8>,
1373        #[debug(skip)]
1374        callback: Sender<Vec<u8>>,
1375    },
1376
1377    AddOutgoingMessage {
1378        message: crate::OutgoingMessage,
1379        #[debug(skip)]
1380        callback: Sender<()>,
1381    },
1382
1383    SetLocalTime {
1384        local_time: Timestamp,
1385        #[debug(skip)]
1386        callback: Sender<()>,
1387    },
1388
1389    AssertBefore {
1390        timestamp: Timestamp,
1391        #[debug(skip)]
1392        callback: Sender<Result<(), ExecutionError>>,
1393    },
1394
1395    AddCreatedBlob {
1396        blob: crate::Blob,
1397        #[debug(skip)]
1398        callback: Sender<()>,
1399    },
1400
1401    ValidationRound {
1402        round: Option<u32>,
1403        #[debug(skip)]
1404        callback: Sender<Option<u32>>,
1405    },
1406
1407    AllowApplicationLogs {
1408        #[debug(skip)]
1409        callback: Sender<bool>,
1410    },
1411
1412    /// Log message from contract execution (fire-and-forget, no callback needed).
1413    #[cfg(web)]
1414    Log {
1415        message: String,
1416        level: tracing::log::Level,
1417    },
1418}