linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9    time::Instant,
10};
11
12use custom_debug_derive::Debug;
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{
16        Amount, ApplicationPermissions, ArithmeticError, BlockHeight, OracleResponse,
17        SendMessageRequest, Timestamp,
18    },
19    ensure, http,
20    identifiers::{
21        Account, AccountOwner, BlobId, BlobType, ChainId, ChannelFullName, ChannelName, EventId,
22        GenericApplicationId, MessageId, StreamId, StreamName,
23    },
24    ownership::ChainOwnership,
25};
26use linera_views::batch::Batch;
27use oneshot::Receiver;
28
29use crate::{
30    execution::UserAction,
31    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
32    resources::ResourceController,
33    system::CreateApplicationResult,
34    util::{ReceiverExt, UnboundedSenderExt},
35    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, ExecutionError,
36    FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation, OperationContext,
37    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, TransactionTracker,
38    UserContractCode, UserContractInstance, UserServiceCode, UserServiceInstance,
39    MAX_STREAM_NAME_LEN,
40};
41
42#[cfg(test)]
43#[path = "unit_tests/runtime_tests.rs"]
44mod tests;
45
46#[derive(Debug)]
47pub struct SyncRuntime<UserInstance>(Option<SyncRuntimeHandle<UserInstance>>);
48
49pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
50
51pub struct ServiceSyncRuntime {
52    runtime: SyncRuntime<UserServiceInstance>,
53    current_context: QueryContext,
54}
55
56#[derive(Debug)]
57pub struct SyncRuntimeHandle<UserInstance>(Arc<Mutex<SyncRuntimeInternal<UserInstance>>>);
58
59pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
60pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
61
62/// Runtime data tracked during the execution of a transaction on the synchronous thread.
63#[derive(Debug)]
64pub struct SyncRuntimeInternal<UserInstance> {
65    /// The current chain ID.
66    chain_id: ChainId,
67    /// The height of the next block that will be added to this chain. During operations
68    /// and messages, this is the current block height.
69    height: BlockHeight,
70    /// The current consensus round. Only available during block validation in multi-leader rounds.
71    round: Option<u32>,
72    /// The authenticated signer of the operation or message, if any.
73    #[debug(skip_if = Option::is_none)]
74    authenticated_signer: Option<AccountOwner>,
75    /// The current message being executed, if there is one.
76    #[debug(skip_if = Option::is_none)]
77    executing_message: Option<ExecutingMessage>,
78
79    /// How to interact with the storage view of the execution state.
80    execution_state_sender: ExecutionStateSender,
81
82    /// If applications are being finalized.
83    ///
84    /// If [`true`], disables cross-application calls.
85    is_finalizing: bool,
86    /// Applications that need to be finalized.
87    applications_to_finalize: Vec<ApplicationId>,
88
89    /// Application instances loaded in this transaction.
90    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
91    /// The current stack of application descriptions.
92    call_stack: Vec<ApplicationStatus>,
93    /// The set of the IDs of the applications that are in the `call_stack`.
94    active_applications: HashSet<ApplicationId>,
95    /// The tracking information for this transaction.
96    transaction_tracker: TransactionTracker,
97    /// The operations scheduled during this query.
98    scheduled_operations: Vec<Operation>,
99
100    /// Track application states based on views.
101    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
102
103    /// The deadline this runtime should finish executing.
104    ///
105    /// Used to limit the execution time of services running as oracles.
106    deadline: Option<Instant>,
107
108    /// Where to send a refund for the unused part of the grant after execution, if any.
109    #[debug(skip_if = Option::is_none)]
110    refund_grant_to: Option<Account>,
111    /// Controller to track fuel and storage consumption.
112    resource_controller: ResourceController,
113}
114
115/// The runtime status of an application.
116#[derive(Debug)]
117struct ApplicationStatus {
118    /// The caller application ID, if forwarded during the call.
119    caller_id: Option<ApplicationId>,
120    /// The application ID.
121    id: ApplicationId,
122    /// The application description.
123    description: ApplicationDescription,
124    /// The authenticated signer for the execution thread, if any.
125    signer: Option<AccountOwner>,
126}
127
128/// A loaded application instance.
129#[derive(Debug)]
130struct LoadedApplication<Instance> {
131    instance: Arc<Mutex<Instance>>,
132    description: ApplicationDescription,
133}
134
135impl<Instance> LoadedApplication<Instance> {
136    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
137    fn new(instance: Instance, description: ApplicationDescription) -> Self {
138        LoadedApplication {
139            instance: Arc::new(Mutex::new(instance)),
140            description,
141        }
142    }
143}
144
145impl<Instance> Clone for LoadedApplication<Instance> {
146    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
147    // bound
148    fn clone(&self) -> Self {
149        LoadedApplication {
150            instance: self.instance.clone(),
151            description: self.description.clone(),
152        }
153    }
154}
155
156#[derive(Debug)]
157enum Promise<T> {
158    Ready(T),
159    Pending(Receiver<T>),
160}
161
162impl<T> Promise<T> {
163    fn force(&mut self) -> Result<(), ExecutionError> {
164        if let Promise::Pending(receiver) = self {
165            let value = receiver
166                .recv_ref()
167                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
168            *self = Promise::Ready(value);
169        }
170        Ok(())
171    }
172
173    fn read(self) -> Result<T, ExecutionError> {
174        match self {
175            Promise::Pending(receiver) => {
176                let value = receiver.recv_response()?;
177                Ok(value)
178            }
179            Promise::Ready(value) => Ok(value),
180        }
181    }
182}
183
184/// Manages a set of pending queries returning values of type `T`.
185#[derive(Debug, Default)]
186struct QueryManager<T> {
187    /// The queries in progress.
188    pending_queries: BTreeMap<u32, Promise<T>>,
189    /// The number of queries ever registered so far. Used for the index of the next query.
190    query_count: u32,
191    /// The number of active queries.
192    active_query_count: u32,
193}
194
195impl<T> QueryManager<T> {
196    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
197        let id = self.query_count;
198        self.pending_queries.insert(id, Promise::Pending(receiver));
199        self.query_count = self
200            .query_count
201            .checked_add(1)
202            .ok_or(ArithmeticError::Overflow)?;
203        self.active_query_count = self
204            .active_query_count
205            .checked_add(1)
206            .ok_or(ArithmeticError::Overflow)?;
207        Ok(id)
208    }
209
210    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
211        let promise = self
212            .pending_queries
213            .remove(&id)
214            .ok_or(ExecutionError::InvalidPromise)?;
215        let value = promise.read()?;
216        self.active_query_count -= 1;
217        Ok(value)
218    }
219
220    fn force_all(&mut self) -> Result<(), ExecutionError> {
221        for promise in self.pending_queries.values_mut() {
222            promise.force()?;
223        }
224        Ok(())
225    }
226}
227
228type Keys = Vec<Vec<u8>>;
229type Value = Vec<u8>;
230type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
231
232#[derive(Debug, Default)]
233struct ViewUserState {
234    /// The contains-key queries in progress.
235    contains_key_queries: QueryManager<bool>,
236    /// The contains-keys queries in progress.
237    contains_keys_queries: QueryManager<Vec<bool>>,
238    /// The read-value queries in progress.
239    read_value_queries: QueryManager<Option<Value>>,
240    /// The read-multi-values queries in progress.
241    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
242    /// The find-keys queries in progress.
243    find_keys_queries: QueryManager<Keys>,
244    /// The find-key-values queries in progress.
245    find_key_values_queries: QueryManager<KeyValues>,
246}
247
248impl ViewUserState {
249    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
250        self.contains_key_queries.force_all()?;
251        self.contains_keys_queries.force_all()?;
252        self.read_value_queries.force_all()?;
253        self.read_multi_values_queries.force_all()?;
254        self.find_keys_queries.force_all()?;
255        self.find_key_values_queries.force_all()?;
256        Ok(())
257    }
258}
259
260impl<UserInstance> Deref for SyncRuntime<UserInstance> {
261    type Target = SyncRuntimeHandle<UserInstance>;
262
263    fn deref(&self) -> &Self::Target {
264        self.0.as_ref().expect(
265            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
266        )
267    }
268}
269
270impl<UserInstance> DerefMut for SyncRuntime<UserInstance> {
271    fn deref_mut(&mut self) -> &mut Self::Target {
272        self.0.as_mut().expect(
273            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
274        )
275    }
276}
277
278impl<UserInstance> Drop for SyncRuntime<UserInstance> {
279    fn drop(&mut self) {
280        // Ensure the `loaded_applications` are cleared to prevent circular references in
281        // the runtime
282        if let Some(handle) = self.0.take() {
283            handle.inner().loaded_applications.clear();
284        }
285    }
286}
287
288impl<UserInstance> SyncRuntimeInternal<UserInstance> {
289    #[expect(clippy::too_many_arguments)]
290    fn new(
291        chain_id: ChainId,
292        height: BlockHeight,
293        round: Option<u32>,
294        authenticated_signer: Option<AccountOwner>,
295        executing_message: Option<ExecutingMessage>,
296        execution_state_sender: ExecutionStateSender,
297        deadline: Option<Instant>,
298        refund_grant_to: Option<Account>,
299        resource_controller: ResourceController,
300        transaction_tracker: TransactionTracker,
301    ) -> Self {
302        Self {
303            chain_id,
304            height,
305            round,
306            authenticated_signer,
307            executing_message,
308            execution_state_sender,
309            is_finalizing: false,
310            applications_to_finalize: Vec::new(),
311            loaded_applications: HashMap::new(),
312            call_stack: Vec::new(),
313            active_applications: HashSet::new(),
314            view_user_states: BTreeMap::new(),
315            deadline,
316            refund_grant_to,
317            resource_controller,
318            transaction_tracker,
319            scheduled_operations: Vec::new(),
320        }
321    }
322
323    /// Returns the [`ApplicationStatus`] of the current application.
324    ///
325    /// The current application is the last to be pushed to the `call_stack`.
326    ///
327    /// # Panics
328    ///
329    /// If the call stack is empty.
330    fn current_application(&self) -> &ApplicationStatus {
331        self.call_stack
332            .last()
333            .expect("Call stack is unexpectedly empty")
334    }
335
336    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
337    ///
338    /// Ensures the application's ID is also tracked in the `active_applications` set.
339    fn push_application(&mut self, status: ApplicationStatus) {
340        self.active_applications.insert(status.id);
341        self.call_stack.push(status);
342    }
343
344    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
345    ///
346    /// Ensures the application's ID is also removed from the `active_applications` set.
347    ///
348    /// # Panics
349    ///
350    /// If the call stack is empty.
351    fn pop_application(&mut self) -> ApplicationStatus {
352        let status = self
353            .call_stack
354            .pop()
355            .expect("Can't remove application from empty call stack");
356        assert!(self.active_applications.remove(&status.id));
357        status
358    }
359
360    /// Ensures that a call to `application_id` is not-reentrant.
361    ///
362    /// Returns an error if there already is an entry for `application_id` in the call stack.
363    fn check_for_reentrancy(
364        &mut self,
365        application_id: ApplicationId,
366    ) -> Result<(), ExecutionError> {
367        ensure!(
368            !self.active_applications.contains(&application_id),
369            ExecutionError::ReentrantCall(application_id)
370        );
371        Ok(())
372    }
373}
374
375impl SyncRuntimeInternal<UserContractInstance> {
376    /// Loads a contract instance, initializing it with this runtime if needed.
377    fn load_contract_instance(
378        &mut self,
379        this: SyncRuntimeHandle<UserContractInstance>,
380        id: ApplicationId,
381    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
382        match self.loaded_applications.entry(id) {
383            // TODO(#2927): support dynamic loading of modules on the Web
384            #[cfg(web)]
385            hash_map::Entry::Vacant(_) => {
386                drop(this);
387                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
388                    id,
389                )))
390            }
391            #[cfg(not(web))]
392            hash_map::Entry::Vacant(entry) => {
393                let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
394                let (code, description, txn_tracker_moved) = self
395                    .execution_state_sender
396                    .send_request(move |callback| ExecutionRequest::LoadContract {
397                        id,
398                        callback,
399                        txn_tracker: txn_tracker_moved,
400                    })?
401                    .recv_response()?;
402                self.transaction_tracker = txn_tracker_moved;
403
404                let instance = code.instantiate(this)?;
405
406                self.applications_to_finalize.push(id);
407                Ok(entry
408                    .insert(LoadedApplication::new(instance, description))
409                    .clone())
410            }
411            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
412        }
413    }
414
415    /// Configures the runtime for executing a call to a different contract.
416    fn prepare_for_call(
417        &mut self,
418        this: ContractSyncRuntimeHandle,
419        authenticated: bool,
420        callee_id: ApplicationId,
421    ) -> Result<(Arc<Mutex<UserContractInstance>>, OperationContext), ExecutionError> {
422        self.check_for_reentrancy(callee_id)?;
423
424        ensure!(
425            !self.is_finalizing,
426            ExecutionError::CrossApplicationCallInFinalize {
427                caller_id: Box::new(self.current_application().id),
428                callee_id: Box::new(callee_id),
429            }
430        );
431
432        // Load the application.
433        let application = self.load_contract_instance(this, callee_id)?;
434
435        let caller = self.current_application();
436        let caller_id = caller.id;
437        let caller_signer = caller.signer;
438        // Make the call to user code.
439        let authenticated_signer = match caller_signer {
440            Some(signer) if authenticated => Some(signer),
441            _ => None,
442        };
443        let authenticated_caller_id = authenticated.then_some(caller_id);
444        let callee_context = OperationContext {
445            chain_id: self.chain_id,
446            authenticated_signer,
447            authenticated_caller_id,
448            height: self.height,
449            round: self.round,
450            index: None,
451        };
452        self.push_application(ApplicationStatus {
453            caller_id: authenticated_caller_id,
454            id: callee_id,
455            description: application.description,
456            // Allow further nested calls to be authenticated if this one is.
457            signer: authenticated_signer,
458        });
459        Ok((application.instance, callee_context))
460    }
461
462    /// Cleans up the runtime after the execution of a call to a different contract.
463    fn finish_call(&mut self) -> Result<(), ExecutionError> {
464        self.pop_application();
465        Ok(())
466    }
467
468    /// Runs the service in a separate thread as an oracle.
469    fn run_service_oracle_query(
470        &mut self,
471        application_id: ApplicationId,
472        query: Vec<u8>,
473    ) -> Result<Vec<u8>, ExecutionError> {
474        let context = QueryContext {
475            chain_id: self.chain_id,
476            next_block_height: self.height,
477            local_time: self.transaction_tracker.local_time(),
478        };
479        let sender = self.execution_state_sender.clone();
480
481        let txn_tracker = TransactionTracker::default()
482            .with_blobs(self.transaction_tracker.created_blobs().clone());
483
484        let timeout = self
485            .resource_controller
486            .remaining_service_oracle_execution_time()?;
487        let execution_start = Instant::now();
488        let deadline = Some(execution_start + timeout);
489
490        let mut service_runtime =
491            ServiceSyncRuntime::new_with_txn_tracker(sender, context, deadline, txn_tracker);
492
493        let result = service_runtime.run_query(application_id, query);
494
495        // Always track the execution time, irrespective to whether the service ran successfully or
496        // timed out
497        self.resource_controller
498            .track_service_oracle_execution(execution_start.elapsed())?;
499
500        let QueryOutcome {
501            response,
502            operations,
503        } = result?;
504
505        self.resource_controller
506            .track_service_oracle_response(response.len())?;
507
508        self.scheduled_operations.extend(operations);
509        Ok(response)
510    }
511}
512
513impl SyncRuntimeInternal<UserServiceInstance> {
514    /// Initializes a service instance with this runtime.
515    fn load_service_instance(
516        &mut self,
517        this: ServiceSyncRuntimeHandle,
518        id: ApplicationId,
519    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
520        match self.loaded_applications.entry(id) {
521            // TODO(#2927): support dynamic loading of modules on the Web
522            #[cfg(web)]
523            hash_map::Entry::Vacant(_) => {
524                drop(this);
525                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
526                    id,
527                )))
528            }
529            #[cfg(not(web))]
530            hash_map::Entry::Vacant(entry) => {
531                let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
532                let (code, description, txn_tracker_moved) = self
533                    .execution_state_sender
534                    .send_request(move |callback| ExecutionRequest::LoadService {
535                        id,
536                        callback,
537                        txn_tracker: txn_tracker_moved,
538                    })?
539                    .recv_response()?;
540                self.transaction_tracker = txn_tracker_moved;
541
542                let instance = code.instantiate(this)?;
543                Ok(entry
544                    .insert(LoadedApplication::new(instance, description))
545                    .clone())
546            }
547            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
548        }
549    }
550}
551
552impl<UserInstance> SyncRuntime<UserInstance> {
553    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
554        let handle = self.0.take().expect(
555            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
556        );
557        let runtime = Arc::into_inner(handle.0)?
558            .into_inner()
559            .expect("`SyncRuntime` should run in a single thread which should not panic");
560        Some(runtime)
561    }
562}
563
564impl<UserInstance> From<SyncRuntimeInternal<UserInstance>> for SyncRuntimeHandle<UserInstance> {
565    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
566        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
567    }
568}
569
570impl<UserInstance> SyncRuntimeHandle<UserInstance> {
571    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
572        self.0
573            .try_lock()
574            .expect("Synchronous runtimes run on a single execution thread")
575    }
576}
577
578impl<UserInstance> BaseRuntime for SyncRuntimeHandle<UserInstance>
579where
580    Self: ContractOrServiceRuntime,
581{
582    type Read = ();
583    type ReadValueBytes = u32;
584    type ContainsKey = u32;
585    type ContainsKeys = u32;
586    type ReadMultiValuesBytes = u32;
587    type FindKeysByPrefix = u32;
588    type FindKeyValuesByPrefix = u32;
589
590    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
591        Ok(self.inner().chain_id)
592    }
593
594    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
595        Ok(self.inner().height)
596    }
597
598    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
599        Ok(self.inner().current_application().id)
600    }
601
602    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
603        Ok(self
604            .inner()
605            .current_application()
606            .description
607            .creator_chain_id)
608    }
609
610    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
611        Ok(self
612            .inner()
613            .current_application()
614            .description
615            .parameters
616            .clone())
617    }
618
619    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
620        self.inner()
621            .execution_state_sender
622            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
623            .recv_response()
624    }
625
626    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
627        self.inner()
628            .execution_state_sender
629            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
630            .recv_response()
631    }
632
633    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
634        self.inner()
635            .execution_state_sender
636            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
637            .recv_response()
638    }
639
640    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
641        self.inner()
642            .execution_state_sender
643            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
644            .recv_response()
645    }
646
647    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
648        self.inner()
649            .execution_state_sender
650            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
651            .recv_response()
652    }
653
654    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
655        self.inner()
656            .execution_state_sender
657            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
658            .recv_response()
659    }
660
661    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
662        let mut this = self.inner();
663        let id = this.current_application().id;
664        this.resource_controller.track_read_operations(1)?;
665        let receiver = this
666            .execution_state_sender
667            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
668        let state = this.view_user_states.entry(id).or_default();
669        state.contains_key_queries.register(receiver)
670    }
671
672    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
673        let mut this = self.inner();
674        let id = this.current_application().id;
675        let state = this.view_user_states.entry(id).or_default();
676        let value = state.contains_key_queries.wait(*promise)?;
677        Ok(value)
678    }
679
680    fn contains_keys_new(
681        &mut self,
682        keys: Vec<Vec<u8>>,
683    ) -> Result<Self::ContainsKeys, ExecutionError> {
684        let mut this = self.inner();
685        let id = this.current_application().id;
686        this.resource_controller.track_read_operations(1)?;
687        let receiver = this
688            .execution_state_sender
689            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
690        let state = this.view_user_states.entry(id).or_default();
691        state.contains_keys_queries.register(receiver)
692    }
693
694    fn contains_keys_wait(
695        &mut self,
696        promise: &Self::ContainsKeys,
697    ) -> Result<Vec<bool>, ExecutionError> {
698        let mut this = self.inner();
699        let id = this.current_application().id;
700        let state = this.view_user_states.entry(id).or_default();
701        let value = state.contains_keys_queries.wait(*promise)?;
702        Ok(value)
703    }
704
705    fn read_multi_values_bytes_new(
706        &mut self,
707        keys: Vec<Vec<u8>>,
708    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
709        let mut this = self.inner();
710        let id = this.current_application().id;
711        this.resource_controller.track_read_operations(1)?;
712        let receiver = this.execution_state_sender.send_request(move |callback| {
713            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
714        })?;
715        let state = this.view_user_states.entry(id).or_default();
716        state.read_multi_values_queries.register(receiver)
717    }
718
719    fn read_multi_values_bytes_wait(
720        &mut self,
721        promise: &Self::ReadMultiValuesBytes,
722    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
723        let mut this = self.inner();
724        let id = this.current_application().id;
725        let state = this.view_user_states.entry(id).or_default();
726        let values = state.read_multi_values_queries.wait(*promise)?;
727        for value in &values {
728            if let Some(value) = &value {
729                this.resource_controller
730                    .track_bytes_read(value.len() as u64)?;
731            }
732        }
733        Ok(values)
734    }
735
736    fn read_value_bytes_new(
737        &mut self,
738        key: Vec<u8>,
739    ) -> Result<Self::ReadValueBytes, ExecutionError> {
740        let mut this = self.inner();
741        let id = this.current_application().id;
742        this.resource_controller.track_read_operations(1)?;
743        let receiver = this
744            .execution_state_sender
745            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
746        let state = this.view_user_states.entry(id).or_default();
747        state.read_value_queries.register(receiver)
748    }
749
750    fn read_value_bytes_wait(
751        &mut self,
752        promise: &Self::ReadValueBytes,
753    ) -> Result<Option<Vec<u8>>, ExecutionError> {
754        let mut this = self.inner();
755        let id = this.current_application().id;
756        let value = {
757            let state = this.view_user_states.entry(id).or_default();
758            state.read_value_queries.wait(*promise)?
759        };
760        if let Some(value) = &value {
761            this.resource_controller
762                .track_bytes_read(value.len() as u64)?;
763        }
764        Ok(value)
765    }
766
767    fn find_keys_by_prefix_new(
768        &mut self,
769        key_prefix: Vec<u8>,
770    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
771        let mut this = self.inner();
772        let id = this.current_application().id;
773        this.resource_controller.track_read_operations(1)?;
774        let receiver = this.execution_state_sender.send_request(move |callback| {
775            ExecutionRequest::FindKeysByPrefix {
776                id,
777                key_prefix,
778                callback,
779            }
780        })?;
781        let state = this.view_user_states.entry(id).or_default();
782        state.find_keys_queries.register(receiver)
783    }
784
785    fn find_keys_by_prefix_wait(
786        &mut self,
787        promise: &Self::FindKeysByPrefix,
788    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
789        let mut this = self.inner();
790        let id = this.current_application().id;
791        let keys = {
792            let state = this.view_user_states.entry(id).or_default();
793            state.find_keys_queries.wait(*promise)?
794        };
795        let mut read_size = 0;
796        for key in &keys {
797            read_size += key.len();
798        }
799        this.resource_controller
800            .track_bytes_read(read_size as u64)?;
801        Ok(keys)
802    }
803
804    fn find_key_values_by_prefix_new(
805        &mut self,
806        key_prefix: Vec<u8>,
807    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
808        let mut this = self.inner();
809        let id = this.current_application().id;
810        this.resource_controller.track_read_operations(1)?;
811        let receiver = this.execution_state_sender.send_request(move |callback| {
812            ExecutionRequest::FindKeyValuesByPrefix {
813                id,
814                key_prefix,
815                callback,
816            }
817        })?;
818        let state = this.view_user_states.entry(id).or_default();
819        state.find_key_values_queries.register(receiver)
820    }
821
822    fn find_key_values_by_prefix_wait(
823        &mut self,
824        promise: &Self::FindKeyValuesByPrefix,
825    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
826        let mut this = self.inner();
827        let id = this.current_application().id;
828        let state = this.view_user_states.entry(id).or_default();
829        let key_values = state.find_key_values_queries.wait(*promise)?;
830        let mut read_size = 0;
831        for (key, value) in &key_values {
832            read_size += key.len() + value.len();
833        }
834        this.resource_controller
835            .track_bytes_read(read_size as u64)?;
836        Ok(key_values)
837    }
838
839    fn perform_http_request(
840        &mut self,
841        request: http::Request,
842    ) -> Result<http::Response, ExecutionError> {
843        let mut this = self.inner();
844        let app_permissions = this
845            .execution_state_sender
846            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
847            .recv_response()?;
848
849        let app_id = this.current_application().id;
850        ensure!(
851            app_permissions.can_make_http_requests(&app_id),
852            ExecutionError::UnauthorizedApplication(app_id)
853        );
854
855        this.resource_controller.track_http_request()?;
856
857        let response =
858            if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
859                match response {
860                    OracleResponse::Http(response) => response,
861                    _ => return Err(ExecutionError::OracleResponseMismatch),
862                }
863            } else {
864                this.execution_state_sender
865                    .send_request(|callback| ExecutionRequest::PerformHttpRequest {
866                        request,
867                        http_responses_are_oracle_responses:
868                            Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
869                        callback,
870                    })?
871                    .recv_response()?
872            };
873        this.transaction_tracker
874            .add_oracle_response(OracleResponse::Http(response.clone()));
875        Ok(response)
876    }
877
878    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
879        let mut this = self.inner();
880        if !this
881            .transaction_tracker
882            .replay_oracle_response(OracleResponse::Assert)?
883        {
884            // There are no recorded oracle responses, so we check the local time.
885            let local_time = this.transaction_tracker.local_time();
886            ensure!(
887                local_time < timestamp,
888                ExecutionError::AssertBefore {
889                    timestamp,
890                    local_time,
891                }
892            );
893        }
894        Ok(())
895    }
896
897    fn read_data_blob(&mut self, hash: &CryptoHash) -> Result<Vec<u8>, ExecutionError> {
898        let mut this = self.inner();
899        let blob_id = BlobId::new(*hash, BlobType::Data);
900        let (blob_content, is_new) = this
901            .execution_state_sender
902            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
903            .recv_response()?;
904        if is_new {
905            this.transaction_tracker
906                .replay_oracle_response(OracleResponse::Blob(blob_id))?;
907        }
908        Ok(blob_content.into_bytes().into_vec())
909    }
910
911    fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError> {
912        let mut this = self.inner();
913        let blob_id = BlobId::new(*hash, BlobType::Data);
914        let is_new = this
915            .execution_state_sender
916            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
917            .recv_response()?;
918        if is_new {
919            this.transaction_tracker
920                .replay_oracle_response(OracleResponse::Blob(blob_id))?;
921        }
922        Ok(())
923    }
924}
925
926/// An extension trait to determine in compile time the different behaviors between contract and
927/// services in the implementation of [`BaseRuntime`].
928trait ContractOrServiceRuntime {
929    /// Configured to `true` if the HTTP response size should be limited to the oracle response
930    /// size.
931    ///
932    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
933    /// and only storing in the block a shorter oracle response.
934    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
935}
936
937impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
938    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
939}
940
941impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
942    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
943}
944
945impl<UserInstance> Clone for SyncRuntimeHandle<UserInstance> {
946    fn clone(&self) -> Self {
947        SyncRuntimeHandle(self.0.clone())
948    }
949}
950
951impl ContractSyncRuntime {
952    pub(crate) fn new(
953        execution_state_sender: ExecutionStateSender,
954        chain_id: ChainId,
955        refund_grant_to: Option<Account>,
956        resource_controller: ResourceController,
957        action: &UserAction,
958        txn_tracker: TransactionTracker,
959    ) -> Self {
960        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
961            SyncRuntimeInternal::new(
962                chain_id,
963                action.height(),
964                action.round(),
965                action.signer(),
966                if let UserAction::Message(context, _) = action {
967                    Some(context.into())
968                } else {
969                    None
970                },
971                execution_state_sender,
972                None,
973                refund_grant_to,
974                resource_controller,
975                txn_tracker,
976            ),
977        )))
978    }
979
980    pub(crate) fn preload_contract(
981        &self,
982        id: ApplicationId,
983        code: UserContractCode,
984        description: ApplicationDescription,
985    ) -> Result<(), ExecutionError> {
986        let this = self
987            .0
988            .as_ref()
989            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
990        let runtime_handle = this.clone();
991        let mut this_guard = this.inner();
992
993        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
994            entry.insert(LoadedApplication::new(
995                code.instantiate(runtime_handle)?,
996                description,
997            ));
998            this_guard.applications_to_finalize.push(id);
999        }
1000
1001        Ok(())
1002    }
1003
1004    /// Main entry point to start executing a user action.
1005    pub(crate) fn run_action(
1006        mut self,
1007        application_id: ApplicationId,
1008        chain_id: ChainId,
1009        action: UserAction,
1010    ) -> Result<(Option<Vec<u8>>, ResourceController, TransactionTracker), ExecutionError> {
1011        let result = self
1012            .deref_mut()
1013            .run_action(application_id, chain_id, action)?;
1014        let runtime = self
1015            .into_inner()
1016            .expect("Runtime clones should have been freed by now");
1017        Ok((
1018            result,
1019            runtime.resource_controller,
1020            runtime.transaction_tracker,
1021        ))
1022    }
1023}
1024
1025impl ContractSyncRuntimeHandle {
1026    fn run_action(
1027        &mut self,
1028        application_id: ApplicationId,
1029        chain_id: ChainId,
1030        action: UserAction,
1031    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1032        let finalize_context = FinalizeContext {
1033            authenticated_signer: action.signer(),
1034            chain_id,
1035            height: action.height(),
1036            round: action.round(),
1037        };
1038
1039        {
1040            let runtime = self.inner();
1041            assert_eq!(runtime.authenticated_signer, action.signer());
1042            assert_eq!(runtime.chain_id, chain_id);
1043            assert_eq!(runtime.height, action.height());
1044        }
1045
1046        let signer = action.signer();
1047        let closure = move |code: &mut UserContractInstance| match action {
1048            UserAction::Instantiate(context, argument) => {
1049                code.instantiate(context, argument).map(|()| None)
1050            }
1051            UserAction::Operation(context, operation) => {
1052                code.execute_operation(context, operation).map(Option::Some)
1053            }
1054            UserAction::Message(context, message) => {
1055                code.execute_message(context, message).map(|()| None)
1056            }
1057        };
1058
1059        let result = self.execute(application_id, signer, closure)?;
1060        self.finalize(finalize_context)?;
1061        Ok(result)
1062    }
1063
1064    /// Notifies all loaded applications that execution is finalizing.
1065    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1066        let applications = mem::take(&mut self.inner().applications_to_finalize)
1067            .into_iter()
1068            .rev();
1069
1070        self.inner().is_finalizing = true;
1071
1072        for application in applications {
1073            self.execute(application, context.authenticated_signer, |contract| {
1074                contract.finalize(context).map(|_| None)
1075            })?;
1076            self.inner().loaded_applications.remove(&application);
1077        }
1078
1079        Ok(())
1080    }
1081
1082    /// Executes a `closure` with the contract code for the `application_id`.
1083    fn execute(
1084        &mut self,
1085        application_id: ApplicationId,
1086        signer: Option<AccountOwner>,
1087        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1088    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1089        let contract = {
1090            let mut runtime = self.inner();
1091            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1092
1093            let status = ApplicationStatus {
1094                caller_id: None,
1095                id: application_id,
1096                description: application.description.clone(),
1097                signer,
1098            };
1099
1100            runtime.push_application(status);
1101
1102            application
1103        };
1104
1105        let result = closure(
1106            &mut contract
1107                .instance
1108                .try_lock()
1109                .expect("Application should not be already executing"),
1110        )?;
1111
1112        let mut runtime = self.inner();
1113        let application_status = runtime.pop_application();
1114        assert_eq!(application_status.caller_id, None);
1115        assert_eq!(application_status.id, application_id);
1116        assert_eq!(application_status.description, contract.description);
1117        assert_eq!(application_status.signer, signer);
1118        assert!(runtime.call_stack.is_empty());
1119
1120        Ok(result)
1121    }
1122}
1123
1124impl ContractRuntime for ContractSyncRuntimeHandle {
1125    fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1126        Ok(self.inner().authenticated_signer)
1127    }
1128
1129    fn message_id(&mut self) -> Result<Option<MessageId>, ExecutionError> {
1130        Ok(self.inner().executing_message.map(|metadata| metadata.id))
1131    }
1132
1133    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1134        Ok(self
1135            .inner()
1136            .executing_message
1137            .map(|metadata| metadata.is_bouncing))
1138    }
1139
1140    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1141        let this = self.inner();
1142        if this.call_stack.len() <= 1 {
1143            return Ok(None);
1144        }
1145        Ok(this.current_application().caller_id)
1146    }
1147
1148    fn remaining_fuel(&mut self) -> Result<u64, ExecutionError> {
1149        Ok(self.inner().resource_controller.remaining_fuel())
1150    }
1151
1152    fn consume_fuel(&mut self, fuel: u64) -> Result<(), ExecutionError> {
1153        let mut this = self.inner();
1154        this.resource_controller.track_fuel(fuel)
1155    }
1156
1157    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1158        let mut this = self.inner();
1159        let application = this.current_application();
1160        let application_id = application.id;
1161        let authenticated_signer = application.signer;
1162        let mut refund_grant_to = this.refund_grant_to;
1163
1164        let grant = this
1165            .resource_controller
1166            .policy
1167            .total_price(&message.grant)?;
1168        if grant.is_zero() {
1169            refund_grant_to = None;
1170        } else {
1171            this.resource_controller.track_grant(grant)?;
1172        }
1173        let kind = if message.is_tracked {
1174            MessageKind::Tracked
1175        } else {
1176            MessageKind::Simple
1177        };
1178
1179        this.transaction_tracker
1180            .add_outgoing_message(OutgoingMessage {
1181                destination: message.destination,
1182                authenticated_signer,
1183                refund_grant_to,
1184                grant,
1185                kind,
1186                message: Message::User {
1187                    application_id,
1188                    bytes: message.message,
1189                },
1190            })?;
1191
1192        Ok(())
1193    }
1194
1195    fn subscribe(&mut self, chain: ChainId, name: ChannelName) -> Result<(), ExecutionError> {
1196        let mut this = self.inner();
1197        let application_id = this.current_application().id;
1198        let full_name = ChannelFullName::new(name, application_id);
1199        this.transaction_tracker.subscribe(full_name, chain);
1200
1201        Ok(())
1202    }
1203
1204    fn unsubscribe(&mut self, chain: ChainId, name: ChannelName) -> Result<(), ExecutionError> {
1205        let mut this = self.inner();
1206        let application_id = this.current_application().id;
1207        let full_name = ChannelFullName::new(name, application_id);
1208        this.transaction_tracker.unsubscribe(full_name, chain);
1209
1210        Ok(())
1211    }
1212
1213    fn transfer(
1214        &mut self,
1215        source: AccountOwner,
1216        destination: Account,
1217        amount: Amount,
1218    ) -> Result<(), ExecutionError> {
1219        let mut this = self.inner();
1220        let current_application = this.current_application();
1221        let application_id = current_application.id;
1222        let signer = current_application.signer;
1223
1224        let maybe_message = this
1225            .execution_state_sender
1226            .send_request(|callback| ExecutionRequest::Transfer {
1227                source,
1228                destination,
1229                amount,
1230                signer,
1231                application_id,
1232                callback,
1233            })?
1234            .recv_response()?;
1235
1236        this.transaction_tracker
1237            .add_outgoing_messages(maybe_message)?;
1238        Ok(())
1239    }
1240
1241    fn claim(
1242        &mut self,
1243        source: Account,
1244        destination: Account,
1245        amount: Amount,
1246    ) -> Result<(), ExecutionError> {
1247        let mut this = self.inner();
1248        let current_application = this.current_application();
1249        let application_id = current_application.id;
1250        let signer = current_application.signer;
1251
1252        let message = this
1253            .execution_state_sender
1254            .send_request(|callback| ExecutionRequest::Claim {
1255                source,
1256                destination,
1257                amount,
1258                signer,
1259                application_id,
1260                callback,
1261            })?
1262            .recv_response()?;
1263        this.transaction_tracker.add_outgoing_message(message)?;
1264        Ok(())
1265    }
1266
1267    fn try_call_application(
1268        &mut self,
1269        authenticated: bool,
1270        callee_id: ApplicationId,
1271        argument: Vec<u8>,
1272    ) -> Result<Vec<u8>, ExecutionError> {
1273        let (contract, context) =
1274            self.inner()
1275                .prepare_for_call(self.clone(), authenticated, callee_id)?;
1276
1277        let value = contract
1278            .try_lock()
1279            .expect("Applications should not have reentrant calls")
1280            .execute_operation(context, argument)?;
1281
1282        self.inner().finish_call()?;
1283
1284        Ok(value)
1285    }
1286
1287    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1288        let mut this = self.inner();
1289        ensure!(
1290            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1291            ExecutionError::StreamNameTooLong
1292        );
1293        let application_id = GenericApplicationId::User(this.current_application().id);
1294        let stream_id = StreamId {
1295            stream_name,
1296            application_id,
1297        };
1298        let index = this
1299            .execution_state_sender
1300            .send_request(|callback| ExecutionRequest::NextEventIndex {
1301                stream_id: stream_id.clone(),
1302                callback,
1303            })?
1304            .recv_response()?;
1305        // TODO(#365): Consider separate event fee categories.
1306        this.resource_controller
1307            .track_bytes_written(value.len() as u64)?;
1308        this.transaction_tracker.add_event(stream_id, index, value);
1309        Ok(index)
1310    }
1311
1312    fn read_event(
1313        &mut self,
1314        chain_id: ChainId,
1315        stream_name: StreamName,
1316        index: u32,
1317    ) -> Result<Vec<u8>, ExecutionError> {
1318        let mut this = self.inner();
1319        ensure!(
1320            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1321            ExecutionError::StreamNameTooLong
1322        );
1323        let application_id = GenericApplicationId::User(this.current_application().id);
1324        let stream_id = StreamId {
1325            stream_name,
1326            application_id,
1327        };
1328        let event_id = EventId {
1329            stream_id,
1330            index,
1331            chain_id,
1332        };
1333        let event = this
1334            .execution_state_sender
1335            .send_request(|callback| ExecutionRequest::ReadEvent {
1336                event_id: event_id.clone(),
1337                callback,
1338            })?
1339            .recv_response()?;
1340        // TODO(#365): Consider separate event fee categories.
1341        this.resource_controller
1342            .track_bytes_read(event.len() as u64)?;
1343        this.transaction_tracker
1344            .replay_oracle_response(OracleResponse::Event(event_id, event.clone()))?;
1345        Ok(event)
1346    }
1347
1348    fn subscribe_to_events(
1349        &mut self,
1350        chain_id: ChainId,
1351        application_id: ApplicationId,
1352        stream_name: StreamName,
1353    ) -> Result<(), ExecutionError> {
1354        let this = self.inner();
1355        ensure!(
1356            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1357            ExecutionError::StreamNameTooLong
1358        );
1359        let stream_id = StreamId {
1360            stream_name,
1361            application_id: application_id.into(),
1362        };
1363        let subscriber_app_id = this.current_application().id;
1364        this.execution_state_sender
1365            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1366                chain_id,
1367                stream_id,
1368                subscriber_app_id,
1369                callback,
1370            })?
1371            .recv_response()?;
1372        Ok(())
1373    }
1374
1375    fn unsubscribe_from_events(
1376        &mut self,
1377        chain_id: ChainId,
1378        application_id: ApplicationId,
1379        stream_name: StreamName,
1380    ) -> Result<(), ExecutionError> {
1381        let this = self.inner();
1382        ensure!(
1383            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1384            ExecutionError::StreamNameTooLong
1385        );
1386        let stream_id = StreamId {
1387            stream_name,
1388            application_id: application_id.into(),
1389        };
1390        let subscriber_app_id = this.current_application().id;
1391        this.execution_state_sender
1392            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1393                chain_id,
1394                stream_id,
1395                subscriber_app_id,
1396                callback,
1397            })?
1398            .recv_response()?;
1399        Ok(())
1400    }
1401
1402    fn query_service(
1403        &mut self,
1404        application_id: ApplicationId,
1405        query: Vec<u8>,
1406    ) -> Result<Vec<u8>, ExecutionError> {
1407        let mut this = self.inner();
1408
1409        let app_permissions = this
1410            .execution_state_sender
1411            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1412            .recv_response()?;
1413
1414        let app_id = this.current_application().id;
1415        ensure!(
1416            app_permissions.can_call_services(&app_id),
1417            ExecutionError::UnauthorizedApplication(app_id)
1418        );
1419
1420        this.resource_controller.track_service_oracle_call()?;
1421        let response =
1422            if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1423                match response {
1424                    OracleResponse::Service(bytes) => bytes,
1425                    _ => return Err(ExecutionError::OracleResponseMismatch),
1426                }
1427            } else {
1428                this.run_service_oracle_query(application_id, query)?
1429            };
1430
1431        this.transaction_tracker
1432            .add_oracle_response(OracleResponse::Service(response.clone()));
1433
1434        Ok(response)
1435    }
1436
1437    fn open_chain(
1438        &mut self,
1439        ownership: ChainOwnership,
1440        application_permissions: ApplicationPermissions,
1441        balance: Amount,
1442    ) -> Result<(MessageId, ChainId), ExecutionError> {
1443        let mut this = self.inner();
1444        let message_id = MessageId {
1445            chain_id: this.chain_id,
1446            height: this.height,
1447            index: this.transaction_tracker.next_message_index(),
1448        };
1449        let chain_id = ChainId::child(message_id);
1450        let open_chain_message = this
1451            .execution_state_sender
1452            .send_request(|callback| ExecutionRequest::OpenChain {
1453                ownership,
1454                balance,
1455                next_message_id: message_id,
1456                application_permissions,
1457                callback,
1458            })?
1459            .recv_response()?;
1460        this.transaction_tracker
1461            .add_outgoing_message(open_chain_message)?;
1462        Ok((message_id, chain_id))
1463    }
1464
1465    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1466        let this = self.inner();
1467        let application_id = this.current_application().id;
1468        this.execution_state_sender
1469            .send_request(|callback| ExecutionRequest::CloseChain {
1470                application_id,
1471                callback,
1472            })?
1473            .recv_response()?
1474    }
1475
1476    fn change_application_permissions(
1477        &mut self,
1478        application_permissions: ApplicationPermissions,
1479    ) -> Result<(), ExecutionError> {
1480        let this = self.inner();
1481        let application_id = this.current_application().id;
1482        this.execution_state_sender
1483            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1484                application_id,
1485                application_permissions,
1486                callback,
1487            })?
1488            .recv_response()?
1489    }
1490
1491    fn create_application(
1492        &mut self,
1493        module_id: ModuleId,
1494        parameters: Vec<u8>,
1495        argument: Vec<u8>,
1496        required_application_ids: Vec<ApplicationId>,
1497    ) -> Result<ApplicationId, ExecutionError> {
1498        let chain_id = self.inner().chain_id;
1499        let block_height = self.block_height()?;
1500
1501        let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1502
1503        let CreateApplicationResult {
1504            app_id,
1505            txn_tracker: txn_tracker_moved,
1506        } = self
1507            .inner()
1508            .execution_state_sender
1509            .send_request(move |callback| ExecutionRequest::CreateApplication {
1510                chain_id,
1511                block_height,
1512                module_id,
1513                parameters,
1514                required_application_ids,
1515                callback,
1516                txn_tracker: txn_tracker_moved,
1517            })?
1518            .recv_response()??;
1519
1520        self.inner().transaction_tracker = txn_tracker_moved;
1521
1522        let (contract, context) = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1523
1524        contract
1525            .try_lock()
1526            .expect("Applications should not have reentrant calls")
1527            .instantiate(context, argument)?;
1528
1529        self.inner().finish_call()?;
1530
1531        Ok(app_id)
1532    }
1533
1534    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1535        let mut this = self.inner();
1536        let round =
1537            if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1538                match response {
1539                    OracleResponse::Round(round) => round,
1540                    _ => return Err(ExecutionError::OracleResponseMismatch),
1541                }
1542            } else {
1543                this.round
1544            };
1545        this.transaction_tracker
1546            .add_oracle_response(OracleResponse::Round(round));
1547        Ok(round)
1548    }
1549
1550    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1551        let mut this = self.inner();
1552        let id = this.current_application().id;
1553        let state = this.view_user_states.entry(id).or_default();
1554        state.force_all_pending_queries()?;
1555        this.resource_controller.track_write_operations(
1556            batch
1557                .num_operations()
1558                .try_into()
1559                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1560        )?;
1561        this.resource_controller
1562            .track_bytes_written(batch.size() as u64)?;
1563        this.execution_state_sender
1564            .send_request(|callback| ExecutionRequest::WriteBatch {
1565                id,
1566                batch,
1567                callback,
1568            })?
1569            .recv_response()?;
1570        Ok(())
1571    }
1572}
1573
1574impl ServiceSyncRuntime {
1575    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1576    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1577        let mut txn_tracker = TransactionTracker::default();
1578        txn_tracker.set_local_time(context.local_time);
1579        Self::new_with_txn_tracker(execution_state_sender, context, None, txn_tracker)
1580    }
1581
1582    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1583    pub fn new_with_txn_tracker(
1584        execution_state_sender: ExecutionStateSender,
1585        context: QueryContext,
1586        deadline: Option<Instant>,
1587        txn_tracker: TransactionTracker,
1588    ) -> Self {
1589        let runtime = SyncRuntime(Some(
1590            SyncRuntimeInternal::new(
1591                context.chain_id,
1592                context.next_block_height,
1593                None,
1594                None,
1595                None,
1596                execution_state_sender,
1597                deadline,
1598                None,
1599                ResourceController::default(),
1600                txn_tracker,
1601            )
1602            .into(),
1603        ));
1604
1605        ServiceSyncRuntime {
1606            runtime,
1607            current_context: context,
1608        }
1609    }
1610
1611    /// Loads a service into the runtime's memory.
1612    pub(crate) fn preload_service(
1613        &self,
1614        id: ApplicationId,
1615        code: UserServiceCode,
1616        description: ApplicationDescription,
1617    ) -> Result<(), ExecutionError> {
1618        let this = self
1619            .runtime
1620            .0
1621            .as_ref()
1622            .expect("services shouldn't be preloaded while the runtime is being dropped");
1623        let runtime_handle = this.clone();
1624        let mut this_guard = this.inner();
1625
1626        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1627            entry.insert(LoadedApplication::new(
1628                code.instantiate(runtime_handle)?,
1629                description,
1630            ));
1631            this_guard.applications_to_finalize.push(id);
1632        }
1633
1634        Ok(())
1635    }
1636
1637    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1638    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1639        while let Ok(request) = incoming_requests.recv() {
1640            let ServiceRuntimeRequest::Query {
1641                application_id,
1642                context,
1643                query,
1644                callback,
1645            } = request;
1646
1647            self.prepare_for_query(context);
1648
1649            let _ = callback.send(self.run_query(application_id, query));
1650        }
1651    }
1652
1653    /// Prepares the runtime to query an application.
1654    pub(crate) fn prepare_for_query(&mut self, new_context: QueryContext) {
1655        let expected_context = QueryContext {
1656            local_time: new_context.local_time,
1657            ..self.current_context
1658        };
1659
1660        if new_context != expected_context {
1661            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1662            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1663        } else {
1664            self.handle_mut()
1665                .inner()
1666                .transaction_tracker
1667                .set_local_time(new_context.local_time);
1668        }
1669    }
1670
1671    /// Queries an application specified by its [`ApplicationId`].
1672    pub(crate) fn run_query(
1673        &mut self,
1674        application_id: ApplicationId,
1675        query: Vec<u8>,
1676    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1677        let this = self.handle_mut();
1678        let response = this.try_query_application(application_id, query)?;
1679        let operations = mem::take(&mut this.inner().scheduled_operations);
1680
1681        Ok(QueryOutcome {
1682            response,
1683            operations,
1684        })
1685    }
1686
1687    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1688    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1689        self.runtime.0.as_mut().expect(
1690            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1691        )
1692    }
1693}
1694
1695impl ServiceRuntime for ServiceSyncRuntimeHandle {
1696    /// Note that queries are not available from writable contexts.
1697    fn try_query_application(
1698        &mut self,
1699        queried_id: ApplicationId,
1700        argument: Vec<u8>,
1701    ) -> Result<Vec<u8>, ExecutionError> {
1702        let (query_context, service) = {
1703            let mut this = self.inner();
1704
1705            // Load the application.
1706            let application = this.load_service_instance(self.clone(), queried_id)?;
1707            // Make the call to user code.
1708            let query_context = QueryContext {
1709                chain_id: this.chain_id,
1710                next_block_height: this.height,
1711                local_time: this.transaction_tracker.local_time(),
1712            };
1713            this.push_application(ApplicationStatus {
1714                caller_id: None,
1715                id: queried_id,
1716                description: application.description,
1717                signer: None,
1718            });
1719            (query_context, application.instance)
1720        };
1721        let response = service
1722            .try_lock()
1723            .expect("Applications should not have reentrant calls")
1724            .handle_query(query_context, argument)?;
1725        self.inner().pop_application();
1726        Ok(response)
1727    }
1728
1729    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1730        let mut this = self.inner();
1731        let application_id = this.current_application().id;
1732
1733        this.scheduled_operations.push(Operation::User {
1734            application_id,
1735            bytes: operation,
1736        });
1737
1738        Ok(())
1739    }
1740
1741    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1742        if let Some(deadline) = self.inner().deadline {
1743            if Instant::now() >= deadline {
1744                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1745            }
1746        }
1747        Ok(())
1748    }
1749}
1750
1751/// A request to the service runtime actor.
1752pub enum ServiceRuntimeRequest {
1753    Query {
1754        application_id: ApplicationId,
1755        context: QueryContext,
1756        query: Vec<u8>,
1757        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1758    },
1759}
1760
1761/// The origin of the execution.
1762#[derive(Clone, Copy, Debug)]
1763struct ExecutingMessage {
1764    id: MessageId,
1765    is_bouncing: bool,
1766}
1767
1768impl From<&MessageContext> for ExecutingMessage {
1769    fn from(context: &MessageContext) -> Self {
1770        ExecutingMessage {
1771            id: context.message_id,
1772            is_bouncing: context.is_bouncing,
1773        }
1774    }
1775}