linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29    execution::UserAction,
30    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31    resources::ResourceController,
32    system::CreateApplicationResult,
33    util::{ReceiverExt, UnboundedSenderExt},
34    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45    type UserContext;
46    type Code;
47}
48
49impl WithContext for UserContractInstance {
50    type UserContext = Timestamp;
51    type Code = UserContractCode;
52}
53
54impl WithContext for UserServiceInstance {
55    type UserContext = ();
56    type Code = UserServiceCode;
57}
58
59#[cfg(test)]
60impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
61    type UserContext = ();
62    type Code = ();
63}
64
65#[derive(Debug)]
66pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
67
68pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
69
70pub struct ServiceSyncRuntime {
71    runtime: SyncRuntime<UserServiceInstance>,
72    current_context: QueryContext,
73}
74
75#[derive(Debug)]
76pub struct SyncRuntimeHandle<UserInstance: WithContext>(
77    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
78);
79
80pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
81pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
82
83/// Runtime data tracked during the execution of a transaction on the synchronous thread.
84#[derive(Debug)]
85pub struct SyncRuntimeInternal<UserInstance: WithContext> {
86    /// The current chain ID.
87    chain_id: ChainId,
88    /// The height of the next block that will be added to this chain. During operations
89    /// and messages, this is the current block height.
90    height: BlockHeight,
91    /// The current consensus round. Only available during block validation in multi-leader rounds.
92    round: Option<u32>,
93    /// The current message being executed, if there is one.
94    #[debug(skip_if = Option::is_none)]
95    executing_message: Option<ExecutingMessage>,
96
97    /// How to interact with the storage view of the execution state.
98    execution_state_sender: ExecutionStateSender,
99
100    /// If applications are being finalized.
101    ///
102    /// If [`true`], disables cross-application calls.
103    is_finalizing: bool,
104    /// Applications that need to be finalized.
105    applications_to_finalize: Vec<ApplicationId>,
106
107    /// Application instances loaded in this transaction.
108    preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
109    /// Application instances loaded in this transaction.
110    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
111    /// The current stack of application descriptions.
112    call_stack: Vec<ApplicationStatus>,
113    /// The set of the IDs of the applications that are in the `call_stack`.
114    active_applications: HashSet<ApplicationId>,
115    /// The operations scheduled during this query.
116    scheduled_operations: Vec<Operation>,
117
118    /// Track application states based on views.
119    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
120
121    /// The deadline this runtime should finish executing.
122    ///
123    /// Used to limit the execution time of services running as oracles.
124    deadline: Option<Instant>,
125
126    /// Where to send a refund for the unused part of the grant after execution, if any.
127    #[debug(skip_if = Option::is_none)]
128    refund_grant_to: Option<Account>,
129    /// Controller to track fuel and storage consumption.
130    resource_controller: ResourceController,
131    /// Additional context for the runtime.
132    user_context: UserInstance::UserContext,
133    /// Whether contract log messages should be output.
134    allow_application_logs: bool,
135}
136
137/// The runtime status of an application.
138#[derive(Debug)]
139struct ApplicationStatus {
140    /// The caller application ID, if forwarded during the call.
141    caller_id: Option<ApplicationId>,
142    /// The application ID.
143    id: ApplicationId,
144    /// The application description.
145    description: ApplicationDescription,
146    /// The authenticated signer for the execution thread, if any.
147    signer: Option<AccountOwner>,
148}
149
150/// A loaded application instance.
151#[derive(Debug)]
152struct LoadedApplication<Instance> {
153    instance: Arc<Mutex<Instance>>,
154    description: ApplicationDescription,
155}
156
157impl<Instance> LoadedApplication<Instance> {
158    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
159    fn new(instance: Instance, description: ApplicationDescription) -> Self {
160        LoadedApplication {
161            instance: Arc::new(Mutex::new(instance)),
162            description,
163        }
164    }
165}
166
167impl<Instance> Clone for LoadedApplication<Instance> {
168    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
169    // bound
170    fn clone(&self) -> Self {
171        LoadedApplication {
172            instance: self.instance.clone(),
173            description: self.description.clone(),
174        }
175    }
176}
177
178#[derive(Debug)]
179enum Promise<T> {
180    Ready(T),
181    Pending(Receiver<T>),
182}
183
184impl<T> Promise<T> {
185    fn force(&mut self) -> Result<(), ExecutionError> {
186        if let Promise::Pending(receiver) = self {
187            let value = receiver
188                .recv_ref()
189                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
190            *self = Promise::Ready(value);
191        }
192        Ok(())
193    }
194
195    fn read(self) -> Result<T, ExecutionError> {
196        match self {
197            Promise::Pending(receiver) => {
198                let value = receiver.recv_response()?;
199                Ok(value)
200            }
201            Promise::Ready(value) => Ok(value),
202        }
203    }
204}
205
206/// Manages a set of pending queries returning values of type `T`.
207#[derive(Debug, Default)]
208struct QueryManager<T> {
209    /// The queries in progress.
210    pending_queries: BTreeMap<u32, Promise<T>>,
211    /// The number of queries ever registered so far. Used for the index of the next query.
212    query_count: u32,
213    /// The number of active queries.
214    active_query_count: u32,
215}
216
217impl<T> QueryManager<T> {
218    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
219        let id = self.query_count;
220        self.pending_queries.insert(id, Promise::Pending(receiver));
221        self.query_count = self
222            .query_count
223            .checked_add(1)
224            .ok_or(ArithmeticError::Overflow)?;
225        self.active_query_count = self
226            .active_query_count
227            .checked_add(1)
228            .ok_or(ArithmeticError::Overflow)?;
229        Ok(id)
230    }
231
232    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
233        let promise = self
234            .pending_queries
235            .remove(&id)
236            .ok_or(ExecutionError::InvalidPromise)?;
237        let value = promise.read()?;
238        self.active_query_count -= 1;
239        Ok(value)
240    }
241
242    fn force_all(&mut self) -> Result<(), ExecutionError> {
243        for promise in self.pending_queries.values_mut() {
244            promise.force()?;
245        }
246        Ok(())
247    }
248}
249
250type Keys = Vec<Vec<u8>>;
251type Value = Vec<u8>;
252type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
253
254#[derive(Debug, Default)]
255struct ViewUserState {
256    /// The contains-key queries in progress.
257    contains_key_queries: QueryManager<bool>,
258    /// The contains-keys queries in progress.
259    contains_keys_queries: QueryManager<Vec<bool>>,
260    /// The read-value queries in progress.
261    read_value_queries: QueryManager<Option<Value>>,
262    /// The read-multi-values queries in progress.
263    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
264    /// The find-keys queries in progress.
265    find_keys_queries: QueryManager<Keys>,
266    /// The find-key-values queries in progress.
267    find_key_values_queries: QueryManager<KeyValues>,
268}
269
270impl ViewUserState {
271    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
272        self.contains_key_queries.force_all()?;
273        self.contains_keys_queries.force_all()?;
274        self.read_value_queries.force_all()?;
275        self.read_multi_values_queries.force_all()?;
276        self.find_keys_queries.force_all()?;
277        self.find_key_values_queries.force_all()?;
278        Ok(())
279    }
280}
281
282impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
283    type Target = SyncRuntimeHandle<UserInstance>;
284
285    fn deref(&self) -> &Self::Target {
286        self.0.as_ref().expect(
287            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288        )
289    }
290}
291
292impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
293    fn deref_mut(&mut self) -> &mut Self::Target {
294        self.0.as_mut().expect(
295            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
296        )
297    }
298}
299
300impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
301    fn drop(&mut self) {
302        // Ensure the `loaded_applications` are cleared to prevent circular references in
303        // the runtime
304        if let Some(handle) = self.0.take() {
305            handle.inner().loaded_applications.clear();
306        }
307    }
308}
309
310impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
311    #[expect(clippy::too_many_arguments)]
312    fn new(
313        chain_id: ChainId,
314        height: BlockHeight,
315        round: Option<u32>,
316        executing_message: Option<ExecutingMessage>,
317        execution_state_sender: ExecutionStateSender,
318        deadline: Option<Instant>,
319        refund_grant_to: Option<Account>,
320        resource_controller: ResourceController,
321        user_context: UserInstance::UserContext,
322        allow_application_logs: bool,
323    ) -> Self {
324        Self {
325            chain_id,
326            height,
327            round,
328            executing_message,
329            execution_state_sender,
330            is_finalizing: false,
331            applications_to_finalize: Vec::new(),
332            preloaded_applications: HashMap::new(),
333            loaded_applications: HashMap::new(),
334            call_stack: Vec::new(),
335            active_applications: HashSet::new(),
336            view_user_states: BTreeMap::new(),
337            deadline,
338            refund_grant_to,
339            resource_controller,
340            scheduled_operations: Vec::new(),
341            user_context,
342            allow_application_logs,
343        }
344    }
345
346    /// Returns the [`ApplicationStatus`] of the current application.
347    ///
348    /// The current application is the last to be pushed to the `call_stack`.
349    ///
350    /// # Panics
351    ///
352    /// If the call stack is empty.
353    fn current_application(&self) -> &ApplicationStatus {
354        self.call_stack
355            .last()
356            .expect("Call stack is unexpectedly empty")
357    }
358
359    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
360    ///
361    /// Ensures the application's ID is also tracked in the `active_applications` set.
362    fn push_application(&mut self, status: ApplicationStatus) {
363        self.active_applications.insert(status.id);
364        self.call_stack.push(status);
365    }
366
367    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
368    ///
369    /// Ensures the application's ID is also removed from the `active_applications` set.
370    ///
371    /// # Panics
372    ///
373    /// If the call stack is empty.
374    fn pop_application(&mut self) -> ApplicationStatus {
375        let status = self
376            .call_stack
377            .pop()
378            .expect("Can't remove application from empty call stack");
379        assert!(self.active_applications.remove(&status.id));
380        status
381    }
382
383    /// Ensures that a call to `application_id` is not-reentrant.
384    ///
385    /// Returns an error if there already is an entry for `application_id` in the call stack.
386    fn check_for_reentrancy(
387        &mut self,
388        application_id: ApplicationId,
389    ) -> Result<(), ExecutionError> {
390        ensure!(
391            !self.active_applications.contains(&application_id),
392            ExecutionError::ReentrantCall(application_id)
393        );
394        Ok(())
395    }
396}
397
398impl SyncRuntimeInternal<UserContractInstance> {
399    /// Loads a contract instance, initializing it with this runtime if needed.
400    fn load_contract_instance(
401        &mut self,
402        this: SyncRuntimeHandle<UserContractInstance>,
403        id: ApplicationId,
404    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
405        match self.loaded_applications.entry(id) {
406            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
407
408            hash_map::Entry::Vacant(entry) => {
409                // First time actually using the application. Let's see if the code was
410                // pre-loaded.
411                let (code, description) = match self.preloaded_applications.entry(id) {
412                    // TODO(#2927): support dynamic loading of modules on the Web
413                    #[cfg(web)]
414                    hash_map::Entry::Vacant(_) => {
415                        drop(this);
416                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
417                            id,
418                        )));
419                    }
420                    #[cfg(not(web))]
421                    hash_map::Entry::Vacant(entry) => {
422                        let (code, description) = self
423                            .execution_state_sender
424                            .send_request(move |callback| ExecutionRequest::LoadContract {
425                                id,
426                                callback,
427                            })?
428                            .recv_response()?;
429                        entry.insert((code, description)).clone()
430                    }
431                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
432                };
433                let instance = code.instantiate(this)?;
434
435                self.applications_to_finalize.push(id);
436                Ok(entry
437                    .insert(LoadedApplication::new(instance, description))
438                    .clone())
439            }
440        }
441    }
442
443    /// Configures the runtime for executing a call to a different contract.
444    fn prepare_for_call(
445        &mut self,
446        this: ContractSyncRuntimeHandle,
447        authenticated: bool,
448        callee_id: ApplicationId,
449    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
450        self.check_for_reentrancy(callee_id)?;
451
452        ensure!(
453            !self.is_finalizing,
454            ExecutionError::CrossApplicationCallInFinalize {
455                caller_id: Box::new(self.current_application().id),
456                callee_id: Box::new(callee_id),
457            }
458        );
459
460        // Load the application.
461        let application = self.load_contract_instance(this, callee_id)?;
462
463        let caller = self.current_application();
464        let caller_id = caller.id;
465        let caller_signer = caller.signer;
466        // Make the call to user code.
467        let authenticated_signer = match caller_signer {
468            Some(signer) if authenticated => Some(signer),
469            _ => None,
470        };
471        let authenticated_caller_id = authenticated.then_some(caller_id);
472        self.push_application(ApplicationStatus {
473            caller_id: authenticated_caller_id,
474            id: callee_id,
475            description: application.description,
476            // Allow further nested calls to be authenticated if this one is.
477            signer: authenticated_signer,
478        });
479        Ok(application.instance)
480    }
481
482    /// Cleans up the runtime after the execution of a call to a different contract.
483    fn finish_call(&mut self) -> Result<(), ExecutionError> {
484        self.pop_application();
485        Ok(())
486    }
487
488    /// Runs the service in a separate thread as an oracle.
489    fn run_service_oracle_query(
490        &mut self,
491        application_id: ApplicationId,
492        query: Vec<u8>,
493    ) -> Result<Vec<u8>, ExecutionError> {
494        let timeout = self
495            .resource_controller
496            .remaining_service_oracle_execution_time()?;
497        let execution_start = Instant::now();
498        let deadline = Some(execution_start + timeout);
499        let response = self
500            .execution_state_sender
501            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
502                deadline,
503                application_id,
504                next_block_height: self.height,
505                query,
506                callback,
507            })?
508            .recv_response()?;
509
510        self.resource_controller
511            .track_service_oracle_execution(execution_start.elapsed())?;
512        self.resource_controller
513            .track_service_oracle_response(response.len())?;
514
515        Ok(response)
516    }
517}
518
519impl SyncRuntimeInternal<UserServiceInstance> {
520    /// Initializes a service instance with this runtime.
521    fn load_service_instance(
522        &mut self,
523        this: SyncRuntimeHandle<UserServiceInstance>,
524        id: ApplicationId,
525    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
526        match self.loaded_applications.entry(id) {
527            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
528
529            hash_map::Entry::Vacant(entry) => {
530                // First time actually using the application. Let's see if the code was
531                // pre-loaded.
532                let (code, description) = match self.preloaded_applications.entry(id) {
533                    // TODO(#2927): support dynamic loading of modules on the Web
534                    #[cfg(web)]
535                    hash_map::Entry::Vacant(_) => {
536                        drop(this);
537                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
538                            id,
539                        )));
540                    }
541                    #[cfg(not(web))]
542                    hash_map::Entry::Vacant(entry) => {
543                        let (code, description) = self
544                            .execution_state_sender
545                            .send_request(move |callback| ExecutionRequest::LoadService {
546                                id,
547                                callback,
548                            })?
549                            .recv_response()?;
550                        entry.insert((code, description)).clone()
551                    }
552                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
553                };
554                let instance = code.instantiate(this)?;
555
556                self.applications_to_finalize.push(id);
557                Ok(entry
558                    .insert(LoadedApplication::new(instance, description))
559                    .clone())
560            }
561        }
562    }
563}
564
565impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
566    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
567        let handle = self.0.take().expect(
568            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
569        );
570        let runtime = Arc::into_inner(handle.0)?
571            .into_inner()
572            .expect("`SyncRuntime` should run in a single thread which should not panic");
573        Some(runtime)
574    }
575}
576
577impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
578    for SyncRuntimeHandle<UserInstance>
579{
580    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
581        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
582    }
583}
584
585impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
586    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
587        self.0
588            .try_lock()
589            .expect("Synchronous runtimes run on a single execution thread")
590    }
591}
592
593impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
594where
595    Self: ContractOrServiceRuntime,
596{
597    type Read = ();
598    type ReadValueBytes = u32;
599    type ContainsKey = u32;
600    type ContainsKeys = u32;
601    type ReadMultiValuesBytes = u32;
602    type FindKeysByPrefix = u32;
603    type FindKeyValuesByPrefix = u32;
604
605    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
606        let mut this = self.inner();
607        let chain_id = this.chain_id;
608        this.resource_controller.track_runtime_chain_id()?;
609        Ok(chain_id)
610    }
611
612    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
613        let mut this = self.inner();
614        let height = this.height;
615        this.resource_controller.track_runtime_block_height()?;
616        Ok(height)
617    }
618
619    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
620        let mut this = self.inner();
621        let application_id = this.current_application().id;
622        this.resource_controller.track_runtime_application_id()?;
623        Ok(application_id)
624    }
625
626    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
627        let mut this = self.inner();
628        let application_creator_chain_id = this.current_application().description.creator_chain_id;
629        this.resource_controller.track_runtime_application_id()?;
630        Ok(application_creator_chain_id)
631    }
632
633    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
634        let mut this = self.inner();
635        let parameters = this.current_application().description.parameters.clone();
636        this.resource_controller
637            .track_runtime_application_parameters(&parameters)?;
638        Ok(parameters)
639    }
640
641    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
642        let mut this = self.inner();
643        let timestamp = this
644            .execution_state_sender
645            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
646            .recv_response()?;
647        this.resource_controller.track_runtime_timestamp()?;
648        Ok(timestamp)
649    }
650
651    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
652        let mut this = self.inner();
653        let balance = this
654            .execution_state_sender
655            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
656            .recv_response()?;
657        this.resource_controller.track_runtime_balance()?;
658        Ok(balance)
659    }
660
661    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
662        let mut this = self.inner();
663        let balance = this
664            .execution_state_sender
665            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
666            .recv_response()?;
667        this.resource_controller.track_runtime_balance()?;
668        Ok(balance)
669    }
670
671    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
672        let mut this = self.inner();
673        let owner_balances = this
674            .execution_state_sender
675            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
676            .recv_response()?;
677        this.resource_controller
678            .track_runtime_owner_balances(&owner_balances)?;
679        Ok(owner_balances)
680    }
681
682    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
683        let mut this = self.inner();
684        let owners = this
685            .execution_state_sender
686            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
687            .recv_response()?;
688        this.resource_controller.track_runtime_owners(&owners)?;
689        Ok(owners)
690    }
691
692    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
693        let mut this = self.inner();
694        let chain_ownership = this
695            .execution_state_sender
696            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
697            .recv_response()?;
698        this.resource_controller
699            .track_runtime_chain_ownership(&chain_ownership)?;
700        Ok(chain_ownership)
701    }
702
703    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
704        let mut this = self.inner();
705        let id = this.current_application().id;
706        this.resource_controller.track_read_operation()?;
707        let receiver = this
708            .execution_state_sender
709            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
710        let state = this.view_user_states.entry(id).or_default();
711        state.contains_key_queries.register(receiver)
712    }
713
714    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
715        let mut this = self.inner();
716        let id = this.current_application().id;
717        let state = this.view_user_states.entry(id).or_default();
718        let value = state.contains_key_queries.wait(*promise)?;
719        Ok(value)
720    }
721
722    fn contains_keys_new(
723        &mut self,
724        keys: Vec<Vec<u8>>,
725    ) -> Result<Self::ContainsKeys, ExecutionError> {
726        let mut this = self.inner();
727        let id = this.current_application().id;
728        this.resource_controller.track_read_operation()?;
729        let receiver = this
730            .execution_state_sender
731            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
732        let state = this.view_user_states.entry(id).or_default();
733        state.contains_keys_queries.register(receiver)
734    }
735
736    fn contains_keys_wait(
737        &mut self,
738        promise: &Self::ContainsKeys,
739    ) -> Result<Vec<bool>, ExecutionError> {
740        let mut this = self.inner();
741        let id = this.current_application().id;
742        let state = this.view_user_states.entry(id).or_default();
743        let value = state.contains_keys_queries.wait(*promise)?;
744        Ok(value)
745    }
746
747    fn read_multi_values_bytes_new(
748        &mut self,
749        keys: Vec<Vec<u8>>,
750    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
751        let mut this = self.inner();
752        let id = this.current_application().id;
753        this.resource_controller.track_read_operation()?;
754        let receiver = this.execution_state_sender.send_request(move |callback| {
755            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
756        })?;
757        let state = this.view_user_states.entry(id).or_default();
758        state.read_multi_values_queries.register(receiver)
759    }
760
761    fn read_multi_values_bytes_wait(
762        &mut self,
763        promise: &Self::ReadMultiValuesBytes,
764    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
765        let mut this = self.inner();
766        let id = this.current_application().id;
767        let state = this.view_user_states.entry(id).or_default();
768        let values = state.read_multi_values_queries.wait(*promise)?;
769        for value in &values {
770            if let Some(value) = &value {
771                this.resource_controller
772                    .track_bytes_read(value.len() as u64)?;
773            }
774        }
775        Ok(values)
776    }
777
778    fn read_value_bytes_new(
779        &mut self,
780        key: Vec<u8>,
781    ) -> Result<Self::ReadValueBytes, ExecutionError> {
782        let mut this = self.inner();
783        let id = this.current_application().id;
784        this.resource_controller.track_read_operation()?;
785        let receiver = this
786            .execution_state_sender
787            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
788        let state = this.view_user_states.entry(id).or_default();
789        state.read_value_queries.register(receiver)
790    }
791
792    fn read_value_bytes_wait(
793        &mut self,
794        promise: &Self::ReadValueBytes,
795    ) -> Result<Option<Vec<u8>>, ExecutionError> {
796        let mut this = self.inner();
797        let id = this.current_application().id;
798        let value = {
799            let state = this.view_user_states.entry(id).or_default();
800            state.read_value_queries.wait(*promise)?
801        };
802        if let Some(value) = &value {
803            this.resource_controller
804                .track_bytes_read(value.len() as u64)?;
805        }
806        Ok(value)
807    }
808
809    fn find_keys_by_prefix_new(
810        &mut self,
811        key_prefix: Vec<u8>,
812    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
813        let mut this = self.inner();
814        let id = this.current_application().id;
815        this.resource_controller.track_read_operation()?;
816        let receiver = this.execution_state_sender.send_request(move |callback| {
817            ExecutionRequest::FindKeysByPrefix {
818                id,
819                key_prefix,
820                callback,
821            }
822        })?;
823        let state = this.view_user_states.entry(id).or_default();
824        state.find_keys_queries.register(receiver)
825    }
826
827    fn find_keys_by_prefix_wait(
828        &mut self,
829        promise: &Self::FindKeysByPrefix,
830    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
831        let mut this = self.inner();
832        let id = this.current_application().id;
833        let keys = {
834            let state = this.view_user_states.entry(id).or_default();
835            state.find_keys_queries.wait(*promise)?
836        };
837        let mut read_size = 0;
838        for key in &keys {
839            read_size += key.len();
840        }
841        this.resource_controller
842            .track_bytes_read(read_size as u64)?;
843        Ok(keys)
844    }
845
846    fn find_key_values_by_prefix_new(
847        &mut self,
848        key_prefix: Vec<u8>,
849    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
850        let mut this = self.inner();
851        let id = this.current_application().id;
852        this.resource_controller.track_read_operation()?;
853        let receiver = this.execution_state_sender.send_request(move |callback| {
854            ExecutionRequest::FindKeyValuesByPrefix {
855                id,
856                key_prefix,
857                callback,
858            }
859        })?;
860        let state = this.view_user_states.entry(id).or_default();
861        state.find_key_values_queries.register(receiver)
862    }
863
864    fn find_key_values_by_prefix_wait(
865        &mut self,
866        promise: &Self::FindKeyValuesByPrefix,
867    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
868        let mut this = self.inner();
869        let id = this.current_application().id;
870        let state = this.view_user_states.entry(id).or_default();
871        let key_values = state.find_key_values_queries.wait(*promise)?;
872        let mut read_size = 0;
873        for (key, value) in &key_values {
874            read_size += key.len() + value.len();
875        }
876        this.resource_controller
877            .track_bytes_read(read_size as u64)?;
878        Ok(key_values)
879    }
880
881    fn perform_http_request(
882        &mut self,
883        request: http::Request,
884    ) -> Result<http::Response, ExecutionError> {
885        let mut this = self.inner();
886        let app_permissions = this
887            .execution_state_sender
888            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
889            .recv_response()?;
890
891        let app_id = this.current_application().id;
892        ensure!(
893            app_permissions.can_make_http_requests(&app_id),
894            ExecutionError::UnauthorizedApplication(app_id)
895        );
896
897        this.resource_controller.track_http_request()?;
898
899        let response = this
900            .execution_state_sender
901            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
902                request,
903                http_responses_are_oracle_responses:
904                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
905                callback,
906            })?
907            .recv_response()?;
908        Ok(response)
909    }
910
911    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
912        let this = self.inner();
913        this.execution_state_sender
914            .send_request(|callback| ExecutionRequest::AssertBefore {
915                timestamp,
916                callback,
917            })?
918            .recv_response()?
919    }
920
921    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
922        let this = self.inner();
923        let blob_id = hash.into();
924        let content = this
925            .execution_state_sender
926            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
927            .recv_response()?;
928        Ok(content.into_vec_or_clone())
929    }
930
931    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
932        let this = self.inner();
933        let blob_id = hash.into();
934        this.execution_state_sender
935            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
936            .recv_response()?;
937        Ok(())
938    }
939
940    fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
941        Ok(self.inner().allow_application_logs)
942    }
943
944    #[cfg(web)]
945    fn send_log(&mut self, message: String, level: tracing::log::Level) {
946        let this = self.inner();
947        // Fire-and-forget: ignore errors since logging shouldn't affect execution
948        let _ = this
949            .execution_state_sender
950            .unbounded_send(ExecutionRequest::Log { message, level });
951    }
952}
953
954/// An extension trait to determine in compile time the different behaviors between contract and
955/// services in the implementation of [`BaseRuntime`].
956trait ContractOrServiceRuntime {
957    /// Configured to `true` if the HTTP response size should be limited to the oracle response
958    /// size.
959    ///
960    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
961    /// and only storing in the block a shorter oracle response.
962    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
963}
964
965impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
966    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
967}
968
969impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
970    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
971}
972
973impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
974    fn clone(&self) -> Self {
975        SyncRuntimeHandle(self.0.clone())
976    }
977}
978
979impl ContractSyncRuntime {
980    pub(crate) fn new(
981        execution_state_sender: ExecutionStateSender,
982        chain_id: ChainId,
983        refund_grant_to: Option<Account>,
984        resource_controller: ResourceController,
985        action: &UserAction,
986        allow_application_logs: bool,
987    ) -> Self {
988        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
989            SyncRuntimeInternal::new(
990                chain_id,
991                action.height(),
992                action.round(),
993                if let UserAction::Message(context, _) = action {
994                    Some(context.into())
995                } else {
996                    None
997                },
998                execution_state_sender,
999                None,
1000                refund_grant_to,
1001                resource_controller,
1002                action.timestamp(),
1003                allow_application_logs,
1004            ),
1005        )))
1006    }
1007
1008    /// Preloads the code of a contract into the runtime's memory.
1009    pub(crate) fn preload_contract(
1010        &self,
1011        id: ApplicationId,
1012        code: UserContractCode,
1013        description: ApplicationDescription,
1014    ) -> Result<(), ExecutionError> {
1015        let this = self
1016            .0
1017            .as_ref()
1018            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1019        let mut this_guard = this.inner();
1020
1021        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1022            entry.insert((code, description));
1023        }
1024
1025        Ok(())
1026    }
1027
1028    /// Main entry point to start executing a user action.
1029    pub(crate) fn run_action(
1030        mut self,
1031        application_id: ApplicationId,
1032        chain_id: ChainId,
1033        action: UserAction,
1034    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1035        let result = self
1036            .deref_mut()
1037            .run_action(application_id, chain_id, action)?;
1038        let runtime = self
1039            .into_inner()
1040            .expect("Runtime clones should have been freed by now");
1041
1042        Ok((result, runtime.resource_controller))
1043    }
1044}
1045
1046impl ContractSyncRuntimeHandle {
1047    fn run_action(
1048        &mut self,
1049        application_id: ApplicationId,
1050        chain_id: ChainId,
1051        action: UserAction,
1052    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1053        let finalize_context = FinalizeContext {
1054            authenticated_signer: action.signer(),
1055            chain_id,
1056            height: action.height(),
1057            round: action.round(),
1058        };
1059
1060        {
1061            let runtime = self.inner();
1062            assert_eq!(runtime.chain_id, chain_id);
1063            assert_eq!(runtime.height, action.height());
1064        }
1065
1066        let signer = action.signer();
1067        let closure = move |code: &mut UserContractInstance| match action {
1068            UserAction::Instantiate(_context, argument) => {
1069                code.instantiate(argument).map(|()| None)
1070            }
1071            UserAction::Operation(_context, operation) => {
1072                code.execute_operation(operation).map(Option::Some)
1073            }
1074            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1075            UserAction::ProcessStreams(_context, updates) => {
1076                code.process_streams(updates).map(|()| None)
1077            }
1078        };
1079
1080        let result = self.execute(application_id, signer, closure)?;
1081        self.finalize(finalize_context)?;
1082        Ok(result)
1083    }
1084
1085    /// Notifies all loaded applications that execution is finalizing.
1086    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1087        let applications = mem::take(&mut self.inner().applications_to_finalize)
1088            .into_iter()
1089            .rev();
1090
1091        self.inner().is_finalizing = true;
1092
1093        for application in applications {
1094            self.execute(application, context.authenticated_signer, |contract| {
1095                contract.finalize().map(|_| None)
1096            })?;
1097            self.inner().loaded_applications.remove(&application);
1098        }
1099
1100        Ok(())
1101    }
1102
1103    /// Executes a `closure` with the contract code for the `application_id`.
1104    fn execute(
1105        &mut self,
1106        application_id: ApplicationId,
1107        signer: Option<AccountOwner>,
1108        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1109    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1110        let contract = {
1111            let mut runtime = self.inner();
1112            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1113
1114            let status = ApplicationStatus {
1115                caller_id: None,
1116                id: application_id,
1117                description: application.description.clone(),
1118                signer,
1119            };
1120
1121            runtime.push_application(status);
1122
1123            application
1124        };
1125
1126        let result = closure(
1127            &mut contract
1128                .instance
1129                .try_lock()
1130                .expect("Application should not be already executing"),
1131        )?;
1132
1133        let mut runtime = self.inner();
1134        let application_status = runtime.pop_application();
1135        assert_eq!(application_status.caller_id, None);
1136        assert_eq!(application_status.id, application_id);
1137        assert_eq!(application_status.description, contract.description);
1138        assert_eq!(application_status.signer, signer);
1139        assert!(runtime.call_stack.is_empty());
1140
1141        Ok(result)
1142    }
1143}
1144
1145impl ContractRuntime for ContractSyncRuntimeHandle {
1146    fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1147        let this = self.inner();
1148        Ok(this.current_application().signer)
1149    }
1150
1151    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1152        Ok(self
1153            .inner()
1154            .executing_message
1155            .map(|metadata| metadata.is_bouncing))
1156    }
1157
1158    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1159        Ok(self
1160            .inner()
1161            .executing_message
1162            .map(|metadata| metadata.origin))
1163    }
1164
1165    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1166        let this = self.inner();
1167        if this.call_stack.len() <= 1 {
1168            return Ok(None);
1169        }
1170        Ok(this.current_application().caller_id)
1171    }
1172
1173    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1174        Ok(match vm_runtime {
1175            VmRuntime::Wasm => {
1176                self.inner()
1177                    .resource_controller
1178                    .policy()
1179                    .maximum_wasm_fuel_per_block
1180            }
1181            VmRuntime::Evm => {
1182                self.inner()
1183                    .resource_controller
1184                    .policy()
1185                    .maximum_evm_fuel_per_block
1186            }
1187        })
1188    }
1189
1190    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1191        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1192    }
1193
1194    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1195        let mut this = self.inner();
1196        this.resource_controller.track_fuel(fuel, vm_runtime)
1197    }
1198
1199    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1200        let mut this = self.inner();
1201        let application = this.current_application();
1202        let application_id = application.id;
1203        let authenticated_signer = application.signer;
1204        let mut refund_grant_to = this.refund_grant_to;
1205
1206        let grant = this
1207            .resource_controller
1208            .policy()
1209            .total_price(&message.grant)?;
1210        if grant.is_zero() {
1211            refund_grant_to = None;
1212        } else {
1213            this.resource_controller.track_grant(grant)?;
1214        }
1215        let kind = if message.is_tracked {
1216            MessageKind::Tracked
1217        } else {
1218            MessageKind::Simple
1219        };
1220
1221        this.execution_state_sender
1222            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1223                message: OutgoingMessage {
1224                    destination: message.destination,
1225                    authenticated_signer,
1226                    refund_grant_to,
1227                    grant,
1228                    kind,
1229                    message: Message::User {
1230                        application_id,
1231                        bytes: message.message,
1232                    },
1233                },
1234                callback,
1235            })?
1236            .recv_response()?;
1237
1238        Ok(())
1239    }
1240
1241    fn transfer(
1242        &mut self,
1243        source: AccountOwner,
1244        destination: Account,
1245        amount: Amount,
1246    ) -> Result<(), ExecutionError> {
1247        let this = self.inner();
1248        let current_application = this.current_application();
1249        let application_id = current_application.id;
1250        let signer = current_application.signer;
1251
1252        this.execution_state_sender
1253            .send_request(|callback| ExecutionRequest::Transfer {
1254                source,
1255                destination,
1256                amount,
1257                signer,
1258                application_id,
1259                callback,
1260            })?
1261            .recv_response()?;
1262        Ok(())
1263    }
1264
1265    fn claim(
1266        &mut self,
1267        source: Account,
1268        destination: Account,
1269        amount: Amount,
1270    ) -> Result<(), ExecutionError> {
1271        let this = self.inner();
1272        let current_application = this.current_application();
1273        let application_id = current_application.id;
1274        let signer = current_application.signer;
1275
1276        this.execution_state_sender
1277            .send_request(|callback| ExecutionRequest::Claim {
1278                source,
1279                destination,
1280                amount,
1281                signer,
1282                application_id,
1283                callback,
1284            })?
1285            .recv_response()?;
1286        Ok(())
1287    }
1288
1289    fn try_call_application(
1290        &mut self,
1291        authenticated: bool,
1292        callee_id: ApplicationId,
1293        argument: Vec<u8>,
1294    ) -> Result<Vec<u8>, ExecutionError> {
1295        let contract = self
1296            .inner()
1297            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1298
1299        let value = contract
1300            .try_lock()
1301            .expect("Applications should not have reentrant calls")
1302            .execute_operation(argument)?;
1303
1304        self.inner().finish_call()?;
1305
1306        Ok(value)
1307    }
1308
1309    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1310        let mut this = self.inner();
1311        ensure!(
1312            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1313            ExecutionError::StreamNameTooLong
1314        );
1315        let application_id = GenericApplicationId::User(this.current_application().id);
1316        let stream_id = StreamId {
1317            stream_name,
1318            application_id,
1319        };
1320        let value_len = value.len() as u64;
1321        let index = this
1322            .execution_state_sender
1323            .send_request(|callback| ExecutionRequest::Emit {
1324                stream_id,
1325                value,
1326                callback,
1327            })?
1328            .recv_response()?;
1329        // TODO(#365): Consider separate event fee categories.
1330        this.resource_controller.track_bytes_written(value_len)?;
1331        Ok(index)
1332    }
1333
1334    fn read_event(
1335        &mut self,
1336        chain_id: ChainId,
1337        stream_name: StreamName,
1338        index: u32,
1339    ) -> Result<Vec<u8>, ExecutionError> {
1340        let mut this = self.inner();
1341        ensure!(
1342            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1343            ExecutionError::StreamNameTooLong
1344        );
1345        let application_id = GenericApplicationId::User(this.current_application().id);
1346        let stream_id = StreamId {
1347            stream_name,
1348            application_id,
1349        };
1350        let event_id = EventId {
1351            stream_id,
1352            index,
1353            chain_id,
1354        };
1355        let event = this
1356            .execution_state_sender
1357            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1358            .recv_response()?;
1359        // TODO(#365): Consider separate event fee categories.
1360        this.resource_controller
1361            .track_bytes_read(event.len() as u64)?;
1362        Ok(event)
1363    }
1364
1365    fn subscribe_to_events(
1366        &mut self,
1367        chain_id: ChainId,
1368        application_id: ApplicationId,
1369        stream_name: StreamName,
1370    ) -> Result<(), ExecutionError> {
1371        let this = self.inner();
1372        ensure!(
1373            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1374            ExecutionError::StreamNameTooLong
1375        );
1376        let stream_id = StreamId {
1377            stream_name,
1378            application_id: application_id.into(),
1379        };
1380        let subscriber_app_id = this.current_application().id;
1381        this.execution_state_sender
1382            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1383                chain_id,
1384                stream_id,
1385                subscriber_app_id,
1386                callback,
1387            })?
1388            .recv_response()?;
1389        Ok(())
1390    }
1391
1392    fn unsubscribe_from_events(
1393        &mut self,
1394        chain_id: ChainId,
1395        application_id: ApplicationId,
1396        stream_name: StreamName,
1397    ) -> Result<(), ExecutionError> {
1398        let this = self.inner();
1399        ensure!(
1400            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1401            ExecutionError::StreamNameTooLong
1402        );
1403        let stream_id = StreamId {
1404            stream_name,
1405            application_id: application_id.into(),
1406        };
1407        let subscriber_app_id = this.current_application().id;
1408        this.execution_state_sender
1409            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1410                chain_id,
1411                stream_id,
1412                subscriber_app_id,
1413                callback,
1414            })?
1415            .recv_response()?;
1416        Ok(())
1417    }
1418
1419    fn query_service(
1420        &mut self,
1421        application_id: ApplicationId,
1422        query: Vec<u8>,
1423    ) -> Result<Vec<u8>, ExecutionError> {
1424        let mut this = self.inner();
1425
1426        let app_permissions = this
1427            .execution_state_sender
1428            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1429            .recv_response()?;
1430
1431        let app_id = this.current_application().id;
1432        ensure!(
1433            app_permissions.can_call_services(&app_id),
1434            ExecutionError::UnauthorizedApplication(app_id)
1435        );
1436
1437        this.resource_controller.track_service_oracle_call()?;
1438
1439        this.run_service_oracle_query(application_id, query)
1440    }
1441
1442    fn open_chain(
1443        &mut self,
1444        ownership: ChainOwnership,
1445        application_permissions: ApplicationPermissions,
1446        balance: Amount,
1447    ) -> Result<ChainId, ExecutionError> {
1448        let parent_id = self.inner().chain_id;
1449        let block_height = self.block_height()?;
1450
1451        let timestamp = self.inner().user_context;
1452
1453        let chain_id = self
1454            .inner()
1455            .execution_state_sender
1456            .send_request(|callback| ExecutionRequest::OpenChain {
1457                ownership,
1458                balance,
1459                parent_id,
1460                block_height,
1461                timestamp,
1462                application_permissions,
1463                callback,
1464            })?
1465            .recv_response()?;
1466
1467        Ok(chain_id)
1468    }
1469
1470    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1471        let this = self.inner();
1472        let application_id = this.current_application().id;
1473        this.execution_state_sender
1474            .send_request(|callback| ExecutionRequest::CloseChain {
1475                application_id,
1476                callback,
1477            })?
1478            .recv_response()?
1479    }
1480
1481    fn change_application_permissions(
1482        &mut self,
1483        application_permissions: ApplicationPermissions,
1484    ) -> Result<(), ExecutionError> {
1485        let this = self.inner();
1486        let application_id = this.current_application().id;
1487        this.execution_state_sender
1488            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1489                application_id,
1490                application_permissions,
1491                callback,
1492            })?
1493            .recv_response()?
1494    }
1495
1496    fn create_application(
1497        &mut self,
1498        module_id: ModuleId,
1499        parameters: Vec<u8>,
1500        argument: Vec<u8>,
1501        required_application_ids: Vec<ApplicationId>,
1502    ) -> Result<ApplicationId, ExecutionError> {
1503        let chain_id = self.inner().chain_id;
1504        let block_height = self.block_height()?;
1505
1506        let CreateApplicationResult { app_id } = self
1507            .inner()
1508            .execution_state_sender
1509            .send_request(move |callback| ExecutionRequest::CreateApplication {
1510                chain_id,
1511                block_height,
1512                module_id,
1513                parameters,
1514                required_application_ids,
1515                callback,
1516            })?
1517            .recv_response()??;
1518
1519        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1520
1521        contract
1522            .try_lock()
1523            .expect("Applications should not have reentrant calls")
1524            .instantiate(argument)?;
1525
1526        self.inner().finish_call()?;
1527
1528        Ok(app_id)
1529    }
1530
1531    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1532        let blob = Blob::new_data(bytes);
1533        let blob_id = blob.id();
1534        let this = self.inner();
1535        this.execution_state_sender
1536            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1537            .recv_response()?;
1538        Ok(DataBlobHash(blob_id.hash))
1539    }
1540
1541    fn publish_module(
1542        &mut self,
1543        contract: Bytecode,
1544        service: Bytecode,
1545        vm_runtime: VmRuntime,
1546    ) -> Result<ModuleId, ExecutionError> {
1547        let (blobs, module_id) =
1548            crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1549        let this = self.inner();
1550        for blob in blobs {
1551            this.execution_state_sender
1552                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1553                .recv_response()?;
1554        }
1555        Ok(module_id)
1556    }
1557
1558    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1559        let this = self.inner();
1560        let round = this.round;
1561        this.execution_state_sender
1562            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1563            .recv_response()
1564    }
1565
1566    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1567        let mut this = self.inner();
1568        let id = this.current_application().id;
1569        let state = this.view_user_states.entry(id).or_default();
1570        state.force_all_pending_queries()?;
1571        this.resource_controller.track_write_operations(
1572            batch
1573                .num_operations()
1574                .try_into()
1575                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1576        )?;
1577        this.resource_controller
1578            .track_bytes_written(batch.size() as u64)?;
1579        this.execution_state_sender
1580            .send_request(|callback| ExecutionRequest::WriteBatch {
1581                id,
1582                batch,
1583                callback,
1584            })?
1585            .recv_response()?;
1586        Ok(())
1587    }
1588}
1589
1590impl ServiceSyncRuntime {
1591    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1592    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1593        Self::new_with_deadline(execution_state_sender, context, None)
1594    }
1595
1596    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1597    pub fn new_with_deadline(
1598        execution_state_sender: ExecutionStateSender,
1599        context: QueryContext,
1600        deadline: Option<Instant>,
1601    ) -> Self {
1602        // Query the allow_application_logs setting from the execution state.
1603        let allow_application_logs = execution_state_sender
1604            .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1605            .ok()
1606            .and_then(|receiver| receiver.recv_response().ok())
1607            .unwrap_or(false);
1608
1609        let runtime = SyncRuntime(Some(
1610            SyncRuntimeInternal::new(
1611                context.chain_id,
1612                context.next_block_height,
1613                None,
1614                None,
1615                execution_state_sender,
1616                deadline,
1617                None,
1618                ResourceController::default(),
1619                (),
1620                allow_application_logs,
1621            )
1622            .into(),
1623        ));
1624
1625        ServiceSyncRuntime {
1626            runtime,
1627            current_context: context,
1628        }
1629    }
1630
1631    /// Preloads the code of a service into the runtime's memory.
1632    pub(crate) fn preload_service(
1633        &self,
1634        id: ApplicationId,
1635        code: UserServiceCode,
1636        description: ApplicationDescription,
1637    ) -> Result<(), ExecutionError> {
1638        let this = self
1639            .runtime
1640            .0
1641            .as_ref()
1642            .expect("services shouldn't be preloaded while the runtime is being dropped");
1643        let mut this_guard = this.inner();
1644
1645        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1646            entry.insert((code, description));
1647        }
1648
1649        Ok(())
1650    }
1651
1652    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1653    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1654        while let Ok(request) = incoming_requests.recv() {
1655            let ServiceRuntimeRequest::Query {
1656                application_id,
1657                context,
1658                query,
1659                callback,
1660            } = request;
1661
1662            let result = self
1663                .prepare_for_query(context)
1664                .and_then(|()| self.run_query(application_id, query));
1665
1666            if let Err(err) = callback.send(result) {
1667                tracing::debug!(%err, "Receiver for query result has been dropped");
1668            }
1669        }
1670    }
1671
1672    /// Prepares the runtime to query an application.
1673    pub(crate) fn prepare_for_query(
1674        &mut self,
1675        new_context: QueryContext,
1676    ) -> Result<(), ExecutionError> {
1677        let expected_context = QueryContext {
1678            local_time: new_context.local_time,
1679            ..self.current_context
1680        };
1681
1682        if new_context != expected_context {
1683            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1684            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1685        } else {
1686            self.handle_mut()
1687                .inner()
1688                .execution_state_sender
1689                .send_request(|callback| ExecutionRequest::SetLocalTime {
1690                    local_time: new_context.local_time,
1691                    callback,
1692                })?
1693                .recv_response()?;
1694        }
1695        Ok(())
1696    }
1697
1698    /// Queries an application specified by its [`ApplicationId`].
1699    pub(crate) fn run_query(
1700        &mut self,
1701        application_id: ApplicationId,
1702        query: Vec<u8>,
1703    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1704        let this = self.handle_mut();
1705        let response = this.try_query_application(application_id, query)?;
1706        let operations = mem::take(&mut this.inner().scheduled_operations);
1707
1708        Ok(QueryOutcome {
1709            response,
1710            operations,
1711        })
1712    }
1713
1714    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1715    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1716        self.runtime.0.as_mut().expect(
1717            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1718        )
1719    }
1720}
1721
1722impl ServiceRuntime for ServiceSyncRuntimeHandle {
1723    /// Note that queries are not available from writable contexts.
1724    fn try_query_application(
1725        &mut self,
1726        queried_id: ApplicationId,
1727        argument: Vec<u8>,
1728    ) -> Result<Vec<u8>, ExecutionError> {
1729        let service = {
1730            let mut this = self.inner();
1731
1732            // Load the application.
1733            let application = this.load_service_instance(self.clone(), queried_id)?;
1734            // Make the call to user code.
1735            this.push_application(ApplicationStatus {
1736                caller_id: None,
1737                id: queried_id,
1738                description: application.description,
1739                signer: None,
1740            });
1741            application.instance
1742        };
1743        let response = service
1744            .try_lock()
1745            .expect("Applications should not have reentrant calls")
1746            .handle_query(argument)?;
1747        self.inner().pop_application();
1748        Ok(response)
1749    }
1750
1751    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1752        let mut this = self.inner();
1753        let application_id = this.current_application().id;
1754
1755        this.scheduled_operations.push(Operation::User {
1756            application_id,
1757            bytes: operation,
1758        });
1759
1760        Ok(())
1761    }
1762
1763    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1764        if let Some(deadline) = self.inner().deadline {
1765            if Instant::now() >= deadline {
1766                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1767            }
1768        }
1769        Ok(())
1770    }
1771}
1772
1773/// A request to the service runtime actor.
1774pub enum ServiceRuntimeRequest {
1775    Query {
1776        application_id: ApplicationId,
1777        context: QueryContext,
1778        query: Vec<u8>,
1779        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1780    },
1781}
1782
1783/// The origin of the execution.
1784#[derive(Clone, Copy, Debug)]
1785struct ExecutingMessage {
1786    is_bouncing: bool,
1787    origin: ChainId,
1788}
1789
1790impl From<&MessageContext> for ExecutingMessage {
1791    fn from(context: &MessageContext) -> Self {
1792        ExecutingMessage {
1793            is_bouncing: context.is_bouncing,
1794            origin: context.origin,
1795        }
1796    }
1797}
1798
1799/// Creates a compressed contract and service bytecode synchronously.
1800pub fn create_bytecode_blobs_sync(
1801    contract: Bytecode,
1802    service: Bytecode,
1803    vm_runtime: VmRuntime,
1804) -> (Vec<Blob>, ModuleId) {
1805    match vm_runtime {
1806        VmRuntime::Wasm => {
1807            let compressed_contract = contract.compress();
1808            let compressed_service = service.compress();
1809            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1810            let service_blob = Blob::new_service_bytecode(compressed_service);
1811            let module_id =
1812                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1813            (vec![contract_blob, service_blob], module_id)
1814        }
1815        VmRuntime::Evm => {
1816            let compressed_contract = contract.compress();
1817            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1818            let module_id = ModuleId::new(
1819                evm_contract_blob.id().hash,
1820                evm_contract_blob.id().hash,
1821                vm_runtime,
1822            );
1823            (vec![evm_contract_blob], module_id)
1824        }
1825    }
1826}