linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29    execution::UserAction,
30    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31    resources::ResourceController,
32    system::CreateApplicationResult,
33    util::{ReceiverExt, UnboundedSenderExt},
34    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45    type UserContext;
46    type Code;
47}
48
49impl WithContext for UserContractInstance {
50    type UserContext = Timestamp;
51    type Code = UserContractCode;
52}
53
54impl WithContext for UserServiceInstance {
55    type UserContext = ();
56    type Code = UserServiceCode;
57}
58
59#[cfg(test)]
60impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
61    type UserContext = ();
62    type Code = ();
63}
64
65#[derive(Debug)]
66pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
67
68pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
69
70pub struct ServiceSyncRuntime {
71    runtime: SyncRuntime<UserServiceInstance>,
72    current_context: QueryContext,
73}
74
75#[derive(Debug)]
76pub struct SyncRuntimeHandle<UserInstance: WithContext>(
77    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
78);
79
80pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
81pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
82
83/// Runtime data tracked during the execution of a transaction on the synchronous thread.
84#[derive(Debug)]
85pub struct SyncRuntimeInternal<UserInstance: WithContext> {
86    /// The current chain ID.
87    chain_id: ChainId,
88    /// The height of the next block that will be added to this chain. During operations
89    /// and messages, this is the current block height.
90    height: BlockHeight,
91    /// The current consensus round. Only available during block validation in multi-leader rounds.
92    round: Option<u32>,
93    /// The current message being executed, if there is one.
94    #[debug(skip_if = Option::is_none)]
95    executing_message: Option<ExecutingMessage>,
96
97    /// How to interact with the storage view of the execution state.
98    execution_state_sender: ExecutionStateSender,
99
100    /// If applications are being finalized.
101    ///
102    /// If [`true`], disables cross-application calls.
103    is_finalizing: bool,
104    /// Applications that need to be finalized.
105    applications_to_finalize: Vec<ApplicationId>,
106
107    /// Application instances loaded in this transaction.
108    preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
109    /// Application instances loaded in this transaction.
110    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
111    /// The current stack of application descriptions.
112    call_stack: Vec<ApplicationStatus>,
113    /// The set of the IDs of the applications that are in the `call_stack`.
114    active_applications: HashSet<ApplicationId>,
115    /// The operations scheduled during this query.
116    scheduled_operations: Vec<Operation>,
117
118    /// Track application states based on views.
119    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
120
121    /// The deadline this runtime should finish executing.
122    ///
123    /// Used to limit the execution time of services running as oracles.
124    deadline: Option<Instant>,
125
126    /// Where to send a refund for the unused part of the grant after execution, if any.
127    #[debug(skip_if = Option::is_none)]
128    refund_grant_to: Option<Account>,
129    /// Controller to track fuel and storage consumption.
130    resource_controller: ResourceController,
131    /// Additional context for the runtime.
132    user_context: UserInstance::UserContext,
133    /// Whether contract log messages should be output.
134    allow_application_logs: bool,
135}
136
137/// The runtime status of an application.
138#[derive(Debug)]
139struct ApplicationStatus {
140    /// The caller application ID, if forwarded during the call.
141    caller_id: Option<ApplicationId>,
142    /// The application ID.
143    id: ApplicationId,
144    /// The application description.
145    description: ApplicationDescription,
146    /// The authenticated signer for the execution thread, if any.
147    signer: Option<AccountOwner>,
148}
149
150/// A loaded application instance.
151#[derive(Debug)]
152struct LoadedApplication<Instance> {
153    instance: Arc<Mutex<Instance>>,
154    description: ApplicationDescription,
155}
156
157impl<Instance> LoadedApplication<Instance> {
158    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
159    fn new(instance: Instance, description: ApplicationDescription) -> Self {
160        LoadedApplication {
161            instance: Arc::new(Mutex::new(instance)),
162            description,
163        }
164    }
165}
166
167impl<Instance> Clone for LoadedApplication<Instance> {
168    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
169    // bound
170    fn clone(&self) -> Self {
171        LoadedApplication {
172            instance: self.instance.clone(),
173            description: self.description.clone(),
174        }
175    }
176}
177
178#[derive(Debug)]
179enum Promise<T> {
180    Ready(T),
181    Pending(Receiver<T>),
182}
183
184impl<T> Promise<T> {
185    fn force(&mut self) -> Result<(), ExecutionError> {
186        if let Promise::Pending(receiver) = self {
187            let value = receiver
188                .recv_ref()
189                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
190            *self = Promise::Ready(value);
191        }
192        Ok(())
193    }
194
195    fn read(self) -> Result<T, ExecutionError> {
196        match self {
197            Promise::Pending(receiver) => {
198                let value = receiver.recv_response()?;
199                Ok(value)
200            }
201            Promise::Ready(value) => Ok(value),
202        }
203    }
204}
205
206/// Manages a set of pending queries returning values of type `T`.
207#[derive(Debug, Default)]
208struct QueryManager<T> {
209    /// The queries in progress.
210    pending_queries: BTreeMap<u32, Promise<T>>,
211    /// The number of queries ever registered so far. Used for the index of the next query.
212    query_count: u32,
213    /// The number of active queries.
214    active_query_count: u32,
215}
216
217impl<T> QueryManager<T> {
218    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
219        let id = self.query_count;
220        self.pending_queries.insert(id, Promise::Pending(receiver));
221        self.query_count = self
222            .query_count
223            .checked_add(1)
224            .ok_or(ArithmeticError::Overflow)?;
225        self.active_query_count = self
226            .active_query_count
227            .checked_add(1)
228            .ok_or(ArithmeticError::Overflow)?;
229        Ok(id)
230    }
231
232    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
233        let promise = self
234            .pending_queries
235            .remove(&id)
236            .ok_or(ExecutionError::InvalidPromise)?;
237        let value = promise.read()?;
238        self.active_query_count -= 1;
239        Ok(value)
240    }
241
242    fn force_all(&mut self) -> Result<(), ExecutionError> {
243        for promise in self.pending_queries.values_mut() {
244            promise.force()?;
245        }
246        Ok(())
247    }
248}
249
250type Keys = Vec<Vec<u8>>;
251type Value = Vec<u8>;
252type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
253
254#[derive(Debug, Default)]
255struct ViewUserState {
256    /// The contains-key queries in progress.
257    contains_key_queries: QueryManager<bool>,
258    /// The contains-keys queries in progress.
259    contains_keys_queries: QueryManager<Vec<bool>>,
260    /// The read-value queries in progress.
261    read_value_queries: QueryManager<Option<Value>>,
262    /// The read-multi-values queries in progress.
263    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
264    /// The find-keys queries in progress.
265    find_keys_queries: QueryManager<Keys>,
266    /// The find-key-values queries in progress.
267    find_key_values_queries: QueryManager<KeyValues>,
268}
269
270impl ViewUserState {
271    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
272        self.contains_key_queries.force_all()?;
273        self.contains_keys_queries.force_all()?;
274        self.read_value_queries.force_all()?;
275        self.read_multi_values_queries.force_all()?;
276        self.find_keys_queries.force_all()?;
277        self.find_key_values_queries.force_all()?;
278        Ok(())
279    }
280}
281
282impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
283    type Target = SyncRuntimeHandle<UserInstance>;
284
285    fn deref(&self) -> &Self::Target {
286        self.0.as_ref().expect(
287            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288        )
289    }
290}
291
292impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
293    fn deref_mut(&mut self) -> &mut Self::Target {
294        self.0.as_mut().expect(
295            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
296        )
297    }
298}
299
300impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
301    fn drop(&mut self) {
302        // Ensure the `loaded_applications` are cleared to prevent circular references in
303        // the runtime
304        if let Some(handle) = self.0.take() {
305            handle.inner().loaded_applications.clear();
306        }
307    }
308}
309
310impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
311    #[expect(clippy::too_many_arguments)]
312    fn new(
313        chain_id: ChainId,
314        height: BlockHeight,
315        round: Option<u32>,
316        executing_message: Option<ExecutingMessage>,
317        execution_state_sender: ExecutionStateSender,
318        deadline: Option<Instant>,
319        refund_grant_to: Option<Account>,
320        resource_controller: ResourceController,
321        user_context: UserInstance::UserContext,
322        allow_application_logs: bool,
323    ) -> Self {
324        Self {
325            chain_id,
326            height,
327            round,
328            executing_message,
329            execution_state_sender,
330            is_finalizing: false,
331            applications_to_finalize: Vec::new(),
332            preloaded_applications: HashMap::new(),
333            loaded_applications: HashMap::new(),
334            call_stack: Vec::new(),
335            active_applications: HashSet::new(),
336            view_user_states: BTreeMap::new(),
337            deadline,
338            refund_grant_to,
339            resource_controller,
340            scheduled_operations: Vec::new(),
341            user_context,
342            allow_application_logs,
343        }
344    }
345
346    /// Returns the [`ApplicationStatus`] of the current application.
347    ///
348    /// The current application is the last to be pushed to the `call_stack`.
349    ///
350    /// # Panics
351    ///
352    /// If the call stack is empty.
353    fn current_application(&self) -> &ApplicationStatus {
354        self.call_stack
355            .last()
356            .expect("Call stack is unexpectedly empty")
357    }
358
359    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
360    ///
361    /// Ensures the application's ID is also tracked in the `active_applications` set.
362    fn push_application(&mut self, status: ApplicationStatus) {
363        self.active_applications.insert(status.id);
364        self.call_stack.push(status);
365    }
366
367    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
368    ///
369    /// Ensures the application's ID is also removed from the `active_applications` set.
370    ///
371    /// # Panics
372    ///
373    /// If the call stack is empty.
374    fn pop_application(&mut self) -> ApplicationStatus {
375        let status = self
376            .call_stack
377            .pop()
378            .expect("Can't remove application from empty call stack");
379        assert!(self.active_applications.remove(&status.id));
380        status
381    }
382
383    /// Ensures that a call to `application_id` is not-reentrant.
384    ///
385    /// Returns an error if there already is an entry for `application_id` in the call stack.
386    fn check_for_reentrancy(
387        &mut self,
388        application_id: ApplicationId,
389    ) -> Result<(), ExecutionError> {
390        ensure!(
391            !self.active_applications.contains(&application_id),
392            ExecutionError::ReentrantCall(application_id)
393        );
394        Ok(())
395    }
396}
397
398impl SyncRuntimeInternal<UserContractInstance> {
399    /// Loads a contract instance, initializing it with this runtime if needed.
400    fn load_contract_instance(
401        &mut self,
402        this: SyncRuntimeHandle<UserContractInstance>,
403        id: ApplicationId,
404    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
405        match self.loaded_applications.entry(id) {
406            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
407
408            hash_map::Entry::Vacant(entry) => {
409                // First time actually using the application. Let's see if the code was
410                // pre-loaded.
411                let (code, description) = match self.preloaded_applications.entry(id) {
412                    // TODO(#2927): support dynamic loading of modules on the Web
413                    #[cfg(web)]
414                    hash_map::Entry::Vacant(_) => {
415                        drop(this);
416                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
417                            id,
418                        )));
419                    }
420                    #[cfg(not(web))]
421                    hash_map::Entry::Vacant(entry) => {
422                        let (code, description) = self
423                            .execution_state_sender
424                            .send_request(move |callback| ExecutionRequest::LoadContract {
425                                id,
426                                callback,
427                            })?
428                            .recv_response()?;
429                        entry.insert((code, description)).clone()
430                    }
431                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
432                };
433                let instance = code.instantiate(this)?;
434
435                self.applications_to_finalize.push(id);
436                Ok(entry
437                    .insert(LoadedApplication::new(instance, description))
438                    .clone())
439            }
440        }
441    }
442
443    /// Configures the runtime for executing a call to a different contract.
444    fn prepare_for_call(
445        &mut self,
446        this: ContractSyncRuntimeHandle,
447        authenticated: bool,
448        callee_id: ApplicationId,
449    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
450        self.check_for_reentrancy(callee_id)?;
451
452        ensure!(
453            !self.is_finalizing,
454            ExecutionError::CrossApplicationCallInFinalize {
455                caller_id: Box::new(self.current_application().id),
456                callee_id: Box::new(callee_id),
457            }
458        );
459
460        // Load the application.
461        let application = self.load_contract_instance(this, callee_id)?;
462
463        let caller = self.current_application();
464        let caller_id = caller.id;
465        let caller_signer = caller.signer;
466        // Make the call to user code.
467        let authenticated_signer = match caller_signer {
468            Some(signer) if authenticated => Some(signer),
469            _ => None,
470        };
471        let authenticated_caller_id = authenticated.then_some(caller_id);
472        self.push_application(ApplicationStatus {
473            caller_id: authenticated_caller_id,
474            id: callee_id,
475            description: application.description,
476            // Allow further nested calls to be authenticated if this one is.
477            signer: authenticated_signer,
478        });
479        Ok(application.instance)
480    }
481
482    /// Cleans up the runtime after the execution of a call to a different contract.
483    fn finish_call(&mut self) -> Result<(), ExecutionError> {
484        self.pop_application();
485        Ok(())
486    }
487
488    /// Runs the service in a separate thread as an oracle.
489    fn run_service_oracle_query(
490        &mut self,
491        application_id: ApplicationId,
492        query: Vec<u8>,
493    ) -> Result<Vec<u8>, ExecutionError> {
494        let timeout = self
495            .resource_controller
496            .remaining_service_oracle_execution_time()?;
497        let execution_start = Instant::now();
498        let deadline = Some(execution_start + timeout);
499        let response = self
500            .execution_state_sender
501            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
502                deadline,
503                application_id,
504                next_block_height: self.height,
505                query,
506                callback,
507            })?
508            .recv_response()?;
509
510        self.resource_controller
511            .track_service_oracle_execution(execution_start.elapsed())?;
512        self.resource_controller
513            .track_service_oracle_response(response.len())?;
514
515        Ok(response)
516    }
517}
518
519impl SyncRuntimeInternal<UserServiceInstance> {
520    /// Initializes a service instance with this runtime.
521    fn load_service_instance(
522        &mut self,
523        this: SyncRuntimeHandle<UserServiceInstance>,
524        id: ApplicationId,
525    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
526        match self.loaded_applications.entry(id) {
527            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
528
529            hash_map::Entry::Vacant(entry) => {
530                // First time actually using the application. Let's see if the code was
531                // pre-loaded.
532                let (code, description) = match self.preloaded_applications.entry(id) {
533                    // TODO(#2927): support dynamic loading of modules on the Web
534                    #[cfg(web)]
535                    hash_map::Entry::Vacant(_) => {
536                        drop(this);
537                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
538                            id,
539                        )));
540                    }
541                    #[cfg(not(web))]
542                    hash_map::Entry::Vacant(entry) => {
543                        let (code, description) = self
544                            .execution_state_sender
545                            .send_request(move |callback| ExecutionRequest::LoadService {
546                                id,
547                                callback,
548                            })?
549                            .recv_response()?;
550                        entry.insert((code, description)).clone()
551                    }
552                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
553                };
554                let instance = code.instantiate(this)?;
555
556                self.applications_to_finalize.push(id);
557                Ok(entry
558                    .insert(LoadedApplication::new(instance, description))
559                    .clone())
560            }
561        }
562    }
563}
564
565impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
566    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
567        let handle = self.0.take().expect(
568            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
569        );
570        let runtime = Arc::into_inner(handle.0)?
571            .into_inner()
572            .expect("`SyncRuntime` should run in a single thread which should not panic");
573        Some(runtime)
574    }
575}
576
577impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
578    for SyncRuntimeHandle<UserInstance>
579{
580    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
581        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
582    }
583}
584
585impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
586    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
587        self.0
588            .try_lock()
589            .expect("Synchronous runtimes run on a single execution thread")
590    }
591}
592
593impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
594where
595    Self: ContractOrServiceRuntime,
596{
597    type Read = ();
598    type ReadValueBytes = u32;
599    type ContainsKey = u32;
600    type ContainsKeys = u32;
601    type ReadMultiValuesBytes = u32;
602    type FindKeysByPrefix = u32;
603    type FindKeyValuesByPrefix = u32;
604
605    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
606        let mut this = self.inner();
607        let chain_id = this.chain_id;
608        this.resource_controller.track_runtime_chain_id()?;
609        Ok(chain_id)
610    }
611
612    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
613        let mut this = self.inner();
614        let height = this.height;
615        this.resource_controller.track_runtime_block_height()?;
616        Ok(height)
617    }
618
619    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
620        let mut this = self.inner();
621        let application_id = this.current_application().id;
622        this.resource_controller.track_runtime_application_id()?;
623        Ok(application_id)
624    }
625
626    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
627        let mut this = self.inner();
628        let application_creator_chain_id = this.current_application().description.creator_chain_id;
629        this.resource_controller.track_runtime_application_id()?;
630        Ok(application_creator_chain_id)
631    }
632
633    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
634        let mut this = self.inner();
635        let parameters = this.current_application().description.parameters.clone();
636        this.resource_controller
637            .track_runtime_application_parameters(&parameters)?;
638        Ok(parameters)
639    }
640
641    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
642        let mut this = self.inner();
643        let timestamp = this
644            .execution_state_sender
645            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
646            .recv_response()?;
647        this.resource_controller.track_runtime_timestamp()?;
648        Ok(timestamp)
649    }
650
651    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
652        let mut this = self.inner();
653        let balance = this
654            .execution_state_sender
655            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
656            .recv_response()?;
657        this.resource_controller.track_runtime_balance()?;
658        Ok(balance)
659    }
660
661    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
662        let mut this = self.inner();
663        let balance = this
664            .execution_state_sender
665            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
666            .recv_response()?;
667        this.resource_controller.track_runtime_balance()?;
668        Ok(balance)
669    }
670
671    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
672        let mut this = self.inner();
673        let owner_balances = this
674            .execution_state_sender
675            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
676            .recv_response()?;
677        this.resource_controller
678            .track_runtime_owner_balances(&owner_balances)?;
679        Ok(owner_balances)
680    }
681
682    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
683        let mut this = self.inner();
684        let owners = this
685            .execution_state_sender
686            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
687            .recv_response()?;
688        this.resource_controller.track_runtime_owners(&owners)?;
689        Ok(owners)
690    }
691
692    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
693        let mut this = self.inner();
694        let chain_ownership = this
695            .execution_state_sender
696            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
697            .recv_response()?;
698        this.resource_controller
699            .track_runtime_chain_ownership(&chain_ownership)?;
700        Ok(chain_ownership)
701    }
702
703    fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
704        let this = self.inner();
705        let application_permissions = this
706            .execution_state_sender
707            .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
708            .recv_response()?;
709        Ok(application_permissions)
710    }
711
712    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
713        let mut this = self.inner();
714        let id = this.current_application().id;
715        this.resource_controller.track_read_operation()?;
716        let receiver = this
717            .execution_state_sender
718            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
719        let state = this.view_user_states.entry(id).or_default();
720        state.contains_key_queries.register(receiver)
721    }
722
723    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
724        let mut this = self.inner();
725        let id = this.current_application().id;
726        let state = this.view_user_states.entry(id).or_default();
727        let value = state.contains_key_queries.wait(*promise)?;
728        Ok(value)
729    }
730
731    fn contains_keys_new(
732        &mut self,
733        keys: Vec<Vec<u8>>,
734    ) -> Result<Self::ContainsKeys, ExecutionError> {
735        let mut this = self.inner();
736        let id = this.current_application().id;
737        this.resource_controller.track_read_operation()?;
738        let receiver = this
739            .execution_state_sender
740            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
741        let state = this.view_user_states.entry(id).or_default();
742        state.contains_keys_queries.register(receiver)
743    }
744
745    fn contains_keys_wait(
746        &mut self,
747        promise: &Self::ContainsKeys,
748    ) -> Result<Vec<bool>, ExecutionError> {
749        let mut this = self.inner();
750        let id = this.current_application().id;
751        let state = this.view_user_states.entry(id).or_default();
752        let value = state.contains_keys_queries.wait(*promise)?;
753        Ok(value)
754    }
755
756    fn read_multi_values_bytes_new(
757        &mut self,
758        keys: Vec<Vec<u8>>,
759    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
760        let mut this = self.inner();
761        let id = this.current_application().id;
762        this.resource_controller.track_read_operation()?;
763        let receiver = this.execution_state_sender.send_request(move |callback| {
764            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
765        })?;
766        let state = this.view_user_states.entry(id).or_default();
767        state.read_multi_values_queries.register(receiver)
768    }
769
770    fn read_multi_values_bytes_wait(
771        &mut self,
772        promise: &Self::ReadMultiValuesBytes,
773    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
774        let mut this = self.inner();
775        let id = this.current_application().id;
776        let state = this.view_user_states.entry(id).or_default();
777        let values = state.read_multi_values_queries.wait(*promise)?;
778        for value in &values {
779            if let Some(value) = &value {
780                this.resource_controller
781                    .track_bytes_read(value.len() as u64)?;
782            }
783        }
784        Ok(values)
785    }
786
787    fn read_value_bytes_new(
788        &mut self,
789        key: Vec<u8>,
790    ) -> Result<Self::ReadValueBytes, ExecutionError> {
791        let mut this = self.inner();
792        let id = this.current_application().id;
793        this.resource_controller.track_read_operation()?;
794        let receiver = this
795            .execution_state_sender
796            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
797        let state = this.view_user_states.entry(id).or_default();
798        state.read_value_queries.register(receiver)
799    }
800
801    fn read_value_bytes_wait(
802        &mut self,
803        promise: &Self::ReadValueBytes,
804    ) -> Result<Option<Vec<u8>>, ExecutionError> {
805        let mut this = self.inner();
806        let id = this.current_application().id;
807        let value = {
808            let state = this.view_user_states.entry(id).or_default();
809            state.read_value_queries.wait(*promise)?
810        };
811        if let Some(value) = &value {
812            this.resource_controller
813                .track_bytes_read(value.len() as u64)?;
814        }
815        Ok(value)
816    }
817
818    fn find_keys_by_prefix_new(
819        &mut self,
820        key_prefix: Vec<u8>,
821    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
822        let mut this = self.inner();
823        let id = this.current_application().id;
824        this.resource_controller.track_read_operation()?;
825        let receiver = this.execution_state_sender.send_request(move |callback| {
826            ExecutionRequest::FindKeysByPrefix {
827                id,
828                key_prefix,
829                callback,
830            }
831        })?;
832        let state = this.view_user_states.entry(id).or_default();
833        state.find_keys_queries.register(receiver)
834    }
835
836    fn find_keys_by_prefix_wait(
837        &mut self,
838        promise: &Self::FindKeysByPrefix,
839    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
840        let mut this = self.inner();
841        let id = this.current_application().id;
842        let keys = {
843            let state = this.view_user_states.entry(id).or_default();
844            state.find_keys_queries.wait(*promise)?
845        };
846        let mut read_size = 0;
847        for key in &keys {
848            read_size += key.len();
849        }
850        this.resource_controller
851            .track_bytes_read(read_size as u64)?;
852        Ok(keys)
853    }
854
855    fn find_key_values_by_prefix_new(
856        &mut self,
857        key_prefix: Vec<u8>,
858    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
859        let mut this = self.inner();
860        let id = this.current_application().id;
861        this.resource_controller.track_read_operation()?;
862        let receiver = this.execution_state_sender.send_request(move |callback| {
863            ExecutionRequest::FindKeyValuesByPrefix {
864                id,
865                key_prefix,
866                callback,
867            }
868        })?;
869        let state = this.view_user_states.entry(id).or_default();
870        state.find_key_values_queries.register(receiver)
871    }
872
873    fn find_key_values_by_prefix_wait(
874        &mut self,
875        promise: &Self::FindKeyValuesByPrefix,
876    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
877        let mut this = self.inner();
878        let id = this.current_application().id;
879        let state = this.view_user_states.entry(id).or_default();
880        let key_values = state.find_key_values_queries.wait(*promise)?;
881        let mut read_size = 0;
882        for (key, value) in &key_values {
883            read_size += key.len() + value.len();
884        }
885        this.resource_controller
886            .track_bytes_read(read_size as u64)?;
887        Ok(key_values)
888    }
889
890    fn perform_http_request(
891        &mut self,
892        request: http::Request,
893    ) -> Result<http::Response, ExecutionError> {
894        let mut this = self.inner();
895        let app_permissions = this
896            .execution_state_sender
897            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
898            .recv_response()?;
899
900        let app_id = this.current_application().id;
901        ensure!(
902            app_permissions.can_make_http_requests(&app_id),
903            ExecutionError::UnauthorizedApplication(app_id)
904        );
905
906        this.resource_controller.track_http_request()?;
907
908        let response = this
909            .execution_state_sender
910            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
911                request,
912                http_responses_are_oracle_responses:
913                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
914                callback,
915            })?
916            .recv_response()?;
917        Ok(response)
918    }
919
920    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
921        let this = self.inner();
922        this.execution_state_sender
923            .send_request(|callback| ExecutionRequest::AssertBefore {
924                timestamp,
925                callback,
926            })?
927            .recv_response()?
928    }
929
930    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
931        let this = self.inner();
932        let blob_id = hash.into();
933        let content = this
934            .execution_state_sender
935            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
936            .recv_response()?;
937        Ok(content.into_vec_or_clone())
938    }
939
940    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
941        let this = self.inner();
942        let blob_id = hash.into();
943        this.execution_state_sender
944            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
945            .recv_response()?;
946        Ok(())
947    }
948
949    fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
950        Ok(self.inner().allow_application_logs)
951    }
952
953    #[cfg(web)]
954    fn send_log(&mut self, message: String, level: tracing::log::Level) {
955        let this = self.inner();
956        // Fire-and-forget: ignore errors since logging shouldn't affect execution
957        let _ = this
958            .execution_state_sender
959            .unbounded_send(ExecutionRequest::Log { message, level });
960    }
961}
962
963/// An extension trait to determine in compile time the different behaviors between contract and
964/// services in the implementation of [`BaseRuntime`].
965trait ContractOrServiceRuntime {
966    /// Configured to `true` if the HTTP response size should be limited to the oracle response
967    /// size.
968    ///
969    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
970    /// and only storing in the block a shorter oracle response.
971    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
972}
973
974impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
975    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
976}
977
978impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
979    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
980}
981
982impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
983    fn clone(&self) -> Self {
984        SyncRuntimeHandle(self.0.clone())
985    }
986}
987
988impl ContractSyncRuntime {
989    pub(crate) fn new(
990        execution_state_sender: ExecutionStateSender,
991        chain_id: ChainId,
992        refund_grant_to: Option<Account>,
993        resource_controller: ResourceController,
994        action: &UserAction,
995        allow_application_logs: bool,
996    ) -> Self {
997        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
998            SyncRuntimeInternal::new(
999                chain_id,
1000                action.height(),
1001                action.round(),
1002                if let UserAction::Message(context, _) = action {
1003                    Some(context.into())
1004                } else {
1005                    None
1006                },
1007                execution_state_sender,
1008                None,
1009                refund_grant_to,
1010                resource_controller,
1011                action.timestamp(),
1012                allow_application_logs,
1013            ),
1014        )))
1015    }
1016
1017    /// Preloads the code of a contract into the runtime's memory.
1018    pub(crate) fn preload_contract(
1019        &self,
1020        id: ApplicationId,
1021        code: UserContractCode,
1022        description: ApplicationDescription,
1023    ) -> Result<(), ExecutionError> {
1024        let this = self
1025            .0
1026            .as_ref()
1027            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1028        let mut this_guard = this.inner();
1029
1030        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1031            entry.insert((code, description));
1032        }
1033
1034        Ok(())
1035    }
1036
1037    /// Main entry point to start executing a user action.
1038    pub(crate) fn run_action(
1039        mut self,
1040        application_id: ApplicationId,
1041        chain_id: ChainId,
1042        action: UserAction,
1043    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1044        let result = self
1045            .deref_mut()
1046            .run_action(application_id, chain_id, action)?;
1047        let runtime = self
1048            .into_inner()
1049            .expect("Runtime clones should have been freed by now");
1050
1051        Ok((result, runtime.resource_controller))
1052    }
1053}
1054
1055impl ContractSyncRuntimeHandle {
1056    fn run_action(
1057        &mut self,
1058        application_id: ApplicationId,
1059        chain_id: ChainId,
1060        action: UserAction,
1061    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1062        let finalize_context = FinalizeContext {
1063            authenticated_signer: action.signer(),
1064            chain_id,
1065            height: action.height(),
1066            round: action.round(),
1067        };
1068
1069        {
1070            let runtime = self.inner();
1071            assert_eq!(runtime.chain_id, chain_id);
1072            assert_eq!(runtime.height, action.height());
1073        }
1074
1075        let signer = action.signer();
1076        let closure = move |code: &mut UserContractInstance| match action {
1077            UserAction::Instantiate(_context, argument) => {
1078                code.instantiate(argument).map(|()| None)
1079            }
1080            UserAction::Operation(_context, operation) => {
1081                code.execute_operation(operation).map(Option::Some)
1082            }
1083            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1084            UserAction::ProcessStreams(_context, updates) => {
1085                code.process_streams(updates).map(|()| None)
1086            }
1087        };
1088
1089        let result = self.execute(application_id, signer, closure)?;
1090        self.finalize(finalize_context)?;
1091        Ok(result)
1092    }
1093
1094    /// Notifies all loaded applications that execution is finalizing.
1095    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1096        let applications = mem::take(&mut self.inner().applications_to_finalize)
1097            .into_iter()
1098            .rev();
1099
1100        self.inner().is_finalizing = true;
1101
1102        for application in applications {
1103            self.execute(application, context.authenticated_signer, |contract| {
1104                contract.finalize().map(|_| None)
1105            })?;
1106            self.inner().loaded_applications.remove(&application);
1107        }
1108
1109        Ok(())
1110    }
1111
1112    /// Executes a `closure` with the contract code for the `application_id`.
1113    fn execute(
1114        &mut self,
1115        application_id: ApplicationId,
1116        signer: Option<AccountOwner>,
1117        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1118    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1119        let contract = {
1120            let mut runtime = self.inner();
1121            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1122
1123            let status = ApplicationStatus {
1124                caller_id: None,
1125                id: application_id,
1126                description: application.description.clone(),
1127                signer,
1128            };
1129
1130            runtime.push_application(status);
1131
1132            application
1133        };
1134
1135        let result = closure(
1136            &mut contract
1137                .instance
1138                .try_lock()
1139                .expect("Application should not be already executing"),
1140        )?;
1141
1142        let mut runtime = self.inner();
1143        let application_status = runtime.pop_application();
1144        assert_eq!(application_status.caller_id, None);
1145        assert_eq!(application_status.id, application_id);
1146        assert_eq!(application_status.description, contract.description);
1147        assert_eq!(application_status.signer, signer);
1148        assert!(runtime.call_stack.is_empty());
1149
1150        Ok(result)
1151    }
1152}
1153
1154impl ContractRuntime for ContractSyncRuntimeHandle {
1155    fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1156        let this = self.inner();
1157        Ok(this.current_application().signer)
1158    }
1159
1160    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1161        Ok(self
1162            .inner()
1163            .executing_message
1164            .map(|metadata| metadata.is_bouncing))
1165    }
1166
1167    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1168        Ok(self
1169            .inner()
1170            .executing_message
1171            .map(|metadata| metadata.origin))
1172    }
1173
1174    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1175        let this = self.inner();
1176        if this.call_stack.len() <= 1 {
1177            return Ok(None);
1178        }
1179        Ok(this.current_application().caller_id)
1180    }
1181
1182    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1183        Ok(match vm_runtime {
1184            VmRuntime::Wasm => {
1185                self.inner()
1186                    .resource_controller
1187                    .policy()
1188                    .maximum_wasm_fuel_per_block
1189            }
1190            VmRuntime::Evm => {
1191                self.inner()
1192                    .resource_controller
1193                    .policy()
1194                    .maximum_evm_fuel_per_block
1195            }
1196        })
1197    }
1198
1199    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1200        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1201    }
1202
1203    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1204        let mut this = self.inner();
1205        this.resource_controller.track_fuel(fuel, vm_runtime)
1206    }
1207
1208    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1209        let mut this = self.inner();
1210        let application = this.current_application();
1211        let application_id = application.id;
1212        let authenticated_signer = application.signer;
1213        let mut refund_grant_to = this.refund_grant_to;
1214
1215        let grant = this
1216            .resource_controller
1217            .policy()
1218            .total_price(&message.grant)?;
1219        if grant.is_zero() {
1220            refund_grant_to = None;
1221        } else {
1222            this.resource_controller.track_grant(grant)?;
1223        }
1224        let kind = if message.is_tracked {
1225            MessageKind::Tracked
1226        } else {
1227            MessageKind::Simple
1228        };
1229
1230        this.execution_state_sender
1231            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1232                message: OutgoingMessage {
1233                    destination: message.destination,
1234                    authenticated_signer,
1235                    refund_grant_to,
1236                    grant,
1237                    kind,
1238                    message: Message::User {
1239                        application_id,
1240                        bytes: message.message,
1241                    },
1242                },
1243                callback,
1244            })?
1245            .recv_response()?;
1246
1247        Ok(())
1248    }
1249
1250    fn transfer(
1251        &mut self,
1252        source: AccountOwner,
1253        destination: Account,
1254        amount: Amount,
1255    ) -> Result<(), ExecutionError> {
1256        let this = self.inner();
1257        let current_application = this.current_application();
1258        let application_id = current_application.id;
1259        let signer = current_application.signer;
1260
1261        this.execution_state_sender
1262            .send_request(|callback| ExecutionRequest::Transfer {
1263                source,
1264                destination,
1265                amount,
1266                signer,
1267                application_id,
1268                callback,
1269            })?
1270            .recv_response()?;
1271        Ok(())
1272    }
1273
1274    fn claim(
1275        &mut self,
1276        source: Account,
1277        destination: Account,
1278        amount: Amount,
1279    ) -> Result<(), ExecutionError> {
1280        let this = self.inner();
1281        let current_application = this.current_application();
1282        let application_id = current_application.id;
1283        let signer = current_application.signer;
1284
1285        this.execution_state_sender
1286            .send_request(|callback| ExecutionRequest::Claim {
1287                source,
1288                destination,
1289                amount,
1290                signer,
1291                application_id,
1292                callback,
1293            })?
1294            .recv_response()?;
1295        Ok(())
1296    }
1297
1298    fn try_call_application(
1299        &mut self,
1300        authenticated: bool,
1301        callee_id: ApplicationId,
1302        argument: Vec<u8>,
1303    ) -> Result<Vec<u8>, ExecutionError> {
1304        let contract = self
1305            .inner()
1306            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1307
1308        let value = contract
1309            .try_lock()
1310            .expect("Applications should not have reentrant calls")
1311            .execute_operation(argument)?;
1312
1313        self.inner().finish_call()?;
1314
1315        Ok(value)
1316    }
1317
1318    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1319        let mut this = self.inner();
1320        ensure!(
1321            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1322            ExecutionError::StreamNameTooLong
1323        );
1324        let application_id = GenericApplicationId::User(this.current_application().id);
1325        let stream_id = StreamId {
1326            stream_name,
1327            application_id,
1328        };
1329        let value_len = value.len() as u64;
1330        let index = this
1331            .execution_state_sender
1332            .send_request(|callback| ExecutionRequest::Emit {
1333                stream_id,
1334                value,
1335                callback,
1336            })?
1337            .recv_response()?;
1338        // TODO(#365): Consider separate event fee categories.
1339        this.resource_controller.track_bytes_written(value_len)?;
1340        Ok(index)
1341    }
1342
1343    fn read_event(
1344        &mut self,
1345        chain_id: ChainId,
1346        stream_name: StreamName,
1347        index: u32,
1348    ) -> Result<Vec<u8>, ExecutionError> {
1349        let mut this = self.inner();
1350        ensure!(
1351            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1352            ExecutionError::StreamNameTooLong
1353        );
1354        let application_id = GenericApplicationId::User(this.current_application().id);
1355        let stream_id = StreamId {
1356            stream_name,
1357            application_id,
1358        };
1359        let event_id = EventId {
1360            stream_id,
1361            index,
1362            chain_id,
1363        };
1364        let event = this
1365            .execution_state_sender
1366            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1367            .recv_response()?;
1368        // TODO(#365): Consider separate event fee categories.
1369        this.resource_controller
1370            .track_bytes_read(event.len() as u64)?;
1371        Ok(event)
1372    }
1373
1374    fn subscribe_to_events(
1375        &mut self,
1376        chain_id: ChainId,
1377        application_id: ApplicationId,
1378        stream_name: StreamName,
1379    ) -> Result<(), ExecutionError> {
1380        let this = self.inner();
1381        ensure!(
1382            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1383            ExecutionError::StreamNameTooLong
1384        );
1385        let stream_id = StreamId {
1386            stream_name,
1387            application_id: application_id.into(),
1388        };
1389        let subscriber_app_id = this.current_application().id;
1390        this.execution_state_sender
1391            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1392                chain_id,
1393                stream_id,
1394                subscriber_app_id,
1395                callback,
1396            })?
1397            .recv_response()?;
1398        Ok(())
1399    }
1400
1401    fn unsubscribe_from_events(
1402        &mut self,
1403        chain_id: ChainId,
1404        application_id: ApplicationId,
1405        stream_name: StreamName,
1406    ) -> Result<(), ExecutionError> {
1407        let this = self.inner();
1408        ensure!(
1409            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1410            ExecutionError::StreamNameTooLong
1411        );
1412        let stream_id = StreamId {
1413            stream_name,
1414            application_id: application_id.into(),
1415        };
1416        let subscriber_app_id = this.current_application().id;
1417        this.execution_state_sender
1418            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1419                chain_id,
1420                stream_id,
1421                subscriber_app_id,
1422                callback,
1423            })?
1424            .recv_response()?;
1425        Ok(())
1426    }
1427
1428    fn query_service(
1429        &mut self,
1430        application_id: ApplicationId,
1431        query: Vec<u8>,
1432    ) -> Result<Vec<u8>, ExecutionError> {
1433        let mut this = self.inner();
1434
1435        let app_permissions = this
1436            .execution_state_sender
1437            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1438            .recv_response()?;
1439
1440        let app_id = this.current_application().id;
1441        ensure!(
1442            app_permissions.can_call_services(&app_id),
1443            ExecutionError::UnauthorizedApplication(app_id)
1444        );
1445
1446        this.resource_controller.track_service_oracle_call()?;
1447
1448        this.run_service_oracle_query(application_id, query)
1449    }
1450
1451    fn open_chain(
1452        &mut self,
1453        ownership: ChainOwnership,
1454        application_permissions: ApplicationPermissions,
1455        balance: Amount,
1456    ) -> Result<ChainId, ExecutionError> {
1457        let parent_id = self.inner().chain_id;
1458        let block_height = self.block_height()?;
1459
1460        let timestamp = self.inner().user_context;
1461
1462        let chain_id = self
1463            .inner()
1464            .execution_state_sender
1465            .send_request(|callback| ExecutionRequest::OpenChain {
1466                ownership,
1467                balance,
1468                parent_id,
1469                block_height,
1470                timestamp,
1471                application_permissions,
1472                callback,
1473            })?
1474            .recv_response()?;
1475
1476        Ok(chain_id)
1477    }
1478
1479    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1480        let this = self.inner();
1481        let application_id = this.current_application().id;
1482        this.execution_state_sender
1483            .send_request(|callback| ExecutionRequest::CloseChain {
1484                application_id,
1485                callback,
1486            })?
1487            .recv_response()?
1488    }
1489
1490    fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1491        let this = self.inner();
1492        let application_id = this.current_application().id;
1493        this.execution_state_sender
1494            .send_request(|callback| ExecutionRequest::ChangeOwnership {
1495                application_id,
1496                ownership,
1497                callback,
1498            })?
1499            .recv_response()?
1500    }
1501
1502    fn change_application_permissions(
1503        &mut self,
1504        application_permissions: ApplicationPermissions,
1505    ) -> Result<(), ExecutionError> {
1506        let this = self.inner();
1507        let application_id = this.current_application().id;
1508        this.execution_state_sender
1509            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1510                application_id,
1511                application_permissions,
1512                callback,
1513            })?
1514            .recv_response()?
1515    }
1516
1517    fn create_application(
1518        &mut self,
1519        module_id: ModuleId,
1520        parameters: Vec<u8>,
1521        argument: Vec<u8>,
1522        required_application_ids: Vec<ApplicationId>,
1523    ) -> Result<ApplicationId, ExecutionError> {
1524        let chain_id = self.inner().chain_id;
1525        let block_height = self.block_height()?;
1526
1527        let CreateApplicationResult { app_id } = self
1528            .inner()
1529            .execution_state_sender
1530            .send_request(move |callback| ExecutionRequest::CreateApplication {
1531                chain_id,
1532                block_height,
1533                module_id,
1534                parameters,
1535                required_application_ids,
1536                callback,
1537            })?
1538            .recv_response()??;
1539
1540        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1541
1542        contract
1543            .try_lock()
1544            .expect("Applications should not have reentrant calls")
1545            .instantiate(argument)?;
1546
1547        self.inner().finish_call()?;
1548
1549        Ok(app_id)
1550    }
1551
1552    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1553        let blob = Blob::new_data(bytes);
1554        let blob_id = blob.id();
1555        let this = self.inner();
1556        this.execution_state_sender
1557            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1558            .recv_response()?;
1559        Ok(DataBlobHash(blob_id.hash))
1560    }
1561
1562    fn publish_module(
1563        &mut self,
1564        contract: Bytecode,
1565        service: Bytecode,
1566        vm_runtime: VmRuntime,
1567    ) -> Result<ModuleId, ExecutionError> {
1568        let (blobs, module_id) =
1569            crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1570        let this = self.inner();
1571        for blob in blobs {
1572            this.execution_state_sender
1573                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1574                .recv_response()?;
1575        }
1576        Ok(module_id)
1577    }
1578
1579    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1580        let this = self.inner();
1581        let round = this.round;
1582        this.execution_state_sender
1583            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1584            .recv_response()
1585    }
1586
1587    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1588        let mut this = self.inner();
1589        let id = this.current_application().id;
1590        let state = this.view_user_states.entry(id).or_default();
1591        state.force_all_pending_queries()?;
1592        this.resource_controller.track_write_operations(
1593            batch
1594                .num_operations()
1595                .try_into()
1596                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1597        )?;
1598        this.resource_controller
1599            .track_bytes_written(batch.size() as u64)?;
1600        this.execution_state_sender
1601            .send_request(|callback| ExecutionRequest::WriteBatch {
1602                id,
1603                batch,
1604                callback,
1605            })?
1606            .recv_response()?;
1607        Ok(())
1608    }
1609}
1610
1611impl ServiceSyncRuntime {
1612    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1613    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1614        Self::new_with_deadline(execution_state_sender, context, None)
1615    }
1616
1617    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1618    pub fn new_with_deadline(
1619        execution_state_sender: ExecutionStateSender,
1620        context: QueryContext,
1621        deadline: Option<Instant>,
1622    ) -> Self {
1623        // Query the allow_application_logs setting from the execution state.
1624        let allow_application_logs = execution_state_sender
1625            .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1626            .ok()
1627            .and_then(|receiver| receiver.recv_response().ok())
1628            .unwrap_or(false);
1629
1630        let runtime = SyncRuntime(Some(
1631            SyncRuntimeInternal::new(
1632                context.chain_id,
1633                context.next_block_height,
1634                None,
1635                None,
1636                execution_state_sender,
1637                deadline,
1638                None,
1639                ResourceController::default(),
1640                (),
1641                allow_application_logs,
1642            )
1643            .into(),
1644        ));
1645
1646        ServiceSyncRuntime {
1647            runtime,
1648            current_context: context,
1649        }
1650    }
1651
1652    /// Preloads the code of a service into the runtime's memory.
1653    pub(crate) fn preload_service(
1654        &self,
1655        id: ApplicationId,
1656        code: UserServiceCode,
1657        description: ApplicationDescription,
1658    ) -> Result<(), ExecutionError> {
1659        let this = self
1660            .runtime
1661            .0
1662            .as_ref()
1663            .expect("services shouldn't be preloaded while the runtime is being dropped");
1664        let mut this_guard = this.inner();
1665
1666        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1667            entry.insert((code, description));
1668        }
1669
1670        Ok(())
1671    }
1672
1673    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1674    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1675        while let Ok(request) = incoming_requests.recv() {
1676            let ServiceRuntimeRequest::Query {
1677                application_id,
1678                context,
1679                query,
1680                callback,
1681            } = request;
1682
1683            let result = self
1684                .prepare_for_query(context)
1685                .and_then(|()| self.run_query(application_id, query));
1686
1687            if let Err(err) = callback.send(result) {
1688                tracing::debug!(%err, "Receiver for query result has been dropped");
1689            }
1690        }
1691    }
1692
1693    /// Prepares the runtime to query an application.
1694    pub(crate) fn prepare_for_query(
1695        &mut self,
1696        new_context: QueryContext,
1697    ) -> Result<(), ExecutionError> {
1698        let expected_context = QueryContext {
1699            local_time: new_context.local_time,
1700            ..self.current_context
1701        };
1702
1703        if new_context != expected_context {
1704            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1705            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1706        } else {
1707            self.handle_mut()
1708                .inner()
1709                .execution_state_sender
1710                .send_request(|callback| ExecutionRequest::SetLocalTime {
1711                    local_time: new_context.local_time,
1712                    callback,
1713                })?
1714                .recv_response()?;
1715        }
1716        Ok(())
1717    }
1718
1719    /// Queries an application specified by its [`ApplicationId`].
1720    pub(crate) fn run_query(
1721        &mut self,
1722        application_id: ApplicationId,
1723        query: Vec<u8>,
1724    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1725        let this = self.handle_mut();
1726        let response = this.try_query_application(application_id, query)?;
1727        let operations = mem::take(&mut this.inner().scheduled_operations);
1728
1729        Ok(QueryOutcome {
1730            response,
1731            operations,
1732        })
1733    }
1734
1735    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1736    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1737        self.runtime.0.as_mut().expect(
1738            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1739        )
1740    }
1741}
1742
1743impl ServiceRuntime for ServiceSyncRuntimeHandle {
1744    /// Note that queries are not available from writable contexts.
1745    fn try_query_application(
1746        &mut self,
1747        queried_id: ApplicationId,
1748        argument: Vec<u8>,
1749    ) -> Result<Vec<u8>, ExecutionError> {
1750        let service = {
1751            let mut this = self.inner();
1752
1753            // Load the application.
1754            let application = this.load_service_instance(self.clone(), queried_id)?;
1755            // Make the call to user code.
1756            this.push_application(ApplicationStatus {
1757                caller_id: None,
1758                id: queried_id,
1759                description: application.description,
1760                signer: None,
1761            });
1762            application.instance
1763        };
1764        let response = service
1765            .try_lock()
1766            .expect("Applications should not have reentrant calls")
1767            .handle_query(argument)?;
1768        self.inner().pop_application();
1769        Ok(response)
1770    }
1771
1772    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1773        let mut this = self.inner();
1774        let application_id = this.current_application().id;
1775
1776        this.scheduled_operations.push(Operation::User {
1777            application_id,
1778            bytes: operation,
1779        });
1780
1781        Ok(())
1782    }
1783
1784    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1785        if let Some(deadline) = self.inner().deadline {
1786            if Instant::now() >= deadline {
1787                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1788            }
1789        }
1790        Ok(())
1791    }
1792}
1793
1794/// A request to the service runtime actor.
1795pub enum ServiceRuntimeRequest {
1796    Query {
1797        application_id: ApplicationId,
1798        context: QueryContext,
1799        query: Vec<u8>,
1800        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1801    },
1802}
1803
1804/// The origin of the execution.
1805#[derive(Clone, Copy, Debug)]
1806struct ExecutingMessage {
1807    is_bouncing: bool,
1808    origin: ChainId,
1809}
1810
1811impl From<&MessageContext> for ExecutingMessage {
1812    fn from(context: &MessageContext) -> Self {
1813        ExecutingMessage {
1814            is_bouncing: context.is_bouncing,
1815            origin: context.origin,
1816        }
1817    }
1818}
1819
1820/// Creates a compressed contract and service bytecode synchronously.
1821pub fn create_bytecode_blobs_sync(
1822    contract: Bytecode,
1823    service: Bytecode,
1824    vm_runtime: VmRuntime,
1825) -> (Vec<Blob>, ModuleId) {
1826    match vm_runtime {
1827        VmRuntime::Wasm => {
1828            let compressed_contract = contract.compress();
1829            let compressed_service = service.compress();
1830            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1831            let service_blob = Blob::new_service_bytecode(compressed_service);
1832            let module_id =
1833                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1834            (vec![contract_blob, service_blob], module_id)
1835        }
1836        VmRuntime::Evm => {
1837            let compressed_contract = contract.compress();
1838            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1839            let module_id = ModuleId::new(
1840                evm_contract_blob.id().hash,
1841                evm_contract_blob.id().hash,
1842                vm_runtime,
1843            );
1844            (vec![evm_contract_blob], module_id)
1845        }
1846    }
1847}