linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29    execution::UserAction,
30    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31    resources::ResourceController,
32    system::CreateApplicationResult,
33    util::{ReceiverExt, UnboundedSenderExt},
34    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45    type UserContext;
46}
47
48impl WithContext for UserContractInstance {
49    type UserContext = Timestamp;
50}
51
52impl WithContext for UserServiceInstance {
53    type UserContext = ();
54}
55
56#[cfg(test)]
57impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
58    type UserContext = ();
59}
60
61#[derive(Debug)]
62pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
63
64pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
65
66pub struct ServiceSyncRuntime {
67    runtime: SyncRuntime<UserServiceInstance>,
68    current_context: QueryContext,
69}
70
71#[derive(Debug)]
72pub struct SyncRuntimeHandle<UserInstance: WithContext>(
73    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
74);
75
76pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
77pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
78
79/// Runtime data tracked during the execution of a transaction on the synchronous thread.
80#[derive(Debug)]
81pub struct SyncRuntimeInternal<UserInstance: WithContext> {
82    /// The current chain ID.
83    chain_id: ChainId,
84    /// The height of the next block that will be added to this chain. During operations
85    /// and messages, this is the current block height.
86    height: BlockHeight,
87    /// The current consensus round. Only available during block validation in multi-leader rounds.
88    round: Option<u32>,
89    /// The current message being executed, if there is one.
90    #[debug(skip_if = Option::is_none)]
91    executing_message: Option<ExecutingMessage>,
92
93    /// How to interact with the storage view of the execution state.
94    execution_state_sender: ExecutionStateSender,
95
96    /// If applications are being finalized.
97    ///
98    /// If [`true`], disables cross-application calls.
99    is_finalizing: bool,
100    /// Applications that need to be finalized.
101    applications_to_finalize: Vec<ApplicationId>,
102
103    /// Application instances loaded in this transaction.
104    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
105    /// The current stack of application descriptions.
106    call_stack: Vec<ApplicationStatus>,
107    /// The set of the IDs of the applications that are in the `call_stack`.
108    active_applications: HashSet<ApplicationId>,
109    /// The operations scheduled during this query.
110    scheduled_operations: Vec<Operation>,
111
112    /// Track application states based on views.
113    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
114
115    /// The deadline this runtime should finish executing.
116    ///
117    /// Used to limit the execution time of services running as oracles.
118    deadline: Option<Instant>,
119
120    /// Where to send a refund for the unused part of the grant after execution, if any.
121    #[debug(skip_if = Option::is_none)]
122    refund_grant_to: Option<Account>,
123    /// Controller to track fuel and storage consumption.
124    resource_controller: ResourceController,
125    /// Additional context for the runtime.
126    user_context: UserInstance::UserContext,
127}
128
129/// The runtime status of an application.
130#[derive(Debug)]
131struct ApplicationStatus {
132    /// The caller application ID, if forwarded during the call.
133    caller_id: Option<ApplicationId>,
134    /// The application ID.
135    id: ApplicationId,
136    /// The application description.
137    description: ApplicationDescription,
138    /// The authenticated signer for the execution thread, if any.
139    signer: Option<AccountOwner>,
140}
141
142/// A loaded application instance.
143#[derive(Debug)]
144struct LoadedApplication<Instance> {
145    instance: Arc<Mutex<Instance>>,
146    description: ApplicationDescription,
147}
148
149impl<Instance> LoadedApplication<Instance> {
150    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
151    fn new(instance: Instance, description: ApplicationDescription) -> Self {
152        LoadedApplication {
153            instance: Arc::new(Mutex::new(instance)),
154            description,
155        }
156    }
157}
158
159impl<Instance> Clone for LoadedApplication<Instance> {
160    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
161    // bound
162    fn clone(&self) -> Self {
163        LoadedApplication {
164            instance: self.instance.clone(),
165            description: self.description.clone(),
166        }
167    }
168}
169
170#[derive(Debug)]
171enum Promise<T> {
172    Ready(T),
173    Pending(Receiver<T>),
174}
175
176impl<T> Promise<T> {
177    fn force(&mut self) -> Result<(), ExecutionError> {
178        if let Promise::Pending(receiver) = self {
179            let value = receiver
180                .recv_ref()
181                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
182            *self = Promise::Ready(value);
183        }
184        Ok(())
185    }
186
187    fn read(self) -> Result<T, ExecutionError> {
188        match self {
189            Promise::Pending(receiver) => {
190                let value = receiver.recv_response()?;
191                Ok(value)
192            }
193            Promise::Ready(value) => Ok(value),
194        }
195    }
196}
197
198/// Manages a set of pending queries returning values of type `T`.
199#[derive(Debug, Default)]
200struct QueryManager<T> {
201    /// The queries in progress.
202    pending_queries: BTreeMap<u32, Promise<T>>,
203    /// The number of queries ever registered so far. Used for the index of the next query.
204    query_count: u32,
205    /// The number of active queries.
206    active_query_count: u32,
207}
208
209impl<T> QueryManager<T> {
210    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
211        let id = self.query_count;
212        self.pending_queries.insert(id, Promise::Pending(receiver));
213        self.query_count = self
214            .query_count
215            .checked_add(1)
216            .ok_or(ArithmeticError::Overflow)?;
217        self.active_query_count = self
218            .active_query_count
219            .checked_add(1)
220            .ok_or(ArithmeticError::Overflow)?;
221        Ok(id)
222    }
223
224    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
225        let promise = self
226            .pending_queries
227            .remove(&id)
228            .ok_or(ExecutionError::InvalidPromise)?;
229        let value = promise.read()?;
230        self.active_query_count -= 1;
231        Ok(value)
232    }
233
234    fn force_all(&mut self) -> Result<(), ExecutionError> {
235        for promise in self.pending_queries.values_mut() {
236            promise.force()?;
237        }
238        Ok(())
239    }
240}
241
242type Keys = Vec<Vec<u8>>;
243type Value = Vec<u8>;
244type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
245
246#[derive(Debug, Default)]
247struct ViewUserState {
248    /// The contains-key queries in progress.
249    contains_key_queries: QueryManager<bool>,
250    /// The contains-keys queries in progress.
251    contains_keys_queries: QueryManager<Vec<bool>>,
252    /// The read-value queries in progress.
253    read_value_queries: QueryManager<Option<Value>>,
254    /// The read-multi-values queries in progress.
255    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
256    /// The find-keys queries in progress.
257    find_keys_queries: QueryManager<Keys>,
258    /// The find-key-values queries in progress.
259    find_key_values_queries: QueryManager<KeyValues>,
260}
261
262impl ViewUserState {
263    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
264        self.contains_key_queries.force_all()?;
265        self.contains_keys_queries.force_all()?;
266        self.read_value_queries.force_all()?;
267        self.read_multi_values_queries.force_all()?;
268        self.find_keys_queries.force_all()?;
269        self.find_key_values_queries.force_all()?;
270        Ok(())
271    }
272}
273
274impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
275    type Target = SyncRuntimeHandle<UserInstance>;
276
277    fn deref(&self) -> &Self::Target {
278        self.0.as_ref().expect(
279            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
280        )
281    }
282}
283
284impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
285    fn deref_mut(&mut self) -> &mut Self::Target {
286        self.0.as_mut().expect(
287            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288        )
289    }
290}
291
292impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
293    fn drop(&mut self) {
294        // Ensure the `loaded_applications` are cleared to prevent circular references in
295        // the runtime
296        if let Some(handle) = self.0.take() {
297            handle.inner().loaded_applications.clear();
298        }
299    }
300}
301
302impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
303    #[expect(clippy::too_many_arguments)]
304    fn new(
305        chain_id: ChainId,
306        height: BlockHeight,
307        round: Option<u32>,
308        executing_message: Option<ExecutingMessage>,
309        execution_state_sender: ExecutionStateSender,
310        deadline: Option<Instant>,
311        refund_grant_to: Option<Account>,
312        resource_controller: ResourceController,
313        user_context: UserInstance::UserContext,
314    ) -> Self {
315        Self {
316            chain_id,
317            height,
318            round,
319            executing_message,
320            execution_state_sender,
321            is_finalizing: false,
322            applications_to_finalize: Vec::new(),
323            loaded_applications: HashMap::new(),
324            call_stack: Vec::new(),
325            active_applications: HashSet::new(),
326            view_user_states: BTreeMap::new(),
327            deadline,
328            refund_grant_to,
329            resource_controller,
330            scheduled_operations: Vec::new(),
331            user_context,
332        }
333    }
334
335    /// Returns the [`ApplicationStatus`] of the current application.
336    ///
337    /// The current application is the last to be pushed to the `call_stack`.
338    ///
339    /// # Panics
340    ///
341    /// If the call stack is empty.
342    fn current_application(&self) -> &ApplicationStatus {
343        self.call_stack
344            .last()
345            .expect("Call stack is unexpectedly empty")
346    }
347
348    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
349    ///
350    /// Ensures the application's ID is also tracked in the `active_applications` set.
351    fn push_application(&mut self, status: ApplicationStatus) {
352        self.active_applications.insert(status.id);
353        self.call_stack.push(status);
354    }
355
356    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
357    ///
358    /// Ensures the application's ID is also removed from the `active_applications` set.
359    ///
360    /// # Panics
361    ///
362    /// If the call stack is empty.
363    fn pop_application(&mut self) -> ApplicationStatus {
364        let status = self
365            .call_stack
366            .pop()
367            .expect("Can't remove application from empty call stack");
368        assert!(self.active_applications.remove(&status.id));
369        status
370    }
371
372    /// Ensures that a call to `application_id` is not-reentrant.
373    ///
374    /// Returns an error if there already is an entry for `application_id` in the call stack.
375    fn check_for_reentrancy(
376        &mut self,
377        application_id: ApplicationId,
378    ) -> Result<(), ExecutionError> {
379        ensure!(
380            !self.active_applications.contains(&application_id),
381            ExecutionError::ReentrantCall(application_id)
382        );
383        Ok(())
384    }
385}
386
387impl SyncRuntimeInternal<UserContractInstance> {
388    /// Loads a contract instance, initializing it with this runtime if needed.
389    fn load_contract_instance(
390        &mut self,
391        this: SyncRuntimeHandle<UserContractInstance>,
392        id: ApplicationId,
393    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
394        match self.loaded_applications.entry(id) {
395            // TODO(#2927): support dynamic loading of modules on the Web
396            #[cfg(web)]
397            hash_map::Entry::Vacant(_) => {
398                drop(this);
399                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
400                    id,
401                )))
402            }
403            #[cfg(not(web))]
404            hash_map::Entry::Vacant(entry) => {
405                let (code, description) = self
406                    .execution_state_sender
407                    .send_request(move |callback| ExecutionRequest::LoadContract { id, callback })?
408                    .recv_response()?;
409
410                let instance = code.instantiate(this)?;
411
412                self.applications_to_finalize.push(id);
413                Ok(entry
414                    .insert(LoadedApplication::new(instance, description))
415                    .clone())
416            }
417            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
418        }
419    }
420
421    /// Configures the runtime for executing a call to a different contract.
422    fn prepare_for_call(
423        &mut self,
424        this: ContractSyncRuntimeHandle,
425        authenticated: bool,
426        callee_id: ApplicationId,
427    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
428        self.check_for_reentrancy(callee_id)?;
429
430        ensure!(
431            !self.is_finalizing,
432            ExecutionError::CrossApplicationCallInFinalize {
433                caller_id: Box::new(self.current_application().id),
434                callee_id: Box::new(callee_id),
435            }
436        );
437
438        // Load the application.
439        let application = self.load_contract_instance(this, callee_id)?;
440
441        let caller = self.current_application();
442        let caller_id = caller.id;
443        let caller_signer = caller.signer;
444        // Make the call to user code.
445        let authenticated_signer = match caller_signer {
446            Some(signer) if authenticated => Some(signer),
447            _ => None,
448        };
449        let authenticated_caller_id = authenticated.then_some(caller_id);
450        self.push_application(ApplicationStatus {
451            caller_id: authenticated_caller_id,
452            id: callee_id,
453            description: application.description,
454            // Allow further nested calls to be authenticated if this one is.
455            signer: authenticated_signer,
456        });
457        Ok(application.instance)
458    }
459
460    /// Cleans up the runtime after the execution of a call to a different contract.
461    fn finish_call(&mut self) -> Result<(), ExecutionError> {
462        self.pop_application();
463        Ok(())
464    }
465
466    /// Runs the service in a separate thread as an oracle.
467    fn run_service_oracle_query(
468        &mut self,
469        application_id: ApplicationId,
470        query: Vec<u8>,
471    ) -> Result<Vec<u8>, ExecutionError> {
472        let timeout = self
473            .resource_controller
474            .remaining_service_oracle_execution_time()?;
475        let execution_start = Instant::now();
476        let deadline = Some(execution_start + timeout);
477        let response = self
478            .execution_state_sender
479            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
480                deadline,
481                application_id,
482                next_block_height: self.height,
483                query,
484                callback,
485            })?
486            .recv_response()?;
487
488        self.resource_controller
489            .track_service_oracle_execution(execution_start.elapsed())?;
490        self.resource_controller
491            .track_service_oracle_response(response.len())?;
492
493        Ok(response)
494    }
495}
496
497impl SyncRuntimeInternal<UserServiceInstance> {
498    /// Initializes a service instance with this runtime.
499    fn load_service_instance(
500        &mut self,
501        this: ServiceSyncRuntimeHandle,
502        id: ApplicationId,
503    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
504        match self.loaded_applications.entry(id) {
505            // TODO(#2927): support dynamic loading of modules on the Web
506            #[cfg(web)]
507            hash_map::Entry::Vacant(_) => {
508                drop(this);
509                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
510                    id,
511                )))
512            }
513            #[cfg(not(web))]
514            hash_map::Entry::Vacant(entry) => {
515                let (code, description) = self
516                    .execution_state_sender
517                    .send_request(move |callback| ExecutionRequest::LoadService { id, callback })?
518                    .recv_response()?;
519
520                let instance = code.instantiate(this)?;
521                Ok(entry
522                    .insert(LoadedApplication::new(instance, description))
523                    .clone())
524            }
525            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
526        }
527    }
528}
529
530impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
531    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
532        let handle = self.0.take().expect(
533            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
534        );
535        let runtime = Arc::into_inner(handle.0)?
536            .into_inner()
537            .expect("`SyncRuntime` should run in a single thread which should not panic");
538        Some(runtime)
539    }
540}
541
542impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
543    for SyncRuntimeHandle<UserInstance>
544{
545    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
546        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
547    }
548}
549
550impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
551    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
552        self.0
553            .try_lock()
554            .expect("Synchronous runtimes run on a single execution thread")
555    }
556}
557
558impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
559where
560    Self: ContractOrServiceRuntime,
561{
562    type Read = ();
563    type ReadValueBytes = u32;
564    type ContainsKey = u32;
565    type ContainsKeys = u32;
566    type ReadMultiValuesBytes = u32;
567    type FindKeysByPrefix = u32;
568    type FindKeyValuesByPrefix = u32;
569
570    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
571        let mut this = self.inner();
572        let chain_id = this.chain_id;
573        this.resource_controller.track_runtime_chain_id()?;
574        Ok(chain_id)
575    }
576
577    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
578        let mut this = self.inner();
579        let height = this.height;
580        this.resource_controller.track_runtime_block_height()?;
581        Ok(height)
582    }
583
584    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
585        let mut this = self.inner();
586        let application_id = this.current_application().id;
587        this.resource_controller.track_runtime_application_id()?;
588        Ok(application_id)
589    }
590
591    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
592        let mut this = self.inner();
593        let application_creator_chain_id = this.current_application().description.creator_chain_id;
594        this.resource_controller.track_runtime_application_id()?;
595        Ok(application_creator_chain_id)
596    }
597
598    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
599        let mut this = self.inner();
600        let parameters = this.current_application().description.parameters.clone();
601        this.resource_controller
602            .track_runtime_application_parameters(&parameters)?;
603        Ok(parameters)
604    }
605
606    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
607        let mut this = self.inner();
608        let timestamp = this
609            .execution_state_sender
610            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
611            .recv_response()?;
612        this.resource_controller.track_runtime_timestamp()?;
613        Ok(timestamp)
614    }
615
616    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
617        let mut this = self.inner();
618        let balance = this
619            .execution_state_sender
620            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
621            .recv_response()?;
622        this.resource_controller.track_runtime_balance()?;
623        Ok(balance)
624    }
625
626    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
627        let mut this = self.inner();
628        let balance = this
629            .execution_state_sender
630            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
631            .recv_response()?;
632        this.resource_controller.track_runtime_balance()?;
633        Ok(balance)
634    }
635
636    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
637        let mut this = self.inner();
638        let owner_balances = this
639            .execution_state_sender
640            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
641            .recv_response()?;
642        this.resource_controller
643            .track_runtime_owner_balances(&owner_balances)?;
644        Ok(owner_balances)
645    }
646
647    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
648        let mut this = self.inner();
649        let owners = this
650            .execution_state_sender
651            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
652            .recv_response()?;
653        this.resource_controller.track_runtime_owners(&owners)?;
654        Ok(owners)
655    }
656
657    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
658        let mut this = self.inner();
659        let chain_ownership = this
660            .execution_state_sender
661            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
662            .recv_response()?;
663        this.resource_controller
664            .track_runtime_chain_ownership(&chain_ownership)?;
665        Ok(chain_ownership)
666    }
667
668    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
669        let mut this = self.inner();
670        let id = this.current_application().id;
671        this.resource_controller.track_read_operation()?;
672        let receiver = this
673            .execution_state_sender
674            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
675        let state = this.view_user_states.entry(id).or_default();
676        state.contains_key_queries.register(receiver)
677    }
678
679    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
680        let mut this = self.inner();
681        let id = this.current_application().id;
682        let state = this.view_user_states.entry(id).or_default();
683        let value = state.contains_key_queries.wait(*promise)?;
684        Ok(value)
685    }
686
687    fn contains_keys_new(
688        &mut self,
689        keys: Vec<Vec<u8>>,
690    ) -> Result<Self::ContainsKeys, ExecutionError> {
691        let mut this = self.inner();
692        let id = this.current_application().id;
693        this.resource_controller.track_read_operation()?;
694        let receiver = this
695            .execution_state_sender
696            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
697        let state = this.view_user_states.entry(id).or_default();
698        state.contains_keys_queries.register(receiver)
699    }
700
701    fn contains_keys_wait(
702        &mut self,
703        promise: &Self::ContainsKeys,
704    ) -> Result<Vec<bool>, ExecutionError> {
705        let mut this = self.inner();
706        let id = this.current_application().id;
707        let state = this.view_user_states.entry(id).or_default();
708        let value = state.contains_keys_queries.wait(*promise)?;
709        Ok(value)
710    }
711
712    fn read_multi_values_bytes_new(
713        &mut self,
714        keys: Vec<Vec<u8>>,
715    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
716        let mut this = self.inner();
717        let id = this.current_application().id;
718        this.resource_controller.track_read_operation()?;
719        let receiver = this.execution_state_sender.send_request(move |callback| {
720            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
721        })?;
722        let state = this.view_user_states.entry(id).or_default();
723        state.read_multi_values_queries.register(receiver)
724    }
725
726    fn read_multi_values_bytes_wait(
727        &mut self,
728        promise: &Self::ReadMultiValuesBytes,
729    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
730        let mut this = self.inner();
731        let id = this.current_application().id;
732        let state = this.view_user_states.entry(id).or_default();
733        let values = state.read_multi_values_queries.wait(*promise)?;
734        for value in &values {
735            if let Some(value) = &value {
736                this.resource_controller
737                    .track_bytes_read(value.len() as u64)?;
738            }
739        }
740        Ok(values)
741    }
742
743    fn read_value_bytes_new(
744        &mut self,
745        key: Vec<u8>,
746    ) -> Result<Self::ReadValueBytes, ExecutionError> {
747        let mut this = self.inner();
748        let id = this.current_application().id;
749        this.resource_controller.track_read_operation()?;
750        let receiver = this
751            .execution_state_sender
752            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
753        let state = this.view_user_states.entry(id).or_default();
754        state.read_value_queries.register(receiver)
755    }
756
757    fn read_value_bytes_wait(
758        &mut self,
759        promise: &Self::ReadValueBytes,
760    ) -> Result<Option<Vec<u8>>, ExecutionError> {
761        let mut this = self.inner();
762        let id = this.current_application().id;
763        let value = {
764            let state = this.view_user_states.entry(id).or_default();
765            state.read_value_queries.wait(*promise)?
766        };
767        if let Some(value) = &value {
768            this.resource_controller
769                .track_bytes_read(value.len() as u64)?;
770        }
771        Ok(value)
772    }
773
774    fn find_keys_by_prefix_new(
775        &mut self,
776        key_prefix: Vec<u8>,
777    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
778        let mut this = self.inner();
779        let id = this.current_application().id;
780        this.resource_controller.track_read_operation()?;
781        let receiver = this.execution_state_sender.send_request(move |callback| {
782            ExecutionRequest::FindKeysByPrefix {
783                id,
784                key_prefix,
785                callback,
786            }
787        })?;
788        let state = this.view_user_states.entry(id).or_default();
789        state.find_keys_queries.register(receiver)
790    }
791
792    fn find_keys_by_prefix_wait(
793        &mut self,
794        promise: &Self::FindKeysByPrefix,
795    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
796        let mut this = self.inner();
797        let id = this.current_application().id;
798        let keys = {
799            let state = this.view_user_states.entry(id).or_default();
800            state.find_keys_queries.wait(*promise)?
801        };
802        let mut read_size = 0;
803        for key in &keys {
804            read_size += key.len();
805        }
806        this.resource_controller
807            .track_bytes_read(read_size as u64)?;
808        Ok(keys)
809    }
810
811    fn find_key_values_by_prefix_new(
812        &mut self,
813        key_prefix: Vec<u8>,
814    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
815        let mut this = self.inner();
816        let id = this.current_application().id;
817        this.resource_controller.track_read_operation()?;
818        let receiver = this.execution_state_sender.send_request(move |callback| {
819            ExecutionRequest::FindKeyValuesByPrefix {
820                id,
821                key_prefix,
822                callback,
823            }
824        })?;
825        let state = this.view_user_states.entry(id).or_default();
826        state.find_key_values_queries.register(receiver)
827    }
828
829    fn find_key_values_by_prefix_wait(
830        &mut self,
831        promise: &Self::FindKeyValuesByPrefix,
832    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
833        let mut this = self.inner();
834        let id = this.current_application().id;
835        let state = this.view_user_states.entry(id).or_default();
836        let key_values = state.find_key_values_queries.wait(*promise)?;
837        let mut read_size = 0;
838        for (key, value) in &key_values {
839            read_size += key.len() + value.len();
840        }
841        this.resource_controller
842            .track_bytes_read(read_size as u64)?;
843        Ok(key_values)
844    }
845
846    fn perform_http_request(
847        &mut self,
848        request: http::Request,
849    ) -> Result<http::Response, ExecutionError> {
850        let mut this = self.inner();
851        let app_permissions = this
852            .execution_state_sender
853            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
854            .recv_response()?;
855
856        let app_id = this.current_application().id;
857        ensure!(
858            app_permissions.can_make_http_requests(&app_id),
859            ExecutionError::UnauthorizedApplication(app_id)
860        );
861
862        this.resource_controller.track_http_request()?;
863
864        let response = this
865            .execution_state_sender
866            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
867                request,
868                http_responses_are_oracle_responses:
869                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
870                callback,
871            })?
872            .recv_response()?;
873        Ok(response)
874    }
875
876    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
877        let this = self.inner();
878        this.execution_state_sender
879            .send_request(|callback| ExecutionRequest::AssertBefore {
880                timestamp,
881                callback,
882            })?
883            .recv_response()?
884    }
885
886    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
887        let this = self.inner();
888        let blob_id = hash.into();
889        let content = this
890            .execution_state_sender
891            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
892            .recv_response()?;
893        Ok(content.into_vec_or_clone())
894    }
895
896    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
897        let this = self.inner();
898        let blob_id = hash.into();
899        this.execution_state_sender
900            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
901            .recv_response()?;
902        Ok(())
903    }
904}
905
906/// An extension trait to determine in compile time the different behaviors between contract and
907/// services in the implementation of [`BaseRuntime`].
908trait ContractOrServiceRuntime {
909    /// Configured to `true` if the HTTP response size should be limited to the oracle response
910    /// size.
911    ///
912    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
913    /// and only storing in the block a shorter oracle response.
914    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
915}
916
917impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
918    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
919}
920
921impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
922    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
923}
924
925impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
926    fn clone(&self) -> Self {
927        SyncRuntimeHandle(self.0.clone())
928    }
929}
930
931impl ContractSyncRuntime {
932    pub(crate) fn new(
933        execution_state_sender: ExecutionStateSender,
934        chain_id: ChainId,
935        refund_grant_to: Option<Account>,
936        resource_controller: ResourceController,
937        action: &UserAction,
938    ) -> Self {
939        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
940            SyncRuntimeInternal::new(
941                chain_id,
942                action.height(),
943                action.round(),
944                if let UserAction::Message(context, _) = action {
945                    Some(context.into())
946                } else {
947                    None
948                },
949                execution_state_sender,
950                None,
951                refund_grant_to,
952                resource_controller,
953                action.timestamp(),
954            ),
955        )))
956    }
957
958    pub(crate) fn preload_contract(
959        &self,
960        id: ApplicationId,
961        code: UserContractCode,
962        description: ApplicationDescription,
963    ) -> Result<(), ExecutionError> {
964        let this = self
965            .0
966            .as_ref()
967            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
968        let runtime_handle = this.clone();
969        let mut this_guard = this.inner();
970
971        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
972            entry.insert(LoadedApplication::new(
973                code.instantiate(runtime_handle)?,
974                description,
975            ));
976            this_guard.applications_to_finalize.push(id);
977        }
978
979        Ok(())
980    }
981
982    /// Main entry point to start executing a user action.
983    pub(crate) fn run_action(
984        mut self,
985        application_id: ApplicationId,
986        chain_id: ChainId,
987        action: UserAction,
988    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
989        let result = self
990            .deref_mut()
991            .run_action(application_id, chain_id, action)?;
992        let runtime = self
993            .into_inner()
994            .expect("Runtime clones should have been freed by now");
995
996        Ok((result, runtime.resource_controller))
997    }
998}
999
1000impl ContractSyncRuntimeHandle {
1001    fn run_action(
1002        &mut self,
1003        application_id: ApplicationId,
1004        chain_id: ChainId,
1005        action: UserAction,
1006    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1007        let finalize_context = FinalizeContext {
1008            authenticated_signer: action.signer(),
1009            chain_id,
1010            height: action.height(),
1011            round: action.round(),
1012        };
1013
1014        {
1015            let runtime = self.inner();
1016            assert_eq!(runtime.chain_id, chain_id);
1017            assert_eq!(runtime.height, action.height());
1018        }
1019
1020        let signer = action.signer();
1021        let closure = move |code: &mut UserContractInstance| match action {
1022            UserAction::Instantiate(_context, argument) => {
1023                code.instantiate(argument).map(|()| None)
1024            }
1025            UserAction::Operation(_context, operation) => {
1026                code.execute_operation(operation).map(Option::Some)
1027            }
1028            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1029            UserAction::ProcessStreams(_context, updates) => {
1030                code.process_streams(updates).map(|()| None)
1031            }
1032        };
1033
1034        let result = self.execute(application_id, signer, closure)?;
1035        self.finalize(finalize_context)?;
1036        Ok(result)
1037    }
1038
1039    /// Notifies all loaded applications that execution is finalizing.
1040    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1041        let applications = mem::take(&mut self.inner().applications_to_finalize)
1042            .into_iter()
1043            .rev();
1044
1045        self.inner().is_finalizing = true;
1046
1047        for application in applications {
1048            self.execute(application, context.authenticated_signer, |contract| {
1049                contract.finalize().map(|_| None)
1050            })?;
1051            self.inner().loaded_applications.remove(&application);
1052        }
1053
1054        Ok(())
1055    }
1056
1057    /// Executes a `closure` with the contract code for the `application_id`.
1058    fn execute(
1059        &mut self,
1060        application_id: ApplicationId,
1061        signer: Option<AccountOwner>,
1062        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1063    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1064        let contract = {
1065            let mut runtime = self.inner();
1066            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1067
1068            let status = ApplicationStatus {
1069                caller_id: None,
1070                id: application_id,
1071                description: application.description.clone(),
1072                signer,
1073            };
1074
1075            runtime.push_application(status);
1076
1077            application
1078        };
1079
1080        let result = closure(
1081            &mut contract
1082                .instance
1083                .try_lock()
1084                .expect("Application should not be already executing"),
1085        )?;
1086
1087        let mut runtime = self.inner();
1088        let application_status = runtime.pop_application();
1089        assert_eq!(application_status.caller_id, None);
1090        assert_eq!(application_status.id, application_id);
1091        assert_eq!(application_status.description, contract.description);
1092        assert_eq!(application_status.signer, signer);
1093        assert!(runtime.call_stack.is_empty());
1094
1095        Ok(result)
1096    }
1097}
1098
1099impl ContractRuntime for ContractSyncRuntimeHandle {
1100    fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1101        let this = self.inner();
1102        Ok(this.current_application().signer)
1103    }
1104
1105    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1106        Ok(self
1107            .inner()
1108            .executing_message
1109            .map(|metadata| metadata.is_bouncing))
1110    }
1111
1112    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1113        Ok(self
1114            .inner()
1115            .executing_message
1116            .map(|metadata| metadata.origin))
1117    }
1118
1119    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1120        let this = self.inner();
1121        if this.call_stack.len() <= 1 {
1122            return Ok(None);
1123        }
1124        Ok(this.current_application().caller_id)
1125    }
1126
1127    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1128        Ok(match vm_runtime {
1129            VmRuntime::Wasm => {
1130                self.inner()
1131                    .resource_controller
1132                    .policy()
1133                    .maximum_wasm_fuel_per_block
1134            }
1135            VmRuntime::Evm => {
1136                self.inner()
1137                    .resource_controller
1138                    .policy()
1139                    .maximum_evm_fuel_per_block
1140            }
1141        })
1142    }
1143
1144    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1145        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1146    }
1147
1148    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1149        let mut this = self.inner();
1150        this.resource_controller.track_fuel(fuel, vm_runtime)
1151    }
1152
1153    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1154        let mut this = self.inner();
1155        let application = this.current_application();
1156        let application_id = application.id;
1157        let authenticated_signer = application.signer;
1158        let mut refund_grant_to = this.refund_grant_to;
1159
1160        let grant = this
1161            .resource_controller
1162            .policy()
1163            .total_price(&message.grant)?;
1164        if grant.is_zero() {
1165            refund_grant_to = None;
1166        } else {
1167            this.resource_controller.track_grant(grant)?;
1168        }
1169        let kind = if message.is_tracked {
1170            MessageKind::Tracked
1171        } else {
1172            MessageKind::Simple
1173        };
1174
1175        this.execution_state_sender
1176            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1177                message: OutgoingMessage {
1178                    destination: message.destination,
1179                    authenticated_signer,
1180                    refund_grant_to,
1181                    grant,
1182                    kind,
1183                    message: Message::User {
1184                        application_id,
1185                        bytes: message.message,
1186                    },
1187                },
1188                callback,
1189            })?
1190            .recv_response()?;
1191
1192        Ok(())
1193    }
1194
1195    fn transfer(
1196        &mut self,
1197        source: AccountOwner,
1198        destination: Account,
1199        amount: Amount,
1200    ) -> Result<(), ExecutionError> {
1201        let this = self.inner();
1202        let current_application = this.current_application();
1203        let application_id = current_application.id;
1204        let signer = current_application.signer;
1205
1206        this.execution_state_sender
1207            .send_request(|callback| ExecutionRequest::Transfer {
1208                source,
1209                destination,
1210                amount,
1211                signer,
1212                application_id,
1213                callback,
1214            })?
1215            .recv_response()?;
1216        Ok(())
1217    }
1218
1219    fn claim(
1220        &mut self,
1221        source: Account,
1222        destination: Account,
1223        amount: Amount,
1224    ) -> Result<(), ExecutionError> {
1225        let this = self.inner();
1226        let current_application = this.current_application();
1227        let application_id = current_application.id;
1228        let signer = current_application.signer;
1229
1230        this.execution_state_sender
1231            .send_request(|callback| ExecutionRequest::Claim {
1232                source,
1233                destination,
1234                amount,
1235                signer,
1236                application_id,
1237                callback,
1238            })?
1239            .recv_response()?;
1240        Ok(())
1241    }
1242
1243    fn try_call_application(
1244        &mut self,
1245        authenticated: bool,
1246        callee_id: ApplicationId,
1247        argument: Vec<u8>,
1248    ) -> Result<Vec<u8>, ExecutionError> {
1249        let contract = self
1250            .inner()
1251            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1252
1253        let value = contract
1254            .try_lock()
1255            .expect("Applications should not have reentrant calls")
1256            .execute_operation(argument)?;
1257
1258        self.inner().finish_call()?;
1259
1260        Ok(value)
1261    }
1262
1263    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1264        let mut this = self.inner();
1265        ensure!(
1266            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1267            ExecutionError::StreamNameTooLong
1268        );
1269        let application_id = GenericApplicationId::User(this.current_application().id);
1270        let stream_id = StreamId {
1271            stream_name,
1272            application_id,
1273        };
1274        let value_len = value.len() as u64;
1275        let index = this
1276            .execution_state_sender
1277            .send_request(|callback| ExecutionRequest::Emit {
1278                stream_id,
1279                value,
1280                callback,
1281            })?
1282            .recv_response()?;
1283        // TODO(#365): Consider separate event fee categories.
1284        this.resource_controller.track_bytes_written(value_len)?;
1285        Ok(index)
1286    }
1287
1288    fn read_event(
1289        &mut self,
1290        chain_id: ChainId,
1291        stream_name: StreamName,
1292        index: u32,
1293    ) -> Result<Vec<u8>, ExecutionError> {
1294        let mut this = self.inner();
1295        ensure!(
1296            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1297            ExecutionError::StreamNameTooLong
1298        );
1299        let application_id = GenericApplicationId::User(this.current_application().id);
1300        let stream_id = StreamId {
1301            stream_name,
1302            application_id,
1303        };
1304        let event_id = EventId {
1305            stream_id,
1306            index,
1307            chain_id,
1308        };
1309        let event = this
1310            .execution_state_sender
1311            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1312            .recv_response()?;
1313        // TODO(#365): Consider separate event fee categories.
1314        this.resource_controller
1315            .track_bytes_read(event.len() as u64)?;
1316        Ok(event)
1317    }
1318
1319    fn subscribe_to_events(
1320        &mut self,
1321        chain_id: ChainId,
1322        application_id: ApplicationId,
1323        stream_name: StreamName,
1324    ) -> Result<(), ExecutionError> {
1325        let this = self.inner();
1326        ensure!(
1327            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1328            ExecutionError::StreamNameTooLong
1329        );
1330        let stream_id = StreamId {
1331            stream_name,
1332            application_id: application_id.into(),
1333        };
1334        let subscriber_app_id = this.current_application().id;
1335        this.execution_state_sender
1336            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1337                chain_id,
1338                stream_id,
1339                subscriber_app_id,
1340                callback,
1341            })?
1342            .recv_response()?;
1343        Ok(())
1344    }
1345
1346    fn unsubscribe_from_events(
1347        &mut self,
1348        chain_id: ChainId,
1349        application_id: ApplicationId,
1350        stream_name: StreamName,
1351    ) -> Result<(), ExecutionError> {
1352        let this = self.inner();
1353        ensure!(
1354            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1355            ExecutionError::StreamNameTooLong
1356        );
1357        let stream_id = StreamId {
1358            stream_name,
1359            application_id: application_id.into(),
1360        };
1361        let subscriber_app_id = this.current_application().id;
1362        this.execution_state_sender
1363            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1364                chain_id,
1365                stream_id,
1366                subscriber_app_id,
1367                callback,
1368            })?
1369            .recv_response()?;
1370        Ok(())
1371    }
1372
1373    fn query_service(
1374        &mut self,
1375        application_id: ApplicationId,
1376        query: Vec<u8>,
1377    ) -> Result<Vec<u8>, ExecutionError> {
1378        let mut this = self.inner();
1379
1380        let app_permissions = this
1381            .execution_state_sender
1382            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1383            .recv_response()?;
1384
1385        let app_id = this.current_application().id;
1386        ensure!(
1387            app_permissions.can_call_services(&app_id),
1388            ExecutionError::UnauthorizedApplication(app_id)
1389        );
1390
1391        this.resource_controller.track_service_oracle_call()?;
1392
1393        this.run_service_oracle_query(application_id, query)
1394    }
1395
1396    fn open_chain(
1397        &mut self,
1398        ownership: ChainOwnership,
1399        application_permissions: ApplicationPermissions,
1400        balance: Amount,
1401    ) -> Result<ChainId, ExecutionError> {
1402        let parent_id = self.inner().chain_id;
1403        let block_height = self.block_height()?;
1404
1405        let timestamp = self.inner().user_context;
1406
1407        let chain_id = self
1408            .inner()
1409            .execution_state_sender
1410            .send_request(|callback| ExecutionRequest::OpenChain {
1411                ownership,
1412                balance,
1413                parent_id,
1414                block_height,
1415                timestamp,
1416                application_permissions,
1417                callback,
1418            })?
1419            .recv_response()?;
1420
1421        Ok(chain_id)
1422    }
1423
1424    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1425        let this = self.inner();
1426        let application_id = this.current_application().id;
1427        this.execution_state_sender
1428            .send_request(|callback| ExecutionRequest::CloseChain {
1429                application_id,
1430                callback,
1431            })?
1432            .recv_response()?
1433    }
1434
1435    fn change_application_permissions(
1436        &mut self,
1437        application_permissions: ApplicationPermissions,
1438    ) -> Result<(), ExecutionError> {
1439        let this = self.inner();
1440        let application_id = this.current_application().id;
1441        this.execution_state_sender
1442            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1443                application_id,
1444                application_permissions,
1445                callback,
1446            })?
1447            .recv_response()?
1448    }
1449
1450    fn create_application(
1451        &mut self,
1452        module_id: ModuleId,
1453        parameters: Vec<u8>,
1454        argument: Vec<u8>,
1455        required_application_ids: Vec<ApplicationId>,
1456    ) -> Result<ApplicationId, ExecutionError> {
1457        let chain_id = self.inner().chain_id;
1458        let block_height = self.block_height()?;
1459
1460        let CreateApplicationResult { app_id } = self
1461            .inner()
1462            .execution_state_sender
1463            .send_request(move |callback| ExecutionRequest::CreateApplication {
1464                chain_id,
1465                block_height,
1466                module_id,
1467                parameters,
1468                required_application_ids,
1469                callback,
1470            })?
1471            .recv_response()??;
1472
1473        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1474
1475        contract
1476            .try_lock()
1477            .expect("Applications should not have reentrant calls")
1478            .instantiate(argument)?;
1479
1480        self.inner().finish_call()?;
1481
1482        Ok(app_id)
1483    }
1484
1485    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1486        let blob = Blob::new_data(bytes);
1487        let blob_id = blob.id();
1488        let this = self.inner();
1489        this.execution_state_sender
1490            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1491            .recv_response()?;
1492        Ok(DataBlobHash(blob_id.hash))
1493    }
1494
1495    fn publish_module(
1496        &mut self,
1497        contract: Bytecode,
1498        service: Bytecode,
1499        vm_runtime: VmRuntime,
1500    ) -> Result<ModuleId, ExecutionError> {
1501        let (blobs, module_id) =
1502            crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1503        let this = self.inner();
1504        for blob in blobs {
1505            this.execution_state_sender
1506                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1507                .recv_response()?;
1508        }
1509        Ok(module_id)
1510    }
1511
1512    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1513        let this = self.inner();
1514        let round = this.round;
1515        this.execution_state_sender
1516            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1517            .recv_response()
1518    }
1519
1520    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1521        let mut this = self.inner();
1522        let id = this.current_application().id;
1523        let state = this.view_user_states.entry(id).or_default();
1524        state.force_all_pending_queries()?;
1525        this.resource_controller.track_write_operations(
1526            batch
1527                .num_operations()
1528                .try_into()
1529                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1530        )?;
1531        this.resource_controller
1532            .track_bytes_written(batch.size() as u64)?;
1533        this.execution_state_sender
1534            .send_request(|callback| ExecutionRequest::WriteBatch {
1535                id,
1536                batch,
1537                callback,
1538            })?
1539            .recv_response()?;
1540        Ok(())
1541    }
1542}
1543
1544impl ServiceSyncRuntime {
1545    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1546    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1547        Self::new_with_deadline(execution_state_sender, context, None)
1548    }
1549
1550    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1551    pub fn new_with_deadline(
1552        execution_state_sender: ExecutionStateSender,
1553        context: QueryContext,
1554        deadline: Option<Instant>,
1555    ) -> Self {
1556        let runtime = SyncRuntime(Some(
1557            SyncRuntimeInternal::new(
1558                context.chain_id,
1559                context.next_block_height,
1560                None,
1561                None,
1562                execution_state_sender,
1563                deadline,
1564                None,
1565                ResourceController::default(),
1566                (),
1567            )
1568            .into(),
1569        ));
1570
1571        ServiceSyncRuntime {
1572            runtime,
1573            current_context: context,
1574        }
1575    }
1576
1577    /// Loads a service into the runtime's memory.
1578    pub(crate) fn preload_service(
1579        &self,
1580        id: ApplicationId,
1581        code: UserServiceCode,
1582        description: ApplicationDescription,
1583    ) -> Result<(), ExecutionError> {
1584        let this = self
1585            .runtime
1586            .0
1587            .as_ref()
1588            .expect("services shouldn't be preloaded while the runtime is being dropped");
1589        let runtime_handle = this.clone();
1590        let mut this_guard = this.inner();
1591
1592        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1593            entry.insert(LoadedApplication::new(
1594                code.instantiate(runtime_handle)?,
1595                description,
1596            ));
1597            this_guard.applications_to_finalize.push(id);
1598        }
1599
1600        Ok(())
1601    }
1602
1603    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1604    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1605        while let Ok(request) = incoming_requests.recv() {
1606            let ServiceRuntimeRequest::Query {
1607                application_id,
1608                context,
1609                query,
1610                callback,
1611            } = request;
1612
1613            let result = self
1614                .prepare_for_query(context)
1615                .and_then(|()| self.run_query(application_id, query));
1616
1617            if let Err(err) = callback.send(result) {
1618                tracing::debug!(%err, "Receiver for query result has been dropped");
1619            }
1620        }
1621    }
1622
1623    /// Prepares the runtime to query an application.
1624    pub(crate) fn prepare_for_query(
1625        &mut self,
1626        new_context: QueryContext,
1627    ) -> Result<(), ExecutionError> {
1628        let expected_context = QueryContext {
1629            local_time: new_context.local_time,
1630            ..self.current_context
1631        };
1632
1633        if new_context != expected_context {
1634            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1635            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1636        } else {
1637            self.handle_mut()
1638                .inner()
1639                .execution_state_sender
1640                .send_request(|callback| ExecutionRequest::SetLocalTime {
1641                    local_time: new_context.local_time,
1642                    callback,
1643                })?
1644                .recv_response()?;
1645        }
1646        Ok(())
1647    }
1648
1649    /// Queries an application specified by its [`ApplicationId`].
1650    pub(crate) fn run_query(
1651        &mut self,
1652        application_id: ApplicationId,
1653        query: Vec<u8>,
1654    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1655        let this = self.handle_mut();
1656        let response = this.try_query_application(application_id, query)?;
1657        let operations = mem::take(&mut this.inner().scheduled_operations);
1658
1659        Ok(QueryOutcome {
1660            response,
1661            operations,
1662        })
1663    }
1664
1665    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1666    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1667        self.runtime.0.as_mut().expect(
1668            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1669        )
1670    }
1671}
1672
1673impl ServiceRuntime for ServiceSyncRuntimeHandle {
1674    /// Note that queries are not available from writable contexts.
1675    fn try_query_application(
1676        &mut self,
1677        queried_id: ApplicationId,
1678        argument: Vec<u8>,
1679    ) -> Result<Vec<u8>, ExecutionError> {
1680        let service = {
1681            let mut this = self.inner();
1682
1683            // Load the application.
1684            let application = this.load_service_instance(self.clone(), queried_id)?;
1685            // Make the call to user code.
1686            this.push_application(ApplicationStatus {
1687                caller_id: None,
1688                id: queried_id,
1689                description: application.description,
1690                signer: None,
1691            });
1692            application.instance
1693        };
1694        let response = service
1695            .try_lock()
1696            .expect("Applications should not have reentrant calls")
1697            .handle_query(argument)?;
1698        self.inner().pop_application();
1699        Ok(response)
1700    }
1701
1702    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1703        let mut this = self.inner();
1704        let application_id = this.current_application().id;
1705
1706        this.scheduled_operations.push(Operation::User {
1707            application_id,
1708            bytes: operation,
1709        });
1710
1711        Ok(())
1712    }
1713
1714    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1715        if let Some(deadline) = self.inner().deadline {
1716            if Instant::now() >= deadline {
1717                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1718            }
1719        }
1720        Ok(())
1721    }
1722}
1723
1724/// A request to the service runtime actor.
1725pub enum ServiceRuntimeRequest {
1726    Query {
1727        application_id: ApplicationId,
1728        context: QueryContext,
1729        query: Vec<u8>,
1730        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1731    },
1732}
1733
1734/// The origin of the execution.
1735#[derive(Clone, Copy, Debug)]
1736struct ExecutingMessage {
1737    is_bouncing: bool,
1738    origin: ChainId,
1739}
1740
1741impl From<&MessageContext> for ExecutingMessage {
1742    fn from(context: &MessageContext) -> Self {
1743        ExecutingMessage {
1744            is_bouncing: context.is_bouncing,
1745            origin: context.origin,
1746        }
1747    }
1748}
1749
1750/// Creates a compressed contract and service bytecode synchronously.
1751pub fn create_bytecode_blobs_sync(
1752    contract: Bytecode,
1753    service: Bytecode,
1754    vm_runtime: VmRuntime,
1755) -> (Vec<Blob>, ModuleId) {
1756    match vm_runtime {
1757        VmRuntime::Wasm => {
1758            let compressed_contract = contract.compress();
1759            let compressed_service = service.compress();
1760            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1761            let service_blob = Blob::new_service_bytecode(compressed_service);
1762            let module_id =
1763                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1764            (vec![contract_blob, service_blob], module_id)
1765        }
1766        VmRuntime::Evm => {
1767            let compressed_contract = contract.compress();
1768            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1769            let module_id = ModuleId::new(
1770                evm_contract_blob.id().hash,
1771                evm_contract_blob.id().hash,
1772                vm_runtime,
1773            );
1774            (vec![evm_contract_blob], module_id)
1775        }
1776    }
1777}