linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        OracleResponse, SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29    execution::UserAction,
30    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31    resources::ResourceController,
32    system::CreateApplicationResult,
33    util::{ReceiverExt, UnboundedSenderExt},
34    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, TransactionTracker,
37    UserContractCode, UserContractInstance, UserServiceCode, UserServiceInstance,
38    MAX_STREAM_NAME_LEN,
39};
40
41#[cfg(test)]
42#[path = "unit_tests/runtime_tests.rs"]
43mod tests;
44
45pub trait WithContext {
46    type UserContext;
47}
48
49impl WithContext for UserContractInstance {
50    type UserContext = Timestamp;
51}
52
53impl WithContext for UserServiceInstance {
54    type UserContext = ();
55}
56
57#[cfg(test)]
58impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
59    type UserContext = ();
60}
61
62#[derive(Debug)]
63pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
64
65pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
66
67pub struct ServiceSyncRuntime {
68    runtime: SyncRuntime<UserServiceInstance>,
69    current_context: QueryContext,
70}
71
72#[derive(Debug)]
73pub struct SyncRuntimeHandle<UserInstance: WithContext>(
74    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
75);
76
77pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
78pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
79
80/// Runtime data tracked during the execution of a transaction on the synchronous thread.
81#[derive(Debug)]
82pub struct SyncRuntimeInternal<UserInstance: WithContext> {
83    /// The current chain ID.
84    chain_id: ChainId,
85    /// The height of the next block that will be added to this chain. During operations
86    /// and messages, this is the current block height.
87    height: BlockHeight,
88    /// The current consensus round. Only available during block validation in multi-leader rounds.
89    round: Option<u32>,
90    /// The current message being executed, if there is one.
91    #[debug(skip_if = Option::is_none)]
92    executing_message: Option<ExecutingMessage>,
93
94    /// How to interact with the storage view of the execution state.
95    execution_state_sender: ExecutionStateSender,
96
97    /// If applications are being finalized.
98    ///
99    /// If [`true`], disables cross-application calls.
100    is_finalizing: bool,
101    /// Applications that need to be finalized.
102    applications_to_finalize: Vec<ApplicationId>,
103
104    /// Application instances loaded in this transaction.
105    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
106    /// The current stack of application descriptions.
107    call_stack: Vec<ApplicationStatus>,
108    /// The set of the IDs of the applications that are in the `call_stack`.
109    active_applications: HashSet<ApplicationId>,
110    /// The tracking information for this transaction.
111    transaction_tracker: TransactionTracker,
112    /// The operations scheduled during this query.
113    scheduled_operations: Vec<Operation>,
114
115    /// Track application states based on views.
116    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
117
118    /// The deadline this runtime should finish executing.
119    ///
120    /// Used to limit the execution time of services running as oracles.
121    deadline: Option<Instant>,
122
123    /// Where to send a refund for the unused part of the grant after execution, if any.
124    #[debug(skip_if = Option::is_none)]
125    refund_grant_to: Option<Account>,
126    /// Controller to track fuel and storage consumption.
127    resource_controller: ResourceController,
128    /// Additional context for the runtime.
129    user_context: UserInstance::UserContext,
130}
131
132/// The runtime status of an application.
133#[derive(Debug)]
134struct ApplicationStatus {
135    /// The caller application ID, if forwarded during the call.
136    caller_id: Option<ApplicationId>,
137    /// The application ID.
138    id: ApplicationId,
139    /// The application description.
140    description: ApplicationDescription,
141    /// The authenticated signer for the execution thread, if any.
142    signer: Option<AccountOwner>,
143}
144
145/// A loaded application instance.
146#[derive(Debug)]
147struct LoadedApplication<Instance> {
148    instance: Arc<Mutex<Instance>>,
149    description: ApplicationDescription,
150}
151
152impl<Instance> LoadedApplication<Instance> {
153    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
154    fn new(instance: Instance, description: ApplicationDescription) -> Self {
155        LoadedApplication {
156            instance: Arc::new(Mutex::new(instance)),
157            description,
158        }
159    }
160}
161
162impl<Instance> Clone for LoadedApplication<Instance> {
163    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
164    // bound
165    fn clone(&self) -> Self {
166        LoadedApplication {
167            instance: self.instance.clone(),
168            description: self.description.clone(),
169        }
170    }
171}
172
173#[derive(Debug)]
174enum Promise<T> {
175    Ready(T),
176    Pending(Receiver<T>),
177}
178
179impl<T> Promise<T> {
180    fn force(&mut self) -> Result<(), ExecutionError> {
181        if let Promise::Pending(receiver) = self {
182            let value = receiver
183                .recv_ref()
184                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
185            *self = Promise::Ready(value);
186        }
187        Ok(())
188    }
189
190    fn read(self) -> Result<T, ExecutionError> {
191        match self {
192            Promise::Pending(receiver) => {
193                let value = receiver.recv_response()?;
194                Ok(value)
195            }
196            Promise::Ready(value) => Ok(value),
197        }
198    }
199}
200
201/// Manages a set of pending queries returning values of type `T`.
202#[derive(Debug, Default)]
203struct QueryManager<T> {
204    /// The queries in progress.
205    pending_queries: BTreeMap<u32, Promise<T>>,
206    /// The number of queries ever registered so far. Used for the index of the next query.
207    query_count: u32,
208    /// The number of active queries.
209    active_query_count: u32,
210}
211
212impl<T> QueryManager<T> {
213    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
214        let id = self.query_count;
215        self.pending_queries.insert(id, Promise::Pending(receiver));
216        self.query_count = self
217            .query_count
218            .checked_add(1)
219            .ok_or(ArithmeticError::Overflow)?;
220        self.active_query_count = self
221            .active_query_count
222            .checked_add(1)
223            .ok_or(ArithmeticError::Overflow)?;
224        Ok(id)
225    }
226
227    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
228        let promise = self
229            .pending_queries
230            .remove(&id)
231            .ok_or(ExecutionError::InvalidPromise)?;
232        let value = promise.read()?;
233        self.active_query_count -= 1;
234        Ok(value)
235    }
236
237    fn force_all(&mut self) -> Result<(), ExecutionError> {
238        for promise in self.pending_queries.values_mut() {
239            promise.force()?;
240        }
241        Ok(())
242    }
243}
244
245type Keys = Vec<Vec<u8>>;
246type Value = Vec<u8>;
247type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
248
249#[derive(Debug, Default)]
250struct ViewUserState {
251    /// The contains-key queries in progress.
252    contains_key_queries: QueryManager<bool>,
253    /// The contains-keys queries in progress.
254    contains_keys_queries: QueryManager<Vec<bool>>,
255    /// The read-value queries in progress.
256    read_value_queries: QueryManager<Option<Value>>,
257    /// The read-multi-values queries in progress.
258    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
259    /// The find-keys queries in progress.
260    find_keys_queries: QueryManager<Keys>,
261    /// The find-key-values queries in progress.
262    find_key_values_queries: QueryManager<KeyValues>,
263}
264
265impl ViewUserState {
266    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
267        self.contains_key_queries.force_all()?;
268        self.contains_keys_queries.force_all()?;
269        self.read_value_queries.force_all()?;
270        self.read_multi_values_queries.force_all()?;
271        self.find_keys_queries.force_all()?;
272        self.find_key_values_queries.force_all()?;
273        Ok(())
274    }
275}
276
277impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
278    type Target = SyncRuntimeHandle<UserInstance>;
279
280    fn deref(&self) -> &Self::Target {
281        self.0.as_ref().expect(
282            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
283        )
284    }
285}
286
287impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
288    fn deref_mut(&mut self) -> &mut Self::Target {
289        self.0.as_mut().expect(
290            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
291        )
292    }
293}
294
295impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
296    fn drop(&mut self) {
297        // Ensure the `loaded_applications` are cleared to prevent circular references in
298        // the runtime
299        if let Some(handle) = self.0.take() {
300            handle.inner().loaded_applications.clear();
301        }
302    }
303}
304
305impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
306    #[expect(clippy::too_many_arguments)]
307    fn new(
308        chain_id: ChainId,
309        height: BlockHeight,
310        round: Option<u32>,
311        executing_message: Option<ExecutingMessage>,
312        execution_state_sender: ExecutionStateSender,
313        deadline: Option<Instant>,
314        refund_grant_to: Option<Account>,
315        resource_controller: ResourceController,
316        transaction_tracker: TransactionTracker,
317        user_context: UserInstance::UserContext,
318    ) -> Self {
319        Self {
320            chain_id,
321            height,
322            round,
323            executing_message,
324            execution_state_sender,
325            is_finalizing: false,
326            applications_to_finalize: Vec::new(),
327            loaded_applications: HashMap::new(),
328            call_stack: Vec::new(),
329            active_applications: HashSet::new(),
330            view_user_states: BTreeMap::new(),
331            deadline,
332            refund_grant_to,
333            resource_controller,
334            transaction_tracker,
335            scheduled_operations: Vec::new(),
336            user_context,
337        }
338    }
339
340    /// Returns the [`ApplicationStatus`] of the current application.
341    ///
342    /// The current application is the last to be pushed to the `call_stack`.
343    ///
344    /// # Panics
345    ///
346    /// If the call stack is empty.
347    fn current_application(&self) -> &ApplicationStatus {
348        self.call_stack
349            .last()
350            .expect("Call stack is unexpectedly empty")
351    }
352
353    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
354    ///
355    /// Ensures the application's ID is also tracked in the `active_applications` set.
356    fn push_application(&mut self, status: ApplicationStatus) {
357        self.active_applications.insert(status.id);
358        self.call_stack.push(status);
359    }
360
361    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
362    ///
363    /// Ensures the application's ID is also removed from the `active_applications` set.
364    ///
365    /// # Panics
366    ///
367    /// If the call stack is empty.
368    fn pop_application(&mut self) -> ApplicationStatus {
369        let status = self
370            .call_stack
371            .pop()
372            .expect("Can't remove application from empty call stack");
373        assert!(self.active_applications.remove(&status.id));
374        status
375    }
376
377    /// Ensures that a call to `application_id` is not-reentrant.
378    ///
379    /// Returns an error if there already is an entry for `application_id` in the call stack.
380    fn check_for_reentrancy(
381        &mut self,
382        application_id: ApplicationId,
383    ) -> Result<(), ExecutionError> {
384        ensure!(
385            !self.active_applications.contains(&application_id),
386            ExecutionError::ReentrantCall(application_id)
387        );
388        Ok(())
389    }
390}
391
392impl SyncRuntimeInternal<UserContractInstance> {
393    /// Loads a contract instance, initializing it with this runtime if needed.
394    fn load_contract_instance(
395        &mut self,
396        this: SyncRuntimeHandle<UserContractInstance>,
397        id: ApplicationId,
398    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
399        match self.loaded_applications.entry(id) {
400            // TODO(#2927): support dynamic loading of modules on the Web
401            #[cfg(web)]
402            hash_map::Entry::Vacant(_) => {
403                drop(this);
404                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
405                    id,
406                )))
407            }
408            #[cfg(not(web))]
409            hash_map::Entry::Vacant(entry) => {
410                let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
411                let (code, description, txn_tracker_moved) = self
412                    .execution_state_sender
413                    .send_request(move |callback| ExecutionRequest::LoadContract {
414                        id,
415                        callback,
416                        txn_tracker: txn_tracker_moved,
417                    })?
418                    .recv_response()?;
419                self.transaction_tracker = txn_tracker_moved;
420
421                let instance = code.instantiate(this)?;
422
423                self.applications_to_finalize.push(id);
424                Ok(entry
425                    .insert(LoadedApplication::new(instance, description))
426                    .clone())
427            }
428            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
429        }
430    }
431
432    /// Configures the runtime for executing a call to a different contract.
433    fn prepare_for_call(
434        &mut self,
435        this: ContractSyncRuntimeHandle,
436        authenticated: bool,
437        callee_id: ApplicationId,
438    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
439        self.check_for_reentrancy(callee_id)?;
440
441        ensure!(
442            !self.is_finalizing,
443            ExecutionError::CrossApplicationCallInFinalize {
444                caller_id: Box::new(self.current_application().id),
445                callee_id: Box::new(callee_id),
446            }
447        );
448
449        // Load the application.
450        let application = self.load_contract_instance(this, callee_id)?;
451
452        let caller = self.current_application();
453        let caller_id = caller.id;
454        let caller_signer = caller.signer;
455        // Make the call to user code.
456        let authenticated_signer = match caller_signer {
457            Some(signer) if authenticated => Some(signer),
458            _ => None,
459        };
460        let authenticated_caller_id = authenticated.then_some(caller_id);
461        self.push_application(ApplicationStatus {
462            caller_id: authenticated_caller_id,
463            id: callee_id,
464            description: application.description,
465            // Allow further nested calls to be authenticated if this one is.
466            signer: authenticated_signer,
467        });
468        Ok(application.instance)
469    }
470
471    /// Cleans up the runtime after the execution of a call to a different contract.
472    fn finish_call(&mut self) -> Result<(), ExecutionError> {
473        self.pop_application();
474        Ok(())
475    }
476
477    /// Runs the service in a separate thread as an oracle.
478    fn run_service_oracle_query(
479        &mut self,
480        application_id: ApplicationId,
481        query: Vec<u8>,
482    ) -> Result<Vec<u8>, ExecutionError> {
483        let context = QueryContext {
484            chain_id: self.chain_id,
485            next_block_height: self.height,
486            local_time: self.transaction_tracker.local_time(),
487        };
488        let sender = self.execution_state_sender.clone();
489
490        let txn_tracker = TransactionTracker::default()
491            .with_blobs(self.transaction_tracker.created_blobs().clone());
492
493        let timeout = self
494            .resource_controller
495            .remaining_service_oracle_execution_time()?;
496        let execution_start = Instant::now();
497        let deadline = Some(execution_start + timeout);
498
499        let mut service_runtime =
500            ServiceSyncRuntime::new_with_txn_tracker(sender, context, deadline, txn_tracker);
501
502        let result = service_runtime.run_query(application_id, query);
503
504        // Always track the execution time, irrespective to whether the service ran successfully or
505        // timed out
506        self.resource_controller
507            .track_service_oracle_execution(execution_start.elapsed())?;
508
509        let QueryOutcome {
510            response,
511            operations,
512        } = result?;
513
514        self.resource_controller
515            .track_service_oracle_response(response.len())?;
516
517        self.scheduled_operations.extend(operations);
518        Ok(response)
519    }
520}
521
522impl SyncRuntimeInternal<UserServiceInstance> {
523    /// Initializes a service instance with this runtime.
524    fn load_service_instance(
525        &mut self,
526        this: ServiceSyncRuntimeHandle,
527        id: ApplicationId,
528    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
529        match self.loaded_applications.entry(id) {
530            // TODO(#2927): support dynamic loading of modules on the Web
531            #[cfg(web)]
532            hash_map::Entry::Vacant(_) => {
533                drop(this);
534                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
535                    id,
536                )))
537            }
538            #[cfg(not(web))]
539            hash_map::Entry::Vacant(entry) => {
540                let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
541                let (code, description, txn_tracker_moved) = self
542                    .execution_state_sender
543                    .send_request(move |callback| ExecutionRequest::LoadService {
544                        id,
545                        callback,
546                        txn_tracker: txn_tracker_moved,
547                    })?
548                    .recv_response()?;
549                self.transaction_tracker = txn_tracker_moved;
550
551                let instance = code.instantiate(this)?;
552                Ok(entry
553                    .insert(LoadedApplication::new(instance, description))
554                    .clone())
555            }
556            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
557        }
558    }
559}
560
561impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
562    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
563        let handle = self.0.take().expect(
564            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
565        );
566        let runtime = Arc::into_inner(handle.0)?
567            .into_inner()
568            .expect("`SyncRuntime` should run in a single thread which should not panic");
569        Some(runtime)
570    }
571}
572
573impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
574    for SyncRuntimeHandle<UserInstance>
575{
576    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
577        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
578    }
579}
580
581impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
582    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
583        self.0
584            .try_lock()
585            .expect("Synchronous runtimes run on a single execution thread")
586    }
587}
588
589impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
590where
591    Self: ContractOrServiceRuntime,
592{
593    type Read = ();
594    type ReadValueBytes = u32;
595    type ContainsKey = u32;
596    type ContainsKeys = u32;
597    type ReadMultiValuesBytes = u32;
598    type FindKeysByPrefix = u32;
599    type FindKeyValuesByPrefix = u32;
600
601    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
602        let mut this = self.inner();
603        let chain_id = this.chain_id;
604        this.resource_controller.track_runtime_chain_id()?;
605        Ok(chain_id)
606    }
607
608    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
609        let mut this = self.inner();
610        let height = this.height;
611        this.resource_controller.track_runtime_block_height()?;
612        Ok(height)
613    }
614
615    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
616        let mut this = self.inner();
617        let application_id = this.current_application().id;
618        this.resource_controller.track_runtime_application_id()?;
619        Ok(application_id)
620    }
621
622    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
623        let mut this = self.inner();
624        let application_creator_chain_id = this.current_application().description.creator_chain_id;
625        this.resource_controller.track_runtime_application_id()?;
626        Ok(application_creator_chain_id)
627    }
628
629    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
630        let mut this = self.inner();
631        let parameters = this.current_application().description.parameters.clone();
632        this.resource_controller
633            .track_runtime_application_parameters(&parameters)?;
634        Ok(parameters)
635    }
636
637    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
638        let mut this = self.inner();
639        let timestamp = this
640            .execution_state_sender
641            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
642            .recv_response()?;
643        this.resource_controller.track_runtime_timestamp()?;
644        Ok(timestamp)
645    }
646
647    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
648        let mut this = self.inner();
649        let balance = this
650            .execution_state_sender
651            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
652            .recv_response()?;
653        this.resource_controller.track_runtime_balance()?;
654        Ok(balance)
655    }
656
657    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
658        let mut this = self.inner();
659        let balance = this
660            .execution_state_sender
661            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
662            .recv_response()?;
663        this.resource_controller.track_runtime_balance()?;
664        Ok(balance)
665    }
666
667    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
668        let mut this = self.inner();
669        let owner_balances = this
670            .execution_state_sender
671            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
672            .recv_response()?;
673        this.resource_controller
674            .track_runtime_owner_balances(&owner_balances)?;
675        Ok(owner_balances)
676    }
677
678    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
679        let mut this = self.inner();
680        let owners = this
681            .execution_state_sender
682            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
683            .recv_response()?;
684        this.resource_controller.track_runtime_owners(&owners)?;
685        Ok(owners)
686    }
687
688    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
689        let mut this = self.inner();
690        let chain_ownership = this
691            .execution_state_sender
692            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
693            .recv_response()?;
694        this.resource_controller
695            .track_runtime_chain_ownership(&chain_ownership)?;
696        Ok(chain_ownership)
697    }
698
699    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
700        let mut this = self.inner();
701        let id = this.current_application().id;
702        this.resource_controller.track_read_operation()?;
703        let receiver = this
704            .execution_state_sender
705            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
706        let state = this.view_user_states.entry(id).or_default();
707        state.contains_key_queries.register(receiver)
708    }
709
710    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
711        let mut this = self.inner();
712        let id = this.current_application().id;
713        let state = this.view_user_states.entry(id).or_default();
714        let value = state.contains_key_queries.wait(*promise)?;
715        Ok(value)
716    }
717
718    fn contains_keys_new(
719        &mut self,
720        keys: Vec<Vec<u8>>,
721    ) -> Result<Self::ContainsKeys, ExecutionError> {
722        let mut this = self.inner();
723        let id = this.current_application().id;
724        this.resource_controller.track_read_operation()?;
725        let receiver = this
726            .execution_state_sender
727            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
728        let state = this.view_user_states.entry(id).or_default();
729        state.contains_keys_queries.register(receiver)
730    }
731
732    fn contains_keys_wait(
733        &mut self,
734        promise: &Self::ContainsKeys,
735    ) -> Result<Vec<bool>, ExecutionError> {
736        let mut this = self.inner();
737        let id = this.current_application().id;
738        let state = this.view_user_states.entry(id).or_default();
739        let value = state.contains_keys_queries.wait(*promise)?;
740        Ok(value)
741    }
742
743    fn read_multi_values_bytes_new(
744        &mut self,
745        keys: Vec<Vec<u8>>,
746    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
747        let mut this = self.inner();
748        let id = this.current_application().id;
749        this.resource_controller.track_read_operation()?;
750        let receiver = this.execution_state_sender.send_request(move |callback| {
751            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
752        })?;
753        let state = this.view_user_states.entry(id).or_default();
754        state.read_multi_values_queries.register(receiver)
755    }
756
757    fn read_multi_values_bytes_wait(
758        &mut self,
759        promise: &Self::ReadMultiValuesBytes,
760    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
761        let mut this = self.inner();
762        let id = this.current_application().id;
763        let state = this.view_user_states.entry(id).or_default();
764        let values = state.read_multi_values_queries.wait(*promise)?;
765        for value in &values {
766            if let Some(value) = &value {
767                this.resource_controller
768                    .track_bytes_read(value.len() as u64)?;
769            }
770        }
771        Ok(values)
772    }
773
774    fn read_value_bytes_new(
775        &mut self,
776        key: Vec<u8>,
777    ) -> Result<Self::ReadValueBytes, ExecutionError> {
778        let mut this = self.inner();
779        let id = this.current_application().id;
780        this.resource_controller.track_read_operation()?;
781        let receiver = this
782            .execution_state_sender
783            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
784        let state = this.view_user_states.entry(id).or_default();
785        state.read_value_queries.register(receiver)
786    }
787
788    fn read_value_bytes_wait(
789        &mut self,
790        promise: &Self::ReadValueBytes,
791    ) -> Result<Option<Vec<u8>>, ExecutionError> {
792        let mut this = self.inner();
793        let id = this.current_application().id;
794        let value = {
795            let state = this.view_user_states.entry(id).or_default();
796            state.read_value_queries.wait(*promise)?
797        };
798        if let Some(value) = &value {
799            this.resource_controller
800                .track_bytes_read(value.len() as u64)?;
801        }
802        Ok(value)
803    }
804
805    fn find_keys_by_prefix_new(
806        &mut self,
807        key_prefix: Vec<u8>,
808    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
809        let mut this = self.inner();
810        let id = this.current_application().id;
811        this.resource_controller.track_read_operation()?;
812        let receiver = this.execution_state_sender.send_request(move |callback| {
813            ExecutionRequest::FindKeysByPrefix {
814                id,
815                key_prefix,
816                callback,
817            }
818        })?;
819        let state = this.view_user_states.entry(id).or_default();
820        state.find_keys_queries.register(receiver)
821    }
822
823    fn find_keys_by_prefix_wait(
824        &mut self,
825        promise: &Self::FindKeysByPrefix,
826    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
827        let mut this = self.inner();
828        let id = this.current_application().id;
829        let keys = {
830            let state = this.view_user_states.entry(id).or_default();
831            state.find_keys_queries.wait(*promise)?
832        };
833        let mut read_size = 0;
834        for key in &keys {
835            read_size += key.len();
836        }
837        this.resource_controller
838            .track_bytes_read(read_size as u64)?;
839        Ok(keys)
840    }
841
842    fn find_key_values_by_prefix_new(
843        &mut self,
844        key_prefix: Vec<u8>,
845    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
846        let mut this = self.inner();
847        let id = this.current_application().id;
848        this.resource_controller.track_read_operation()?;
849        let receiver = this.execution_state_sender.send_request(move |callback| {
850            ExecutionRequest::FindKeyValuesByPrefix {
851                id,
852                key_prefix,
853                callback,
854            }
855        })?;
856        let state = this.view_user_states.entry(id).or_default();
857        state.find_key_values_queries.register(receiver)
858    }
859
860    fn find_key_values_by_prefix_wait(
861        &mut self,
862        promise: &Self::FindKeyValuesByPrefix,
863    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
864        let mut this = self.inner();
865        let id = this.current_application().id;
866        let state = this.view_user_states.entry(id).or_default();
867        let key_values = state.find_key_values_queries.wait(*promise)?;
868        let mut read_size = 0;
869        for (key, value) in &key_values {
870            read_size += key.len() + value.len();
871        }
872        this.resource_controller
873            .track_bytes_read(read_size as u64)?;
874        Ok(key_values)
875    }
876
877    fn perform_http_request(
878        &mut self,
879        request: http::Request,
880    ) -> Result<http::Response, ExecutionError> {
881        let mut this = self.inner();
882        let app_permissions = this
883            .execution_state_sender
884            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
885            .recv_response()?;
886
887        let app_id = this.current_application().id;
888        ensure!(
889            app_permissions.can_make_http_requests(&app_id),
890            ExecutionError::UnauthorizedApplication(app_id)
891        );
892
893        this.resource_controller.track_http_request()?;
894
895        let response =
896            if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
897                match response {
898                    OracleResponse::Http(response) => response,
899                    _ => return Err(ExecutionError::OracleResponseMismatch),
900                }
901            } else {
902                this.execution_state_sender
903                    .send_request(|callback| ExecutionRequest::PerformHttpRequest {
904                        request,
905                        http_responses_are_oracle_responses:
906                            Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
907                        callback,
908                    })?
909                    .recv_response()?
910            };
911        this.transaction_tracker
912            .add_oracle_response(OracleResponse::Http(response.clone()));
913        Ok(response)
914    }
915
916    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
917        let mut this = self.inner();
918        if !this
919            .transaction_tracker
920            .replay_oracle_response(OracleResponse::Assert)?
921        {
922            // There are no recorded oracle responses, so we check the local time.
923            let local_time = this.transaction_tracker.local_time();
924            ensure!(
925                local_time < timestamp,
926                ExecutionError::AssertBefore {
927                    timestamp,
928                    local_time,
929                }
930            );
931        }
932        Ok(())
933    }
934
935    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
936        let mut this = self.inner();
937        let blob_id = hash.into();
938        if let Some(content) = this.transaction_tracker.get_blob_content(&blob_id) {
939            return Ok(content.bytes().to_vec());
940        };
941        let (content, is_new) = this
942            .execution_state_sender
943            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
944            .recv_response()?;
945        if is_new {
946            this.transaction_tracker
947                .replay_oracle_response(OracleResponse::Blob(blob_id))?;
948        }
949        Ok(content.into_vec_or_clone())
950    }
951
952    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
953        let mut this = self.inner();
954        let blob_id = hash.into();
955        let is_new = this
956            .execution_state_sender
957            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
958            .recv_response()?;
959        if is_new {
960            this.transaction_tracker
961                .replay_oracle_response(OracleResponse::Blob(blob_id))?;
962        }
963        Ok(())
964    }
965}
966
967/// An extension trait to determine in compile time the different behaviors between contract and
968/// services in the implementation of [`BaseRuntime`].
969trait ContractOrServiceRuntime {
970    /// Configured to `true` if the HTTP response size should be limited to the oracle response
971    /// size.
972    ///
973    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
974    /// and only storing in the block a shorter oracle response.
975    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
976}
977
978impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
979    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
980}
981
982impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
983    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
984}
985
986impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
987    fn clone(&self) -> Self {
988        SyncRuntimeHandle(self.0.clone())
989    }
990}
991
992impl ContractSyncRuntime {
993    pub(crate) fn new(
994        execution_state_sender: ExecutionStateSender,
995        chain_id: ChainId,
996        refund_grant_to: Option<Account>,
997        resource_controller: ResourceController,
998        action: &UserAction,
999        txn_tracker: TransactionTracker,
1000    ) -> Self {
1001        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1002            SyncRuntimeInternal::new(
1003                chain_id,
1004                action.height(),
1005                action.round(),
1006                if let UserAction::Message(context, _) = action {
1007                    Some(context.into())
1008                } else {
1009                    None
1010                },
1011                execution_state_sender,
1012                None,
1013                refund_grant_to,
1014                resource_controller,
1015                txn_tracker,
1016                action.timestamp(),
1017            ),
1018        )))
1019    }
1020
1021    pub(crate) fn preload_contract(
1022        &self,
1023        id: ApplicationId,
1024        code: UserContractCode,
1025        description: ApplicationDescription,
1026    ) -> Result<(), ExecutionError> {
1027        let this = self
1028            .0
1029            .as_ref()
1030            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1031        let runtime_handle = this.clone();
1032        let mut this_guard = this.inner();
1033
1034        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1035            entry.insert(LoadedApplication::new(
1036                code.instantiate(runtime_handle)?,
1037                description,
1038            ));
1039            this_guard.applications_to_finalize.push(id);
1040        }
1041
1042        Ok(())
1043    }
1044
1045    /// Main entry point to start executing a user action.
1046    pub(crate) fn run_action(
1047        mut self,
1048        application_id: ApplicationId,
1049        chain_id: ChainId,
1050        action: UserAction,
1051    ) -> Result<(Option<Vec<u8>>, ResourceController, TransactionTracker), ExecutionError> {
1052        let result = self
1053            .deref_mut()
1054            .run_action(application_id, chain_id, action)?;
1055        let runtime = self
1056            .into_inner()
1057            .expect("Runtime clones should have been freed by now");
1058        Ok((
1059            result,
1060            runtime.resource_controller,
1061            runtime.transaction_tracker,
1062        ))
1063    }
1064}
1065
1066impl ContractSyncRuntimeHandle {
1067    fn run_action(
1068        &mut self,
1069        application_id: ApplicationId,
1070        chain_id: ChainId,
1071        action: UserAction,
1072    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1073        let finalize_context = FinalizeContext {
1074            authenticated_signer: action.signer(),
1075            chain_id,
1076            height: action.height(),
1077            round: action.round(),
1078        };
1079
1080        {
1081            let runtime = self.inner();
1082            assert_eq!(runtime.chain_id, chain_id);
1083            assert_eq!(runtime.height, action.height());
1084        }
1085
1086        let signer = action.signer();
1087        let closure = move |code: &mut UserContractInstance| match action {
1088            UserAction::Instantiate(_context, argument) => {
1089                code.instantiate(argument).map(|()| None)
1090            }
1091            UserAction::Operation(_context, operation) => {
1092                code.execute_operation(operation).map(Option::Some)
1093            }
1094            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1095            UserAction::ProcessStreams(_context, updates) => {
1096                code.process_streams(updates).map(|()| None)
1097            }
1098        };
1099
1100        let result = self.execute(application_id, signer, closure)?;
1101        self.finalize(finalize_context)?;
1102        Ok(result)
1103    }
1104
1105    /// Notifies all loaded applications that execution is finalizing.
1106    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1107        let applications = mem::take(&mut self.inner().applications_to_finalize)
1108            .into_iter()
1109            .rev();
1110
1111        self.inner().is_finalizing = true;
1112
1113        for application in applications {
1114            self.execute(application, context.authenticated_signer, |contract| {
1115                contract.finalize().map(|_| None)
1116            })?;
1117            self.inner().loaded_applications.remove(&application);
1118        }
1119
1120        Ok(())
1121    }
1122
1123    /// Executes a `closure` with the contract code for the `application_id`.
1124    fn execute(
1125        &mut self,
1126        application_id: ApplicationId,
1127        signer: Option<AccountOwner>,
1128        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1129    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1130        let contract = {
1131            let mut runtime = self.inner();
1132            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1133
1134            let status = ApplicationStatus {
1135                caller_id: None,
1136                id: application_id,
1137                description: application.description.clone(),
1138                signer,
1139            };
1140
1141            runtime.push_application(status);
1142
1143            application
1144        };
1145
1146        let result = closure(
1147            &mut contract
1148                .instance
1149                .try_lock()
1150                .expect("Application should not be already executing"),
1151        )?;
1152
1153        let mut runtime = self.inner();
1154        let application_status = runtime.pop_application();
1155        assert_eq!(application_status.caller_id, None);
1156        assert_eq!(application_status.id, application_id);
1157        assert_eq!(application_status.description, contract.description);
1158        assert_eq!(application_status.signer, signer);
1159        assert!(runtime.call_stack.is_empty());
1160
1161        Ok(result)
1162    }
1163}
1164
1165impl ContractRuntime for ContractSyncRuntimeHandle {
1166    fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1167        let this = self.inner();
1168        Ok(this.current_application().signer)
1169    }
1170
1171    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1172        Ok(self
1173            .inner()
1174            .executing_message
1175            .map(|metadata| metadata.is_bouncing))
1176    }
1177
1178    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1179        Ok(self
1180            .inner()
1181            .executing_message
1182            .map(|metadata| metadata.origin))
1183    }
1184
1185    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1186        let this = self.inner();
1187        if this.call_stack.len() <= 1 {
1188            return Ok(None);
1189        }
1190        Ok(this.current_application().caller_id)
1191    }
1192
1193    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1194        Ok(match vm_runtime {
1195            VmRuntime::Wasm => {
1196                self.inner()
1197                    .resource_controller
1198                    .policy()
1199                    .maximum_wasm_fuel_per_block
1200            }
1201            VmRuntime::Evm => {
1202                self.inner()
1203                    .resource_controller
1204                    .policy()
1205                    .maximum_evm_fuel_per_block
1206            }
1207        })
1208    }
1209
1210    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1211        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1212    }
1213
1214    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1215        let mut this = self.inner();
1216        this.resource_controller.track_fuel(fuel, vm_runtime)
1217    }
1218
1219    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1220        let mut this = self.inner();
1221        let application = this.current_application();
1222        let application_id = application.id;
1223        let authenticated_signer = application.signer;
1224        let mut refund_grant_to = this.refund_grant_to;
1225
1226        let grant = this
1227            .resource_controller
1228            .policy()
1229            .total_price(&message.grant)?;
1230        if grant.is_zero() {
1231            refund_grant_to = None;
1232        } else {
1233            this.resource_controller.track_grant(grant)?;
1234        }
1235        let kind = if message.is_tracked {
1236            MessageKind::Tracked
1237        } else {
1238            MessageKind::Simple
1239        };
1240
1241        this.transaction_tracker
1242            .add_outgoing_message(OutgoingMessage {
1243                destination: message.destination,
1244                authenticated_signer,
1245                refund_grant_to,
1246                grant,
1247                kind,
1248                message: Message::User {
1249                    application_id,
1250                    bytes: message.message,
1251                },
1252            });
1253
1254        Ok(())
1255    }
1256
1257    fn transfer(
1258        &mut self,
1259        source: AccountOwner,
1260        destination: Account,
1261        amount: Amount,
1262    ) -> Result<(), ExecutionError> {
1263        let mut this = self.inner();
1264        let current_application = this.current_application();
1265        let application_id = current_application.id;
1266        let signer = current_application.signer;
1267
1268        let maybe_message = this
1269            .execution_state_sender
1270            .send_request(|callback| ExecutionRequest::Transfer {
1271                source,
1272                destination,
1273                amount,
1274                signer,
1275                application_id,
1276                callback,
1277            })?
1278            .recv_response()?;
1279
1280        this.transaction_tracker
1281            .add_outgoing_messages(maybe_message);
1282        Ok(())
1283    }
1284
1285    fn claim(
1286        &mut self,
1287        source: Account,
1288        destination: Account,
1289        amount: Amount,
1290    ) -> Result<(), ExecutionError> {
1291        let mut this = self.inner();
1292        let current_application = this.current_application();
1293        let application_id = current_application.id;
1294        let signer = current_application.signer;
1295
1296        let maybe_message = this
1297            .execution_state_sender
1298            .send_request(|callback| ExecutionRequest::Claim {
1299                source,
1300                destination,
1301                amount,
1302                signer,
1303                application_id,
1304                callback,
1305            })?
1306            .recv_response()?;
1307        this.transaction_tracker
1308            .add_outgoing_messages(maybe_message);
1309        Ok(())
1310    }
1311
1312    fn try_call_application(
1313        &mut self,
1314        authenticated: bool,
1315        callee_id: ApplicationId,
1316        argument: Vec<u8>,
1317    ) -> Result<Vec<u8>, ExecutionError> {
1318        let contract = self
1319            .inner()
1320            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1321
1322        let value = contract
1323            .try_lock()
1324            .expect("Applications should not have reentrant calls")
1325            .execute_operation(argument)?;
1326
1327        self.inner().finish_call()?;
1328
1329        Ok(value)
1330    }
1331
1332    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1333        let mut this = self.inner();
1334        ensure!(
1335            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1336            ExecutionError::StreamNameTooLong
1337        );
1338        let application_id = GenericApplicationId::User(this.current_application().id);
1339        let stream_id = StreamId {
1340            stream_name,
1341            application_id,
1342        };
1343        let index = this
1344            .execution_state_sender
1345            .send_request(|callback| ExecutionRequest::NextEventIndex {
1346                stream_id: stream_id.clone(),
1347                callback,
1348            })?
1349            .recv_response()?;
1350        // TODO(#365): Consider separate event fee categories.
1351        this.resource_controller
1352            .track_bytes_written(value.len() as u64)?;
1353        this.transaction_tracker.add_event(stream_id, index, value);
1354        Ok(index)
1355    }
1356
1357    fn read_event(
1358        &mut self,
1359        chain_id: ChainId,
1360        stream_name: StreamName,
1361        index: u32,
1362    ) -> Result<Vec<u8>, ExecutionError> {
1363        let mut this = self.inner();
1364        ensure!(
1365            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1366            ExecutionError::StreamNameTooLong
1367        );
1368        let application_id = GenericApplicationId::User(this.current_application().id);
1369        let stream_id = StreamId {
1370            stream_name,
1371            application_id,
1372        };
1373        let event_id = EventId {
1374            stream_id,
1375            index,
1376            chain_id,
1377        };
1378        let event = match this.transaction_tracker.next_replayed_oracle_response()? {
1379            None => this
1380                .execution_state_sender
1381                .send_request(|callback| ExecutionRequest::ReadEvent {
1382                    event_id: event_id.clone(),
1383                    callback,
1384                })?
1385                .recv_response()?,
1386            Some(OracleResponse::Event(recorded_event_id, event))
1387                if recorded_event_id == event_id =>
1388            {
1389                event
1390            }
1391            Some(_) => return Err(ExecutionError::OracleResponseMismatch),
1392        };
1393        this.transaction_tracker
1394            .add_oracle_response(OracleResponse::Event(event_id, event.clone()));
1395        // TODO(#365): Consider separate event fee categories.
1396        this.resource_controller
1397            .track_bytes_read(event.len() as u64)?;
1398        Ok(event)
1399    }
1400
1401    fn subscribe_to_events(
1402        &mut self,
1403        chain_id: ChainId,
1404        application_id: ApplicationId,
1405        stream_name: StreamName,
1406    ) -> Result<(), ExecutionError> {
1407        let mut this = self.inner();
1408        ensure!(
1409            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1410            ExecutionError::StreamNameTooLong
1411        );
1412        let stream_id = StreamId {
1413            stream_name,
1414            application_id: application_id.into(),
1415        };
1416        let subscriber_app_id = this.current_application().id;
1417        let next_index = this
1418            .execution_state_sender
1419            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1420                chain_id,
1421                stream_id: stream_id.clone(),
1422                subscriber_app_id,
1423                callback,
1424            })?
1425            .recv_response()?;
1426        this.transaction_tracker.add_stream_to_process(
1427            subscriber_app_id,
1428            chain_id,
1429            stream_id,
1430            0,
1431            next_index,
1432        );
1433        Ok(())
1434    }
1435
1436    fn unsubscribe_from_events(
1437        &mut self,
1438        chain_id: ChainId,
1439        application_id: ApplicationId,
1440        stream_name: StreamName,
1441    ) -> Result<(), ExecutionError> {
1442        let mut this = self.inner();
1443        ensure!(
1444            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1445            ExecutionError::StreamNameTooLong
1446        );
1447        let stream_id = StreamId {
1448            stream_name,
1449            application_id: application_id.into(),
1450        };
1451        let subscriber_app_id = this.current_application().id;
1452        this.execution_state_sender
1453            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1454                chain_id,
1455                stream_id: stream_id.clone(),
1456                subscriber_app_id,
1457                callback,
1458            })?
1459            .recv_response()?;
1460        this.transaction_tracker
1461            .remove_stream_to_process(application_id, chain_id, stream_id);
1462        Ok(())
1463    }
1464
1465    fn query_service(
1466        &mut self,
1467        application_id: ApplicationId,
1468        query: Vec<u8>,
1469    ) -> Result<Vec<u8>, ExecutionError> {
1470        let mut this = self.inner();
1471
1472        let app_permissions = this
1473            .execution_state_sender
1474            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1475            .recv_response()?;
1476
1477        let app_id = this.current_application().id;
1478        ensure!(
1479            app_permissions.can_call_services(&app_id),
1480            ExecutionError::UnauthorizedApplication(app_id)
1481        );
1482
1483        this.resource_controller.track_service_oracle_call()?;
1484        let response =
1485            if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1486                match response {
1487                    OracleResponse::Service(bytes) => bytes,
1488                    _ => return Err(ExecutionError::OracleResponseMismatch),
1489                }
1490            } else {
1491                this.run_service_oracle_query(application_id, query)?
1492            };
1493
1494        this.transaction_tracker
1495            .add_oracle_response(OracleResponse::Service(response.clone()));
1496
1497        Ok(response)
1498    }
1499
1500    fn open_chain(
1501        &mut self,
1502        ownership: ChainOwnership,
1503        application_permissions: ApplicationPermissions,
1504        balance: Amount,
1505    ) -> Result<ChainId, ExecutionError> {
1506        let parent_id = self.inner().chain_id;
1507        let block_height = self.block_height()?;
1508
1509        let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1510        let timestamp = self.inner().user_context;
1511
1512        let (chain_id, txn_tracker_moved) = self
1513            .inner()
1514            .execution_state_sender
1515            .send_request(|callback| ExecutionRequest::OpenChain {
1516                ownership,
1517                balance,
1518                parent_id,
1519                block_height,
1520                timestamp,
1521                application_permissions,
1522                callback,
1523                txn_tracker: txn_tracker_moved,
1524            })?
1525            .recv_response()?;
1526
1527        self.inner().transaction_tracker = txn_tracker_moved;
1528
1529        Ok(chain_id)
1530    }
1531
1532    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1533        let this = self.inner();
1534        let application_id = this.current_application().id;
1535        this.execution_state_sender
1536            .send_request(|callback| ExecutionRequest::CloseChain {
1537                application_id,
1538                callback,
1539            })?
1540            .recv_response()?
1541    }
1542
1543    fn change_application_permissions(
1544        &mut self,
1545        application_permissions: ApplicationPermissions,
1546    ) -> Result<(), ExecutionError> {
1547        let this = self.inner();
1548        let application_id = this.current_application().id;
1549        this.execution_state_sender
1550            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1551                application_id,
1552                application_permissions,
1553                callback,
1554            })?
1555            .recv_response()?
1556    }
1557
1558    fn create_application(
1559        &mut self,
1560        module_id: ModuleId,
1561        parameters: Vec<u8>,
1562        argument: Vec<u8>,
1563        required_application_ids: Vec<ApplicationId>,
1564    ) -> Result<ApplicationId, ExecutionError> {
1565        let chain_id = self.inner().chain_id;
1566        let block_height = self.block_height()?;
1567
1568        let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1569
1570        let CreateApplicationResult {
1571            app_id,
1572            txn_tracker: txn_tracker_moved,
1573        } = self
1574            .inner()
1575            .execution_state_sender
1576            .send_request(move |callback| ExecutionRequest::CreateApplication {
1577                chain_id,
1578                block_height,
1579                module_id,
1580                parameters,
1581                required_application_ids,
1582                callback,
1583                txn_tracker: txn_tracker_moved,
1584            })?
1585            .recv_response()??;
1586
1587        self.inner().transaction_tracker = txn_tracker_moved;
1588
1589        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1590
1591        contract
1592            .try_lock()
1593            .expect("Applications should not have reentrant calls")
1594            .instantiate(argument)?;
1595
1596        self.inner().finish_call()?;
1597
1598        Ok(app_id)
1599    }
1600
1601    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1602        let blob = Blob::new_data(bytes);
1603        let blob_id = blob.id();
1604        self.inner().transaction_tracker.add_created_blob(blob);
1605        Ok(DataBlobHash(blob_id.hash))
1606    }
1607
1608    fn publish_module(
1609        &mut self,
1610        contract: Bytecode,
1611        service: Bytecode,
1612        vm_runtime: VmRuntime,
1613    ) -> Result<ModuleId, ExecutionError> {
1614        let (blobs, module_id) =
1615            crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1616        for blob in blobs {
1617            self.inner().transaction_tracker.add_created_blob(blob);
1618        }
1619        Ok(module_id)
1620    }
1621
1622    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1623        let mut this = self.inner();
1624        let round =
1625            if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1626                match response {
1627                    OracleResponse::Round(round) => round,
1628                    _ => return Err(ExecutionError::OracleResponseMismatch),
1629                }
1630            } else {
1631                this.round
1632            };
1633        this.transaction_tracker
1634            .add_oracle_response(OracleResponse::Round(round));
1635        Ok(round)
1636    }
1637
1638    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1639        let mut this = self.inner();
1640        let id = this.current_application().id;
1641        let state = this.view_user_states.entry(id).or_default();
1642        state.force_all_pending_queries()?;
1643        this.resource_controller.track_write_operations(
1644            batch
1645                .num_operations()
1646                .try_into()
1647                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1648        )?;
1649        this.resource_controller
1650            .track_bytes_written(batch.size() as u64)?;
1651        this.execution_state_sender
1652            .send_request(|callback| ExecutionRequest::WriteBatch {
1653                id,
1654                batch,
1655                callback,
1656            })?
1657            .recv_response()?;
1658        Ok(())
1659    }
1660}
1661
1662impl ServiceSyncRuntime {
1663    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1664    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1665        let mut txn_tracker = TransactionTracker::default();
1666        txn_tracker.set_local_time(context.local_time);
1667        Self::new_with_txn_tracker(execution_state_sender, context, None, txn_tracker)
1668    }
1669
1670    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1671    pub fn new_with_txn_tracker(
1672        execution_state_sender: ExecutionStateSender,
1673        context: QueryContext,
1674        deadline: Option<Instant>,
1675        txn_tracker: TransactionTracker,
1676    ) -> Self {
1677        let runtime = SyncRuntime(Some(
1678            SyncRuntimeInternal::new(
1679                context.chain_id,
1680                context.next_block_height,
1681                None,
1682                None,
1683                execution_state_sender,
1684                deadline,
1685                None,
1686                ResourceController::default(),
1687                txn_tracker,
1688                (),
1689            )
1690            .into(),
1691        ));
1692
1693        ServiceSyncRuntime {
1694            runtime,
1695            current_context: context,
1696        }
1697    }
1698
1699    /// Loads a service into the runtime's memory.
1700    pub(crate) fn preload_service(
1701        &self,
1702        id: ApplicationId,
1703        code: UserServiceCode,
1704        description: ApplicationDescription,
1705    ) -> Result<(), ExecutionError> {
1706        let this = self
1707            .runtime
1708            .0
1709            .as_ref()
1710            .expect("services shouldn't be preloaded while the runtime is being dropped");
1711        let runtime_handle = this.clone();
1712        let mut this_guard = this.inner();
1713
1714        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1715            entry.insert(LoadedApplication::new(
1716                code.instantiate(runtime_handle)?,
1717                description,
1718            ));
1719            this_guard.applications_to_finalize.push(id);
1720        }
1721
1722        Ok(())
1723    }
1724
1725    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1726    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1727        while let Ok(request) = incoming_requests.recv() {
1728            let ServiceRuntimeRequest::Query {
1729                application_id,
1730                context,
1731                query,
1732                callback,
1733            } = request;
1734
1735            self.prepare_for_query(context);
1736
1737            let _ = callback.send(self.run_query(application_id, query));
1738        }
1739    }
1740
1741    /// Prepares the runtime to query an application.
1742    pub(crate) fn prepare_for_query(&mut self, new_context: QueryContext) {
1743        let expected_context = QueryContext {
1744            local_time: new_context.local_time,
1745            ..self.current_context
1746        };
1747
1748        if new_context != expected_context {
1749            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1750            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1751        } else {
1752            self.handle_mut()
1753                .inner()
1754                .transaction_tracker
1755                .set_local_time(new_context.local_time);
1756        }
1757    }
1758
1759    /// Queries an application specified by its [`ApplicationId`].
1760    pub(crate) fn run_query(
1761        &mut self,
1762        application_id: ApplicationId,
1763        query: Vec<u8>,
1764    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1765        let this = self.handle_mut();
1766        let response = this.try_query_application(application_id, query)?;
1767        let operations = mem::take(&mut this.inner().scheduled_operations);
1768
1769        Ok(QueryOutcome {
1770            response,
1771            operations,
1772        })
1773    }
1774
1775    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1776    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1777        self.runtime.0.as_mut().expect(
1778            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1779        )
1780    }
1781}
1782
1783impl ServiceRuntime for ServiceSyncRuntimeHandle {
1784    /// Note that queries are not available from writable contexts.
1785    fn try_query_application(
1786        &mut self,
1787        queried_id: ApplicationId,
1788        argument: Vec<u8>,
1789    ) -> Result<Vec<u8>, ExecutionError> {
1790        let service = {
1791            let mut this = self.inner();
1792
1793            // Load the application.
1794            let application = this.load_service_instance(self.clone(), queried_id)?;
1795            // Make the call to user code.
1796            this.push_application(ApplicationStatus {
1797                caller_id: None,
1798                id: queried_id,
1799                description: application.description,
1800                signer: None,
1801            });
1802            application.instance
1803        };
1804        let response = service
1805            .try_lock()
1806            .expect("Applications should not have reentrant calls")
1807            .handle_query(argument)?;
1808        self.inner().pop_application();
1809        Ok(response)
1810    }
1811
1812    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1813        let mut this = self.inner();
1814        let application_id = this.current_application().id;
1815
1816        this.scheduled_operations.push(Operation::User {
1817            application_id,
1818            bytes: operation,
1819        });
1820
1821        Ok(())
1822    }
1823
1824    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1825        if let Some(deadline) = self.inner().deadline {
1826            if Instant::now() >= deadline {
1827                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1828            }
1829        }
1830        Ok(())
1831    }
1832}
1833
1834/// A request to the service runtime actor.
1835pub enum ServiceRuntimeRequest {
1836    Query {
1837        application_id: ApplicationId,
1838        context: QueryContext,
1839        query: Vec<u8>,
1840        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1841    },
1842}
1843
1844/// The origin of the execution.
1845#[derive(Clone, Copy, Debug)]
1846struct ExecutingMessage {
1847    is_bouncing: bool,
1848    origin: ChainId,
1849}
1850
1851impl From<&MessageContext> for ExecutingMessage {
1852    fn from(context: &MessageContext) -> Self {
1853        ExecutingMessage {
1854            is_bouncing: context.is_bouncing,
1855            origin: context.origin,
1856        }
1857    }
1858}
1859
1860/// Creates a compressed contract and service bytecode synchronously.
1861pub fn create_bytecode_blobs_sync(
1862    contract: Bytecode,
1863    service: Bytecode,
1864    vm_runtime: VmRuntime,
1865) -> (Vec<Blob>, ModuleId) {
1866    match vm_runtime {
1867        VmRuntime::Wasm => {
1868            let compressed_contract = contract.compress();
1869            let compressed_service = service.compress();
1870            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1871            let service_blob = Blob::new_service_bytecode(compressed_service);
1872            let module_id =
1873                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1874            (vec![contract_blob, service_blob], module_id)
1875        }
1876        VmRuntime::Evm => {
1877            let compressed_contract = contract.compress();
1878            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1879            let module_id = ModuleId::new(
1880                evm_contract_blob.id().hash,
1881                evm_contract_blob.id().hash,
1882                vm_runtime,
1883            );
1884            (vec![evm_contract_blob], module_id)
1885        }
1886    }
1887}