Skip to main content

linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15        Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19    ownership::ChainOwnership,
20    time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25use tracing::{info_span, instrument, Instrument as _};
26
27use crate::{
28    execution::UserAction,
29    runtime::ContractSyncRuntime,
30    system::{CreateApplicationResult, OpenChainConfig},
31    util::{OracleResponseExt as _, RespondExt as _},
32    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
33    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
34    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
35    ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
36};
37
38/// Actor for handling requests to the execution state.
39pub struct ExecutionStateActor<'a, C> {
40    state: &'a mut ExecutionStateView<C>,
41    txn_tracker: &'a mut TransactionTracker,
42    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
43}
44
45#[cfg(with_metrics)]
46mod metrics {
47    use std::sync::LazyLock;
48
49    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
50    use prometheus::HistogramVec;
51
52    /// Histogram of the latency to load a contract bytecode.
53    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54        register_histogram_vec(
55            "load_contract_latency",
56            "Load contract latency",
57            &[],
58            exponential_bucket_latencies(250.0),
59        )
60    });
61
62    /// Histogram of the latency to load a service bytecode.
63    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
64        register_histogram_vec(
65            "load_service_latency",
66            "Load service latency",
67            &[],
68            exponential_bucket_latencies(250.0),
69        )
70    });
71}
72
73pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
74
75impl<'a, C> ExecutionStateActor<'a, C>
76where
77    C: Context + Clone + 'static,
78    C::Extra: ExecutionRuntimeContext,
79{
80    /// Creates a new execution state actor.
81    pub fn new(
82        state: &'a mut ExecutionStateView<C>,
83        txn_tracker: &'a mut TransactionTracker,
84        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
85    ) -> Self {
86        Self {
87            state,
88            txn_tracker,
89            resource_controller,
90        }
91    }
92
93    #[instrument(skip_all, fields(application_id = %id))]
94    pub(crate) async fn load_contract(
95        &mut self,
96        id: ApplicationId,
97    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
98        #[cfg(with_metrics)]
99        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
100        let blob_id = id.description_blob_id();
101        let description = match self.txn_tracker.get_blob_content(&blob_id) {
102            Some(blob) => bcs::from_bytes(blob.bytes())?,
103            None => {
104                self.state
105                    .system
106                    .describe_application(id, self.txn_tracker)
107                    .await?
108            }
109        };
110        let code = self
111            .state
112            .context()
113            .extra()
114            .get_user_contract(&description, self.txn_tracker)
115            .await?;
116        Ok((code, description))
117    }
118
119    pub(crate) async fn load_service(
120        &mut self,
121        id: ApplicationId,
122    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
123        #[cfg(with_metrics)]
124        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
125        let blob_id = id.description_blob_id();
126        let description = match self.txn_tracker.get_blob_content(&blob_id) {
127            Some(blob) => bcs::from_bytes(blob.bytes())?,
128            None => {
129                self.state
130                    .system
131                    .describe_application(id, self.txn_tracker)
132                    .await?
133            }
134        };
135        let code = self
136            .state
137            .context()
138            .extra()
139            .get_user_service(&description, self.txn_tracker)
140            .await?;
141        Ok((code, description))
142    }
143
144    // TODO(#1416): Support concurrent I/O.
145    #[instrument(
146        skip_all,
147        fields(request_type = %request.as_ref())
148    )]
149    pub(crate) async fn handle_request(
150        &mut self,
151        request: ExecutionRequest,
152    ) -> Result<(), ExecutionError> {
153        use ExecutionRequest::*;
154        match request {
155            #[cfg(not(web))]
156            LoadContract { id, callback } => {
157                let (code, description) = self.load_contract(id).await?;
158                callback.respond((code, description))
159            }
160            #[cfg(not(web))]
161            LoadService { id, callback } => {
162                let (code, description) = self.load_service(id).await?;
163                callback.respond((code, description))
164            }
165
166            ChainBalance { callback } => {
167                let balance = *self.state.system.balance.get();
168                callback.respond(balance);
169            }
170
171            OwnerBalance { owner, callback } => {
172                let balance = self
173                    .state
174                    .system
175                    .balances
176                    .get(&owner)
177                    .await?
178                    .unwrap_or_default();
179                callback.respond(balance);
180            }
181
182            OwnerBalances { callback } => {
183                let balances = self.state.system.balances.index_values().await?;
184                callback.respond(balances.into_iter().collect());
185            }
186
187            BalanceOwners { callback } => {
188                let owners = self.state.system.balances.indices().await?;
189                callback.respond(owners);
190            }
191
192            Transfer {
193                source,
194                destination,
195                amount,
196                signer,
197                application_id,
198                callback,
199            } => {
200                let maybe_message = self
201                    .state
202                    .system
203                    .transfer(signer, Some(application_id), source, destination, amount)
204                    .await?;
205                self.txn_tracker.add_outgoing_messages(maybe_message);
206                callback.respond(());
207            }
208
209            Claim {
210                source,
211                destination,
212                amount,
213                signer,
214                application_id,
215                callback,
216            } => {
217                let maybe_message = self
218                    .state
219                    .system
220                    .claim(
221                        signer,
222                        Some(application_id),
223                        source.owner,
224                        source.chain_id,
225                        destination,
226                        amount,
227                    )
228                    .await?;
229                self.txn_tracker.add_outgoing_messages(maybe_message);
230                callback.respond(());
231            }
232
233            SystemTimestamp { callback } => {
234                let timestamp = *self.state.system.timestamp.get();
235                callback.respond(timestamp);
236            }
237
238            ChainOwnership { callback } => {
239                let ownership = self.state.system.ownership.get().clone();
240                callback.respond(ownership);
241            }
242
243            ApplicationPermissions { callback } => {
244                let permissions = self.state.system.application_permissions.get().clone();
245                callback.respond(permissions);
246            }
247
248            ReadApplicationDescription {
249                application_id,
250                callback,
251            } => {
252                let blob_id = application_id.description_blob_id();
253                let description = match self.txn_tracker.get_blob_content(&blob_id) {
254                    Some(blob) => bcs::from_bytes(blob.bytes())?,
255                    None => {
256                        let blob_content = self.state.system.read_blob_content(blob_id).await?;
257                        self.state
258                            .system
259                            .blob_used(self.txn_tracker, blob_id)
260                            .await?;
261                        bcs::from_bytes(blob_content.bytes())?
262                    }
263                };
264                callback.respond(description);
265            }
266
267            ContainsKey { id, key, callback } => {
268                let view = self.state.users.try_load_entry(&id).await?;
269                let result = match view {
270                    Some(view) => view.contains_key(&key).await?,
271                    None => false,
272                };
273                callback.respond(result);
274            }
275
276            ContainsKeys { id, keys, callback } => {
277                let view = self.state.users.try_load_entry(&id).await?;
278                let result = match view {
279                    Some(view) => view.contains_keys(&keys).await?,
280                    None => vec![false; keys.len()],
281                };
282                callback.respond(result);
283            }
284
285            ReadMultiValuesBytes { id, keys, callback } => {
286                let view = self.state.users.try_load_entry(&id).await?;
287                let values = match view {
288                    Some(view) => view.multi_get(&keys).await?,
289                    None => vec![None; keys.len()],
290                };
291                callback.respond(values);
292            }
293
294            ReadValueBytes { id, key, callback } => {
295                let view = self.state.users.try_load_entry(&id).await?;
296                let result = match view {
297                    Some(view) => view.get(&key).await?,
298                    None => None,
299                };
300                callback.respond(result);
301            }
302
303            FindKeysByPrefix {
304                id,
305                key_prefix,
306                callback,
307            } => {
308                let view = self.state.users.try_load_entry(&id).await?;
309                let result = match view {
310                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
311                    None => Vec::new(),
312                };
313                callback.respond(result);
314            }
315
316            FindKeyValuesByPrefix {
317                id,
318                key_prefix,
319                callback,
320            } => {
321                let view = self.state.users.try_load_entry(&id).await?;
322                let result = match view {
323                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
324                    None => Vec::new(),
325                };
326                callback.respond(result);
327            }
328
329            WriteBatch {
330                id,
331                batch,
332                callback,
333            } => {
334                let mut view = self.state.users.try_load_entry_mut(&id).await?;
335                view.write_batch(batch).await?;
336                callback.respond(());
337            }
338
339            OpenChain {
340                ownership,
341                balance,
342                parent_id,
343                block_height,
344                application_permissions,
345                timestamp,
346                callback,
347            } => {
348                let config = OpenChainConfig {
349                    ownership,
350                    balance,
351                    application_permissions,
352                };
353                let chain_id = self
354                    .state
355                    .system
356                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
357                    .await?;
358                callback.respond(chain_id);
359            }
360
361            CloseChain {
362                application_id,
363                callback,
364            } => {
365                let app_permissions = self.state.system.application_permissions.get();
366                if !app_permissions.can_close_chain(&application_id) {
367                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
368                } else {
369                    self.state.system.close_chain().await?;
370                    callback.respond(Ok(()));
371                }
372            }
373
374            ChangeOwnership {
375                application_id,
376                ownership,
377                callback,
378            } => {
379                let app_permissions = self.state.system.application_permissions.get();
380                if !app_permissions.can_close_chain(&application_id) {
381                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
382                } else {
383                    self.state.system.ownership.set(ownership);
384                    callback.respond(Ok(()));
385                }
386            }
387
388            ChangeApplicationPermissions {
389                application_id,
390                application_permissions,
391                callback,
392            } => {
393                let app_permissions = self.state.system.application_permissions.get();
394                if !app_permissions.can_change_application_permissions(&application_id) {
395                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
396                } else {
397                    self.state
398                        .system
399                        .application_permissions
400                        .set(application_permissions);
401                    callback.respond(Ok(()));
402                }
403            }
404
405            CreateApplication {
406                chain_id,
407                block_height,
408                module_id,
409                parameters,
410                required_application_ids,
411                callback,
412            } => {
413                let create_application_result = self
414                    .state
415                    .system
416                    .create_application(
417                        chain_id,
418                        block_height,
419                        module_id,
420                        parameters,
421                        required_application_ids,
422                        self.txn_tracker,
423                    )
424                    .await?;
425                callback.respond(Ok(create_application_result));
426            }
427
428            PerformHttpRequest {
429                request,
430                http_responses_are_oracle_responses,
431                callback,
432            } => {
433                let system = &mut self.state.system;
434                let response = self
435                    .txn_tracker
436                    .oracle(|| async {
437                        let headers = request
438                            .headers
439                            .into_iter()
440                            .map(|http::Header { name, value }| {
441                                Ok((name.parse()?, value.try_into()?))
442                            })
443                            .collect::<Result<HeaderMap, ExecutionError>>()?;
444
445                        let url = Url::parse(&request.url)?;
446                        let host = url
447                            .host_str()
448                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
449
450                        let (_epoch, committee) = system
451                            .current_committee()
452                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
453                        let allowed_hosts = &committee.policy().http_request_allow_list;
454
455                        ensure!(
456                            allowed_hosts.contains(host),
457                            ExecutionError::UnauthorizedHttpRequest(url)
458                        );
459
460                        let request = Client::new()
461                            .request(request.method.into(), url)
462                            .body(request.body)
463                            .headers(headers);
464                        #[cfg(not(web))]
465                        let request = request.timeout(linera_base::time::Duration::from_millis(
466                            committee.policy().http_request_timeout_ms,
467                        ));
468
469                        let response = request.send().await?;
470
471                        let mut response_size_limit =
472                            committee.policy().maximum_http_response_bytes;
473
474                        if http_responses_are_oracle_responses {
475                            response_size_limit = response_size_limit
476                                .min(committee.policy().maximum_oracle_response_bytes);
477                        }
478                        Ok(OracleResponse::Http(
479                            Self::receive_http_response(response, response_size_limit).await?,
480                        ))
481                    })
482                    .await?
483                    .to_http_response()?;
484                callback.respond(response);
485            }
486
487            ReadBlobContent { blob_id, callback } => {
488                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
489                    content.clone()
490                } else {
491                    let content = self.state.system.read_blob_content(blob_id).await?;
492                    if blob_id.blob_type == BlobType::Data {
493                        self.resource_controller
494                            .with_state(&mut self.state.system)
495                            .await?
496                            .track_blob_read(content.bytes().len() as u64)?;
497                    }
498                    self.state
499                        .system
500                        .blob_used(self.txn_tracker, blob_id)
501                        .await?;
502                    content
503                };
504                callback.respond(content)
505            }
506
507            AssertBlobExists { blob_id, callback } => {
508                self.state.system.assert_blob_exists(blob_id).await?;
509                // Treating this as reading a size-0 blob for fee purposes.
510                if blob_id.blob_type == BlobType::Data {
511                    self.resource_controller
512                        .with_state(&mut self.state.system)
513                        .await?
514                        .track_blob_read(0)?;
515                }
516                let is_new = self
517                    .state
518                    .system
519                    .blob_used(self.txn_tracker, blob_id)
520                    .await?;
521                if is_new {
522                    self.txn_tracker
523                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
524                }
525                callback.respond(());
526            }
527
528            Emit {
529                stream_id,
530                value,
531                callback,
532            } => {
533                let count = self
534                    .state
535                    .stream_event_counts
536                    .get_mut_or_default(&stream_id)
537                    .await?;
538                let index = *count;
539                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
540                self.txn_tracker.add_event(stream_id, index, value);
541                callback.respond(index)
542            }
543
544            ReadEvent { event_id, callback } => {
545                let extra = self.state.context().extra();
546                let event = self
547                    .txn_tracker
548                    .oracle(|| async {
549                        let event = extra
550                            .get_event(event_id.clone())
551                            .await?
552                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
553                        Ok(OracleResponse::Event(event_id.clone(), event))
554                    })
555                    .await?
556                    .to_event(&event_id)?;
557                callback.respond(event);
558            }
559
560            SubscribeToEvents {
561                chain_id,
562                stream_id,
563                subscriber_app_id,
564                callback,
565            } => {
566                let subscriptions = self
567                    .state
568                    .system
569                    .event_subscriptions
570                    .get_mut_or_default(&(chain_id, stream_id.clone()))
571                    .await?;
572                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
573                    subscriptions.next_index
574                } else {
575                    0
576                };
577                self.txn_tracker.add_stream_to_process(
578                    subscriber_app_id,
579                    chain_id,
580                    stream_id,
581                    0,
582                    next_index,
583                );
584                callback.respond(());
585            }
586
587            UnsubscribeFromEvents {
588                chain_id,
589                stream_id,
590                subscriber_app_id,
591                callback,
592            } => {
593                let key = (chain_id, stream_id.clone());
594                let subscriptions = self
595                    .state
596                    .system
597                    .event_subscriptions
598                    .get_mut_or_default(&key)
599                    .await?;
600                subscriptions.applications.remove(&subscriber_app_id);
601                if subscriptions.applications.is_empty() {
602                    self.state.system.event_subscriptions.remove(&key)?;
603                }
604                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
605                    self.txn_tracker
606                        .remove_stream_to_process(app_id, chain_id, stream_id);
607                }
608                callback.respond(());
609            }
610
611            GetApplicationPermissions { callback } => {
612                let app_permissions = self.state.system.application_permissions.get();
613                callback.respond(app_permissions.clone());
614            }
615
616            QueryServiceOracle {
617                deadline,
618                application_id,
619                next_block_height,
620                query,
621                callback,
622            } => {
623                let state = &mut self.state;
624                let local_time = self.txn_tracker.local_time();
625                let created_blobs = self.txn_tracker.created_blobs().clone();
626                let bytes = self
627                    .txn_tracker
628                    .oracle(|| async {
629                        let context = QueryContext {
630                            chain_id: state.context().extra().chain_id(),
631                            next_block_height,
632                            local_time,
633                        };
634                        let QueryOutcome {
635                            response,
636                            operations,
637                        } = Box::pin(state.query_user_application_with_deadline(
638                            application_id,
639                            context,
640                            query,
641                            deadline,
642                            created_blobs,
643                        ))
644                        .await?;
645                        ensure!(
646                            operations.is_empty(),
647                            ExecutionError::ServiceOracleQueryOperations(operations)
648                        );
649                        Ok(OracleResponse::Service(response))
650                    })
651                    .await?
652                    .to_service_response()?;
653                callback.respond(bytes);
654            }
655
656            AddOutgoingMessage { message, callback } => {
657                self.txn_tracker.add_outgoing_message(message);
658                callback.respond(());
659            }
660
661            SetLocalTime {
662                local_time,
663                callback,
664            } => {
665                self.txn_tracker.set_local_time(local_time);
666                callback.respond(());
667            }
668
669            AssertBefore {
670                timestamp,
671                callback,
672            } => {
673                let result = if !self
674                    .txn_tracker
675                    .replay_oracle_response(OracleResponse::Assert)?
676                {
677                    // There are no recorded oracle responses, so we check the local time.
678                    let local_time = self.txn_tracker.local_time();
679                    if local_time >= timestamp {
680                        Err(ExecutionError::AssertBefore {
681                            timestamp,
682                            local_time,
683                        })
684                    } else {
685                        Ok(())
686                    }
687                } else {
688                    Ok(())
689                };
690                callback.respond(result);
691            }
692
693            AddCreatedBlob { blob, callback } => {
694                if self.resource_controller.is_free {
695                    self.txn_tracker.mark_blob_free(blob.id());
696                }
697                self.txn_tracker.add_created_blob(blob);
698                callback.respond(());
699            }
700
701            ValidationRound { round, callback } => {
702                let validation_round = self
703                    .txn_tracker
704                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
705                    .await?
706                    .to_round()?;
707                callback.respond(validation_round);
708            }
709
710            AllowApplicationLogs { callback } => {
711                let allow = self
712                    .state
713                    .context()
714                    .extra()
715                    .execution_runtime_config()
716                    .allow_application_logs;
717                callback.respond(allow);
718            }
719
720            #[cfg(web)]
721            Log { message, level } => match level {
722                tracing::log::Level::Trace | tracing::log::Level::Debug => {
723                    tracing::debug!(target: "user_application_log", message = %message);
724                }
725                tracing::log::Level::Info => {
726                    tracing::info!(target: "user_application_log", message = %message);
727                }
728                tracing::log::Level::Warn => {
729                    tracing::warn!(target: "user_application_log", message = %message);
730                }
731                tracing::log::Level::Error => {
732                    tracing::error!(target: "user_application_log", message = %message);
733                }
734            },
735        }
736
737        Ok(())
738    }
739
740    /// Calls `process_streams` for all applications that are subscribed to streams with new
741    /// events or that have new subscriptions.
742    #[instrument(skip_all)]
743    async fn process_subscriptions(
744        &mut self,
745        context: ProcessStreamsContext,
746    ) -> Result<(), ExecutionError> {
747        // Keep track of which streams we have already processed. This is to guard against
748        // applications unsubscribing and subscribing in the process_streams call itself.
749        let mut processed = BTreeSet::new();
750        loop {
751            let to_process = self
752                .txn_tracker
753                .take_streams_to_process()
754                .into_iter()
755                .filter_map(|(app_id, updates)| {
756                    let updates = updates
757                        .into_iter()
758                        .filter_map(|update| {
759                            if !processed.insert((
760                                app_id,
761                                update.chain_id,
762                                update.stream_id.clone(),
763                            )) {
764                                return None;
765                            }
766                            Some(update)
767                        })
768                        .collect::<Vec<_>>();
769                    if updates.is_empty() {
770                        return None;
771                    }
772                    Some((app_id, updates))
773                })
774                .collect::<BTreeMap<_, _>>();
775            if to_process.is_empty() {
776                return Ok(());
777            }
778            for (app_id, updates) in to_process {
779                self.run_user_action(
780                    app_id,
781                    UserAction::ProcessStreams(context, updates),
782                    None,
783                    None,
784                )
785                .await?;
786            }
787        }
788    }
789
790    pub(crate) async fn run_user_action(
791        &mut self,
792        application_id: ApplicationId,
793        action: UserAction,
794        refund_grant_to: Option<Account>,
795        grant: Option<&mut Amount>,
796    ) -> Result<(), ExecutionError> {
797        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
798            .await
799    }
800
801    // TODO(#5034): unify with `contract_and_dependencies`
802    pub(crate) async fn service_and_dependencies(
803        &mut self,
804        application: ApplicationId,
805    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
806        // cyclic futures are illegal so we need to either box the frames or keep our own
807        // stack
808        let mut stack = vec![application];
809        let mut codes = vec![];
810        let mut descriptions = vec![];
811
812        while let Some(id) = stack.pop() {
813            let (code, description) = self.load_service(id).await?;
814            stack.extend(description.required_application_ids.iter().rev().copied());
815            codes.push(code);
816            descriptions.push(description);
817        }
818
819        codes.reverse();
820        descriptions.reverse();
821
822        Ok((codes, descriptions))
823    }
824
825    // TODO(#5034): unify with `service_and_dependencies`
826    #[instrument(skip_all, fields(application_id = %application))]
827    async fn contract_and_dependencies(
828        &mut self,
829        application: ApplicationId,
830    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
831        // cyclic futures are illegal so we need to either box the frames or keep our own
832        // stack
833        let mut stack = vec![application];
834        let mut codes = vec![];
835        let mut descriptions = vec![];
836
837        while let Some(id) = stack.pop() {
838            let (code, description) = self.load_contract(id).await?;
839            stack.extend(description.required_application_ids.iter().rev().copied());
840            codes.push(code);
841            descriptions.push(description);
842        }
843
844        codes.reverse();
845        descriptions.reverse();
846
847        Ok((codes, descriptions))
848    }
849
850    #[instrument(skip_all, fields(application_id = %application_id))]
851    async fn run_user_action_with_runtime(
852        &mut self,
853        application_id: ApplicationId,
854        action: UserAction,
855        refund_grant_to: Option<Account>,
856        grant: Option<&mut Amount>,
857    ) -> Result<(), ExecutionError> {
858        let chain_id = self.state.context().extra().chain_id();
859        let mut cloned_grant = grant.as_ref().map(|x| **x);
860        let initial_balance = self
861            .resource_controller
862            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
863            .await?
864            .balance()?;
865        let mut controller = ResourceController::new(
866            self.resource_controller.policy().clone(),
867            self.resource_controller.tracker,
868            initial_balance,
869        );
870        let is_free = matches!(
871            &action,
872            UserAction::Message(..) | UserAction::ProcessStreams(..)
873        ) && self
874            .resource_controller
875            .policy()
876            .is_free_app(&application_id);
877        controller.is_free = is_free;
878        self.resource_controller.is_free = is_free;
879        let (execution_state_sender, mut execution_state_receiver) =
880            futures::channel::mpsc::unbounded();
881
882        let (codes, descriptions): (Vec<_>, Vec<_>) =
883            self.contract_and_dependencies(application_id).await?;
884
885        let allow_application_logs = self
886            .state
887            .context()
888            .extra()
889            .execution_runtime_config()
890            .allow_application_logs;
891
892        let contract_runtime_task = self
893            .state
894            .context()
895            .extra()
896            .thread_pool()
897            .run_send(JsVec(codes), move |codes| async move {
898                let runtime = ContractSyncRuntime::new(
899                    execution_state_sender,
900                    chain_id,
901                    refund_grant_to,
902                    controller,
903                    &action,
904                    allow_application_logs,
905                );
906
907                for (code, description) in codes.0.into_iter().zip(descriptions) {
908                    runtime.preload_contract(
909                        ApplicationId::from(&description),
910                        code,
911                        description,
912                    )?;
913                }
914
915                runtime.run_action(application_id, chain_id, action)
916            })
917            .await;
918
919        async {
920            while let Some(request) = execution_state_receiver.next().await {
921                self.handle_request(request).await?;
922            }
923            Ok::<(), ExecutionError>(())
924        }
925        .instrument(info_span!("handle_runtime_requests"))
926        .await?;
927
928        let (result, controller) = contract_runtime_task.await??;
929
930        self.resource_controller.is_free = false;
931
932        self.txn_tracker.add_operation_result(result);
933
934        self.resource_controller
935            .with_state_and_grant(&mut self.state.system, grant)
936            .await?
937            .merge_balance(initial_balance, controller.balance()?)?;
938        self.resource_controller.tracker = controller.tracker;
939
940        Ok(())
941    }
942
943    #[instrument(skip_all, fields(
944        chain_id = %context.chain_id,
945        block_height = %context.height,
946        operation_type = %operation.as_ref(),
947    ))]
948    pub async fn execute_operation(
949        &mut self,
950        context: OperationContext,
951        operation: Operation,
952    ) -> Result<(), ExecutionError> {
953        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
954        match operation {
955            Operation::System(op) => {
956                let new_application = self
957                    .state
958                    .system
959                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
960                    .await?;
961                if let Some((application_id, argument)) = new_application {
962                    let user_action = UserAction::Instantiate(context, argument);
963                    self.run_user_action(
964                        application_id,
965                        user_action,
966                        context.refund_grant_to(),
967                        None,
968                    )
969                    .await?;
970                }
971            }
972            Operation::User {
973                application_id,
974                bytes,
975            } => {
976                self.run_user_action(
977                    application_id,
978                    UserAction::Operation(context, bytes),
979                    context.refund_grant_to(),
980                    None,
981                )
982                .await?;
983            }
984        }
985        self.process_subscriptions(context.into()).await?;
986        Ok(())
987    }
988
989    #[instrument(skip_all, fields(
990        chain_id = %context.chain_id,
991        block_height = %context.height,
992        origin = %context.origin,
993        is_bouncing = %context.is_bouncing,
994        message_type = %message.as_ref(),
995    ))]
996    pub async fn execute_message(
997        &mut self,
998        context: MessageContext,
999        message: Message,
1000        grant: Option<&mut Amount>,
1001    ) -> Result<(), ExecutionError> {
1002        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1003        match message {
1004            Message::System(message) => {
1005                let outcome = self.state.system.execute_message(context, message).await?;
1006                self.txn_tracker.add_outgoing_messages(outcome);
1007            }
1008            Message::User {
1009                application_id,
1010                bytes,
1011            } => {
1012                self.run_user_action(
1013                    application_id,
1014                    UserAction::Message(context, bytes),
1015                    context.refund_grant_to,
1016                    grant,
1017                )
1018                .await?;
1019            }
1020        }
1021        self.process_subscriptions(context.into()).await?;
1022        Ok(())
1023    }
1024
1025    pub fn bounce_message(
1026        &mut self,
1027        context: MessageContext,
1028        grant: Amount,
1029        message: Message,
1030    ) -> Result<(), ExecutionError> {
1031        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1032        self.txn_tracker.add_outgoing_message(OutgoingMessage {
1033            destination: context.origin,
1034            authenticated_signer: context.authenticated_signer,
1035            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1036            grant,
1037            kind: MessageKind::Bouncing,
1038            message,
1039        });
1040        Ok(())
1041    }
1042
1043    pub fn send_refund(
1044        &mut self,
1045        context: MessageContext,
1046        amount: Amount,
1047    ) -> Result<(), ExecutionError> {
1048        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1049        if amount.is_zero() {
1050            return Ok(());
1051        }
1052        let Some(account) = context.refund_grant_to else {
1053            return Err(ExecutionError::InternalError(
1054                "Messages with grants should have a non-empty `refund_grant_to`",
1055            ));
1056        };
1057        let message = SystemMessage::Credit {
1058            amount,
1059            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
1060            target: account.owner,
1061        };
1062        self.txn_tracker.add_outgoing_message(
1063            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1064        );
1065        Ok(())
1066    }
1067
1068    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
1069    ///
1070    /// Ensures that the response does not exceed the provided `size_limit`.
1071    async fn receive_http_response(
1072        response: reqwest::Response,
1073        size_limit: u64,
1074    ) -> Result<http::Response, ExecutionError> {
1075        let status = response.status().as_u16();
1076        let maybe_content_length = response.content_length();
1077
1078        let headers = response
1079            .headers()
1080            .iter()
1081            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1082            .collect::<Vec<_>>();
1083
1084        let total_header_size = headers
1085            .iter()
1086            .map(|header| (header.name.len() + header.value.len()) as u64)
1087            .sum();
1088
1089        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1090            ExecutionError::HttpResponseSizeLimitExceeded {
1091                limit: size_limit,
1092                size: total_header_size,
1093            },
1094        )?;
1095
1096        if let Some(content_length) = maybe_content_length {
1097            if content_length > remaining_bytes {
1098                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1099                    limit: size_limit,
1100                    size: content_length + total_header_size,
1101                });
1102            }
1103        }
1104
1105        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1106        let mut body_stream = response.bytes_stream();
1107
1108        while let Some(bytes) = body_stream.next().await.transpose()? {
1109            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1110                ExecutionError::HttpResponseSizeLimitExceeded {
1111                    limit: size_limit,
1112                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1113                },
1114            )?;
1115
1116            body.extend(&bytes);
1117        }
1118
1119        Ok(http::Response {
1120            status,
1121            headers,
1122            body,
1123        })
1124    }
1125}
1126
1127/// Requests to the execution state.
1128#[derive(Debug, strum::AsRefStr)]
1129pub enum ExecutionRequest {
1130    #[cfg(not(web))]
1131    LoadContract {
1132        id: ApplicationId,
1133        #[debug(skip)]
1134        callback: Sender<(UserContractCode, ApplicationDescription)>,
1135    },
1136
1137    #[cfg(not(web))]
1138    LoadService {
1139        id: ApplicationId,
1140        #[debug(skip)]
1141        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1142    },
1143
1144    ChainBalance {
1145        #[debug(skip)]
1146        callback: Sender<Amount>,
1147    },
1148
1149    OwnerBalance {
1150        owner: AccountOwner,
1151        #[debug(skip)]
1152        callback: Sender<Amount>,
1153    },
1154
1155    OwnerBalances {
1156        #[debug(skip)]
1157        callback: Sender<Vec<(AccountOwner, Amount)>>,
1158    },
1159
1160    BalanceOwners {
1161        #[debug(skip)]
1162        callback: Sender<Vec<AccountOwner>>,
1163    },
1164
1165    Transfer {
1166        source: AccountOwner,
1167        destination: Account,
1168        amount: Amount,
1169        #[debug(skip_if = Option::is_none)]
1170        signer: Option<AccountOwner>,
1171        application_id: ApplicationId,
1172        #[debug(skip)]
1173        callback: Sender<()>,
1174    },
1175
1176    Claim {
1177        source: Account,
1178        destination: Account,
1179        amount: Amount,
1180        #[debug(skip_if = Option::is_none)]
1181        signer: Option<AccountOwner>,
1182        application_id: ApplicationId,
1183        #[debug(skip)]
1184        callback: Sender<()>,
1185    },
1186
1187    SystemTimestamp {
1188        #[debug(skip)]
1189        callback: Sender<Timestamp>,
1190    },
1191
1192    ChainOwnership {
1193        #[debug(skip)]
1194        callback: Sender<ChainOwnership>,
1195    },
1196
1197    ApplicationPermissions {
1198        #[debug(skip)]
1199        callback: Sender<ApplicationPermissions>,
1200    },
1201
1202    ReadApplicationDescription {
1203        application_id: ApplicationId,
1204        #[debug(skip)]
1205        callback: Sender<ApplicationDescription>,
1206    },
1207
1208    ReadValueBytes {
1209        id: ApplicationId,
1210        #[debug(with = hex_debug)]
1211        key: Vec<u8>,
1212        #[debug(skip)]
1213        callback: Sender<Option<Vec<u8>>>,
1214    },
1215
1216    ContainsKey {
1217        id: ApplicationId,
1218        key: Vec<u8>,
1219        #[debug(skip)]
1220        callback: Sender<bool>,
1221    },
1222
1223    ContainsKeys {
1224        id: ApplicationId,
1225        #[debug(with = hex_vec_debug)]
1226        keys: Vec<Vec<u8>>,
1227        callback: Sender<Vec<bool>>,
1228    },
1229
1230    ReadMultiValuesBytes {
1231        id: ApplicationId,
1232        #[debug(with = hex_vec_debug)]
1233        keys: Vec<Vec<u8>>,
1234        #[debug(skip)]
1235        callback: Sender<Vec<Option<Vec<u8>>>>,
1236    },
1237
1238    FindKeysByPrefix {
1239        id: ApplicationId,
1240        #[debug(with = hex_debug)]
1241        key_prefix: Vec<u8>,
1242        #[debug(skip)]
1243        callback: Sender<Vec<Vec<u8>>>,
1244    },
1245
1246    FindKeyValuesByPrefix {
1247        id: ApplicationId,
1248        #[debug(with = hex_debug)]
1249        key_prefix: Vec<u8>,
1250        #[debug(skip)]
1251        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1252    },
1253
1254    WriteBatch {
1255        id: ApplicationId,
1256        batch: Batch,
1257        #[debug(skip)]
1258        callback: Sender<()>,
1259    },
1260
1261    OpenChain {
1262        ownership: ChainOwnership,
1263        #[debug(skip_if = Amount::is_zero)]
1264        balance: Amount,
1265        parent_id: ChainId,
1266        block_height: BlockHeight,
1267        application_permissions: ApplicationPermissions,
1268        timestamp: Timestamp,
1269        #[debug(skip)]
1270        callback: Sender<ChainId>,
1271    },
1272
1273    CloseChain {
1274        application_id: ApplicationId,
1275        #[debug(skip)]
1276        callback: Sender<Result<(), ExecutionError>>,
1277    },
1278
1279    ChangeOwnership {
1280        application_id: ApplicationId,
1281        ownership: ChainOwnership,
1282        #[debug(skip)]
1283        callback: Sender<Result<(), ExecutionError>>,
1284    },
1285
1286    ChangeApplicationPermissions {
1287        application_id: ApplicationId,
1288        application_permissions: ApplicationPermissions,
1289        #[debug(skip)]
1290        callback: Sender<Result<(), ExecutionError>>,
1291    },
1292
1293    CreateApplication {
1294        chain_id: ChainId,
1295        block_height: BlockHeight,
1296        module_id: ModuleId,
1297        parameters: Vec<u8>,
1298        required_application_ids: Vec<ApplicationId>,
1299        #[debug(skip)]
1300        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
1301    },
1302
1303    PerformHttpRequest {
1304        request: http::Request,
1305        http_responses_are_oracle_responses: bool,
1306        #[debug(skip)]
1307        callback: Sender<http::Response>,
1308    },
1309
1310    ReadBlobContent {
1311        blob_id: BlobId,
1312        #[debug(skip)]
1313        callback: Sender<BlobContent>,
1314    },
1315
1316    AssertBlobExists {
1317        blob_id: BlobId,
1318        #[debug(skip)]
1319        callback: Sender<()>,
1320    },
1321
1322    Emit {
1323        stream_id: StreamId,
1324        #[debug(with = hex_debug)]
1325        value: Vec<u8>,
1326        #[debug(skip)]
1327        callback: Sender<u32>,
1328    },
1329
1330    ReadEvent {
1331        event_id: EventId,
1332        callback: oneshot::Sender<Vec<u8>>,
1333    },
1334
1335    SubscribeToEvents {
1336        chain_id: ChainId,
1337        stream_id: StreamId,
1338        subscriber_app_id: ApplicationId,
1339        #[debug(skip)]
1340        callback: Sender<()>,
1341    },
1342
1343    UnsubscribeFromEvents {
1344        chain_id: ChainId,
1345        stream_id: StreamId,
1346        subscriber_app_id: ApplicationId,
1347        #[debug(skip)]
1348        callback: Sender<()>,
1349    },
1350
1351    GetApplicationPermissions {
1352        #[debug(skip)]
1353        callback: Sender<ApplicationPermissions>,
1354    },
1355
1356    QueryServiceOracle {
1357        deadline: Option<Instant>,
1358        application_id: ApplicationId,
1359        next_block_height: BlockHeight,
1360        query: Vec<u8>,
1361        #[debug(skip)]
1362        callback: Sender<Vec<u8>>,
1363    },
1364
1365    AddOutgoingMessage {
1366        message: crate::OutgoingMessage,
1367        #[debug(skip)]
1368        callback: Sender<()>,
1369    },
1370
1371    SetLocalTime {
1372        local_time: Timestamp,
1373        #[debug(skip)]
1374        callback: Sender<()>,
1375    },
1376
1377    AssertBefore {
1378        timestamp: Timestamp,
1379        #[debug(skip)]
1380        callback: Sender<Result<(), ExecutionError>>,
1381    },
1382
1383    AddCreatedBlob {
1384        blob: crate::Blob,
1385        #[debug(skip)]
1386        callback: Sender<()>,
1387    },
1388
1389    ValidationRound {
1390        round: Option<u32>,
1391        #[debug(skip)]
1392        callback: Sender<Option<u32>>,
1393    },
1394
1395    AllowApplicationLogs {
1396        #[debug(skip)]
1397        callback: Sender<bool>,
1398    },
1399
1400    /// Log message from contract execution (fire-and-forget, no callback needed).
1401    #[cfg(web)]
1402    Log {
1403        message: String,
1404        level: tracing::log::Level,
1405    },
1406}