Skip to main content

linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27use tracing::instrument;
28
29use crate::{
30    execution::UserAction,
31    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
32    resources::ResourceController,
33    system::CreateApplicationResult,
34    util::{ReceiverExt, UnboundedSenderExt},
35    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
36    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
37    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
38    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
39};
40
41#[cfg(test)]
42#[path = "unit_tests/runtime_tests.rs"]
43mod tests;
44
45pub trait WithContext {
46    type UserContext;
47    type Code;
48}
49
50impl WithContext for UserContractInstance {
51    type UserContext = Timestamp;
52    type Code = UserContractCode;
53}
54
55impl WithContext for UserServiceInstance {
56    type UserContext = ();
57    type Code = UserServiceCode;
58}
59
60#[cfg(test)]
61impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
62    type UserContext = ();
63    type Code = ();
64}
65
66#[derive(Debug)]
67pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
68
69pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
70
71pub struct ServiceSyncRuntime {
72    runtime: SyncRuntime<UserServiceInstance>,
73    current_context: QueryContext,
74}
75
76#[derive(Debug)]
77pub struct SyncRuntimeHandle<UserInstance: WithContext>(
78    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
79);
80
81pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
82pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
83
84/// Runtime data tracked during the execution of a transaction on the synchronous thread.
85#[derive(Debug)]
86pub struct SyncRuntimeInternal<UserInstance: WithContext> {
87    /// The current chain ID.
88    chain_id: ChainId,
89    /// The height of the next block that will be added to this chain. During operations
90    /// and messages, this is the current block height.
91    height: BlockHeight,
92    /// The current consensus round. Only available during block validation in multi-leader rounds.
93    round: Option<u32>,
94    /// The current message being executed, if there is one.
95    #[debug(skip_if = Option::is_none)]
96    executing_message: Option<ExecutingMessage>,
97
98    /// How to interact with the storage view of the execution state.
99    execution_state_sender: ExecutionStateSender,
100
101    /// If applications are being finalized.
102    ///
103    /// If [`true`], disables cross-application calls.
104    is_finalizing: bool,
105    /// Applications that need to be finalized.
106    applications_to_finalize: Vec<ApplicationId>,
107
108    /// Application instances loaded in this transaction.
109    preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
110    /// Application instances loaded in this transaction.
111    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
112    /// The current stack of application descriptions.
113    call_stack: Vec<ApplicationStatus>,
114    /// The set of the IDs of the applications that are in the `call_stack`.
115    active_applications: HashSet<ApplicationId>,
116    /// The operations scheduled during this query.
117    scheduled_operations: Vec<Operation>,
118
119    /// Track application states based on views.
120    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
121
122    /// The deadline this runtime should finish executing.
123    ///
124    /// Used to limit the execution time of services running as oracles.
125    deadline: Option<Instant>,
126
127    /// Where to send a refund for the unused part of the grant after execution, if any.
128    #[debug(skip_if = Option::is_none)]
129    refund_grant_to: Option<Account>,
130    /// Controller to track fuel and storage consumption.
131    resource_controller: ResourceController,
132    /// Additional context for the runtime.
133    user_context: UserInstance::UserContext,
134    /// Whether contract log messages should be output.
135    allow_application_logs: bool,
136}
137
138/// The runtime status of an application.
139#[derive(Debug)]
140struct ApplicationStatus {
141    /// The caller application ID, if forwarded during the call.
142    caller_id: Option<ApplicationId>,
143    /// The application ID.
144    id: ApplicationId,
145    /// The application description.
146    description: ApplicationDescription,
147    /// The authenticated signer for the execution thread, if any.
148    signer: Option<AccountOwner>,
149}
150
151/// A loaded application instance.
152#[derive(Debug)]
153struct LoadedApplication<Instance> {
154    instance: Arc<Mutex<Instance>>,
155    description: ApplicationDescription,
156}
157
158impl<Instance> LoadedApplication<Instance> {
159    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
160    fn new(instance: Instance, description: ApplicationDescription) -> Self {
161        LoadedApplication {
162            instance: Arc::new(Mutex::new(instance)),
163            description,
164        }
165    }
166}
167
168impl<Instance> Clone for LoadedApplication<Instance> {
169    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
170    // bound
171    fn clone(&self) -> Self {
172        LoadedApplication {
173            instance: self.instance.clone(),
174            description: self.description.clone(),
175        }
176    }
177}
178
179#[derive(Debug)]
180enum Promise<T> {
181    Ready(T),
182    Pending(Receiver<T>),
183}
184
185impl<T> Promise<T> {
186    fn force(&mut self) -> Result<(), ExecutionError> {
187        if let Promise::Pending(receiver) = self {
188            let value = receiver
189                .recv_ref()
190                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
191            *self = Promise::Ready(value);
192        }
193        Ok(())
194    }
195
196    fn read(self) -> Result<T, ExecutionError> {
197        match self {
198            Promise::Pending(receiver) => {
199                let value = receiver.recv_response()?;
200                Ok(value)
201            }
202            Promise::Ready(value) => Ok(value),
203        }
204    }
205}
206
207/// Manages a set of pending queries returning values of type `T`.
208#[derive(Debug, Default)]
209struct QueryManager<T> {
210    /// The queries in progress.
211    pending_queries: BTreeMap<u32, Promise<T>>,
212    /// The number of queries ever registered so far. Used for the index of the next query.
213    query_count: u32,
214    /// The number of active queries.
215    active_query_count: u32,
216}
217
218impl<T> QueryManager<T> {
219    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
220        let id = self.query_count;
221        self.pending_queries.insert(id, Promise::Pending(receiver));
222        self.query_count = self
223            .query_count
224            .checked_add(1)
225            .ok_or(ArithmeticError::Overflow)?;
226        self.active_query_count = self
227            .active_query_count
228            .checked_add(1)
229            .ok_or(ArithmeticError::Overflow)?;
230        Ok(id)
231    }
232
233    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
234        let promise = self
235            .pending_queries
236            .remove(&id)
237            .ok_or(ExecutionError::InvalidPromise)?;
238        let value = promise.read()?;
239        self.active_query_count -= 1;
240        Ok(value)
241    }
242
243    fn force_all(&mut self) -> Result<(), ExecutionError> {
244        for promise in self.pending_queries.values_mut() {
245            promise.force()?;
246        }
247        Ok(())
248    }
249}
250
251type Keys = Vec<Vec<u8>>;
252type Value = Vec<u8>;
253type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
254
255#[derive(Debug, Default)]
256struct ViewUserState {
257    /// The contains-key queries in progress.
258    contains_key_queries: QueryManager<bool>,
259    /// The contains-keys queries in progress.
260    contains_keys_queries: QueryManager<Vec<bool>>,
261    /// The read-value queries in progress.
262    read_value_queries: QueryManager<Option<Value>>,
263    /// The read-multi-values queries in progress.
264    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
265    /// The find-keys queries in progress.
266    find_keys_queries: QueryManager<Keys>,
267    /// The find-key-values queries in progress.
268    find_key_values_queries: QueryManager<KeyValues>,
269}
270
271impl ViewUserState {
272    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
273        self.contains_key_queries.force_all()?;
274        self.contains_keys_queries.force_all()?;
275        self.read_value_queries.force_all()?;
276        self.read_multi_values_queries.force_all()?;
277        self.find_keys_queries.force_all()?;
278        self.find_key_values_queries.force_all()?;
279        Ok(())
280    }
281}
282
283impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
284    type Target = SyncRuntimeHandle<UserInstance>;
285
286    fn deref(&self) -> &Self::Target {
287        self.0.as_ref().expect(
288            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
289        )
290    }
291}
292
293impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
294    fn deref_mut(&mut self) -> &mut Self::Target {
295        self.0.as_mut().expect(
296            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
297        )
298    }
299}
300
301impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
302    fn drop(&mut self) {
303        // Ensure the `loaded_applications` are cleared to prevent circular references in
304        // the runtime
305        if let Some(handle) = self.0.take() {
306            handle.inner().loaded_applications.clear();
307        }
308    }
309}
310
311impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
312    #[expect(clippy::too_many_arguments)]
313    fn new(
314        chain_id: ChainId,
315        height: BlockHeight,
316        round: Option<u32>,
317        executing_message: Option<ExecutingMessage>,
318        execution_state_sender: ExecutionStateSender,
319        deadline: Option<Instant>,
320        refund_grant_to: Option<Account>,
321        resource_controller: ResourceController,
322        user_context: UserInstance::UserContext,
323        allow_application_logs: bool,
324    ) -> Self {
325        Self {
326            chain_id,
327            height,
328            round,
329            executing_message,
330            execution_state_sender,
331            is_finalizing: false,
332            applications_to_finalize: Vec::new(),
333            preloaded_applications: HashMap::new(),
334            loaded_applications: HashMap::new(),
335            call_stack: Vec::new(),
336            active_applications: HashSet::new(),
337            view_user_states: BTreeMap::new(),
338            deadline,
339            refund_grant_to,
340            resource_controller,
341            scheduled_operations: Vec::new(),
342            user_context,
343            allow_application_logs,
344        }
345    }
346
347    /// Returns the [`ApplicationStatus`] of the current application.
348    ///
349    /// The current application is the last to be pushed to the `call_stack`.
350    ///
351    /// # Panics
352    ///
353    /// If the call stack is empty.
354    fn current_application(&self) -> &ApplicationStatus {
355        self.call_stack
356            .last()
357            .expect("Call stack is unexpectedly empty")
358    }
359
360    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
361    ///
362    /// Ensures the application's ID is also tracked in the `active_applications` set.
363    fn push_application(&mut self, status: ApplicationStatus) {
364        self.active_applications.insert(status.id);
365        self.call_stack.push(status);
366    }
367
368    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
369    ///
370    /// Ensures the application's ID is also removed from the `active_applications` set.
371    ///
372    /// # Panics
373    ///
374    /// If the call stack is empty.
375    fn pop_application(&mut self) -> ApplicationStatus {
376        let status = self
377            .call_stack
378            .pop()
379            .expect("Can't remove application from empty call stack");
380        assert!(self.active_applications.remove(&status.id));
381        status
382    }
383
384    /// Ensures that a call to `application_id` is not-reentrant.
385    ///
386    /// Returns an error if there already is an entry for `application_id` in the call stack.
387    fn check_for_reentrancy(
388        &mut self,
389        application_id: ApplicationId,
390    ) -> Result<(), ExecutionError> {
391        ensure!(
392            !self.active_applications.contains(&application_id),
393            ExecutionError::ReentrantCall(application_id)
394        );
395        Ok(())
396    }
397}
398
399impl SyncRuntimeInternal<UserContractInstance> {
400    /// Loads a contract instance, initializing it with this runtime if needed.
401    #[instrument(skip_all, fields(application_id = %id))]
402    fn load_contract_instance(
403        &mut self,
404        this: SyncRuntimeHandle<UserContractInstance>,
405        id: ApplicationId,
406    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
407        match self.loaded_applications.entry(id) {
408            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
409
410            hash_map::Entry::Vacant(entry) => {
411                // First time actually using the application. Let's see if the code was
412                // pre-loaded.
413                let (code, description) = match self.preloaded_applications.entry(id) {
414                    // TODO(#2927): support dynamic loading of modules on the Web
415                    #[cfg(web)]
416                    hash_map::Entry::Vacant(_) => {
417                        drop(this);
418                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
419                            id,
420                        )));
421                    }
422                    #[cfg(not(web))]
423                    hash_map::Entry::Vacant(entry) => {
424                        let (code, description) = self
425                            .execution_state_sender
426                            .send_request(move |callback| ExecutionRequest::LoadContract {
427                                id,
428                                callback,
429                            })?
430                            .recv_response()?;
431                        entry.insert((code, description)).clone()
432                    }
433                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
434                };
435                let instance = code.instantiate(this)?;
436
437                self.applications_to_finalize.push(id);
438                Ok(entry
439                    .insert(LoadedApplication::new(instance, description))
440                    .clone())
441            }
442        }
443    }
444
445    /// Configures the runtime for executing a call to a different contract.
446    fn prepare_for_call(
447        &mut self,
448        this: ContractSyncRuntimeHandle,
449        authenticated: bool,
450        callee_id: ApplicationId,
451    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
452        self.check_for_reentrancy(callee_id)?;
453
454        ensure!(
455            !self.is_finalizing,
456            ExecutionError::CrossApplicationCallInFinalize {
457                caller_id: Box::new(self.current_application().id),
458                callee_id: Box::new(callee_id),
459            }
460        );
461
462        // Load the application.
463        let application = self.load_contract_instance(this, callee_id)?;
464
465        let caller = self.current_application();
466        let caller_id = caller.id;
467        let caller_signer = caller.signer;
468        // Make the call to user code.
469        let authenticated_signer = match caller_signer {
470            Some(signer) if authenticated => Some(signer),
471            _ => None,
472        };
473        let authenticated_caller_id = authenticated.then_some(caller_id);
474        self.push_application(ApplicationStatus {
475            caller_id: authenticated_caller_id,
476            id: callee_id,
477            description: application.description,
478            // Allow further nested calls to be authenticated if this one is.
479            signer: authenticated_signer,
480        });
481        Ok(application.instance)
482    }
483
484    /// Cleans up the runtime after the execution of a call to a different contract.
485    fn finish_call(&mut self) -> Result<(), ExecutionError> {
486        self.pop_application();
487        Ok(())
488    }
489
490    /// Runs the service in a separate thread as an oracle.
491    fn run_service_oracle_query(
492        &mut self,
493        application_id: ApplicationId,
494        query: Vec<u8>,
495    ) -> Result<Vec<u8>, ExecutionError> {
496        let timeout = self
497            .resource_controller
498            .remaining_service_oracle_execution_time()?;
499        let execution_start = Instant::now();
500        let deadline = Some(execution_start + timeout);
501        let response = self
502            .execution_state_sender
503            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
504                deadline,
505                application_id,
506                next_block_height: self.height,
507                query,
508                callback,
509            })?
510            .recv_response()?;
511
512        self.resource_controller
513            .track_service_oracle_execution(execution_start.elapsed())?;
514        self.resource_controller
515            .track_service_oracle_response(response.len())?;
516
517        Ok(response)
518    }
519}
520
521impl SyncRuntimeInternal<UserServiceInstance> {
522    /// Initializes a service instance with this runtime.
523    fn load_service_instance(
524        &mut self,
525        this: SyncRuntimeHandle<UserServiceInstance>,
526        id: ApplicationId,
527    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
528        match self.loaded_applications.entry(id) {
529            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
530
531            hash_map::Entry::Vacant(entry) => {
532                // First time actually using the application. Let's see if the code was
533                // pre-loaded.
534                let (code, description) = match self.preloaded_applications.entry(id) {
535                    // TODO(#2927): support dynamic loading of modules on the Web
536                    #[cfg(web)]
537                    hash_map::Entry::Vacant(_) => {
538                        drop(this);
539                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
540                            id,
541                        )));
542                    }
543                    #[cfg(not(web))]
544                    hash_map::Entry::Vacant(entry) => {
545                        let (code, description) = self
546                            .execution_state_sender
547                            .send_request(move |callback| ExecutionRequest::LoadService {
548                                id,
549                                callback,
550                            })?
551                            .recv_response()?;
552                        entry.insert((code, description)).clone()
553                    }
554                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
555                };
556                let instance = code.instantiate(this)?;
557
558                self.applications_to_finalize.push(id);
559                Ok(entry
560                    .insert(LoadedApplication::new(instance, description))
561                    .clone())
562            }
563        }
564    }
565}
566
567impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
568    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
569        let handle = self.0.take().expect(
570            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
571        );
572        let runtime = Arc::into_inner(handle.0)?
573            .into_inner()
574            .expect("`SyncRuntime` should run in a single thread which should not panic");
575        Some(runtime)
576    }
577}
578
579impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
580    for SyncRuntimeHandle<UserInstance>
581{
582    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
583        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
584    }
585}
586
587impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
588    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
589        self.0
590            .try_lock()
591            .expect("Synchronous runtimes run on a single execution thread")
592    }
593}
594
595impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
596where
597    Self: ContractOrServiceRuntime,
598{
599    type Read = ();
600    type ReadValueBytes = u32;
601    type ContainsKey = u32;
602    type ContainsKeys = u32;
603    type ReadMultiValuesBytes = u32;
604    type FindKeysByPrefix = u32;
605    type FindKeyValuesByPrefix = u32;
606
607    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
608        let mut this = self.inner();
609        let chain_id = this.chain_id;
610        this.resource_controller.track_runtime_chain_id()?;
611        Ok(chain_id)
612    }
613
614    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
615        let mut this = self.inner();
616        let height = this.height;
617        this.resource_controller.track_runtime_block_height()?;
618        Ok(height)
619    }
620
621    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
622        let mut this = self.inner();
623        let application_id = this.current_application().id;
624        this.resource_controller.track_runtime_application_id()?;
625        Ok(application_id)
626    }
627
628    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
629        let mut this = self.inner();
630        let application_creator_chain_id = this.current_application().description.creator_chain_id;
631        this.resource_controller.track_runtime_application_id()?;
632        Ok(application_creator_chain_id)
633    }
634
635    fn read_application_description(
636        &mut self,
637        application_id: ApplicationId,
638    ) -> Result<ApplicationDescription, ExecutionError> {
639        let mut this = self.inner();
640        let description = this
641            .execution_state_sender
642            .send_request(|callback| ExecutionRequest::ReadApplicationDescription {
643                application_id,
644                callback,
645            })?
646            .recv_response()?;
647        this.resource_controller
648            .track_runtime_application_description(&description)?;
649        Ok(description)
650    }
651
652    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
653        let mut this = self.inner();
654        let parameters = this.current_application().description.parameters.clone();
655        this.resource_controller
656            .track_runtime_application_parameters(&parameters)?;
657        Ok(parameters)
658    }
659
660    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
661        let mut this = self.inner();
662        let timestamp = this
663            .execution_state_sender
664            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
665            .recv_response()?;
666        this.resource_controller.track_runtime_timestamp()?;
667        Ok(timestamp)
668    }
669
670    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
671        let mut this = self.inner();
672        let balance = this
673            .execution_state_sender
674            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
675            .recv_response()?;
676        this.resource_controller.track_runtime_balance()?;
677        Ok(balance)
678    }
679
680    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
681        let mut this = self.inner();
682        let balance = this
683            .execution_state_sender
684            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
685            .recv_response()?;
686        this.resource_controller.track_runtime_balance()?;
687        Ok(balance)
688    }
689
690    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
691        let mut this = self.inner();
692        let owner_balances = this
693            .execution_state_sender
694            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
695            .recv_response()?;
696        this.resource_controller
697            .track_runtime_owner_balances(&owner_balances)?;
698        Ok(owner_balances)
699    }
700
701    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
702        let mut this = self.inner();
703        let owners = this
704            .execution_state_sender
705            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
706            .recv_response()?;
707        this.resource_controller.track_runtime_owners(&owners)?;
708        Ok(owners)
709    }
710
711    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
712        let mut this = self.inner();
713        let chain_ownership = this
714            .execution_state_sender
715            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
716            .recv_response()?;
717        this.resource_controller
718            .track_runtime_chain_ownership(&chain_ownership)?;
719        Ok(chain_ownership)
720    }
721
722    fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
723        let this = self.inner();
724        let application_permissions = this
725            .execution_state_sender
726            .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
727            .recv_response()?;
728        Ok(application_permissions)
729    }
730
731    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
732        let mut this = self.inner();
733        let id = this.current_application().id;
734        this.resource_controller.track_read_operation()?;
735        let receiver = this
736            .execution_state_sender
737            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
738        let state = this.view_user_states.entry(id).or_default();
739        state.contains_key_queries.register(receiver)
740    }
741
742    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
743        let mut this = self.inner();
744        let id = this.current_application().id;
745        let state = this.view_user_states.entry(id).or_default();
746        let value = state.contains_key_queries.wait(*promise)?;
747        Ok(value)
748    }
749
750    fn contains_keys_new(
751        &mut self,
752        keys: Vec<Vec<u8>>,
753    ) -> Result<Self::ContainsKeys, ExecutionError> {
754        let mut this = self.inner();
755        let id = this.current_application().id;
756        this.resource_controller.track_read_operation()?;
757        let receiver = this
758            .execution_state_sender
759            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
760        let state = this.view_user_states.entry(id).or_default();
761        state.contains_keys_queries.register(receiver)
762    }
763
764    fn contains_keys_wait(
765        &mut self,
766        promise: &Self::ContainsKeys,
767    ) -> Result<Vec<bool>, ExecutionError> {
768        let mut this = self.inner();
769        let id = this.current_application().id;
770        let state = this.view_user_states.entry(id).or_default();
771        let value = state.contains_keys_queries.wait(*promise)?;
772        Ok(value)
773    }
774
775    fn read_multi_values_bytes_new(
776        &mut self,
777        keys: Vec<Vec<u8>>,
778    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
779        let mut this = self.inner();
780        let id = this.current_application().id;
781        this.resource_controller.track_read_operation()?;
782        let receiver = this.execution_state_sender.send_request(move |callback| {
783            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
784        })?;
785        let state = this.view_user_states.entry(id).or_default();
786        state.read_multi_values_queries.register(receiver)
787    }
788
789    fn read_multi_values_bytes_wait(
790        &mut self,
791        promise: &Self::ReadMultiValuesBytes,
792    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
793        let mut this = self.inner();
794        let id = this.current_application().id;
795        let state = this.view_user_states.entry(id).or_default();
796        let values = state.read_multi_values_queries.wait(*promise)?;
797        for value in &values {
798            if let Some(value) = &value {
799                this.resource_controller
800                    .track_bytes_read(value.len() as u64)?;
801            }
802        }
803        Ok(values)
804    }
805
806    fn read_value_bytes_new(
807        &mut self,
808        key: Vec<u8>,
809    ) -> Result<Self::ReadValueBytes, ExecutionError> {
810        let mut this = self.inner();
811        let id = this.current_application().id;
812        this.resource_controller.track_read_operation()?;
813        let receiver = this
814            .execution_state_sender
815            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
816        let state = this.view_user_states.entry(id).or_default();
817        state.read_value_queries.register(receiver)
818    }
819
820    fn read_value_bytes_wait(
821        &mut self,
822        promise: &Self::ReadValueBytes,
823    ) -> Result<Option<Vec<u8>>, ExecutionError> {
824        let mut this = self.inner();
825        let id = this.current_application().id;
826        let value = {
827            let state = this.view_user_states.entry(id).or_default();
828            state.read_value_queries.wait(*promise)?
829        };
830        if let Some(value) = &value {
831            this.resource_controller
832                .track_bytes_read(value.len() as u64)?;
833        }
834        Ok(value)
835    }
836
837    fn find_keys_by_prefix_new(
838        &mut self,
839        key_prefix: Vec<u8>,
840    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
841        let mut this = self.inner();
842        let id = this.current_application().id;
843        this.resource_controller.track_read_operation()?;
844        let receiver = this.execution_state_sender.send_request(move |callback| {
845            ExecutionRequest::FindKeysByPrefix {
846                id,
847                key_prefix,
848                callback,
849            }
850        })?;
851        let state = this.view_user_states.entry(id).or_default();
852        state.find_keys_queries.register(receiver)
853    }
854
855    fn find_keys_by_prefix_wait(
856        &mut self,
857        promise: &Self::FindKeysByPrefix,
858    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
859        let mut this = self.inner();
860        let id = this.current_application().id;
861        let keys = {
862            let state = this.view_user_states.entry(id).or_default();
863            state.find_keys_queries.wait(*promise)?
864        };
865        let mut read_size = 0;
866        for key in &keys {
867            read_size += key.len();
868        }
869        this.resource_controller
870            .track_bytes_read(read_size as u64)?;
871        Ok(keys)
872    }
873
874    fn find_key_values_by_prefix_new(
875        &mut self,
876        key_prefix: Vec<u8>,
877    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
878        let mut this = self.inner();
879        let id = this.current_application().id;
880        this.resource_controller.track_read_operation()?;
881        let receiver = this.execution_state_sender.send_request(move |callback| {
882            ExecutionRequest::FindKeyValuesByPrefix {
883                id,
884                key_prefix,
885                callback,
886            }
887        })?;
888        let state = this.view_user_states.entry(id).or_default();
889        state.find_key_values_queries.register(receiver)
890    }
891
892    fn find_key_values_by_prefix_wait(
893        &mut self,
894        promise: &Self::FindKeyValuesByPrefix,
895    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
896        let mut this = self.inner();
897        let id = this.current_application().id;
898        let state = this.view_user_states.entry(id).or_default();
899        let key_values = state.find_key_values_queries.wait(*promise)?;
900        let mut read_size = 0;
901        for (key, value) in &key_values {
902            read_size += key.len() + value.len();
903        }
904        this.resource_controller
905            .track_bytes_read(read_size as u64)?;
906        Ok(key_values)
907    }
908
909    fn perform_http_request(
910        &mut self,
911        request: http::Request,
912    ) -> Result<http::Response, ExecutionError> {
913        let mut this = self.inner();
914        let app_permissions = this
915            .execution_state_sender
916            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
917            .recv_response()?;
918
919        let app_id = this.current_application().id;
920        ensure!(
921            app_permissions.can_make_http_requests(&app_id),
922            ExecutionError::UnauthorizedApplication(app_id)
923        );
924
925        this.resource_controller.track_http_request()?;
926
927        let response = this
928            .execution_state_sender
929            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
930                request,
931                http_responses_are_oracle_responses:
932                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
933                callback,
934            })?
935            .recv_response()?;
936        Ok(response)
937    }
938
939    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
940        let this = self.inner();
941        this.execution_state_sender
942            .send_request(|callback| ExecutionRequest::AssertBefore {
943                timestamp,
944                callback,
945            })?
946            .recv_response()?
947    }
948
949    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
950        let this = self.inner();
951        let blob_id = hash.into();
952        let content = this
953            .execution_state_sender
954            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
955            .recv_response()?;
956        Ok(content.into_vec_or_clone())
957    }
958
959    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
960        let this = self.inner();
961        let blob_id = hash.into();
962        this.execution_state_sender
963            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
964            .recv_response()?;
965        Ok(())
966    }
967
968    fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
969        Ok(self.inner().allow_application_logs)
970    }
971
972    #[cfg(web)]
973    fn send_log(&mut self, message: String, level: tracing::log::Level) {
974        let this = self.inner();
975        // Fire-and-forget: ignore errors since logging shouldn't affect execution.
976        this.execution_state_sender
977            .unbounded_send(ExecutionRequest::Log { message, level })
978            .ok();
979    }
980}
981
982/// An extension trait to determine in compile time the different behaviors between contract and
983/// services in the implementation of [`BaseRuntime`].
984trait ContractOrServiceRuntime {
985    /// Configured to `true` if the HTTP response size should be limited to the oracle response
986    /// size.
987    ///
988    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
989    /// and only storing in the block a shorter oracle response.
990    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
991}
992
993impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
994    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
995}
996
997impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
998    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
999}
1000
1001impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
1002    fn clone(&self) -> Self {
1003        SyncRuntimeHandle(self.0.clone())
1004    }
1005}
1006
1007impl ContractSyncRuntime {
1008    pub(crate) fn new(
1009        execution_state_sender: ExecutionStateSender,
1010        chain_id: ChainId,
1011        refund_grant_to: Option<Account>,
1012        resource_controller: ResourceController,
1013        action: &UserAction,
1014        allow_application_logs: bool,
1015    ) -> Self {
1016        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1017            SyncRuntimeInternal::new(
1018                chain_id,
1019                action.height(),
1020                action.round(),
1021                if let UserAction::Message(context, _) = action {
1022                    Some(context.into())
1023                } else {
1024                    None
1025                },
1026                execution_state_sender,
1027                None,
1028                refund_grant_to,
1029                resource_controller,
1030                action.timestamp(),
1031                allow_application_logs,
1032            ),
1033        )))
1034    }
1035
1036    /// Preloads the code of a contract into the runtime's memory.
1037    pub(crate) fn preload_contract(
1038        &self,
1039        id: ApplicationId,
1040        code: UserContractCode,
1041        description: ApplicationDescription,
1042    ) -> Result<(), ExecutionError> {
1043        let this = self
1044            .0
1045            .as_ref()
1046            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1047        let mut this_guard = this.inner();
1048
1049        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1050            entry.insert((code, description));
1051        }
1052
1053        Ok(())
1054    }
1055
1056    /// Main entry point to start executing a user action.
1057    pub(crate) fn run_action(
1058        mut self,
1059        application_id: ApplicationId,
1060        chain_id: ChainId,
1061        action: UserAction,
1062    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1063        let result = self
1064            .deref_mut()
1065            .run_action(application_id, chain_id, action)?;
1066        let runtime = self
1067            .into_inner()
1068            .expect("Runtime clones should have been freed by now");
1069
1070        Ok((result, runtime.resource_controller))
1071    }
1072}
1073
1074impl ContractSyncRuntimeHandle {
1075    #[instrument(skip_all, fields(application_id = %application_id))]
1076    fn run_action(
1077        &mut self,
1078        application_id: ApplicationId,
1079        chain_id: ChainId,
1080        action: UserAction,
1081    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1082        let finalize_context = FinalizeContext {
1083            authenticated_signer: action.signer(),
1084            chain_id,
1085            height: action.height(),
1086            round: action.round(),
1087        };
1088
1089        {
1090            let runtime = self.inner();
1091            assert_eq!(runtime.chain_id, chain_id);
1092            assert_eq!(runtime.height, action.height());
1093        }
1094
1095        let signer = action.signer();
1096        let closure = move |code: &mut UserContractInstance| match action {
1097            UserAction::Instantiate(_context, argument) => {
1098                code.instantiate(argument).map(|()| None)
1099            }
1100            UserAction::Operation(_context, operation) => {
1101                code.execute_operation(operation).map(Option::Some)
1102            }
1103            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1104            UserAction::ProcessStreams(_context, updates) => {
1105                code.process_streams(updates).map(|()| None)
1106            }
1107        };
1108
1109        let result = self.execute(application_id, signer, closure)?;
1110        self.finalize(finalize_context)?;
1111        Ok(result)
1112    }
1113
1114    /// Notifies all loaded applications that execution is finalizing.
1115    #[instrument(skip_all)]
1116    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1117        let applications = mem::take(&mut self.inner().applications_to_finalize)
1118            .into_iter()
1119            .rev();
1120
1121        self.inner().is_finalizing = true;
1122
1123        for application in applications {
1124            self.execute(application, context.authenticated_signer, |contract| {
1125                contract.finalize().map(|_| None)
1126            })?;
1127            self.inner().loaded_applications.remove(&application);
1128        }
1129
1130        Ok(())
1131    }
1132
1133    /// Executes a `closure` with the contract code for the `application_id`.
1134    #[instrument(skip_all, fields(application_id = %application_id))]
1135    fn execute(
1136        &mut self,
1137        application_id: ApplicationId,
1138        signer: Option<AccountOwner>,
1139        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1140    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1141        let contract = {
1142            let mut runtime = self.inner();
1143            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1144
1145            let status = ApplicationStatus {
1146                caller_id: None,
1147                id: application_id,
1148                description: application.description.clone(),
1149                signer,
1150            };
1151
1152            runtime.push_application(status);
1153
1154            application
1155        };
1156
1157        let result = closure(
1158            &mut contract
1159                .instance
1160                .try_lock()
1161                .expect("Application should not be already executing"),
1162        )?;
1163
1164        let mut runtime = self.inner();
1165        let application_status = runtime.pop_application();
1166        assert_eq!(application_status.caller_id, None);
1167        assert_eq!(application_status.id, application_id);
1168        assert_eq!(application_status.description, contract.description);
1169        assert_eq!(application_status.signer, signer);
1170        assert!(runtime.call_stack.is_empty());
1171
1172        Ok(result)
1173    }
1174}
1175
1176impl ContractRuntime for ContractSyncRuntimeHandle {
1177    fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1178        let this = self.inner();
1179        Ok(this.current_application().signer)
1180    }
1181
1182    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1183        Ok(self
1184            .inner()
1185            .executing_message
1186            .map(|metadata| metadata.is_bouncing))
1187    }
1188
1189    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1190        Ok(self
1191            .inner()
1192            .executing_message
1193            .map(|metadata| metadata.origin))
1194    }
1195
1196    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1197        let this = self.inner();
1198        if this.call_stack.len() <= 1 {
1199            return Ok(None);
1200        }
1201        Ok(this.current_application().caller_id)
1202    }
1203
1204    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1205        Ok(match vm_runtime {
1206            VmRuntime::Wasm => {
1207                self.inner()
1208                    .resource_controller
1209                    .policy()
1210                    .maximum_wasm_fuel_per_block
1211            }
1212            VmRuntime::Evm => {
1213                self.inner()
1214                    .resource_controller
1215                    .policy()
1216                    .maximum_evm_fuel_per_block
1217            }
1218        })
1219    }
1220
1221    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1222        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1223    }
1224
1225    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1226        let mut this = self.inner();
1227        this.resource_controller.track_fuel(fuel, vm_runtime)
1228    }
1229
1230    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1231        let mut this = self.inner();
1232        let application = this.current_application();
1233        let application_id = application.id;
1234        let authenticated_signer = application.signer;
1235        let mut refund_grant_to = this.refund_grant_to;
1236
1237        let grant = this
1238            .resource_controller
1239            .policy()
1240            .total_price(&message.grant)?;
1241        if grant.is_zero() {
1242            refund_grant_to = None;
1243        } else {
1244            this.resource_controller.track_grant(grant)?;
1245        }
1246        let kind = if message.is_tracked {
1247            MessageKind::Tracked
1248        } else {
1249            MessageKind::Simple
1250        };
1251
1252        this.execution_state_sender
1253            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1254                message: OutgoingMessage {
1255                    destination: message.destination,
1256                    authenticated_signer,
1257                    refund_grant_to,
1258                    grant,
1259                    kind,
1260                    message: Message::User {
1261                        application_id,
1262                        bytes: message.message,
1263                    },
1264                },
1265                callback,
1266            })?
1267            .recv_response()?;
1268
1269        Ok(())
1270    }
1271
1272    fn transfer(
1273        &mut self,
1274        source: AccountOwner,
1275        destination: Account,
1276        amount: Amount,
1277    ) -> Result<(), ExecutionError> {
1278        let this = self.inner();
1279        let current_application = this.current_application();
1280        let application_id = current_application.id;
1281        let signer = current_application.signer;
1282
1283        this.execution_state_sender
1284            .send_request(|callback| ExecutionRequest::Transfer {
1285                source,
1286                destination,
1287                amount,
1288                signer,
1289                application_id,
1290                callback,
1291            })?
1292            .recv_response()?;
1293        Ok(())
1294    }
1295
1296    fn claim(
1297        &mut self,
1298        source: Account,
1299        destination: Account,
1300        amount: Amount,
1301    ) -> Result<(), ExecutionError> {
1302        let this = self.inner();
1303        let current_application = this.current_application();
1304        let application_id = current_application.id;
1305        let signer = current_application.signer;
1306
1307        this.execution_state_sender
1308            .send_request(|callback| ExecutionRequest::Claim {
1309                source,
1310                destination,
1311                amount,
1312                signer,
1313                application_id,
1314                callback,
1315            })?
1316            .recv_response()?;
1317        Ok(())
1318    }
1319
1320    fn try_call_application(
1321        &mut self,
1322        authenticated: bool,
1323        callee_id: ApplicationId,
1324        argument: Vec<u8>,
1325    ) -> Result<Vec<u8>, ExecutionError> {
1326        let contract = self
1327            .inner()
1328            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1329
1330        let value = contract
1331            .try_lock()
1332            .expect("Applications should not have reentrant calls")
1333            .execute_operation(argument)?;
1334
1335        self.inner().finish_call()?;
1336
1337        Ok(value)
1338    }
1339
1340    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1341        let mut this = self.inner();
1342        ensure!(
1343            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1344            ExecutionError::StreamNameTooLong
1345        );
1346        let application_id = GenericApplicationId::User(this.current_application().id);
1347        let stream_id = StreamId {
1348            stream_name,
1349            application_id,
1350        };
1351        let value_len = value.len() as u64;
1352        let index = this
1353            .execution_state_sender
1354            .send_request(|callback| ExecutionRequest::Emit {
1355                stream_id,
1356                value,
1357                callback,
1358            })?
1359            .recv_response()?;
1360        // TODO(#365): Consider separate event fee categories.
1361        this.resource_controller.track_bytes_written(value_len)?;
1362        Ok(index)
1363    }
1364
1365    fn read_event(
1366        &mut self,
1367        chain_id: ChainId,
1368        stream_name: StreamName,
1369        index: u32,
1370    ) -> Result<Vec<u8>, ExecutionError> {
1371        let mut this = self.inner();
1372        ensure!(
1373            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1374            ExecutionError::StreamNameTooLong
1375        );
1376        let application_id = GenericApplicationId::User(this.current_application().id);
1377        let stream_id = StreamId {
1378            stream_name,
1379            application_id,
1380        };
1381        let event_id = EventId {
1382            stream_id,
1383            index,
1384            chain_id,
1385        };
1386        let event = this
1387            .execution_state_sender
1388            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1389            .recv_response()?;
1390        // TODO(#365): Consider separate event fee categories.
1391        this.resource_controller
1392            .track_bytes_read(event.len() as u64)?;
1393        Ok(event)
1394    }
1395
1396    fn subscribe_to_events(
1397        &mut self,
1398        chain_id: ChainId,
1399        application_id: ApplicationId,
1400        stream_name: StreamName,
1401    ) -> Result<(), ExecutionError> {
1402        let this = self.inner();
1403        ensure!(
1404            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1405            ExecutionError::StreamNameTooLong
1406        );
1407        let stream_id = StreamId {
1408            stream_name,
1409            application_id: application_id.into(),
1410        };
1411        let subscriber_app_id = this.current_application().id;
1412        this.execution_state_sender
1413            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1414                chain_id,
1415                stream_id,
1416                subscriber_app_id,
1417                callback,
1418            })?
1419            .recv_response()?;
1420        Ok(())
1421    }
1422
1423    fn unsubscribe_from_events(
1424        &mut self,
1425        chain_id: ChainId,
1426        application_id: ApplicationId,
1427        stream_name: StreamName,
1428    ) -> Result<(), ExecutionError> {
1429        let this = self.inner();
1430        ensure!(
1431            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1432            ExecutionError::StreamNameTooLong
1433        );
1434        let stream_id = StreamId {
1435            stream_name,
1436            application_id: application_id.into(),
1437        };
1438        let subscriber_app_id = this.current_application().id;
1439        this.execution_state_sender
1440            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1441                chain_id,
1442                stream_id,
1443                subscriber_app_id,
1444                callback,
1445            })?
1446            .recv_response()?;
1447        Ok(())
1448    }
1449
1450    fn query_service(
1451        &mut self,
1452        application_id: ApplicationId,
1453        query: Vec<u8>,
1454    ) -> Result<Vec<u8>, ExecutionError> {
1455        let mut this = self.inner();
1456
1457        let app_permissions = this
1458            .execution_state_sender
1459            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1460            .recv_response()?;
1461
1462        let app_id = this.current_application().id;
1463        ensure!(
1464            app_permissions.can_call_services(&app_id),
1465            ExecutionError::UnauthorizedApplication(app_id)
1466        );
1467
1468        this.resource_controller.track_service_oracle_call()?;
1469
1470        this.run_service_oracle_query(application_id, query)
1471    }
1472
1473    fn open_chain(
1474        &mut self,
1475        ownership: ChainOwnership,
1476        application_permissions: ApplicationPermissions,
1477        balance: Amount,
1478    ) -> Result<ChainId, ExecutionError> {
1479        let parent_id = self.inner().chain_id;
1480        let block_height = self.block_height()?;
1481
1482        let timestamp = self.inner().user_context;
1483
1484        let chain_id = self
1485            .inner()
1486            .execution_state_sender
1487            .send_request(|callback| ExecutionRequest::OpenChain {
1488                ownership,
1489                balance,
1490                parent_id,
1491                block_height,
1492                timestamp,
1493                application_permissions,
1494                callback,
1495            })?
1496            .recv_response()?;
1497
1498        Ok(chain_id)
1499    }
1500
1501    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1502        let this = self.inner();
1503        let application_id = this.current_application().id;
1504        this.execution_state_sender
1505            .send_request(|callback| ExecutionRequest::CloseChain {
1506                application_id,
1507                callback,
1508            })?
1509            .recv_response()?
1510    }
1511
1512    fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1513        let this = self.inner();
1514        let application_id = this.current_application().id;
1515        this.execution_state_sender
1516            .send_request(|callback| ExecutionRequest::ChangeOwnership {
1517                application_id,
1518                ownership,
1519                callback,
1520            })?
1521            .recv_response()?
1522    }
1523
1524    fn change_application_permissions(
1525        &mut self,
1526        application_permissions: ApplicationPermissions,
1527    ) -> Result<(), ExecutionError> {
1528        let this = self.inner();
1529        let application_id = this.current_application().id;
1530        this.execution_state_sender
1531            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1532                application_id,
1533                application_permissions,
1534                callback,
1535            })?
1536            .recv_response()?
1537    }
1538
1539    fn create_application(
1540        &mut self,
1541        module_id: ModuleId,
1542        parameters: Vec<u8>,
1543        argument: Vec<u8>,
1544        required_application_ids: Vec<ApplicationId>,
1545    ) -> Result<ApplicationId, ExecutionError> {
1546        let chain_id = self.inner().chain_id;
1547        let block_height = self.block_height()?;
1548
1549        let CreateApplicationResult { app_id } = self
1550            .inner()
1551            .execution_state_sender
1552            .send_request(move |callback| ExecutionRequest::CreateApplication {
1553                chain_id,
1554                block_height,
1555                module_id,
1556                parameters,
1557                required_application_ids,
1558                callback,
1559            })?
1560            .recv_response()??;
1561
1562        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1563
1564        contract
1565            .try_lock()
1566            .expect("Applications should not have reentrant calls")
1567            .instantiate(argument)?;
1568
1569        self.inner().finish_call()?;
1570
1571        Ok(app_id)
1572    }
1573
1574    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1575        let blob = Blob::new_data(bytes);
1576        let blob_id = blob.id();
1577        let this = self.inner();
1578        this.execution_state_sender
1579            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1580            .recv_response()?;
1581        Ok(DataBlobHash(blob_id.hash))
1582    }
1583
1584    fn publish_module(
1585        &mut self,
1586        contract: Bytecode,
1587        service: Bytecode,
1588        vm_runtime: VmRuntime,
1589    ) -> Result<ModuleId, ExecutionError> {
1590        let (blobs, module_id) =
1591            crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1592        let this = self.inner();
1593        for blob in blobs {
1594            this.execution_state_sender
1595                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1596                .recv_response()?;
1597        }
1598        Ok(module_id)
1599    }
1600
1601    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1602        let this = self.inner();
1603        let round = this.round;
1604        this.execution_state_sender
1605            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1606            .recv_response()
1607    }
1608
1609    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1610        let mut this = self.inner();
1611        let id = this.current_application().id;
1612        let state = this.view_user_states.entry(id).or_default();
1613        state.force_all_pending_queries()?;
1614        this.resource_controller.track_write_operations(
1615            batch
1616                .num_operations()
1617                .try_into()
1618                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1619        )?;
1620        this.resource_controller
1621            .track_bytes_written(batch.size() as u64)?;
1622        this.execution_state_sender
1623            .send_request(|callback| ExecutionRequest::WriteBatch {
1624                id,
1625                batch,
1626                callback,
1627            })?
1628            .recv_response()?;
1629        Ok(())
1630    }
1631}
1632
1633impl ServiceSyncRuntime {
1634    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1635    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1636        Self::new_with_deadline(execution_state_sender, context, None)
1637    }
1638
1639    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1640    pub fn new_with_deadline(
1641        execution_state_sender: ExecutionStateSender,
1642        context: QueryContext,
1643        deadline: Option<Instant>,
1644    ) -> Self {
1645        // Query the allow_application_logs setting from the execution state.
1646        let allow_application_logs = execution_state_sender
1647            .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1648            .ok()
1649            .and_then(|receiver| receiver.recv_response().ok())
1650            .unwrap_or(false);
1651
1652        let runtime = SyncRuntime(Some(
1653            SyncRuntimeInternal::new(
1654                context.chain_id,
1655                context.next_block_height,
1656                None,
1657                None,
1658                execution_state_sender,
1659                deadline,
1660                None,
1661                ResourceController::default(),
1662                (),
1663                allow_application_logs,
1664            )
1665            .into(),
1666        ));
1667
1668        ServiceSyncRuntime {
1669            runtime,
1670            current_context: context,
1671        }
1672    }
1673
1674    /// Preloads the code of a service into the runtime's memory.
1675    pub(crate) fn preload_service(
1676        &self,
1677        id: ApplicationId,
1678        code: UserServiceCode,
1679        description: ApplicationDescription,
1680    ) -> Result<(), ExecutionError> {
1681        let this = self
1682            .runtime
1683            .0
1684            .as_ref()
1685            .expect("services shouldn't be preloaded while the runtime is being dropped");
1686        let mut this_guard = this.inner();
1687
1688        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1689            entry.insert((code, description));
1690        }
1691
1692        Ok(())
1693    }
1694
1695    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1696    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1697        while let Ok(request) = incoming_requests.recv() {
1698            let ServiceRuntimeRequest::Query {
1699                application_id,
1700                context,
1701                query,
1702                callback,
1703            } = request;
1704
1705            let result = self
1706                .prepare_for_query(context)
1707                .and_then(|()| self.run_query(application_id, query));
1708
1709            if let Err(err) = callback.send(result) {
1710                tracing::debug!(%err, "Receiver for query result has been dropped");
1711            }
1712        }
1713    }
1714
1715    /// Prepares the runtime to query an application.
1716    pub(crate) fn prepare_for_query(
1717        &mut self,
1718        new_context: QueryContext,
1719    ) -> Result<(), ExecutionError> {
1720        let expected_context = QueryContext {
1721            local_time: new_context.local_time,
1722            ..self.current_context
1723        };
1724
1725        if new_context != expected_context {
1726            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1727            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1728        } else {
1729            self.handle_mut()
1730                .inner()
1731                .execution_state_sender
1732                .send_request(|callback| ExecutionRequest::SetLocalTime {
1733                    local_time: new_context.local_time,
1734                    callback,
1735                })?
1736                .recv_response()?;
1737        }
1738        Ok(())
1739    }
1740
1741    /// Queries an application specified by its [`ApplicationId`].
1742    pub(crate) fn run_query(
1743        &mut self,
1744        application_id: ApplicationId,
1745        query: Vec<u8>,
1746    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1747        let this = self.handle_mut();
1748        let response = this.try_query_application(application_id, query)?;
1749        let operations = mem::take(&mut this.inner().scheduled_operations);
1750
1751        Ok(QueryOutcome {
1752            response,
1753            operations,
1754        })
1755    }
1756
1757    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1758    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1759        self.runtime.0.as_mut().expect(
1760            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1761        )
1762    }
1763}
1764
1765impl ServiceRuntime for ServiceSyncRuntimeHandle {
1766    /// Note that queries are not available from writable contexts.
1767    fn try_query_application(
1768        &mut self,
1769        queried_id: ApplicationId,
1770        argument: Vec<u8>,
1771    ) -> Result<Vec<u8>, ExecutionError> {
1772        let service = {
1773            let mut this = self.inner();
1774
1775            // Load the application.
1776            let application = this.load_service_instance(self.clone(), queried_id)?;
1777            // Make the call to user code.
1778            this.push_application(ApplicationStatus {
1779                caller_id: None,
1780                id: queried_id,
1781                description: application.description,
1782                signer: None,
1783            });
1784            application.instance
1785        };
1786        let response = service
1787            .try_lock()
1788            .expect("Applications should not have reentrant calls")
1789            .handle_query(argument)?;
1790        self.inner().pop_application();
1791        Ok(response)
1792    }
1793
1794    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1795        let mut this = self.inner();
1796        let application_id = this.current_application().id;
1797
1798        this.scheduled_operations.push(Operation::User {
1799            application_id,
1800            bytes: operation,
1801        });
1802
1803        Ok(())
1804    }
1805
1806    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1807        if let Some(deadline) = self.inner().deadline {
1808            if Instant::now() >= deadline {
1809                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1810            }
1811        }
1812        Ok(())
1813    }
1814}
1815
1816/// A request to the service runtime actor.
1817pub enum ServiceRuntimeRequest {
1818    Query {
1819        application_id: ApplicationId,
1820        context: QueryContext,
1821        query: Vec<u8>,
1822        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1823    },
1824}
1825
1826/// The origin of the execution.
1827#[derive(Clone, Copy, Debug)]
1828struct ExecutingMessage {
1829    is_bouncing: bool,
1830    origin: ChainId,
1831}
1832
1833impl From<&MessageContext> for ExecutingMessage {
1834    fn from(context: &MessageContext) -> Self {
1835        ExecutingMessage {
1836            is_bouncing: context.is_bouncing,
1837            origin: context.origin,
1838        }
1839    }
1840}
1841
1842/// Creates a compressed contract and service bytecode synchronously.
1843pub fn create_bytecode_blobs_sync(
1844    contract: Bytecode,
1845    service: Bytecode,
1846    vm_runtime: VmRuntime,
1847) -> (Vec<Blob>, ModuleId) {
1848    match vm_runtime {
1849        VmRuntime::Wasm => {
1850            let compressed_contract = contract.compress();
1851            let compressed_service = service.compress();
1852            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1853            let service_blob = Blob::new_service_bytecode(compressed_service);
1854            let module_id =
1855                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1856            (vec![contract_blob, service_blob], module_id)
1857        }
1858        VmRuntime::Evm => {
1859            let compressed_contract = contract.compress();
1860            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1861            let module_id = ModuleId::new(
1862                evm_contract_blob.id().hash,
1863                evm_contract_blob.id().hash,
1864                vm_runtime,
1865            );
1866            (vec![evm_contract_blob], module_id)
1867        }
1868    }
1869}