linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15        Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19    ownership::ChainOwnership,
20    time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27    execution::UserAction,
28    runtime::ContractSyncRuntime,
29    system::{CreateApplicationResult, OpenChainConfig},
30    util::{OracleResponseExt as _, RespondExt as _},
31    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
32    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
33    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
34    ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
35};
36
37/// Actor for handling requests to the execution state.
38pub struct ExecutionStateActor<'a, C> {
39    state: &'a mut ExecutionStateView<C>,
40    txn_tracker: &'a mut TransactionTracker,
41    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
42}
43
44#[cfg(with_metrics)]
45mod metrics {
46    use std::sync::LazyLock;
47
48    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
49    use prometheus::HistogramVec;
50
51    /// Histogram of the latency to load a contract bytecode.
52    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
53        register_histogram_vec(
54            "load_contract_latency",
55            "Load contract latency",
56            &[],
57            exponential_bucket_latencies(250.0),
58        )
59    });
60
61    /// Histogram of the latency to load a service bytecode.
62    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
63        register_histogram_vec(
64            "load_service_latency",
65            "Load service latency",
66            &[],
67            exponential_bucket_latencies(250.0),
68        )
69    });
70}
71
72pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
73
74impl<'a, C> ExecutionStateActor<'a, C>
75where
76    C: Context + Clone + 'static,
77    C::Extra: ExecutionRuntimeContext,
78{
79    /// Creates a new execution state actor.
80    pub fn new(
81        state: &'a mut ExecutionStateView<C>,
82        txn_tracker: &'a mut TransactionTracker,
83        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
84    ) -> Self {
85        Self {
86            state,
87            txn_tracker,
88            resource_controller,
89        }
90    }
91
92    pub(crate) async fn load_contract(
93        &mut self,
94        id: ApplicationId,
95    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
96        #[cfg(with_metrics)]
97        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
98        let blob_id = id.description_blob_id();
99        let description = match self.txn_tracker.get_blob_content(&blob_id) {
100            Some(blob) => bcs::from_bytes(blob.bytes())?,
101            None => {
102                self.state
103                    .system
104                    .describe_application(id, self.txn_tracker)
105                    .await?
106            }
107        };
108        let code = self
109            .state
110            .context()
111            .extra()
112            .get_user_contract(&description, self.txn_tracker)
113            .await?;
114        Ok((code, description))
115    }
116
117    pub(crate) async fn load_service(
118        &mut self,
119        id: ApplicationId,
120    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
121        #[cfg(with_metrics)]
122        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
123        let blob_id = id.description_blob_id();
124        let description = match self.txn_tracker.get_blob_content(&blob_id) {
125            Some(blob) => bcs::from_bytes(blob.bytes())?,
126            None => {
127                self.state
128                    .system
129                    .describe_application(id, self.txn_tracker)
130                    .await?
131            }
132        };
133        let code = self
134            .state
135            .context()
136            .extra()
137            .get_user_service(&description, self.txn_tracker)
138            .await?;
139        Ok((code, description))
140    }
141
142    // TODO(#1416): Support concurrent I/O.
143    pub(crate) async fn handle_request(
144        &mut self,
145        request: ExecutionRequest,
146    ) -> Result<(), ExecutionError> {
147        use ExecutionRequest::*;
148        match request {
149            #[cfg(not(web))]
150            LoadContract { id, callback } => {
151                let (code, description) = self.load_contract(id).await?;
152                callback.respond((code, description))
153            }
154            #[cfg(not(web))]
155            LoadService { id, callback } => {
156                let (code, description) = self.load_service(id).await?;
157                callback.respond((code, description))
158            }
159
160            ChainBalance { callback } => {
161                let balance = *self.state.system.balance.get();
162                callback.respond(balance);
163            }
164
165            OwnerBalance { owner, callback } => {
166                let balance = self
167                    .state
168                    .system
169                    .balances
170                    .get(&owner)
171                    .await?
172                    .unwrap_or_default();
173                callback.respond(balance);
174            }
175
176            OwnerBalances { callback } => {
177                let balances = self.state.system.balances.index_values().await?;
178                callback.respond(balances.into_iter().collect());
179            }
180
181            BalanceOwners { callback } => {
182                let owners = self.state.system.balances.indices().await?;
183                callback.respond(owners);
184            }
185
186            Transfer {
187                source,
188                destination,
189                amount,
190                signer,
191                application_id,
192                callback,
193            } => {
194                let maybe_message = self
195                    .state
196                    .system
197                    .transfer(signer, Some(application_id), source, destination, amount)
198                    .await?;
199                self.txn_tracker.add_outgoing_messages(maybe_message);
200                callback.respond(());
201            }
202
203            Claim {
204                source,
205                destination,
206                amount,
207                signer,
208                application_id,
209                callback,
210            } => {
211                let maybe_message = self
212                    .state
213                    .system
214                    .claim(
215                        signer,
216                        Some(application_id),
217                        source.owner,
218                        source.chain_id,
219                        destination,
220                        amount,
221                    )
222                    .await?;
223                self.txn_tracker.add_outgoing_messages(maybe_message);
224                callback.respond(());
225            }
226
227            SystemTimestamp { callback } => {
228                let timestamp = *self.state.system.timestamp.get();
229                callback.respond(timestamp);
230            }
231
232            ChainOwnership { callback } => {
233                let ownership = self.state.system.ownership.get().clone();
234                callback.respond(ownership);
235            }
236
237            ContainsKey { id, key, callback } => {
238                let view = self.state.users.try_load_entry(&id).await?;
239                let result = match view {
240                    Some(view) => view.contains_key(&key).await?,
241                    None => false,
242                };
243                callback.respond(result);
244            }
245
246            ContainsKeys { id, keys, callback } => {
247                let view = self.state.users.try_load_entry(&id).await?;
248                let result = match view {
249                    Some(view) => view.contains_keys(&keys).await?,
250                    None => vec![false; keys.len()],
251                };
252                callback.respond(result);
253            }
254
255            ReadMultiValuesBytes { id, keys, callback } => {
256                let view = self.state.users.try_load_entry(&id).await?;
257                let values = match view {
258                    Some(view) => view.multi_get(&keys).await?,
259                    None => vec![None; keys.len()],
260                };
261                callback.respond(values);
262            }
263
264            ReadValueBytes { id, key, callback } => {
265                let view = self.state.users.try_load_entry(&id).await?;
266                let result = match view {
267                    Some(view) => view.get(&key).await?,
268                    None => None,
269                };
270                callback.respond(result);
271            }
272
273            FindKeysByPrefix {
274                id,
275                key_prefix,
276                callback,
277            } => {
278                let view = self.state.users.try_load_entry(&id).await?;
279                let result = match view {
280                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
281                    None => Vec::new(),
282                };
283                callback.respond(result);
284            }
285
286            FindKeyValuesByPrefix {
287                id,
288                key_prefix,
289                callback,
290            } => {
291                let view = self.state.users.try_load_entry(&id).await?;
292                let result = match view {
293                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
294                    None => Vec::new(),
295                };
296                callback.respond(result);
297            }
298
299            WriteBatch {
300                id,
301                batch,
302                callback,
303            } => {
304                let mut view = self.state.users.try_load_entry_mut(&id).await?;
305                view.write_batch(batch).await?;
306                callback.respond(());
307            }
308
309            OpenChain {
310                ownership,
311                balance,
312                parent_id,
313                block_height,
314                application_permissions,
315                timestamp,
316                callback,
317            } => {
318                let config = OpenChainConfig {
319                    ownership,
320                    balance,
321                    application_permissions,
322                };
323                let chain_id = self
324                    .state
325                    .system
326                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
327                    .await?;
328                callback.respond(chain_id);
329            }
330
331            CloseChain {
332                application_id,
333                callback,
334            } => {
335                let app_permissions = self.state.system.application_permissions.get();
336                if !app_permissions.can_close_chain(&application_id) {
337                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
338                } else {
339                    self.state.system.close_chain().await?;
340                    callback.respond(Ok(()));
341                }
342            }
343
344            ChangeApplicationPermissions {
345                application_id,
346                application_permissions,
347                callback,
348            } => {
349                let app_permissions = self.state.system.application_permissions.get();
350                if !app_permissions.can_change_application_permissions(&application_id) {
351                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
352                } else {
353                    self.state
354                        .system
355                        .application_permissions
356                        .set(application_permissions);
357                    callback.respond(Ok(()));
358                }
359            }
360
361            CreateApplication {
362                chain_id,
363                block_height,
364                module_id,
365                parameters,
366                required_application_ids,
367                callback,
368            } => {
369                let create_application_result = self
370                    .state
371                    .system
372                    .create_application(
373                        chain_id,
374                        block_height,
375                        module_id,
376                        parameters,
377                        required_application_ids,
378                        self.txn_tracker,
379                    )
380                    .await?;
381                callback.respond(Ok(create_application_result));
382            }
383
384            PerformHttpRequest {
385                request,
386                http_responses_are_oracle_responses,
387                callback,
388            } => {
389                let system = &mut self.state.system;
390                let response = self
391                    .txn_tracker
392                    .oracle(|| async {
393                        let headers = request
394                            .headers
395                            .into_iter()
396                            .map(|http::Header { name, value }| {
397                                Ok((name.parse()?, value.try_into()?))
398                            })
399                            .collect::<Result<HeaderMap, ExecutionError>>()?;
400
401                        let url = Url::parse(&request.url)?;
402                        let host = url
403                            .host_str()
404                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
405
406                        let (_epoch, committee) = system
407                            .current_committee()
408                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
409                        let allowed_hosts = &committee.policy().http_request_allow_list;
410
411                        ensure!(
412                            allowed_hosts.contains(host),
413                            ExecutionError::UnauthorizedHttpRequest(url)
414                        );
415
416                        let request = Client::new()
417                            .request(request.method.into(), url)
418                            .body(request.body)
419                            .headers(headers);
420                        #[cfg(not(web))]
421                        let request = request.timeout(linera_base::time::Duration::from_millis(
422                            committee.policy().http_request_timeout_ms,
423                        ));
424
425                        let response = request.send().await?;
426
427                        let mut response_size_limit =
428                            committee.policy().maximum_http_response_bytes;
429
430                        if http_responses_are_oracle_responses {
431                            response_size_limit = response_size_limit
432                                .min(committee.policy().maximum_oracle_response_bytes);
433                        }
434                        Ok(OracleResponse::Http(
435                            Self::receive_http_response(response, response_size_limit).await?,
436                        ))
437                    })
438                    .await?
439                    .to_http_response()?;
440                callback.respond(response);
441            }
442
443            ReadBlobContent { blob_id, callback } => {
444                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
445                    content.clone()
446                } else {
447                    let content = self.state.system.read_blob_content(blob_id).await?;
448                    if blob_id.blob_type == BlobType::Data {
449                        self.resource_controller
450                            .with_state(&mut self.state.system)
451                            .await?
452                            .track_blob_read(content.bytes().len() as u64)?;
453                    }
454                    self.state
455                        .system
456                        .blob_used(self.txn_tracker, blob_id)
457                        .await?;
458                    content
459                };
460                callback.respond(content)
461            }
462
463            AssertBlobExists { blob_id, callback } => {
464                self.state.system.assert_blob_exists(blob_id).await?;
465                // Treating this as reading a size-0 blob for fee purposes.
466                if blob_id.blob_type == BlobType::Data {
467                    self.resource_controller
468                        .with_state(&mut self.state.system)
469                        .await?
470                        .track_blob_read(0)?;
471                }
472                let is_new = self
473                    .state
474                    .system
475                    .blob_used(self.txn_tracker, blob_id)
476                    .await?;
477                if is_new {
478                    self.txn_tracker
479                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
480                }
481                callback.respond(());
482            }
483
484            Emit {
485                stream_id,
486                value,
487                callback,
488            } => {
489                let count = self
490                    .state
491                    .stream_event_counts
492                    .get_mut_or_default(&stream_id)
493                    .await?;
494                let index = *count;
495                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
496                self.txn_tracker.add_event(stream_id, index, value);
497                callback.respond(index)
498            }
499
500            ReadEvent { event_id, callback } => {
501                let extra = self.state.context().extra();
502                let event = self
503                    .txn_tracker
504                    .oracle(|| async {
505                        let event = extra
506                            .get_event(event_id.clone())
507                            .await?
508                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
509                        Ok(OracleResponse::Event(event_id.clone(), event))
510                    })
511                    .await?
512                    .to_event(&event_id)?;
513                callback.respond(event);
514            }
515
516            SubscribeToEvents {
517                chain_id,
518                stream_id,
519                subscriber_app_id,
520                callback,
521            } => {
522                let subscriptions = self
523                    .state
524                    .system
525                    .event_subscriptions
526                    .get_mut_or_default(&(chain_id, stream_id.clone()))
527                    .await?;
528                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
529                    subscriptions.next_index
530                } else {
531                    0
532                };
533                self.txn_tracker.add_stream_to_process(
534                    subscriber_app_id,
535                    chain_id,
536                    stream_id,
537                    0,
538                    next_index,
539                );
540                callback.respond(());
541            }
542
543            UnsubscribeFromEvents {
544                chain_id,
545                stream_id,
546                subscriber_app_id,
547                callback,
548            } => {
549                let key = (chain_id, stream_id.clone());
550                let subscriptions = self
551                    .state
552                    .system
553                    .event_subscriptions
554                    .get_mut_or_default(&key)
555                    .await?;
556                subscriptions.applications.remove(&subscriber_app_id);
557                if subscriptions.applications.is_empty() {
558                    self.state.system.event_subscriptions.remove(&key)?;
559                }
560                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
561                    self.txn_tracker
562                        .remove_stream_to_process(app_id, chain_id, stream_id);
563                }
564                callback.respond(());
565            }
566
567            GetApplicationPermissions { callback } => {
568                let app_permissions = self.state.system.application_permissions.get();
569                callback.respond(app_permissions.clone());
570            }
571
572            QueryServiceOracle {
573                deadline,
574                application_id,
575                next_block_height,
576                query,
577                callback,
578            } => {
579                let state = &mut self.state;
580                let local_time = self.txn_tracker.local_time();
581                let created_blobs = self.txn_tracker.created_blobs().clone();
582                let bytes = self
583                    .txn_tracker
584                    .oracle(|| async {
585                        let context = QueryContext {
586                            chain_id: state.context().extra().chain_id(),
587                            next_block_height,
588                            local_time,
589                        };
590                        let QueryOutcome {
591                            response,
592                            operations,
593                        } = Box::pin(state.query_user_application_with_deadline(
594                            application_id,
595                            context,
596                            query,
597                            deadline,
598                            created_blobs,
599                        ))
600                        .await?;
601                        ensure!(
602                            operations.is_empty(),
603                            ExecutionError::ServiceOracleQueryOperations(operations)
604                        );
605                        Ok(OracleResponse::Service(response))
606                    })
607                    .await?
608                    .to_service_response()?;
609                callback.respond(bytes);
610            }
611
612            AddOutgoingMessage { message, callback } => {
613                self.txn_tracker.add_outgoing_message(message);
614                callback.respond(());
615            }
616
617            SetLocalTime {
618                local_time,
619                callback,
620            } => {
621                self.txn_tracker.set_local_time(local_time);
622                callback.respond(());
623            }
624
625            AssertBefore {
626                timestamp,
627                callback,
628            } => {
629                let result = if !self
630                    .txn_tracker
631                    .replay_oracle_response(OracleResponse::Assert)?
632                {
633                    // There are no recorded oracle responses, so we check the local time.
634                    let local_time = self.txn_tracker.local_time();
635                    if local_time >= timestamp {
636                        Err(ExecutionError::AssertBefore {
637                            timestamp,
638                            local_time,
639                        })
640                    } else {
641                        Ok(())
642                    }
643                } else {
644                    Ok(())
645                };
646                callback.respond(result);
647            }
648
649            AddCreatedBlob { blob, callback } => {
650                self.txn_tracker.add_created_blob(blob);
651                callback.respond(());
652            }
653
654            ValidationRound { round, callback } => {
655                let validation_round = self
656                    .txn_tracker
657                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
658                    .await?
659                    .to_round()?;
660                callback.respond(validation_round);
661            }
662
663            AllowApplicationLogs { callback } => {
664                let allow = self
665                    .state
666                    .context()
667                    .extra()
668                    .execution_runtime_config()
669                    .allow_application_logs;
670                callback.respond(allow);
671            }
672
673            #[cfg(web)]
674            Log { message, level } => {
675                // Output directly to browser console with clean formatting
676                let formatted: js_sys::JsString = format!("[CONTRACT {level}] {message}").into();
677                match level {
678                    tracing::log::Level::Trace | tracing::log::Level::Debug => {
679                        web_sys::console::debug_1(&formatted)
680                    }
681                    tracing::log::Level::Info => web_sys::console::log_1(&formatted),
682                    tracing::log::Level::Warn => web_sys::console::warn_1(&formatted),
683                    tracing::log::Level::Error => web_sys::console::error_1(&formatted),
684                }
685            }
686        }
687
688        Ok(())
689    }
690
691    /// Calls `process_streams` for all applications that are subscribed to streams with new
692    /// events or that have new subscriptions.
693    async fn process_subscriptions(
694        &mut self,
695        context: ProcessStreamsContext,
696    ) -> Result<(), ExecutionError> {
697        // Keep track of which streams we have already processed. This is to guard against
698        // applications unsubscribing and subscribing in the process_streams call itself.
699        let mut processed = BTreeSet::new();
700        loop {
701            let to_process = self
702                .txn_tracker
703                .take_streams_to_process()
704                .into_iter()
705                .filter_map(|(app_id, updates)| {
706                    let updates = updates
707                        .into_iter()
708                        .filter_map(|update| {
709                            if !processed.insert((
710                                app_id,
711                                update.chain_id,
712                                update.stream_id.clone(),
713                            )) {
714                                return None;
715                            }
716                            Some(update)
717                        })
718                        .collect::<Vec<_>>();
719                    if updates.is_empty() {
720                        return None;
721                    }
722                    Some((app_id, updates))
723                })
724                .collect::<BTreeMap<_, _>>();
725            if to_process.is_empty() {
726                return Ok(());
727            }
728            for (app_id, updates) in to_process {
729                self.run_user_action(
730                    app_id,
731                    UserAction::ProcessStreams(context, updates),
732                    None,
733                    None,
734                )
735                .await?;
736            }
737        }
738    }
739
740    pub(crate) async fn run_user_action(
741        &mut self,
742        application_id: ApplicationId,
743        action: UserAction,
744        refund_grant_to: Option<Account>,
745        grant: Option<&mut Amount>,
746    ) -> Result<(), ExecutionError> {
747        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
748            .await
749    }
750
751    // TODO(#5034): unify with `contract_and_dependencies`
752    pub(crate) async fn service_and_dependencies(
753        &mut self,
754        application: ApplicationId,
755    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
756        // cyclic futures are illegal so we need to either box the frames or keep our own
757        // stack
758        let mut stack = vec![application];
759        let mut codes = vec![];
760        let mut descriptions = vec![];
761
762        while let Some(id) = stack.pop() {
763            let (code, description) = self.load_service(id).await?;
764            stack.extend(description.required_application_ids.iter().rev().copied());
765            codes.push(code);
766            descriptions.push(description);
767        }
768
769        codes.reverse();
770        descriptions.reverse();
771
772        Ok((codes, descriptions))
773    }
774
775    // TODO(#5034): unify with `service_and_dependencies`
776    async fn contract_and_dependencies(
777        &mut self,
778        application: ApplicationId,
779    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
780        // cyclic futures are illegal so we need to either box the frames or keep our own
781        // stack
782        let mut stack = vec![application];
783        let mut codes = vec![];
784        let mut descriptions = vec![];
785
786        while let Some(id) = stack.pop() {
787            let (code, description) = self.load_contract(id).await?;
788            stack.extend(description.required_application_ids.iter().rev().copied());
789            codes.push(code);
790            descriptions.push(description);
791        }
792
793        codes.reverse();
794        descriptions.reverse();
795
796        Ok((codes, descriptions))
797    }
798
799    async fn run_user_action_with_runtime(
800        &mut self,
801        application_id: ApplicationId,
802        action: UserAction,
803        refund_grant_to: Option<Account>,
804        grant: Option<&mut Amount>,
805    ) -> Result<(), ExecutionError> {
806        let chain_id = self.state.context().extra().chain_id();
807        let mut cloned_grant = grant.as_ref().map(|x| **x);
808        let initial_balance = self
809            .resource_controller
810            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
811            .await?
812            .balance()?;
813        let controller = ResourceController::new(
814            self.resource_controller.policy().clone(),
815            self.resource_controller.tracker,
816            initial_balance,
817        );
818        let (execution_state_sender, mut execution_state_receiver) =
819            futures::channel::mpsc::unbounded();
820
821        let (codes, descriptions): (Vec<_>, Vec<_>) =
822            self.contract_and_dependencies(application_id).await?;
823
824        let allow_application_logs = self
825            .state
826            .context()
827            .extra()
828            .execution_runtime_config()
829            .allow_application_logs;
830
831        let contract_runtime_task = self
832            .state
833            .context()
834            .extra()
835            .thread_pool()
836            .run_send(JsVec(codes), move |codes| async move {
837                let runtime = ContractSyncRuntime::new(
838                    execution_state_sender,
839                    chain_id,
840                    refund_grant_to,
841                    controller,
842                    &action,
843                    allow_application_logs,
844                );
845
846                for (code, description) in codes.0.into_iter().zip(descriptions) {
847                    runtime.preload_contract(
848                        ApplicationId::from(&description),
849                        code,
850                        description,
851                    )?;
852                }
853
854                runtime.run_action(application_id, chain_id, action)
855            })
856            .await;
857
858        while let Some(request) = execution_state_receiver.next().await {
859            self.handle_request(request).await?;
860        }
861
862        let (result, controller) = contract_runtime_task.await??;
863
864        self.txn_tracker.add_operation_result(result);
865
866        self.resource_controller
867            .with_state_and_grant(&mut self.state.system, grant)
868            .await?
869            .merge_balance(initial_balance, controller.balance()?)?;
870        self.resource_controller.tracker = controller.tracker;
871
872        Ok(())
873    }
874
875    pub async fn execute_operation(
876        &mut self,
877        context: OperationContext,
878        operation: Operation,
879    ) -> Result<(), ExecutionError> {
880        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
881        match operation {
882            Operation::System(op) => {
883                let new_application = self
884                    .state
885                    .system
886                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
887                    .await?;
888                if let Some((application_id, argument)) = new_application {
889                    let user_action = UserAction::Instantiate(context, argument);
890                    self.run_user_action(
891                        application_id,
892                        user_action,
893                        context.refund_grant_to(),
894                        None,
895                    )
896                    .await?;
897                }
898            }
899            Operation::User {
900                application_id,
901                bytes,
902            } => {
903                self.run_user_action(
904                    application_id,
905                    UserAction::Operation(context, bytes),
906                    context.refund_grant_to(),
907                    None,
908                )
909                .await?;
910            }
911        }
912        self.process_subscriptions(context.into()).await?;
913        Ok(())
914    }
915
916    pub async fn execute_message(
917        &mut self,
918        context: MessageContext,
919        message: Message,
920        grant: Option<&mut Amount>,
921    ) -> Result<(), ExecutionError> {
922        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
923        match message {
924            Message::System(message) => {
925                let outcome = self.state.system.execute_message(context, message).await?;
926                self.txn_tracker.add_outgoing_messages(outcome);
927            }
928            Message::User {
929                application_id,
930                bytes,
931            } => {
932                self.run_user_action(
933                    application_id,
934                    UserAction::Message(context, bytes),
935                    context.refund_grant_to,
936                    grant,
937                )
938                .await?;
939            }
940        }
941        self.process_subscriptions(context.into()).await?;
942        Ok(())
943    }
944
945    pub fn bounce_message(
946        &mut self,
947        context: MessageContext,
948        grant: Amount,
949        message: Message,
950    ) -> Result<(), ExecutionError> {
951        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
952        self.txn_tracker.add_outgoing_message(OutgoingMessage {
953            destination: context.origin,
954            authenticated_signer: context.authenticated_signer,
955            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
956            grant,
957            kind: MessageKind::Bouncing,
958            message,
959        });
960        Ok(())
961    }
962
963    pub fn send_refund(
964        &mut self,
965        context: MessageContext,
966        amount: Amount,
967    ) -> Result<(), ExecutionError> {
968        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
969        if amount.is_zero() {
970            return Ok(());
971        }
972        let Some(account) = context.refund_grant_to else {
973            return Err(ExecutionError::InternalError(
974                "Messages with grants should have a non-empty `refund_grant_to`",
975            ));
976        };
977        let message = SystemMessage::Credit {
978            amount,
979            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
980            target: account.owner,
981        };
982        self.txn_tracker.add_outgoing_message(
983            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
984        );
985        Ok(())
986    }
987
988    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
989    ///
990    /// Ensures that the response does not exceed the provided `size_limit`.
991    async fn receive_http_response(
992        response: reqwest::Response,
993        size_limit: u64,
994    ) -> Result<http::Response, ExecutionError> {
995        let status = response.status().as_u16();
996        let maybe_content_length = response.content_length();
997
998        let headers = response
999            .headers()
1000            .iter()
1001            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1002            .collect::<Vec<_>>();
1003
1004        let total_header_size = headers
1005            .iter()
1006            .map(|header| (header.name.len() + header.value.len()) as u64)
1007            .sum();
1008
1009        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1010            ExecutionError::HttpResponseSizeLimitExceeded {
1011                limit: size_limit,
1012                size: total_header_size,
1013            },
1014        )?;
1015
1016        if let Some(content_length) = maybe_content_length {
1017            if content_length > remaining_bytes {
1018                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1019                    limit: size_limit,
1020                    size: content_length + total_header_size,
1021                });
1022            }
1023        }
1024
1025        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1026        let mut body_stream = response.bytes_stream();
1027
1028        while let Some(bytes) = body_stream.next().await.transpose()? {
1029            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1030                ExecutionError::HttpResponseSizeLimitExceeded {
1031                    limit: size_limit,
1032                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1033                },
1034            )?;
1035
1036            body.extend(&bytes);
1037        }
1038
1039        Ok(http::Response {
1040            status,
1041            headers,
1042            body,
1043        })
1044    }
1045}
1046
1047/// Requests to the execution state.
1048#[derive(Debug)]
1049pub enum ExecutionRequest {
1050    #[cfg(not(web))]
1051    LoadContract {
1052        id: ApplicationId,
1053        #[debug(skip)]
1054        callback: Sender<(UserContractCode, ApplicationDescription)>,
1055    },
1056
1057    #[cfg(not(web))]
1058    LoadService {
1059        id: ApplicationId,
1060        #[debug(skip)]
1061        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1062    },
1063
1064    ChainBalance {
1065        #[debug(skip)]
1066        callback: Sender<Amount>,
1067    },
1068
1069    OwnerBalance {
1070        owner: AccountOwner,
1071        #[debug(skip)]
1072        callback: Sender<Amount>,
1073    },
1074
1075    OwnerBalances {
1076        #[debug(skip)]
1077        callback: Sender<Vec<(AccountOwner, Amount)>>,
1078    },
1079
1080    BalanceOwners {
1081        #[debug(skip)]
1082        callback: Sender<Vec<AccountOwner>>,
1083    },
1084
1085    Transfer {
1086        source: AccountOwner,
1087        destination: Account,
1088        amount: Amount,
1089        #[debug(skip_if = Option::is_none)]
1090        signer: Option<AccountOwner>,
1091        application_id: ApplicationId,
1092        #[debug(skip)]
1093        callback: Sender<()>,
1094    },
1095
1096    Claim {
1097        source: Account,
1098        destination: Account,
1099        amount: Amount,
1100        #[debug(skip_if = Option::is_none)]
1101        signer: Option<AccountOwner>,
1102        application_id: ApplicationId,
1103        #[debug(skip)]
1104        callback: Sender<()>,
1105    },
1106
1107    SystemTimestamp {
1108        #[debug(skip)]
1109        callback: Sender<Timestamp>,
1110    },
1111
1112    ChainOwnership {
1113        #[debug(skip)]
1114        callback: Sender<ChainOwnership>,
1115    },
1116
1117    ReadValueBytes {
1118        id: ApplicationId,
1119        #[debug(with = hex_debug)]
1120        key: Vec<u8>,
1121        #[debug(skip)]
1122        callback: Sender<Option<Vec<u8>>>,
1123    },
1124
1125    ContainsKey {
1126        id: ApplicationId,
1127        key: Vec<u8>,
1128        #[debug(skip)]
1129        callback: Sender<bool>,
1130    },
1131
1132    ContainsKeys {
1133        id: ApplicationId,
1134        #[debug(with = hex_vec_debug)]
1135        keys: Vec<Vec<u8>>,
1136        callback: Sender<Vec<bool>>,
1137    },
1138
1139    ReadMultiValuesBytes {
1140        id: ApplicationId,
1141        #[debug(with = hex_vec_debug)]
1142        keys: Vec<Vec<u8>>,
1143        #[debug(skip)]
1144        callback: Sender<Vec<Option<Vec<u8>>>>,
1145    },
1146
1147    FindKeysByPrefix {
1148        id: ApplicationId,
1149        #[debug(with = hex_debug)]
1150        key_prefix: Vec<u8>,
1151        #[debug(skip)]
1152        callback: Sender<Vec<Vec<u8>>>,
1153    },
1154
1155    FindKeyValuesByPrefix {
1156        id: ApplicationId,
1157        #[debug(with = hex_debug)]
1158        key_prefix: Vec<u8>,
1159        #[debug(skip)]
1160        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1161    },
1162
1163    WriteBatch {
1164        id: ApplicationId,
1165        batch: Batch,
1166        #[debug(skip)]
1167        callback: Sender<()>,
1168    },
1169
1170    OpenChain {
1171        ownership: ChainOwnership,
1172        #[debug(skip_if = Amount::is_zero)]
1173        balance: Amount,
1174        parent_id: ChainId,
1175        block_height: BlockHeight,
1176        application_permissions: ApplicationPermissions,
1177        timestamp: Timestamp,
1178        #[debug(skip)]
1179        callback: Sender<ChainId>,
1180    },
1181
1182    CloseChain {
1183        application_id: ApplicationId,
1184        #[debug(skip)]
1185        callback: Sender<Result<(), ExecutionError>>,
1186    },
1187
1188    ChangeApplicationPermissions {
1189        application_id: ApplicationId,
1190        application_permissions: ApplicationPermissions,
1191        #[debug(skip)]
1192        callback: Sender<Result<(), ExecutionError>>,
1193    },
1194
1195    CreateApplication {
1196        chain_id: ChainId,
1197        block_height: BlockHeight,
1198        module_id: ModuleId,
1199        parameters: Vec<u8>,
1200        required_application_ids: Vec<ApplicationId>,
1201        #[debug(skip)]
1202        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1203    },
1204
1205    PerformHttpRequest {
1206        request: http::Request,
1207        http_responses_are_oracle_responses: bool,
1208        #[debug(skip)]
1209        callback: Sender<http::Response>,
1210    },
1211
1212    ReadBlobContent {
1213        blob_id: BlobId,
1214        #[debug(skip)]
1215        callback: Sender<BlobContent>,
1216    },
1217
1218    AssertBlobExists {
1219        blob_id: BlobId,
1220        #[debug(skip)]
1221        callback: Sender<()>,
1222    },
1223
1224    Emit {
1225        stream_id: StreamId,
1226        #[debug(with = hex_debug)]
1227        value: Vec<u8>,
1228        #[debug(skip)]
1229        callback: Sender<u32>,
1230    },
1231
1232    ReadEvent {
1233        event_id: EventId,
1234        callback: oneshot::Sender<Vec<u8>>,
1235    },
1236
1237    SubscribeToEvents {
1238        chain_id: ChainId,
1239        stream_id: StreamId,
1240        subscriber_app_id: ApplicationId,
1241        #[debug(skip)]
1242        callback: Sender<()>,
1243    },
1244
1245    UnsubscribeFromEvents {
1246        chain_id: ChainId,
1247        stream_id: StreamId,
1248        subscriber_app_id: ApplicationId,
1249        #[debug(skip)]
1250        callback: Sender<()>,
1251    },
1252
1253    GetApplicationPermissions {
1254        #[debug(skip)]
1255        callback: Sender<ApplicationPermissions>,
1256    },
1257
1258    QueryServiceOracle {
1259        deadline: Option<Instant>,
1260        application_id: ApplicationId,
1261        next_block_height: BlockHeight,
1262        query: Vec<u8>,
1263        #[debug(skip)]
1264        callback: Sender<Vec<u8>>,
1265    },
1266
1267    AddOutgoingMessage {
1268        message: crate::OutgoingMessage,
1269        #[debug(skip)]
1270        callback: Sender<()>,
1271    },
1272
1273    SetLocalTime {
1274        local_time: Timestamp,
1275        #[debug(skip)]
1276        callback: Sender<()>,
1277    },
1278
1279    AssertBefore {
1280        timestamp: Timestamp,
1281        #[debug(skip)]
1282        callback: Sender<Result<(), ExecutionError>>,
1283    },
1284
1285    AddCreatedBlob {
1286        blob: crate::Blob,
1287        #[debug(skip)]
1288        callback: Sender<()>,
1289    },
1290
1291    ValidationRound {
1292        round: Option<u32>,
1293        #[debug(skip)]
1294        callback: Sender<Option<u32>>,
1295    },
1296
1297    AllowApplicationLogs {
1298        #[debug(skip)]
1299        callback: Sender<bool>,
1300    },
1301
1302    /// Log message from contract execution (fire-and-forget, no callback needed).
1303    #[cfg(web)]
1304    Log {
1305        message: String,
1306        level: tracing::log::Level,
1307    },
1308}