linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15        Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19    ownership::ChainOwnership,
20    time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27    execution::UserAction,
28    runtime::ContractSyncRuntime,
29    system::{CreateApplicationResult, OpenChainConfig},
30    util::{OracleResponseExt as _, RespondExt as _},
31    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
32    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
33    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
34    ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
35};
36
37/// Actor for handling requests to the execution state.
38pub struct ExecutionStateActor<'a, C> {
39    state: &'a mut ExecutionStateView<C>,
40    txn_tracker: &'a mut TransactionTracker,
41    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
42}
43
44#[cfg(with_metrics)]
45mod metrics {
46    use std::sync::LazyLock;
47
48    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
49    use prometheus::HistogramVec;
50
51    /// Histogram of the latency to load a contract bytecode.
52    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
53        register_histogram_vec(
54            "load_contract_latency",
55            "Load contract latency",
56            &[],
57            exponential_bucket_latencies(250.0),
58        )
59    });
60
61    /// Histogram of the latency to load a service bytecode.
62    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
63        register_histogram_vec(
64            "load_service_latency",
65            "Load service latency",
66            &[],
67            exponential_bucket_latencies(250.0),
68        )
69    });
70}
71
72pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
73
74impl<'a, C> ExecutionStateActor<'a, C>
75where
76    C: Context + Clone + 'static,
77    C::Extra: ExecutionRuntimeContext,
78{
79    /// Creates a new execution state actor.
80    pub fn new(
81        state: &'a mut ExecutionStateView<C>,
82        txn_tracker: &'a mut TransactionTracker,
83        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
84    ) -> Self {
85        Self {
86            state,
87            txn_tracker,
88            resource_controller,
89        }
90    }
91
92    pub(crate) async fn load_contract(
93        &mut self,
94        id: ApplicationId,
95    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
96        #[cfg(with_metrics)]
97        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
98        let blob_id = id.description_blob_id();
99        let description = match self.txn_tracker.get_blob_content(&blob_id) {
100            Some(blob) => bcs::from_bytes(blob.bytes())?,
101            None => {
102                self.state
103                    .system
104                    .describe_application(id, self.txn_tracker)
105                    .await?
106            }
107        };
108        let code = self
109            .state
110            .context()
111            .extra()
112            .get_user_contract(&description, self.txn_tracker)
113            .await?;
114        Ok((code, description))
115    }
116
117    pub(crate) async fn load_service(
118        &mut self,
119        id: ApplicationId,
120    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
121        #[cfg(with_metrics)]
122        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
123        let blob_id = id.description_blob_id();
124        let description = match self.txn_tracker.get_blob_content(&blob_id) {
125            Some(blob) => bcs::from_bytes(blob.bytes())?,
126            None => {
127                self.state
128                    .system
129                    .describe_application(id, self.txn_tracker)
130                    .await?
131            }
132        };
133        let code = self
134            .state
135            .context()
136            .extra()
137            .get_user_service(&description, self.txn_tracker)
138            .await?;
139        Ok((code, description))
140    }
141
142    // TODO(#1416): Support concurrent I/O.
143    pub(crate) async fn handle_request(
144        &mut self,
145        request: ExecutionRequest,
146    ) -> Result<(), ExecutionError> {
147        use ExecutionRequest::*;
148        match request {
149            #[cfg(not(web))]
150            LoadContract { id, callback } => {
151                let (code, description) = self.load_contract(id).await?;
152                callback.respond((code, description))
153            }
154            #[cfg(not(web))]
155            LoadService { id, callback } => {
156                let (code, description) = self.load_service(id).await?;
157                callback.respond((code, description))
158            }
159
160            ChainBalance { callback } => {
161                let balance = *self.state.system.balance.get();
162                callback.respond(balance);
163            }
164
165            OwnerBalance { owner, callback } => {
166                let balance = self
167                    .state
168                    .system
169                    .balances
170                    .get(&owner)
171                    .await?
172                    .unwrap_or_default();
173                callback.respond(balance);
174            }
175
176            OwnerBalances { callback } => {
177                let balances = self.state.system.balances.index_values().await?;
178                callback.respond(balances.into_iter().collect());
179            }
180
181            BalanceOwners { callback } => {
182                let owners = self.state.system.balances.indices().await?;
183                callback.respond(owners);
184            }
185
186            Transfer {
187                source,
188                destination,
189                amount,
190                signer,
191                application_id,
192                callback,
193            } => {
194                let maybe_message = self
195                    .state
196                    .system
197                    .transfer(signer, Some(application_id), source, destination, amount)
198                    .await?;
199                self.txn_tracker.add_outgoing_messages(maybe_message);
200                callback.respond(());
201            }
202
203            Claim {
204                source,
205                destination,
206                amount,
207                signer,
208                application_id,
209                callback,
210            } => {
211                let maybe_message = self
212                    .state
213                    .system
214                    .claim(
215                        signer,
216                        Some(application_id),
217                        source.owner,
218                        source.chain_id,
219                        destination,
220                        amount,
221                    )
222                    .await?;
223                self.txn_tracker.add_outgoing_messages(maybe_message);
224                callback.respond(());
225            }
226
227            SystemTimestamp { callback } => {
228                let timestamp = *self.state.system.timestamp.get();
229                callback.respond(timestamp);
230            }
231
232            ChainOwnership { callback } => {
233                let ownership = self.state.system.ownership.get().clone();
234                callback.respond(ownership);
235            }
236
237            ApplicationPermissions { callback } => {
238                let permissions = self.state.system.application_permissions.get().clone();
239                callback.respond(permissions);
240            }
241
242            ContainsKey { id, key, callback } => {
243                let view = self.state.users.try_load_entry(&id).await?;
244                let result = match view {
245                    Some(view) => view.contains_key(&key).await?,
246                    None => false,
247                };
248                callback.respond(result);
249            }
250
251            ContainsKeys { id, keys, callback } => {
252                let view = self.state.users.try_load_entry(&id).await?;
253                let result = match view {
254                    Some(view) => view.contains_keys(&keys).await?,
255                    None => vec![false; keys.len()],
256                };
257                callback.respond(result);
258            }
259
260            ReadMultiValuesBytes { id, keys, callback } => {
261                let view = self.state.users.try_load_entry(&id).await?;
262                let values = match view {
263                    Some(view) => view.multi_get(&keys).await?,
264                    None => vec![None; keys.len()],
265                };
266                callback.respond(values);
267            }
268
269            ReadValueBytes { id, key, callback } => {
270                let view = self.state.users.try_load_entry(&id).await?;
271                let result = match view {
272                    Some(view) => view.get(&key).await?,
273                    None => None,
274                };
275                callback.respond(result);
276            }
277
278            FindKeysByPrefix {
279                id,
280                key_prefix,
281                callback,
282            } => {
283                let view = self.state.users.try_load_entry(&id).await?;
284                let result = match view {
285                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
286                    None => Vec::new(),
287                };
288                callback.respond(result);
289            }
290
291            FindKeyValuesByPrefix {
292                id,
293                key_prefix,
294                callback,
295            } => {
296                let view = self.state.users.try_load_entry(&id).await?;
297                let result = match view {
298                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
299                    None => Vec::new(),
300                };
301                callback.respond(result);
302            }
303
304            WriteBatch {
305                id,
306                batch,
307                callback,
308            } => {
309                let mut view = self.state.users.try_load_entry_mut(&id).await?;
310                view.write_batch(batch).await?;
311                callback.respond(());
312            }
313
314            OpenChain {
315                ownership,
316                balance,
317                parent_id,
318                block_height,
319                application_permissions,
320                timestamp,
321                callback,
322            } => {
323                let config = OpenChainConfig {
324                    ownership,
325                    balance,
326                    application_permissions,
327                };
328                let chain_id = self
329                    .state
330                    .system
331                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
332                    .await?;
333                callback.respond(chain_id);
334            }
335
336            CloseChain {
337                application_id,
338                callback,
339            } => {
340                let app_permissions = self.state.system.application_permissions.get();
341                if !app_permissions.can_close_chain(&application_id) {
342                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
343                } else {
344                    self.state.system.close_chain().await?;
345                    callback.respond(Ok(()));
346                }
347            }
348
349            ChangeOwnership {
350                application_id,
351                ownership,
352                callback,
353            } => {
354                let app_permissions = self.state.system.application_permissions.get();
355                if !app_permissions.can_close_chain(&application_id) {
356                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
357                } else {
358                    self.state.system.ownership.set(ownership);
359                    callback.respond(Ok(()));
360                }
361            }
362
363            ChangeApplicationPermissions {
364                application_id,
365                application_permissions,
366                callback,
367            } => {
368                let app_permissions = self.state.system.application_permissions.get();
369                if !app_permissions.can_change_application_permissions(&application_id) {
370                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
371                } else {
372                    self.state
373                        .system
374                        .application_permissions
375                        .set(application_permissions);
376                    callback.respond(Ok(()));
377                }
378            }
379
380            CreateApplication {
381                chain_id,
382                block_height,
383                module_id,
384                parameters,
385                required_application_ids,
386                callback,
387            } => {
388                let create_application_result = self
389                    .state
390                    .system
391                    .create_application(
392                        chain_id,
393                        block_height,
394                        module_id,
395                        parameters,
396                        required_application_ids,
397                        self.txn_tracker,
398                    )
399                    .await?;
400                callback.respond(Ok(create_application_result));
401            }
402
403            PerformHttpRequest {
404                request,
405                http_responses_are_oracle_responses,
406                callback,
407            } => {
408                let system = &mut self.state.system;
409                let response = self
410                    .txn_tracker
411                    .oracle(|| async {
412                        let headers = request
413                            .headers
414                            .into_iter()
415                            .map(|http::Header { name, value }| {
416                                Ok((name.parse()?, value.try_into()?))
417                            })
418                            .collect::<Result<HeaderMap, ExecutionError>>()?;
419
420                        let url = Url::parse(&request.url)?;
421                        let host = url
422                            .host_str()
423                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
424
425                        let (_epoch, committee) = system
426                            .current_committee()
427                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
428                        let allowed_hosts = &committee.policy().http_request_allow_list;
429
430                        ensure!(
431                            allowed_hosts.contains(host),
432                            ExecutionError::UnauthorizedHttpRequest(url)
433                        );
434
435                        let request = Client::new()
436                            .request(request.method.into(), url)
437                            .body(request.body)
438                            .headers(headers);
439                        #[cfg(not(web))]
440                        let request = request.timeout(linera_base::time::Duration::from_millis(
441                            committee.policy().http_request_timeout_ms,
442                        ));
443
444                        let response = request.send().await?;
445
446                        let mut response_size_limit =
447                            committee.policy().maximum_http_response_bytes;
448
449                        if http_responses_are_oracle_responses {
450                            response_size_limit = response_size_limit
451                                .min(committee.policy().maximum_oracle_response_bytes);
452                        }
453                        Ok(OracleResponse::Http(
454                            Self::receive_http_response(response, response_size_limit).await?,
455                        ))
456                    })
457                    .await?
458                    .to_http_response()?;
459                callback.respond(response);
460            }
461
462            ReadBlobContent { blob_id, callback } => {
463                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
464                    content.clone()
465                } else {
466                    let content = self.state.system.read_blob_content(blob_id).await?;
467                    if blob_id.blob_type == BlobType::Data {
468                        self.resource_controller
469                            .with_state(&mut self.state.system)
470                            .await?
471                            .track_blob_read(content.bytes().len() as u64)?;
472                    }
473                    self.state
474                        .system
475                        .blob_used(self.txn_tracker, blob_id)
476                        .await?;
477                    content
478                };
479                callback.respond(content)
480            }
481
482            AssertBlobExists { blob_id, callback } => {
483                self.state.system.assert_blob_exists(blob_id).await?;
484                // Treating this as reading a size-0 blob for fee purposes.
485                if blob_id.blob_type == BlobType::Data {
486                    self.resource_controller
487                        .with_state(&mut self.state.system)
488                        .await?
489                        .track_blob_read(0)?;
490                }
491                let is_new = self
492                    .state
493                    .system
494                    .blob_used(self.txn_tracker, blob_id)
495                    .await?;
496                if is_new {
497                    self.txn_tracker
498                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
499                }
500                callback.respond(());
501            }
502
503            Emit {
504                stream_id,
505                value,
506                callback,
507            } => {
508                let count = self
509                    .state
510                    .stream_event_counts
511                    .get_mut_or_default(&stream_id)
512                    .await?;
513                let index = *count;
514                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
515                self.txn_tracker.add_event(stream_id, index, value);
516                callback.respond(index)
517            }
518
519            ReadEvent { event_id, callback } => {
520                let extra = self.state.context().extra();
521                let event = self
522                    .txn_tracker
523                    .oracle(|| async {
524                        let event = extra
525                            .get_event(event_id.clone())
526                            .await?
527                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
528                        Ok(OracleResponse::Event(event_id.clone(), event))
529                    })
530                    .await?
531                    .to_event(&event_id)?;
532                callback.respond(event);
533            }
534
535            SubscribeToEvents {
536                chain_id,
537                stream_id,
538                subscriber_app_id,
539                callback,
540            } => {
541                let subscriptions = self
542                    .state
543                    .system
544                    .event_subscriptions
545                    .get_mut_or_default(&(chain_id, stream_id.clone()))
546                    .await?;
547                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
548                    subscriptions.next_index
549                } else {
550                    0
551                };
552                self.txn_tracker.add_stream_to_process(
553                    subscriber_app_id,
554                    chain_id,
555                    stream_id,
556                    0,
557                    next_index,
558                );
559                callback.respond(());
560            }
561
562            UnsubscribeFromEvents {
563                chain_id,
564                stream_id,
565                subscriber_app_id,
566                callback,
567            } => {
568                let key = (chain_id, stream_id.clone());
569                let subscriptions = self
570                    .state
571                    .system
572                    .event_subscriptions
573                    .get_mut_or_default(&key)
574                    .await?;
575                subscriptions.applications.remove(&subscriber_app_id);
576                if subscriptions.applications.is_empty() {
577                    self.state.system.event_subscriptions.remove(&key)?;
578                }
579                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
580                    self.txn_tracker
581                        .remove_stream_to_process(app_id, chain_id, stream_id);
582                }
583                callback.respond(());
584            }
585
586            GetApplicationPermissions { callback } => {
587                let app_permissions = self.state.system.application_permissions.get();
588                callback.respond(app_permissions.clone());
589            }
590
591            QueryServiceOracle {
592                deadline,
593                application_id,
594                next_block_height,
595                query,
596                callback,
597            } => {
598                let state = &mut self.state;
599                let local_time = self.txn_tracker.local_time();
600                let created_blobs = self.txn_tracker.created_blobs().clone();
601                let bytes = self
602                    .txn_tracker
603                    .oracle(|| async {
604                        let context = QueryContext {
605                            chain_id: state.context().extra().chain_id(),
606                            next_block_height,
607                            local_time,
608                        };
609                        let QueryOutcome {
610                            response,
611                            operations,
612                        } = Box::pin(state.query_user_application_with_deadline(
613                            application_id,
614                            context,
615                            query,
616                            deadline,
617                            created_blobs,
618                        ))
619                        .await?;
620                        ensure!(
621                            operations.is_empty(),
622                            ExecutionError::ServiceOracleQueryOperations(operations)
623                        );
624                        Ok(OracleResponse::Service(response))
625                    })
626                    .await?
627                    .to_service_response()?;
628                callback.respond(bytes);
629            }
630
631            AddOutgoingMessage { message, callback } => {
632                self.txn_tracker.add_outgoing_message(message);
633                callback.respond(());
634            }
635
636            SetLocalTime {
637                local_time,
638                callback,
639            } => {
640                self.txn_tracker.set_local_time(local_time);
641                callback.respond(());
642            }
643
644            AssertBefore {
645                timestamp,
646                callback,
647            } => {
648                let result = if !self
649                    .txn_tracker
650                    .replay_oracle_response(OracleResponse::Assert)?
651                {
652                    // There are no recorded oracle responses, so we check the local time.
653                    let local_time = self.txn_tracker.local_time();
654                    if local_time >= timestamp {
655                        Err(ExecutionError::AssertBefore {
656                            timestamp,
657                            local_time,
658                        })
659                    } else {
660                        Ok(())
661                    }
662                } else {
663                    Ok(())
664                };
665                callback.respond(result);
666            }
667
668            AddCreatedBlob { blob, callback } => {
669                self.txn_tracker.add_created_blob(blob);
670                callback.respond(());
671            }
672
673            ValidationRound { round, callback } => {
674                let validation_round = self
675                    .txn_tracker
676                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
677                    .await?
678                    .to_round()?;
679                callback.respond(validation_round);
680            }
681
682            AllowApplicationLogs { callback } => {
683                let allow = self
684                    .state
685                    .context()
686                    .extra()
687                    .execution_runtime_config()
688                    .allow_application_logs;
689                callback.respond(allow);
690            }
691
692            #[cfg(web)]
693            Log { message, level } => {
694                // Output directly to browser console with clean formatting
695                let formatted: js_sys::JsString = format!("[CONTRACT {level}] {message}").into();
696                match level {
697                    tracing::log::Level::Trace | tracing::log::Level::Debug => {
698                        web_sys::console::debug_1(&formatted)
699                    }
700                    tracing::log::Level::Info => web_sys::console::log_1(&formatted),
701                    tracing::log::Level::Warn => web_sys::console::warn_1(&formatted),
702                    tracing::log::Level::Error => web_sys::console::error_1(&formatted),
703                }
704            }
705        }
706
707        Ok(())
708    }
709
710    /// Calls `process_streams` for all applications that are subscribed to streams with new
711    /// events or that have new subscriptions.
712    async fn process_subscriptions(
713        &mut self,
714        context: ProcessStreamsContext,
715    ) -> Result<(), ExecutionError> {
716        // Keep track of which streams we have already processed. This is to guard against
717        // applications unsubscribing and subscribing in the process_streams call itself.
718        let mut processed = BTreeSet::new();
719        loop {
720            let to_process = self
721                .txn_tracker
722                .take_streams_to_process()
723                .into_iter()
724                .filter_map(|(app_id, updates)| {
725                    let updates = updates
726                        .into_iter()
727                        .filter_map(|update| {
728                            if !processed.insert((
729                                app_id,
730                                update.chain_id,
731                                update.stream_id.clone(),
732                            )) {
733                                return None;
734                            }
735                            Some(update)
736                        })
737                        .collect::<Vec<_>>();
738                    if updates.is_empty() {
739                        return None;
740                    }
741                    Some((app_id, updates))
742                })
743                .collect::<BTreeMap<_, _>>();
744            if to_process.is_empty() {
745                return Ok(());
746            }
747            for (app_id, updates) in to_process {
748                self.run_user_action(
749                    app_id,
750                    UserAction::ProcessStreams(context, updates),
751                    None,
752                    None,
753                )
754                .await?;
755            }
756        }
757    }
758
759    pub(crate) async fn run_user_action(
760        &mut self,
761        application_id: ApplicationId,
762        action: UserAction,
763        refund_grant_to: Option<Account>,
764        grant: Option<&mut Amount>,
765    ) -> Result<(), ExecutionError> {
766        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
767            .await
768    }
769
770    // TODO(#5034): unify with `contract_and_dependencies`
771    pub(crate) async fn service_and_dependencies(
772        &mut self,
773        application: ApplicationId,
774    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
775        // cyclic futures are illegal so we need to either box the frames or keep our own
776        // stack
777        let mut stack = vec![application];
778        let mut codes = vec![];
779        let mut descriptions = vec![];
780
781        while let Some(id) = stack.pop() {
782            let (code, description) = self.load_service(id).await?;
783            stack.extend(description.required_application_ids.iter().rev().copied());
784            codes.push(code);
785            descriptions.push(description);
786        }
787
788        codes.reverse();
789        descriptions.reverse();
790
791        Ok((codes, descriptions))
792    }
793
794    // TODO(#5034): unify with `service_and_dependencies`
795    async fn contract_and_dependencies(
796        &mut self,
797        application: ApplicationId,
798    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
799        // cyclic futures are illegal so we need to either box the frames or keep our own
800        // stack
801        let mut stack = vec![application];
802        let mut codes = vec![];
803        let mut descriptions = vec![];
804
805        while let Some(id) = stack.pop() {
806            let (code, description) = self.load_contract(id).await?;
807            stack.extend(description.required_application_ids.iter().rev().copied());
808            codes.push(code);
809            descriptions.push(description);
810        }
811
812        codes.reverse();
813        descriptions.reverse();
814
815        Ok((codes, descriptions))
816    }
817
818    async fn run_user_action_with_runtime(
819        &mut self,
820        application_id: ApplicationId,
821        action: UserAction,
822        refund_grant_to: Option<Account>,
823        grant: Option<&mut Amount>,
824    ) -> Result<(), ExecutionError> {
825        let chain_id = self.state.context().extra().chain_id();
826        let mut cloned_grant = grant.as_ref().map(|x| **x);
827        let initial_balance = self
828            .resource_controller
829            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
830            .await?
831            .balance()?;
832        let controller = ResourceController::new(
833            self.resource_controller.policy().clone(),
834            self.resource_controller.tracker,
835            initial_balance,
836        );
837        let (execution_state_sender, mut execution_state_receiver) =
838            futures::channel::mpsc::unbounded();
839
840        let (codes, descriptions): (Vec<_>, Vec<_>) =
841            self.contract_and_dependencies(application_id).await?;
842
843        let allow_application_logs = self
844            .state
845            .context()
846            .extra()
847            .execution_runtime_config()
848            .allow_application_logs;
849
850        let contract_runtime_task = self
851            .state
852            .context()
853            .extra()
854            .thread_pool()
855            .run_send(JsVec(codes), move |codes| async move {
856                let runtime = ContractSyncRuntime::new(
857                    execution_state_sender,
858                    chain_id,
859                    refund_grant_to,
860                    controller,
861                    &action,
862                    allow_application_logs,
863                );
864
865                for (code, description) in codes.0.into_iter().zip(descriptions) {
866                    runtime.preload_contract(
867                        ApplicationId::from(&description),
868                        code,
869                        description,
870                    )?;
871                }
872
873                runtime.run_action(application_id, chain_id, action)
874            })
875            .await;
876
877        while let Some(request) = execution_state_receiver.next().await {
878            self.handle_request(request).await?;
879        }
880
881        let (result, controller) = contract_runtime_task.await??;
882
883        self.txn_tracker.add_operation_result(result);
884
885        self.resource_controller
886            .with_state_and_grant(&mut self.state.system, grant)
887            .await?
888            .merge_balance(initial_balance, controller.balance()?)?;
889        self.resource_controller.tracker = controller.tracker;
890
891        Ok(())
892    }
893
894    pub async fn execute_operation(
895        &mut self,
896        context: OperationContext,
897        operation: Operation,
898    ) -> Result<(), ExecutionError> {
899        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
900        match operation {
901            Operation::System(op) => {
902                let new_application = self
903                    .state
904                    .system
905                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
906                    .await?;
907                if let Some((application_id, argument)) = new_application {
908                    let user_action = UserAction::Instantiate(context, argument);
909                    self.run_user_action(
910                        application_id,
911                        user_action,
912                        context.refund_grant_to(),
913                        None,
914                    )
915                    .await?;
916                }
917            }
918            Operation::User {
919                application_id,
920                bytes,
921            } => {
922                self.run_user_action(
923                    application_id,
924                    UserAction::Operation(context, bytes),
925                    context.refund_grant_to(),
926                    None,
927                )
928                .await?;
929            }
930        }
931        self.process_subscriptions(context.into()).await?;
932        Ok(())
933    }
934
935    pub async fn execute_message(
936        &mut self,
937        context: MessageContext,
938        message: Message,
939        grant: Option<&mut Amount>,
940    ) -> Result<(), ExecutionError> {
941        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
942        match message {
943            Message::System(message) => {
944                let outcome = self.state.system.execute_message(context, message).await?;
945                self.txn_tracker.add_outgoing_messages(outcome);
946            }
947            Message::User {
948                application_id,
949                bytes,
950            } => {
951                self.run_user_action(
952                    application_id,
953                    UserAction::Message(context, bytes),
954                    context.refund_grant_to,
955                    grant,
956                )
957                .await?;
958            }
959        }
960        self.process_subscriptions(context.into()).await?;
961        Ok(())
962    }
963
964    pub fn bounce_message(
965        &mut self,
966        context: MessageContext,
967        grant: Amount,
968        message: Message,
969    ) -> Result<(), ExecutionError> {
970        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
971        self.txn_tracker.add_outgoing_message(OutgoingMessage {
972            destination: context.origin,
973            authenticated_signer: context.authenticated_signer,
974            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
975            grant,
976            kind: MessageKind::Bouncing,
977            message,
978        });
979        Ok(())
980    }
981
982    pub fn send_refund(
983        &mut self,
984        context: MessageContext,
985        amount: Amount,
986    ) -> Result<(), ExecutionError> {
987        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
988        if amount.is_zero() {
989            return Ok(());
990        }
991        let Some(account) = context.refund_grant_to else {
992            return Err(ExecutionError::InternalError(
993                "Messages with grants should have a non-empty `refund_grant_to`",
994            ));
995        };
996        let message = SystemMessage::Credit {
997            amount,
998            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
999            target: account.owner,
1000        };
1001        self.txn_tracker.add_outgoing_message(
1002            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1003        );
1004        Ok(())
1005    }
1006
1007    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
1008    ///
1009    /// Ensures that the response does not exceed the provided `size_limit`.
1010    async fn receive_http_response(
1011        response: reqwest::Response,
1012        size_limit: u64,
1013    ) -> Result<http::Response, ExecutionError> {
1014        let status = response.status().as_u16();
1015        let maybe_content_length = response.content_length();
1016
1017        let headers = response
1018            .headers()
1019            .iter()
1020            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1021            .collect::<Vec<_>>();
1022
1023        let total_header_size = headers
1024            .iter()
1025            .map(|header| (header.name.len() + header.value.len()) as u64)
1026            .sum();
1027
1028        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1029            ExecutionError::HttpResponseSizeLimitExceeded {
1030                limit: size_limit,
1031                size: total_header_size,
1032            },
1033        )?;
1034
1035        if let Some(content_length) = maybe_content_length {
1036            if content_length > remaining_bytes {
1037                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1038                    limit: size_limit,
1039                    size: content_length + total_header_size,
1040                });
1041            }
1042        }
1043
1044        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1045        let mut body_stream = response.bytes_stream();
1046
1047        while let Some(bytes) = body_stream.next().await.transpose()? {
1048            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1049                ExecutionError::HttpResponseSizeLimitExceeded {
1050                    limit: size_limit,
1051                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1052                },
1053            )?;
1054
1055            body.extend(&bytes);
1056        }
1057
1058        Ok(http::Response {
1059            status,
1060            headers,
1061            body,
1062        })
1063    }
1064}
1065
1066/// Requests to the execution state.
1067#[derive(Debug)]
1068pub enum ExecutionRequest {
1069    #[cfg(not(web))]
1070    LoadContract {
1071        id: ApplicationId,
1072        #[debug(skip)]
1073        callback: Sender<(UserContractCode, ApplicationDescription)>,
1074    },
1075
1076    #[cfg(not(web))]
1077    LoadService {
1078        id: ApplicationId,
1079        #[debug(skip)]
1080        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1081    },
1082
1083    ChainBalance {
1084        #[debug(skip)]
1085        callback: Sender<Amount>,
1086    },
1087
1088    OwnerBalance {
1089        owner: AccountOwner,
1090        #[debug(skip)]
1091        callback: Sender<Amount>,
1092    },
1093
1094    OwnerBalances {
1095        #[debug(skip)]
1096        callback: Sender<Vec<(AccountOwner, Amount)>>,
1097    },
1098
1099    BalanceOwners {
1100        #[debug(skip)]
1101        callback: Sender<Vec<AccountOwner>>,
1102    },
1103
1104    Transfer {
1105        source: AccountOwner,
1106        destination: Account,
1107        amount: Amount,
1108        #[debug(skip_if = Option::is_none)]
1109        signer: Option<AccountOwner>,
1110        application_id: ApplicationId,
1111        #[debug(skip)]
1112        callback: Sender<()>,
1113    },
1114
1115    Claim {
1116        source: Account,
1117        destination: Account,
1118        amount: Amount,
1119        #[debug(skip_if = Option::is_none)]
1120        signer: Option<AccountOwner>,
1121        application_id: ApplicationId,
1122        #[debug(skip)]
1123        callback: Sender<()>,
1124    },
1125
1126    SystemTimestamp {
1127        #[debug(skip)]
1128        callback: Sender<Timestamp>,
1129    },
1130
1131    ChainOwnership {
1132        #[debug(skip)]
1133        callback: Sender<ChainOwnership>,
1134    },
1135
1136    ApplicationPermissions {
1137        #[debug(skip)]
1138        callback: Sender<ApplicationPermissions>,
1139    },
1140
1141    ReadValueBytes {
1142        id: ApplicationId,
1143        #[debug(with = hex_debug)]
1144        key: Vec<u8>,
1145        #[debug(skip)]
1146        callback: Sender<Option<Vec<u8>>>,
1147    },
1148
1149    ContainsKey {
1150        id: ApplicationId,
1151        key: Vec<u8>,
1152        #[debug(skip)]
1153        callback: Sender<bool>,
1154    },
1155
1156    ContainsKeys {
1157        id: ApplicationId,
1158        #[debug(with = hex_vec_debug)]
1159        keys: Vec<Vec<u8>>,
1160        callback: Sender<Vec<bool>>,
1161    },
1162
1163    ReadMultiValuesBytes {
1164        id: ApplicationId,
1165        #[debug(with = hex_vec_debug)]
1166        keys: Vec<Vec<u8>>,
1167        #[debug(skip)]
1168        callback: Sender<Vec<Option<Vec<u8>>>>,
1169    },
1170
1171    FindKeysByPrefix {
1172        id: ApplicationId,
1173        #[debug(with = hex_debug)]
1174        key_prefix: Vec<u8>,
1175        #[debug(skip)]
1176        callback: Sender<Vec<Vec<u8>>>,
1177    },
1178
1179    FindKeyValuesByPrefix {
1180        id: ApplicationId,
1181        #[debug(with = hex_debug)]
1182        key_prefix: Vec<u8>,
1183        #[debug(skip)]
1184        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1185    },
1186
1187    WriteBatch {
1188        id: ApplicationId,
1189        batch: Batch,
1190        #[debug(skip)]
1191        callback: Sender<()>,
1192    },
1193
1194    OpenChain {
1195        ownership: ChainOwnership,
1196        #[debug(skip_if = Amount::is_zero)]
1197        balance: Amount,
1198        parent_id: ChainId,
1199        block_height: BlockHeight,
1200        application_permissions: ApplicationPermissions,
1201        timestamp: Timestamp,
1202        #[debug(skip)]
1203        callback: Sender<ChainId>,
1204    },
1205
1206    CloseChain {
1207        application_id: ApplicationId,
1208        #[debug(skip)]
1209        callback: Sender<Result<(), ExecutionError>>,
1210    },
1211
1212    ChangeOwnership {
1213        application_id: ApplicationId,
1214        ownership: ChainOwnership,
1215        #[debug(skip)]
1216        callback: Sender<Result<(), ExecutionError>>,
1217    },
1218
1219    ChangeApplicationPermissions {
1220        application_id: ApplicationId,
1221        application_permissions: ApplicationPermissions,
1222        #[debug(skip)]
1223        callback: Sender<Result<(), ExecutionError>>,
1224    },
1225
1226    CreateApplication {
1227        chain_id: ChainId,
1228        block_height: BlockHeight,
1229        module_id: ModuleId,
1230        parameters: Vec<u8>,
1231        required_application_ids: Vec<ApplicationId>,
1232        #[debug(skip)]
1233        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1234    },
1235
1236    PerformHttpRequest {
1237        request: http::Request,
1238        http_responses_are_oracle_responses: bool,
1239        #[debug(skip)]
1240        callback: Sender<http::Response>,
1241    },
1242
1243    ReadBlobContent {
1244        blob_id: BlobId,
1245        #[debug(skip)]
1246        callback: Sender<BlobContent>,
1247    },
1248
1249    AssertBlobExists {
1250        blob_id: BlobId,
1251        #[debug(skip)]
1252        callback: Sender<()>,
1253    },
1254
1255    Emit {
1256        stream_id: StreamId,
1257        #[debug(with = hex_debug)]
1258        value: Vec<u8>,
1259        #[debug(skip)]
1260        callback: Sender<u32>,
1261    },
1262
1263    ReadEvent {
1264        event_id: EventId,
1265        callback: oneshot::Sender<Vec<u8>>,
1266    },
1267
1268    SubscribeToEvents {
1269        chain_id: ChainId,
1270        stream_id: StreamId,
1271        subscriber_app_id: ApplicationId,
1272        #[debug(skip)]
1273        callback: Sender<()>,
1274    },
1275
1276    UnsubscribeFromEvents {
1277        chain_id: ChainId,
1278        stream_id: StreamId,
1279        subscriber_app_id: ApplicationId,
1280        #[debug(skip)]
1281        callback: Sender<()>,
1282    },
1283
1284    GetApplicationPermissions {
1285        #[debug(skip)]
1286        callback: Sender<ApplicationPermissions>,
1287    },
1288
1289    QueryServiceOracle {
1290        deadline: Option<Instant>,
1291        application_id: ApplicationId,
1292        next_block_height: BlockHeight,
1293        query: Vec<u8>,
1294        #[debug(skip)]
1295        callback: Sender<Vec<u8>>,
1296    },
1297
1298    AddOutgoingMessage {
1299        message: crate::OutgoingMessage,
1300        #[debug(skip)]
1301        callback: Sender<()>,
1302    },
1303
1304    SetLocalTime {
1305        local_time: Timestamp,
1306        #[debug(skip)]
1307        callback: Sender<()>,
1308    },
1309
1310    AssertBefore {
1311        timestamp: Timestamp,
1312        #[debug(skip)]
1313        callback: Sender<Result<(), ExecutionError>>,
1314    },
1315
1316    AddCreatedBlob {
1317        blob: crate::Blob,
1318        #[debug(skip)]
1319        callback: Sender<()>,
1320    },
1321
1322    ValidationRound {
1323        round: Option<u32>,
1324        #[debug(skip)]
1325        callback: Sender<Option<u32>>,
1326    },
1327
1328    AllowApplicationLogs {
1329        #[debug(skip)]
1330        callback: Sender<bool>,
1331    },
1332
1333    /// Log message from contract execution (fire-and-forget, no callback needed).
1334    #[cfg(web)]
1335    Log {
1336        message: String,
1337        level: tracing::log::Level,
1338    },
1339}