Skip to main content

linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27use tracing::instrument;
28
29use crate::{
30    execution::UserAction,
31    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
32    resources::ResourceController,
33    system::CreateApplicationResult,
34    util::{ReceiverExt, UnboundedSenderExt},
35    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
36    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
37    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
38    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
39};
40
41#[cfg(test)]
42#[path = "unit_tests/runtime_tests.rs"]
43mod tests;
44
45pub trait WithContext {
46    type UserContext;
47    type Code;
48}
49
50impl WithContext for UserContractInstance {
51    type UserContext = Timestamp;
52    type Code = UserContractCode;
53}
54
55impl WithContext for UserServiceInstance {
56    type UserContext = ();
57    type Code = UserServiceCode;
58}
59
60#[cfg(test)]
61impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
62    type UserContext = ();
63    type Code = ();
64}
65
66#[derive(Debug)]
67pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
68
69pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
70
71pub struct ServiceSyncRuntime {
72    runtime: SyncRuntime<UserServiceInstance>,
73    current_context: QueryContext,
74}
75
76#[derive(Debug)]
77pub struct SyncRuntimeHandle<UserInstance: WithContext>(
78    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
79);
80
81pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
82pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
83
84/// Runtime data tracked during the execution of a transaction on the synchronous thread.
85#[derive(Debug)]
86pub struct SyncRuntimeInternal<UserInstance: WithContext> {
87    /// The current chain ID.
88    chain_id: ChainId,
89    /// The height of the next block that will be added to this chain. During operations
90    /// and messages, this is the current block height.
91    height: BlockHeight,
92    /// The current consensus round. Only available during block validation in multi-leader rounds.
93    round: Option<u32>,
94    /// The current message being executed, if there is one.
95    #[debug(skip_if = Option::is_none)]
96    executing_message: Option<ExecutingMessage>,
97
98    /// How to interact with the storage view of the execution state.
99    execution_state_sender: ExecutionStateSender,
100
101    /// If applications are being finalized.
102    ///
103    /// If [`true`], disables cross-application calls.
104    is_finalizing: bool,
105    /// Applications that need to be finalized.
106    applications_to_finalize: Vec<ApplicationId>,
107
108    /// Application instances loaded in this transaction.
109    preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
110    /// Application instances loaded in this transaction.
111    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
112    /// The current stack of application descriptions.
113    call_stack: Vec<ApplicationStatus>,
114    /// The set of the IDs of the applications that are in the `call_stack`.
115    active_applications: HashSet<ApplicationId>,
116    /// The operations scheduled during this query.
117    scheduled_operations: Vec<Operation>,
118
119    /// Track application states based on views.
120    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
121
122    /// The deadline this runtime should finish executing.
123    ///
124    /// Used to limit the execution time of services running as oracles.
125    deadline: Option<Instant>,
126
127    /// Where to send a refund for the unused part of the grant after execution, if any.
128    #[debug(skip_if = Option::is_none)]
129    refund_grant_to: Option<Account>,
130    /// Controller to track fuel and storage consumption.
131    resource_controller: ResourceController,
132    /// Additional context for the runtime.
133    user_context: UserInstance::UserContext,
134    /// Whether contract log messages should be output.
135    allow_application_logs: bool,
136}
137
138/// The runtime status of an application.
139#[derive(Debug)]
140struct ApplicationStatus {
141    /// The caller application ID, if forwarded during the call.
142    caller_id: Option<ApplicationId>,
143    /// The application ID.
144    id: ApplicationId,
145    /// The application description.
146    description: ApplicationDescription,
147    /// The authenticated signer for the execution thread, if any.
148    signer: Option<AccountOwner>,
149}
150
151/// A loaded application instance.
152#[derive(Debug)]
153struct LoadedApplication<Instance> {
154    instance: Arc<Mutex<Instance>>,
155    description: ApplicationDescription,
156}
157
158impl<Instance> LoadedApplication<Instance> {
159    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
160    fn new(instance: Instance, description: ApplicationDescription) -> Self {
161        LoadedApplication {
162            instance: Arc::new(Mutex::new(instance)),
163            description,
164        }
165    }
166}
167
168impl<Instance> Clone for LoadedApplication<Instance> {
169    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
170    // bound
171    fn clone(&self) -> Self {
172        LoadedApplication {
173            instance: self.instance.clone(),
174            description: self.description.clone(),
175        }
176    }
177}
178
179#[derive(Debug)]
180enum Promise<T> {
181    Ready(T),
182    Pending(Receiver<T>),
183}
184
185impl<T> Promise<T> {
186    fn force(&mut self) -> Result<(), ExecutionError> {
187        if let Promise::Pending(receiver) = self {
188            let value = receiver
189                .recv_ref()
190                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
191            *self = Promise::Ready(value);
192        }
193        Ok(())
194    }
195
196    fn read(self) -> Result<T, ExecutionError> {
197        match self {
198            Promise::Pending(receiver) => {
199                let value = receiver.recv_response()?;
200                Ok(value)
201            }
202            Promise::Ready(value) => Ok(value),
203        }
204    }
205}
206
207/// Manages a set of pending queries returning values of type `T`.
208#[derive(Debug, Default)]
209struct QueryManager<T> {
210    /// The queries in progress.
211    pending_queries: BTreeMap<u32, Promise<T>>,
212    /// The number of queries ever registered so far. Used for the index of the next query.
213    query_count: u32,
214    /// The number of active queries.
215    active_query_count: u32,
216}
217
218impl<T> QueryManager<T> {
219    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
220        let id = self.query_count;
221        self.pending_queries.insert(id, Promise::Pending(receiver));
222        self.query_count = self
223            .query_count
224            .checked_add(1)
225            .ok_or(ArithmeticError::Overflow)?;
226        self.active_query_count = self
227            .active_query_count
228            .checked_add(1)
229            .ok_or(ArithmeticError::Overflow)?;
230        Ok(id)
231    }
232
233    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
234        let promise = self
235            .pending_queries
236            .remove(&id)
237            .ok_or(ExecutionError::InvalidPromise)?;
238        let value = promise.read()?;
239        self.active_query_count -= 1;
240        Ok(value)
241    }
242
243    fn force_all(&mut self) -> Result<(), ExecutionError> {
244        for promise in self.pending_queries.values_mut() {
245            promise.force()?;
246        }
247        Ok(())
248    }
249}
250
251type Keys = Vec<Vec<u8>>;
252type Value = Vec<u8>;
253type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
254
255#[derive(Debug, Default)]
256struct ViewUserState {
257    /// The contains-key queries in progress.
258    contains_key_queries: QueryManager<bool>,
259    /// The contains-keys queries in progress.
260    contains_keys_queries: QueryManager<Vec<bool>>,
261    /// The read-value queries in progress.
262    read_value_queries: QueryManager<Option<Value>>,
263    /// The read-multi-values queries in progress.
264    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
265    /// The find-keys queries in progress.
266    find_keys_queries: QueryManager<Keys>,
267    /// The find-key-values queries in progress.
268    find_key_values_queries: QueryManager<KeyValues>,
269}
270
271impl ViewUserState {
272    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
273        self.contains_key_queries.force_all()?;
274        self.contains_keys_queries.force_all()?;
275        self.read_value_queries.force_all()?;
276        self.read_multi_values_queries.force_all()?;
277        self.find_keys_queries.force_all()?;
278        self.find_key_values_queries.force_all()?;
279        Ok(())
280    }
281}
282
283impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
284    type Target = SyncRuntimeHandle<UserInstance>;
285
286    fn deref(&self) -> &Self::Target {
287        self.0.as_ref().expect(
288            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
289        )
290    }
291}
292
293impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
294    fn deref_mut(&mut self) -> &mut Self::Target {
295        self.0.as_mut().expect(
296            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
297        )
298    }
299}
300
301impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
302    fn drop(&mut self) {
303        // Ensure the `loaded_applications` are cleared to prevent circular references in
304        // the runtime
305        if let Some(handle) = self.0.take() {
306            handle.inner().loaded_applications.clear();
307        }
308    }
309}
310
311impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
312    #[expect(clippy::too_many_arguments)]
313    fn new(
314        chain_id: ChainId,
315        height: BlockHeight,
316        round: Option<u32>,
317        executing_message: Option<ExecutingMessage>,
318        execution_state_sender: ExecutionStateSender,
319        deadline: Option<Instant>,
320        refund_grant_to: Option<Account>,
321        resource_controller: ResourceController,
322        user_context: UserInstance::UserContext,
323        allow_application_logs: bool,
324    ) -> Self {
325        Self {
326            chain_id,
327            height,
328            round,
329            executing_message,
330            execution_state_sender,
331            is_finalizing: false,
332            applications_to_finalize: Vec::new(),
333            preloaded_applications: HashMap::new(),
334            loaded_applications: HashMap::new(),
335            call_stack: Vec::new(),
336            active_applications: HashSet::new(),
337            view_user_states: BTreeMap::new(),
338            deadline,
339            refund_grant_to,
340            resource_controller,
341            scheduled_operations: Vec::new(),
342            user_context,
343            allow_application_logs,
344        }
345    }
346
347    /// Returns the [`ApplicationStatus`] of the current application.
348    ///
349    /// The current application is the last to be pushed to the `call_stack`.
350    ///
351    /// # Panics
352    ///
353    /// If the call stack is empty.
354    fn current_application(&self) -> &ApplicationStatus {
355        self.call_stack
356            .last()
357            .expect("Call stack is unexpectedly empty")
358    }
359
360    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
361    ///
362    /// Ensures the application's ID is also tracked in the `active_applications` set.
363    fn push_application(&mut self, status: ApplicationStatus) {
364        self.active_applications.insert(status.id);
365        self.call_stack.push(status);
366    }
367
368    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
369    ///
370    /// Ensures the application's ID is also removed from the `active_applications` set.
371    ///
372    /// # Panics
373    ///
374    /// If the call stack is empty.
375    fn pop_application(&mut self) -> ApplicationStatus {
376        let status = self
377            .call_stack
378            .pop()
379            .expect("Can't remove application from empty call stack");
380        assert!(self.active_applications.remove(&status.id));
381        status
382    }
383
384    /// Ensures that a call to `application_id` is not-reentrant.
385    ///
386    /// Returns an error if there already is an entry for `application_id` in the call stack.
387    fn check_for_reentrancy(&self, application_id: ApplicationId) -> Result<(), ExecutionError> {
388        ensure!(
389            !self.active_applications.contains(&application_id),
390            ExecutionError::ReentrantCall(application_id)
391        );
392        Ok(())
393    }
394}
395
396impl SyncRuntimeInternal<UserContractInstance> {
397    /// Loads a contract instance, initializing it with this runtime if needed.
398    #[instrument(skip_all, fields(application_id = %id))]
399    fn load_contract_instance(
400        &mut self,
401        this: SyncRuntimeHandle<UserContractInstance>,
402        id: ApplicationId,
403    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
404        match self.loaded_applications.entry(id) {
405            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
406
407            hash_map::Entry::Vacant(entry) => {
408                // First time actually using the application. Let's see if the code was
409                // pre-loaded.
410                let (code, description) = match self.preloaded_applications.entry(id) {
411                    // TODO(#2927): support dynamic loading of modules on the Web
412                    #[cfg(web)]
413                    hash_map::Entry::Vacant(_) => {
414                        drop(this);
415                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
416                            id,
417                        )));
418                    }
419                    #[cfg(not(web))]
420                    hash_map::Entry::Vacant(entry) => {
421                        let (code, description) = self
422                            .execution_state_sender
423                            .send_request(move |callback| ExecutionRequest::LoadContract {
424                                id,
425                                callback,
426                            })?
427                            .recv_response()?;
428                        entry.insert((code, description)).clone()
429                    }
430                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
431                };
432                let instance = code.instantiate(this)?;
433
434                self.applications_to_finalize.push(id);
435                Ok(entry
436                    .insert(LoadedApplication::new(instance, description))
437                    .clone())
438            }
439        }
440    }
441
442    /// Configures the runtime for executing a call to a different contract.
443    fn prepare_for_call(
444        &mut self,
445        this: ContractSyncRuntimeHandle,
446        authenticated: bool,
447        callee_id: ApplicationId,
448    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
449        self.check_for_reentrancy(callee_id)?;
450
451        ensure!(
452            !self.is_finalizing,
453            ExecutionError::CrossApplicationCallInFinalize {
454                caller_id: Box::new(self.current_application().id),
455                callee_id: Box::new(callee_id),
456            }
457        );
458
459        // Load the application.
460        let application = self.load_contract_instance(this, callee_id)?;
461
462        let caller = self.current_application();
463        let caller_id = caller.id;
464        let caller_signer = caller.signer;
465        // Make the call to user code.
466        let authenticated_signer = match caller_signer {
467            Some(signer) if authenticated => Some(signer),
468            _ => None,
469        };
470        let authenticated_caller_id = authenticated.then_some(caller_id);
471        self.push_application(ApplicationStatus {
472            caller_id: authenticated_caller_id,
473            id: callee_id,
474            description: application.description,
475            // Allow further nested calls to be authenticated if this one is.
476            signer: authenticated_signer,
477        });
478        Ok(application.instance)
479    }
480
481    /// Cleans up the runtime after the execution of a call to a different contract.
482    fn finish_call(&mut self) {
483        self.pop_application();
484    }
485
486    /// Runs the service in a separate thread as an oracle.
487    fn run_service_oracle_query(
488        &mut self,
489        application_id: ApplicationId,
490        query: Vec<u8>,
491    ) -> Result<Vec<u8>, ExecutionError> {
492        let timeout = self
493            .resource_controller
494            .remaining_service_oracle_execution_time()?;
495        let execution_start = Instant::now();
496        let deadline = Some(execution_start + timeout);
497        let response = self
498            .execution_state_sender
499            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
500                deadline,
501                application_id,
502                next_block_height: self.height,
503                query,
504                callback,
505            })?
506            .recv_response()?;
507
508        self.resource_controller
509            .track_service_oracle_execution(execution_start.elapsed())?;
510        self.resource_controller
511            .track_service_oracle_response(response.len())?;
512
513        Ok(response)
514    }
515}
516
517impl SyncRuntimeInternal<UserServiceInstance> {
518    /// Initializes a service instance with this runtime.
519    fn load_service_instance(
520        &mut self,
521        this: SyncRuntimeHandle<UserServiceInstance>,
522        id: ApplicationId,
523    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
524        match self.loaded_applications.entry(id) {
525            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
526
527            hash_map::Entry::Vacant(entry) => {
528                // First time actually using the application. Let's see if the code was
529                // pre-loaded.
530                let (code, description) = match self.preloaded_applications.entry(id) {
531                    // TODO(#2927): support dynamic loading of modules on the Web
532                    #[cfg(web)]
533                    hash_map::Entry::Vacant(_) => {
534                        drop(this);
535                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
536                            id,
537                        )));
538                    }
539                    #[cfg(not(web))]
540                    hash_map::Entry::Vacant(entry) => {
541                        let (code, description) = self
542                            .execution_state_sender
543                            .send_request(move |callback| ExecutionRequest::LoadService {
544                                id,
545                                callback,
546                            })?
547                            .recv_response()?;
548                        entry.insert((code, description)).clone()
549                    }
550                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
551                };
552                let instance = code.instantiate(this)?;
553
554                self.applications_to_finalize.push(id);
555                Ok(entry
556                    .insert(LoadedApplication::new(instance, description))
557                    .clone())
558            }
559        }
560    }
561}
562
563impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
564    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
565        let handle = self.0.take().expect(
566            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
567        );
568        let runtime = Arc::into_inner(handle.0)?
569            .into_inner()
570            .expect("`SyncRuntime` should run in a single thread which should not panic");
571        Some(runtime)
572    }
573}
574
575impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
576    for SyncRuntimeHandle<UserInstance>
577{
578    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
579        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
580    }
581}
582
583impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
584    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
585        self.0
586            .try_lock()
587            .expect("Synchronous runtimes run on a single execution thread")
588    }
589}
590
591impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
592where
593    Self: ContractOrServiceRuntime,
594{
595    type Read = ();
596    type ReadValueBytes = u32;
597    type ContainsKey = u32;
598    type ContainsKeys = u32;
599    type ReadMultiValuesBytes = u32;
600    type FindKeysByPrefix = u32;
601    type FindKeyValuesByPrefix = u32;
602
603    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
604        let mut this = self.inner();
605        let chain_id = this.chain_id;
606        this.resource_controller.track_runtime_chain_id()?;
607        Ok(chain_id)
608    }
609
610    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
611        let mut this = self.inner();
612        let height = this.height;
613        this.resource_controller.track_runtime_block_height()?;
614        Ok(height)
615    }
616
617    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
618        let mut this = self.inner();
619        let application_id = this.current_application().id;
620        this.resource_controller.track_runtime_application_id()?;
621        Ok(application_id)
622    }
623
624    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
625        let mut this = self.inner();
626        let application_creator_chain_id = this.current_application().description.creator_chain_id;
627        this.resource_controller.track_runtime_application_id()?;
628        Ok(application_creator_chain_id)
629    }
630
631    fn read_application_description(
632        &mut self,
633        application_id: ApplicationId,
634    ) -> Result<ApplicationDescription, ExecutionError> {
635        let mut this = self.inner();
636        let description = this
637            .execution_state_sender
638            .send_request(|callback| ExecutionRequest::ReadApplicationDescription {
639                application_id,
640                callback,
641            })?
642            .recv_response()?;
643        this.resource_controller
644            .track_runtime_application_description(&description)?;
645        Ok(description)
646    }
647
648    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
649        let mut this = self.inner();
650        let parameters = this.current_application().description.parameters.clone();
651        this.resource_controller
652            .track_runtime_application_parameters(&parameters)?;
653        Ok(parameters)
654    }
655
656    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
657        let mut this = self.inner();
658        let timestamp = this
659            .execution_state_sender
660            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
661            .recv_response()?;
662        this.resource_controller.track_runtime_timestamp()?;
663        Ok(timestamp)
664    }
665
666    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
667        let mut this = self.inner();
668        let balance = this
669            .execution_state_sender
670            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
671            .recv_response()?;
672        this.resource_controller.track_runtime_balance()?;
673        Ok(balance)
674    }
675
676    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
677        let mut this = self.inner();
678        let balance = this
679            .execution_state_sender
680            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
681            .recv_response()?;
682        this.resource_controller.track_runtime_balance()?;
683        Ok(balance)
684    }
685
686    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
687        let mut this = self.inner();
688        let owner_balances = this
689            .execution_state_sender
690            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
691            .recv_response()?;
692        this.resource_controller
693            .track_runtime_owner_balances(&owner_balances)?;
694        Ok(owner_balances)
695    }
696
697    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
698        let mut this = self.inner();
699        let owners = this
700            .execution_state_sender
701            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
702            .recv_response()?;
703        this.resource_controller.track_runtime_owners(&owners)?;
704        Ok(owners)
705    }
706
707    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
708        let mut this = self.inner();
709        let chain_ownership = this
710            .execution_state_sender
711            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
712            .recv_response()?;
713        this.resource_controller
714            .track_runtime_chain_ownership(&chain_ownership)?;
715        Ok(chain_ownership)
716    }
717
718    fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
719        let this = self.inner();
720        let application_permissions = this
721            .execution_state_sender
722            .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
723            .recv_response()?;
724        Ok(application_permissions)
725    }
726
727    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
728        let mut this = self.inner();
729        let id = this.current_application().id;
730        this.resource_controller.track_read_operation()?;
731        let receiver = this
732            .execution_state_sender
733            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
734        let state = this.view_user_states.entry(id).or_default();
735        state.contains_key_queries.register(receiver)
736    }
737
738    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
739        let mut this = self.inner();
740        let id = this.current_application().id;
741        let state = this.view_user_states.entry(id).or_default();
742        let value = state.contains_key_queries.wait(*promise)?;
743        Ok(value)
744    }
745
746    fn contains_keys_new(
747        &mut self,
748        keys: Vec<Vec<u8>>,
749    ) -> Result<Self::ContainsKeys, ExecutionError> {
750        let mut this = self.inner();
751        let id = this.current_application().id;
752        this.resource_controller.track_read_operation()?;
753        let receiver = this
754            .execution_state_sender
755            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
756        let state = this.view_user_states.entry(id).or_default();
757        state.contains_keys_queries.register(receiver)
758    }
759
760    fn contains_keys_wait(
761        &mut self,
762        promise: &Self::ContainsKeys,
763    ) -> Result<Vec<bool>, ExecutionError> {
764        let mut this = self.inner();
765        let id = this.current_application().id;
766        let state = this.view_user_states.entry(id).or_default();
767        let value = state.contains_keys_queries.wait(*promise)?;
768        Ok(value)
769    }
770
771    fn read_multi_values_bytes_new(
772        &mut self,
773        keys: Vec<Vec<u8>>,
774    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
775        let mut this = self.inner();
776        let id = this.current_application().id;
777        this.resource_controller.track_read_operation()?;
778        let receiver = this.execution_state_sender.send_request(move |callback| {
779            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
780        })?;
781        let state = this.view_user_states.entry(id).or_default();
782        state.read_multi_values_queries.register(receiver)
783    }
784
785    fn read_multi_values_bytes_wait(
786        &mut self,
787        promise: &Self::ReadMultiValuesBytes,
788    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
789        let mut this = self.inner();
790        let id = this.current_application().id;
791        let state = this.view_user_states.entry(id).or_default();
792        let values = state.read_multi_values_queries.wait(*promise)?;
793        for value in &values {
794            if let Some(value) = &value {
795                this.resource_controller
796                    .track_bytes_read(value.len() as u64)?;
797            }
798        }
799        Ok(values)
800    }
801
802    fn read_value_bytes_new(
803        &mut self,
804        key: Vec<u8>,
805    ) -> Result<Self::ReadValueBytes, ExecutionError> {
806        let mut this = self.inner();
807        let id = this.current_application().id;
808        this.resource_controller.track_read_operation()?;
809        let receiver = this
810            .execution_state_sender
811            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
812        let state = this.view_user_states.entry(id).or_default();
813        state.read_value_queries.register(receiver)
814    }
815
816    fn read_value_bytes_wait(
817        &mut self,
818        promise: &Self::ReadValueBytes,
819    ) -> Result<Option<Vec<u8>>, ExecutionError> {
820        let mut this = self.inner();
821        let id = this.current_application().id;
822        let value = {
823            let state = this.view_user_states.entry(id).or_default();
824            state.read_value_queries.wait(*promise)?
825        };
826        if let Some(value) = &value {
827            this.resource_controller
828                .track_bytes_read(value.len() as u64)?;
829        }
830        Ok(value)
831    }
832
833    fn find_keys_by_prefix_new(
834        &mut self,
835        key_prefix: Vec<u8>,
836    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
837        let mut this = self.inner();
838        let id = this.current_application().id;
839        this.resource_controller.track_read_operation()?;
840        let receiver = this.execution_state_sender.send_request(move |callback| {
841            ExecutionRequest::FindKeysByPrefix {
842                id,
843                key_prefix,
844                callback,
845            }
846        })?;
847        let state = this.view_user_states.entry(id).or_default();
848        state.find_keys_queries.register(receiver)
849    }
850
851    fn find_keys_by_prefix_wait(
852        &mut self,
853        promise: &Self::FindKeysByPrefix,
854    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
855        let mut this = self.inner();
856        let id = this.current_application().id;
857        let keys = {
858            let state = this.view_user_states.entry(id).or_default();
859            state.find_keys_queries.wait(*promise)?
860        };
861        let mut read_size = 0;
862        for key in &keys {
863            read_size += key.len();
864        }
865        this.resource_controller
866            .track_bytes_read(read_size as u64)?;
867        Ok(keys)
868    }
869
870    fn find_key_values_by_prefix_new(
871        &mut self,
872        key_prefix: Vec<u8>,
873    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
874        let mut this = self.inner();
875        let id = this.current_application().id;
876        this.resource_controller.track_read_operation()?;
877        let receiver = this.execution_state_sender.send_request(move |callback| {
878            ExecutionRequest::FindKeyValuesByPrefix {
879                id,
880                key_prefix,
881                callback,
882            }
883        })?;
884        let state = this.view_user_states.entry(id).or_default();
885        state.find_key_values_queries.register(receiver)
886    }
887
888    fn find_key_values_by_prefix_wait(
889        &mut self,
890        promise: &Self::FindKeyValuesByPrefix,
891    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
892        let mut this = self.inner();
893        let id = this.current_application().id;
894        let state = this.view_user_states.entry(id).or_default();
895        let key_values = state.find_key_values_queries.wait(*promise)?;
896        let mut read_size = 0;
897        for (key, value) in &key_values {
898            read_size += key.len() + value.len();
899        }
900        this.resource_controller
901            .track_bytes_read(read_size as u64)?;
902        Ok(key_values)
903    }
904
905    fn perform_http_request(
906        &mut self,
907        request: http::Request,
908    ) -> Result<http::Response, ExecutionError> {
909        let mut this = self.inner();
910        let app_permissions = this
911            .execution_state_sender
912            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
913            .recv_response()?;
914
915        let app_id = this.current_application().id;
916        ensure!(
917            app_permissions.can_make_http_requests(&app_id),
918            ExecutionError::UnauthorizedApplication(app_id)
919        );
920
921        this.resource_controller.track_http_request()?;
922
923        let response = this
924            .execution_state_sender
925            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
926                request,
927                http_responses_are_oracle_responses:
928                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
929                callback,
930            })?
931            .recv_response()?;
932        Ok(response)
933    }
934
935    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
936        let this = self.inner();
937        this.execution_state_sender
938            .send_request(|callback| ExecutionRequest::AssertBefore {
939                timestamp,
940                callback,
941            })?
942            .recv_response()?
943    }
944
945    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
946        let this = self.inner();
947        let blob_id = hash.into();
948        let content = this
949            .execution_state_sender
950            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
951            .recv_response()?;
952        Ok(content.into_vec_or_clone())
953    }
954
955    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
956        let this = self.inner();
957        let blob_id = hash.into();
958        this.execution_state_sender
959            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
960            .recv_response()?;
961        Ok(())
962    }
963
964    fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
965        Ok(self.inner().allow_application_logs)
966    }
967
968    #[cfg(web)]
969    fn send_log(&mut self, message: String, level: tracing::log::Level) {
970        let this = self.inner();
971        // Fire-and-forget: ignore errors since logging shouldn't affect execution.
972        this.execution_state_sender
973            .unbounded_send(ExecutionRequest::Log { message, level })
974            .ok();
975    }
976}
977
978/// An extension trait to determine in compile time the different behaviors between contract and
979/// services in the implementation of [`BaseRuntime`].
980trait ContractOrServiceRuntime {
981    /// Configured to `true` if the HTTP response size should be limited to the oracle response
982    /// size.
983    ///
984    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
985    /// and only storing in the block a shorter oracle response.
986    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
987}
988
989impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
990    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
991}
992
993impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
994    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
995}
996
997impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
998    fn clone(&self) -> Self {
999        SyncRuntimeHandle(self.0.clone())
1000    }
1001}
1002
1003impl ContractSyncRuntime {
1004    pub(crate) fn new(
1005        execution_state_sender: ExecutionStateSender,
1006        chain_id: ChainId,
1007        refund_grant_to: Option<Account>,
1008        resource_controller: ResourceController,
1009        action: &UserAction,
1010        allow_application_logs: bool,
1011    ) -> Self {
1012        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1013            SyncRuntimeInternal::new(
1014                chain_id,
1015                action.height(),
1016                action.round(),
1017                if let UserAction::Message(context, _) = action {
1018                    Some(context.into())
1019                } else {
1020                    None
1021                },
1022                execution_state_sender,
1023                None,
1024                refund_grant_to,
1025                resource_controller,
1026                action.timestamp(),
1027                allow_application_logs,
1028            ),
1029        )))
1030    }
1031
1032    /// Preloads the code of a contract into the runtime's memory.
1033    pub(crate) fn preload_contract(
1034        &self,
1035        id: ApplicationId,
1036        code: UserContractCode,
1037        description: ApplicationDescription,
1038    ) {
1039        let this = self
1040            .0
1041            .as_ref()
1042            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1043        let mut this_guard = this.inner();
1044
1045        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1046            entry.insert((code, description));
1047        }
1048    }
1049
1050    /// Main entry point to start executing a user action.
1051    pub(crate) fn run_action(
1052        mut self,
1053        application_id: ApplicationId,
1054        chain_id: ChainId,
1055        action: UserAction,
1056    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1057        let result = self
1058            .deref_mut()
1059            .run_action(application_id, chain_id, action)?;
1060        let runtime = self
1061            .into_inner()
1062            .expect("Runtime clones should have been freed by now");
1063
1064        Ok((result, runtime.resource_controller))
1065    }
1066}
1067
1068impl ContractSyncRuntimeHandle {
1069    #[instrument(skip_all, fields(application_id = %application_id))]
1070    fn run_action(
1071        &self,
1072        application_id: ApplicationId,
1073        chain_id: ChainId,
1074        action: UserAction,
1075    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1076        let finalize_context = FinalizeContext {
1077            authenticated_signer: action.signer(),
1078            chain_id,
1079            height: action.height(),
1080            round: action.round(),
1081        };
1082
1083        {
1084            let runtime = self.inner();
1085            assert_eq!(runtime.chain_id, chain_id);
1086            assert_eq!(runtime.height, action.height());
1087        }
1088
1089        let signer = action.signer();
1090        let closure = move |code: &mut UserContractInstance| match action {
1091            UserAction::Instantiate(_context, argument) => {
1092                code.instantiate(argument).map(|()| None)
1093            }
1094            UserAction::Operation(_context, operation) => {
1095                code.execute_operation(operation).map(Option::Some)
1096            }
1097            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1098            UserAction::ProcessStreams(_context, updates) => {
1099                code.process_streams(updates).map(|()| None)
1100            }
1101        };
1102
1103        let result = self.execute(application_id, signer, closure)?;
1104        self.finalize(finalize_context)?;
1105        Ok(result)
1106    }
1107
1108    /// Notifies all loaded applications that execution is finalizing.
1109    #[instrument(skip_all)]
1110    fn finalize(&self, context: FinalizeContext) -> Result<(), ExecutionError> {
1111        let applications = mem::take(&mut self.inner().applications_to_finalize)
1112            .into_iter()
1113            .rev();
1114
1115        self.inner().is_finalizing = true;
1116
1117        for application in applications {
1118            self.execute(application, context.authenticated_signer, |contract| {
1119                contract.finalize().map(|_| None)
1120            })?;
1121            self.inner().loaded_applications.remove(&application);
1122        }
1123
1124        Ok(())
1125    }
1126
1127    /// Executes a `closure` with the contract code for the `application_id`.
1128    #[instrument(skip_all, fields(application_id = %application_id))]
1129    fn execute(
1130        &self,
1131        application_id: ApplicationId,
1132        signer: Option<AccountOwner>,
1133        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1134    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1135        let contract = {
1136            let mut runtime = self.inner();
1137            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1138
1139            let status = ApplicationStatus {
1140                caller_id: None,
1141                id: application_id,
1142                description: application.description.clone(),
1143                signer,
1144            };
1145
1146            runtime.push_application(status);
1147
1148            application
1149        };
1150
1151        let result = closure(
1152            &mut contract
1153                .instance
1154                .try_lock()
1155                .expect("Application should not be already executing"),
1156        )?;
1157
1158        let mut runtime = self.inner();
1159        let application_status = runtime.pop_application();
1160        assert_eq!(application_status.caller_id, None);
1161        assert_eq!(application_status.id, application_id);
1162        assert_eq!(application_status.description, contract.description);
1163        assert_eq!(application_status.signer, signer);
1164        assert!(runtime.call_stack.is_empty());
1165
1166        Ok(result)
1167    }
1168}
1169
1170impl ContractRuntime for ContractSyncRuntimeHandle {
1171    fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1172        let this = self.inner();
1173        Ok(this.current_application().signer)
1174    }
1175
1176    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1177        Ok(self
1178            .inner()
1179            .executing_message
1180            .map(|metadata| metadata.is_bouncing))
1181    }
1182
1183    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1184        Ok(self
1185            .inner()
1186            .executing_message
1187            .map(|metadata| metadata.origin))
1188    }
1189
1190    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1191        let this = self.inner();
1192        if this.call_stack.len() <= 1 {
1193            return Ok(None);
1194        }
1195        Ok(this.current_application().caller_id)
1196    }
1197
1198    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1199        Ok(match vm_runtime {
1200            VmRuntime::Wasm => {
1201                self.inner()
1202                    .resource_controller
1203                    .policy()
1204                    .maximum_wasm_fuel_per_block
1205            }
1206            VmRuntime::Evm => {
1207                self.inner()
1208                    .resource_controller
1209                    .policy()
1210                    .maximum_evm_fuel_per_block
1211            }
1212        })
1213    }
1214
1215    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1216        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1217    }
1218
1219    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1220        let mut this = self.inner();
1221        this.resource_controller.track_fuel(fuel, vm_runtime)
1222    }
1223
1224    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1225        let mut this = self.inner();
1226        let application = this.current_application();
1227        let application_id = application.id;
1228        let authenticated_signer = application.signer;
1229        let mut refund_grant_to = this.refund_grant_to;
1230
1231        let grant = this
1232            .resource_controller
1233            .policy()
1234            .total_price(&message.grant)?;
1235        if grant.is_zero() {
1236            refund_grant_to = None;
1237        } else {
1238            this.resource_controller.track_grant(grant)?;
1239        }
1240        let kind = if message.is_tracked {
1241            MessageKind::Tracked
1242        } else {
1243            MessageKind::Simple
1244        };
1245
1246        this.execution_state_sender
1247            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1248                message: OutgoingMessage {
1249                    destination: message.destination,
1250                    authenticated_signer,
1251                    refund_grant_to,
1252                    grant,
1253                    kind,
1254                    message: Message::User {
1255                        application_id,
1256                        bytes: message.message,
1257                    },
1258                },
1259                callback,
1260            })?
1261            .recv_response()?;
1262
1263        Ok(())
1264    }
1265
1266    fn transfer(
1267        &mut self,
1268        source: AccountOwner,
1269        destination: Account,
1270        amount: Amount,
1271    ) -> Result<(), ExecutionError> {
1272        let this = self.inner();
1273        let current_application = this.current_application();
1274        let application_id = current_application.id;
1275        let signer = current_application.signer;
1276
1277        this.execution_state_sender
1278            .send_request(|callback| ExecutionRequest::Transfer {
1279                source,
1280                destination,
1281                amount,
1282                signer,
1283                application_id,
1284                callback,
1285            })?
1286            .recv_response()?;
1287        Ok(())
1288    }
1289
1290    fn claim(
1291        &mut self,
1292        source: Account,
1293        destination: Account,
1294        amount: Amount,
1295    ) -> Result<(), ExecutionError> {
1296        let this = self.inner();
1297        let current_application = this.current_application();
1298        let application_id = current_application.id;
1299        let signer = current_application.signer;
1300
1301        this.execution_state_sender
1302            .send_request(|callback| ExecutionRequest::Claim {
1303                source,
1304                destination,
1305                amount,
1306                signer,
1307                application_id,
1308                callback,
1309            })?
1310            .recv_response()?;
1311        Ok(())
1312    }
1313
1314    fn try_call_application(
1315        &mut self,
1316        authenticated: bool,
1317        callee_id: ApplicationId,
1318        argument: Vec<u8>,
1319    ) -> Result<Vec<u8>, ExecutionError> {
1320        let contract = self
1321            .inner()
1322            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1323
1324        let value = contract
1325            .try_lock()
1326            .expect("Applications should not have reentrant calls")
1327            .execute_operation(argument)?;
1328
1329        self.inner().finish_call();
1330
1331        Ok(value)
1332    }
1333
1334    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1335        let mut this = self.inner();
1336        ensure!(
1337            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1338            ExecutionError::StreamNameTooLong
1339        );
1340        let application_id = GenericApplicationId::User(this.current_application().id);
1341        let stream_id = StreamId {
1342            stream_name,
1343            application_id,
1344        };
1345        let value_len = value.len() as u64;
1346        let index = this
1347            .execution_state_sender
1348            .send_request(|callback| ExecutionRequest::Emit {
1349                stream_id,
1350                value,
1351                callback,
1352            })?
1353            .recv_response()?;
1354        // TODO(#365): Consider separate event fee categories.
1355        this.resource_controller.track_bytes_written(value_len)?;
1356        Ok(index)
1357    }
1358
1359    fn read_event(
1360        &mut self,
1361        chain_id: ChainId,
1362        stream_name: StreamName,
1363        index: u32,
1364    ) -> Result<Vec<u8>, ExecutionError> {
1365        let mut this = self.inner();
1366        ensure!(
1367            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1368            ExecutionError::StreamNameTooLong
1369        );
1370        let application_id = GenericApplicationId::User(this.current_application().id);
1371        let stream_id = StreamId {
1372            stream_name,
1373            application_id,
1374        };
1375        let event_id = EventId {
1376            stream_id,
1377            index,
1378            chain_id,
1379        };
1380        let event = this
1381            .execution_state_sender
1382            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1383            .recv_response()?;
1384        // TODO(#365): Consider separate event fee categories.
1385        this.resource_controller
1386            .track_bytes_read(event.len() as u64)?;
1387        Ok(event)
1388    }
1389
1390    fn subscribe_to_events(
1391        &mut self,
1392        chain_id: ChainId,
1393        application_id: ApplicationId,
1394        stream_name: StreamName,
1395    ) -> Result<(), ExecutionError> {
1396        let this = self.inner();
1397        ensure!(
1398            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1399            ExecutionError::StreamNameTooLong
1400        );
1401        let stream_id = StreamId {
1402            stream_name,
1403            application_id: application_id.into(),
1404        };
1405        let subscriber_app_id = this.current_application().id;
1406        this.execution_state_sender
1407            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1408                chain_id,
1409                stream_id,
1410                subscriber_app_id,
1411                callback,
1412            })?
1413            .recv_response()?;
1414        Ok(())
1415    }
1416
1417    fn unsubscribe_from_events(
1418        &mut self,
1419        chain_id: ChainId,
1420        application_id: ApplicationId,
1421        stream_name: StreamName,
1422    ) -> Result<(), ExecutionError> {
1423        let this = self.inner();
1424        ensure!(
1425            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1426            ExecutionError::StreamNameTooLong
1427        );
1428        let stream_id = StreamId {
1429            stream_name,
1430            application_id: application_id.into(),
1431        };
1432        let subscriber_app_id = this.current_application().id;
1433        this.execution_state_sender
1434            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1435                chain_id,
1436                stream_id,
1437                subscriber_app_id,
1438                callback,
1439            })?
1440            .recv_response()?;
1441        Ok(())
1442    }
1443
1444    fn query_service(
1445        &mut self,
1446        application_id: ApplicationId,
1447        query: Vec<u8>,
1448    ) -> Result<Vec<u8>, ExecutionError> {
1449        let mut this = self.inner();
1450
1451        let app_permissions = this
1452            .execution_state_sender
1453            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1454            .recv_response()?;
1455
1456        let app_id = this.current_application().id;
1457        ensure!(
1458            app_permissions.can_call_services(&app_id),
1459            ExecutionError::UnauthorizedApplication(app_id)
1460        );
1461
1462        this.resource_controller.track_service_oracle_call()?;
1463
1464        this.run_service_oracle_query(application_id, query)
1465    }
1466
1467    fn open_chain(
1468        &mut self,
1469        ownership: ChainOwnership,
1470        application_permissions: ApplicationPermissions,
1471        balance: Amount,
1472    ) -> Result<ChainId, ExecutionError> {
1473        let parent_id = self.inner().chain_id;
1474        let block_height = self.block_height()?;
1475
1476        let timestamp = self.inner().user_context;
1477
1478        let chain_id = self
1479            .inner()
1480            .execution_state_sender
1481            .send_request(|callback| ExecutionRequest::OpenChain {
1482                ownership,
1483                balance,
1484                parent_id,
1485                block_height,
1486                timestamp,
1487                application_permissions,
1488                callback,
1489            })?
1490            .recv_response()?;
1491
1492        Ok(chain_id)
1493    }
1494
1495    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1496        let this = self.inner();
1497        let application_id = this.current_application().id;
1498        this.execution_state_sender
1499            .send_request(|callback| ExecutionRequest::CloseChain {
1500                application_id,
1501                callback,
1502            })?
1503            .recv_response()?
1504    }
1505
1506    fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1507        let this = self.inner();
1508        let application_id = this.current_application().id;
1509        this.execution_state_sender
1510            .send_request(|callback| ExecutionRequest::ChangeOwnership {
1511                application_id,
1512                ownership,
1513                callback,
1514            })?
1515            .recv_response()?
1516    }
1517
1518    fn change_application_permissions(
1519        &mut self,
1520        application_permissions: ApplicationPermissions,
1521    ) -> Result<(), ExecutionError> {
1522        let this = self.inner();
1523        let application_id = this.current_application().id;
1524        this.execution_state_sender
1525            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1526                application_id,
1527                application_permissions,
1528                callback,
1529            })?
1530            .recv_response()?
1531    }
1532
1533    fn create_application(
1534        &mut self,
1535        module_id: ModuleId,
1536        parameters: Vec<u8>,
1537        argument: Vec<u8>,
1538        required_application_ids: Vec<ApplicationId>,
1539    ) -> Result<ApplicationId, ExecutionError> {
1540        let chain_id = self.inner().chain_id;
1541        let block_height = self.block_height()?;
1542
1543        let CreateApplicationResult { app_id } = self
1544            .inner()
1545            .execution_state_sender
1546            .send_request(move |callback| ExecutionRequest::CreateApplication {
1547                chain_id,
1548                block_height,
1549                module_id,
1550                parameters,
1551                required_application_ids,
1552                callback,
1553            })?
1554            .recv_response()??;
1555
1556        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1557
1558        contract
1559            .try_lock()
1560            .expect("Applications should not have reentrant calls")
1561            .instantiate(argument)?;
1562
1563        self.inner().finish_call();
1564
1565        Ok(app_id)
1566    }
1567
1568    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1569        let blob = Blob::new_data(bytes);
1570        let blob_id = blob.id();
1571        let this = self.inner();
1572        this.execution_state_sender
1573            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1574            .recv_response()?;
1575        Ok(DataBlobHash(blob_id.hash))
1576    }
1577
1578    fn publish_module(
1579        &mut self,
1580        contract: Bytecode,
1581        service: Bytecode,
1582        vm_runtime: VmRuntime,
1583    ) -> Result<ModuleId, ExecutionError> {
1584        let (blobs, module_id) =
1585            crate::runtime::create_bytecode_blobs_sync(&contract, &service, vm_runtime);
1586        let this = self.inner();
1587        for blob in blobs {
1588            this.execution_state_sender
1589                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1590                .recv_response()?;
1591        }
1592        Ok(module_id)
1593    }
1594
1595    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1596        let this = self.inner();
1597        let round = this.round;
1598        this.execution_state_sender
1599            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1600            .recv_response()
1601    }
1602
1603    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1604        let mut this = self.inner();
1605        let id = this.current_application().id;
1606        let state = this.view_user_states.entry(id).or_default();
1607        state.force_all_pending_queries()?;
1608        this.resource_controller.track_write_operations(
1609            batch
1610                .num_operations()
1611                .try_into()
1612                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1613        )?;
1614        this.resource_controller
1615            .track_bytes_written(batch.size() as u64)?;
1616        this.execution_state_sender
1617            .send_request(|callback| ExecutionRequest::WriteBatch {
1618                id,
1619                batch,
1620                callback,
1621            })?
1622            .recv_response()?;
1623        Ok(())
1624    }
1625}
1626
1627impl ServiceSyncRuntime {
1628    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1629    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1630        Self::new_with_deadline(execution_state_sender, context, None)
1631    }
1632
1633    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1634    pub fn new_with_deadline(
1635        execution_state_sender: ExecutionStateSender,
1636        context: QueryContext,
1637        deadline: Option<Instant>,
1638    ) -> Self {
1639        // Query the allow_application_logs setting from the execution state.
1640        let allow_application_logs = execution_state_sender
1641            .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1642            .ok()
1643            .and_then(|receiver| receiver.recv_response().ok())
1644            .unwrap_or(false);
1645
1646        let runtime = SyncRuntime(Some(
1647            SyncRuntimeInternal::new(
1648                context.chain_id,
1649                context.next_block_height,
1650                None,
1651                None,
1652                execution_state_sender,
1653                deadline,
1654                None,
1655                ResourceController::default(),
1656                (),
1657                allow_application_logs,
1658            )
1659            .into(),
1660        ));
1661
1662        ServiceSyncRuntime {
1663            runtime,
1664            current_context: context,
1665        }
1666    }
1667
1668    /// Preloads the code of a service into the runtime's memory.
1669    pub(crate) fn preload_service(
1670        &self,
1671        id: ApplicationId,
1672        code: UserServiceCode,
1673        description: ApplicationDescription,
1674    ) {
1675        let this = self
1676            .runtime
1677            .0
1678            .as_ref()
1679            .expect("services shouldn't be preloaded while the runtime is being dropped");
1680        let mut this_guard = this.inner();
1681
1682        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1683            entry.insert((code, description));
1684        }
1685    }
1686
1687    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1688    pub fn run(&mut self, incoming_requests: &std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1689        while let Ok(request) = incoming_requests.recv() {
1690            let ServiceRuntimeRequest::Query {
1691                application_id,
1692                context,
1693                query,
1694                callback,
1695            } = request;
1696
1697            let result = self
1698                .prepare_for_query(context)
1699                .and_then(|()| self.run_query(application_id, query));
1700
1701            if let Err(err) = callback.send(result) {
1702                tracing::debug!(%err, "Receiver for query result has been dropped");
1703            }
1704        }
1705    }
1706
1707    /// Prepares the runtime to query an application.
1708    pub(crate) fn prepare_for_query(
1709        &mut self,
1710        new_context: QueryContext,
1711    ) -> Result<(), ExecutionError> {
1712        let expected_context = QueryContext {
1713            local_time: new_context.local_time,
1714            ..self.current_context
1715        };
1716
1717        if new_context != expected_context {
1718            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1719            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1720        } else {
1721            self.handle_mut()
1722                .inner()
1723                .execution_state_sender
1724                .send_request(|callback| ExecutionRequest::SetLocalTime {
1725                    local_time: new_context.local_time,
1726                    callback,
1727                })?
1728                .recv_response()?;
1729        }
1730        Ok(())
1731    }
1732
1733    /// Queries an application specified by its [`ApplicationId`].
1734    pub(crate) fn run_query(
1735        &mut self,
1736        application_id: ApplicationId,
1737        query: Vec<u8>,
1738    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1739        let this = self.handle_mut();
1740        let response = this.try_query_application(application_id, query)?;
1741        let operations = mem::take(&mut this.inner().scheduled_operations);
1742
1743        Ok(QueryOutcome {
1744            response,
1745            operations,
1746        })
1747    }
1748
1749    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1750    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1751        self.runtime.0.as_mut().expect(
1752            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1753        )
1754    }
1755}
1756
1757impl ServiceRuntime for ServiceSyncRuntimeHandle {
1758    /// Note that queries are not available from writable contexts.
1759    fn try_query_application(
1760        &mut self,
1761        queried_id: ApplicationId,
1762        argument: Vec<u8>,
1763    ) -> Result<Vec<u8>, ExecutionError> {
1764        let service = {
1765            let mut this = self.inner();
1766
1767            // Load the application.
1768            let application = this.load_service_instance(self.clone(), queried_id)?;
1769            // Make the call to user code.
1770            this.push_application(ApplicationStatus {
1771                caller_id: None,
1772                id: queried_id,
1773                description: application.description,
1774                signer: None,
1775            });
1776            application.instance
1777        };
1778        let response = service
1779            .try_lock()
1780            .expect("Applications should not have reentrant calls")
1781            .handle_query(argument)?;
1782        self.inner().pop_application();
1783        Ok(response)
1784    }
1785
1786    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1787        let mut this = self.inner();
1788        let application_id = this.current_application().id;
1789
1790        this.scheduled_operations.push(Operation::User {
1791            application_id,
1792            bytes: operation,
1793        });
1794
1795        Ok(())
1796    }
1797
1798    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1799        if let Some(deadline) = self.inner().deadline {
1800            if Instant::now() >= deadline {
1801                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1802            }
1803        }
1804        Ok(())
1805    }
1806}
1807
1808/// A request to the service runtime actor.
1809pub enum ServiceRuntimeRequest {
1810    Query {
1811        application_id: ApplicationId,
1812        context: QueryContext,
1813        query: Vec<u8>,
1814        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1815    },
1816}
1817
1818/// The origin of the execution.
1819#[derive(Clone, Copy, Debug)]
1820struct ExecutingMessage {
1821    is_bouncing: bool,
1822    origin: ChainId,
1823}
1824
1825impl From<&MessageContext> for ExecutingMessage {
1826    fn from(context: &MessageContext) -> Self {
1827        ExecutingMessage {
1828            is_bouncing: context.is_bouncing,
1829            origin: context.origin,
1830        }
1831    }
1832}
1833
1834/// Creates a compressed contract and service bytecode synchronously.
1835pub fn create_bytecode_blobs_sync(
1836    contract: &Bytecode,
1837    service: &Bytecode,
1838    vm_runtime: VmRuntime,
1839) -> (Vec<Blob>, ModuleId) {
1840    match vm_runtime {
1841        VmRuntime::Wasm => {
1842            let compressed_contract = contract.compress();
1843            let compressed_service = service.compress();
1844            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1845            let service_blob = Blob::new_service_bytecode(compressed_service);
1846            let module_id =
1847                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1848            (vec![contract_blob, service_blob], module_id)
1849        }
1850        VmRuntime::Evm => {
1851            let compressed_contract = contract.compress();
1852            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1853            let module_id = ModuleId::new(
1854                evm_contract_blob.id().hash,
1855                evm_contract_blob.id().hash,
1856                vm_runtime,
1857            );
1858            (vec![evm_contract_blob], module_id)
1859        }
1860    }
1861}