1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15 OracleResponse, SendMessageRequest, Timestamp,
16 },
17 ensure, http,
18 identifiers::{
19 Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20 },
21 ownership::ChainOwnership,
22 time::Instant,
23 vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29 execution::UserAction,
30 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31 resources::ResourceController,
32 system::CreateApplicationResult,
33 util::{ReceiverExt, UnboundedSenderExt},
34 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35 ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36 OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, TransactionTracker,
37 UserContractCode, UserContractInstance, UserServiceCode, UserServiceInstance,
38 MAX_STREAM_NAME_LEN,
39};
40
41#[cfg(test)]
42#[path = "unit_tests/runtime_tests.rs"]
43mod tests;
44
45pub trait WithContext {
46 type UserContext;
47}
48
49impl WithContext for UserContractInstance {
50 type UserContext = Timestamp;
51}
52
53impl WithContext for UserServiceInstance {
54 type UserContext = ();
55}
56
57#[cfg(test)]
58impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
59 type UserContext = ();
60}
61
62#[derive(Debug)]
63pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
64
65pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
66
67pub struct ServiceSyncRuntime {
68 runtime: SyncRuntime<UserServiceInstance>,
69 current_context: QueryContext,
70}
71
72#[derive(Debug)]
73pub struct SyncRuntimeHandle<UserInstance: WithContext>(
74 Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
75);
76
77pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
78pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
79
80#[derive(Debug)]
82pub struct SyncRuntimeInternal<UserInstance: WithContext> {
83 chain_id: ChainId,
85 height: BlockHeight,
88 round: Option<u32>,
90 #[debug(skip_if = Option::is_none)]
92 executing_message: Option<ExecutingMessage>,
93
94 execution_state_sender: ExecutionStateSender,
96
97 is_finalizing: bool,
101 applications_to_finalize: Vec<ApplicationId>,
103
104 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
106 call_stack: Vec<ApplicationStatus>,
108 active_applications: HashSet<ApplicationId>,
110 transaction_tracker: TransactionTracker,
112 scheduled_operations: Vec<Operation>,
114
115 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
117
118 deadline: Option<Instant>,
122
123 #[debug(skip_if = Option::is_none)]
125 refund_grant_to: Option<Account>,
126 resource_controller: ResourceController,
128 user_context: UserInstance::UserContext,
130}
131
132#[derive(Debug)]
134struct ApplicationStatus {
135 caller_id: Option<ApplicationId>,
137 id: ApplicationId,
139 description: ApplicationDescription,
141 signer: Option<AccountOwner>,
143}
144
145#[derive(Debug)]
147struct LoadedApplication<Instance> {
148 instance: Arc<Mutex<Instance>>,
149 description: ApplicationDescription,
150}
151
152impl<Instance> LoadedApplication<Instance> {
153 fn new(instance: Instance, description: ApplicationDescription) -> Self {
155 LoadedApplication {
156 instance: Arc::new(Mutex::new(instance)),
157 description,
158 }
159 }
160}
161
162impl<Instance> Clone for LoadedApplication<Instance> {
163 fn clone(&self) -> Self {
166 LoadedApplication {
167 instance: self.instance.clone(),
168 description: self.description.clone(),
169 }
170 }
171}
172
173#[derive(Debug)]
174enum Promise<T> {
175 Ready(T),
176 Pending(Receiver<T>),
177}
178
179impl<T> Promise<T> {
180 fn force(&mut self) -> Result<(), ExecutionError> {
181 if let Promise::Pending(receiver) = self {
182 let value = receiver
183 .recv_ref()
184 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
185 *self = Promise::Ready(value);
186 }
187 Ok(())
188 }
189
190 fn read(self) -> Result<T, ExecutionError> {
191 match self {
192 Promise::Pending(receiver) => {
193 let value = receiver.recv_response()?;
194 Ok(value)
195 }
196 Promise::Ready(value) => Ok(value),
197 }
198 }
199}
200
201#[derive(Debug, Default)]
203struct QueryManager<T> {
204 pending_queries: BTreeMap<u32, Promise<T>>,
206 query_count: u32,
208 active_query_count: u32,
210}
211
212impl<T> QueryManager<T> {
213 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
214 let id = self.query_count;
215 self.pending_queries.insert(id, Promise::Pending(receiver));
216 self.query_count = self
217 .query_count
218 .checked_add(1)
219 .ok_or(ArithmeticError::Overflow)?;
220 self.active_query_count = self
221 .active_query_count
222 .checked_add(1)
223 .ok_or(ArithmeticError::Overflow)?;
224 Ok(id)
225 }
226
227 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
228 let promise = self
229 .pending_queries
230 .remove(&id)
231 .ok_or(ExecutionError::InvalidPromise)?;
232 let value = promise.read()?;
233 self.active_query_count -= 1;
234 Ok(value)
235 }
236
237 fn force_all(&mut self) -> Result<(), ExecutionError> {
238 for promise in self.pending_queries.values_mut() {
239 promise.force()?;
240 }
241 Ok(())
242 }
243}
244
245type Keys = Vec<Vec<u8>>;
246type Value = Vec<u8>;
247type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
248
249#[derive(Debug, Default)]
250struct ViewUserState {
251 contains_key_queries: QueryManager<bool>,
253 contains_keys_queries: QueryManager<Vec<bool>>,
255 read_value_queries: QueryManager<Option<Value>>,
257 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
259 find_keys_queries: QueryManager<Keys>,
261 find_key_values_queries: QueryManager<KeyValues>,
263}
264
265impl ViewUserState {
266 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
267 self.contains_key_queries.force_all()?;
268 self.contains_keys_queries.force_all()?;
269 self.read_value_queries.force_all()?;
270 self.read_multi_values_queries.force_all()?;
271 self.find_keys_queries.force_all()?;
272 self.find_key_values_queries.force_all()?;
273 Ok(())
274 }
275}
276
277impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
278 type Target = SyncRuntimeHandle<UserInstance>;
279
280 fn deref(&self) -> &Self::Target {
281 self.0.as_ref().expect(
282 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
283 )
284 }
285}
286
287impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
288 fn deref_mut(&mut self) -> &mut Self::Target {
289 self.0.as_mut().expect(
290 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
291 )
292 }
293}
294
295impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
296 fn drop(&mut self) {
297 if let Some(handle) = self.0.take() {
300 handle.inner().loaded_applications.clear();
301 }
302 }
303}
304
305impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
306 #[expect(clippy::too_many_arguments)]
307 fn new(
308 chain_id: ChainId,
309 height: BlockHeight,
310 round: Option<u32>,
311 executing_message: Option<ExecutingMessage>,
312 execution_state_sender: ExecutionStateSender,
313 deadline: Option<Instant>,
314 refund_grant_to: Option<Account>,
315 resource_controller: ResourceController,
316 transaction_tracker: TransactionTracker,
317 user_context: UserInstance::UserContext,
318 ) -> Self {
319 Self {
320 chain_id,
321 height,
322 round,
323 executing_message,
324 execution_state_sender,
325 is_finalizing: false,
326 applications_to_finalize: Vec::new(),
327 loaded_applications: HashMap::new(),
328 call_stack: Vec::new(),
329 active_applications: HashSet::new(),
330 view_user_states: BTreeMap::new(),
331 deadline,
332 refund_grant_to,
333 resource_controller,
334 transaction_tracker,
335 scheduled_operations: Vec::new(),
336 user_context,
337 }
338 }
339
340 fn current_application(&self) -> &ApplicationStatus {
348 self.call_stack
349 .last()
350 .expect("Call stack is unexpectedly empty")
351 }
352
353 fn push_application(&mut self, status: ApplicationStatus) {
357 self.active_applications.insert(status.id);
358 self.call_stack.push(status);
359 }
360
361 fn pop_application(&mut self) -> ApplicationStatus {
369 let status = self
370 .call_stack
371 .pop()
372 .expect("Can't remove application from empty call stack");
373 assert!(self.active_applications.remove(&status.id));
374 status
375 }
376
377 fn check_for_reentrancy(
381 &mut self,
382 application_id: ApplicationId,
383 ) -> Result<(), ExecutionError> {
384 ensure!(
385 !self.active_applications.contains(&application_id),
386 ExecutionError::ReentrantCall(application_id)
387 );
388 Ok(())
389 }
390}
391
392impl SyncRuntimeInternal<UserContractInstance> {
393 fn load_contract_instance(
395 &mut self,
396 this: SyncRuntimeHandle<UserContractInstance>,
397 id: ApplicationId,
398 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
399 match self.loaded_applications.entry(id) {
400 #[cfg(web)]
402 hash_map::Entry::Vacant(_) => {
403 drop(this);
404 Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
405 id,
406 )))
407 }
408 #[cfg(not(web))]
409 hash_map::Entry::Vacant(entry) => {
410 let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
411 let (code, description, txn_tracker_moved) = self
412 .execution_state_sender
413 .send_request(move |callback| ExecutionRequest::LoadContract {
414 id,
415 callback,
416 txn_tracker: txn_tracker_moved,
417 })?
418 .recv_response()?;
419 self.transaction_tracker = txn_tracker_moved;
420
421 let instance = code.instantiate(this)?;
422
423 self.applications_to_finalize.push(id);
424 Ok(entry
425 .insert(LoadedApplication::new(instance, description))
426 .clone())
427 }
428 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
429 }
430 }
431
432 fn prepare_for_call(
434 &mut self,
435 this: ContractSyncRuntimeHandle,
436 authenticated: bool,
437 callee_id: ApplicationId,
438 ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
439 self.check_for_reentrancy(callee_id)?;
440
441 ensure!(
442 !self.is_finalizing,
443 ExecutionError::CrossApplicationCallInFinalize {
444 caller_id: Box::new(self.current_application().id),
445 callee_id: Box::new(callee_id),
446 }
447 );
448
449 let application = self.load_contract_instance(this, callee_id)?;
451
452 let caller = self.current_application();
453 let caller_id = caller.id;
454 let caller_signer = caller.signer;
455 let authenticated_signer = match caller_signer {
457 Some(signer) if authenticated => Some(signer),
458 _ => None,
459 };
460 let authenticated_caller_id = authenticated.then_some(caller_id);
461 self.push_application(ApplicationStatus {
462 caller_id: authenticated_caller_id,
463 id: callee_id,
464 description: application.description,
465 signer: authenticated_signer,
467 });
468 Ok(application.instance)
469 }
470
471 fn finish_call(&mut self) -> Result<(), ExecutionError> {
473 self.pop_application();
474 Ok(())
475 }
476
477 fn run_service_oracle_query(
479 &mut self,
480 application_id: ApplicationId,
481 query: Vec<u8>,
482 ) -> Result<Vec<u8>, ExecutionError> {
483 let context = QueryContext {
484 chain_id: self.chain_id,
485 next_block_height: self.height,
486 local_time: self.transaction_tracker.local_time(),
487 };
488 let sender = self.execution_state_sender.clone();
489
490 let txn_tracker = TransactionTracker::default()
491 .with_blobs(self.transaction_tracker.created_blobs().clone());
492
493 let timeout = self
494 .resource_controller
495 .remaining_service_oracle_execution_time()?;
496 let execution_start = Instant::now();
497 let deadline = Some(execution_start + timeout);
498
499 let mut service_runtime =
500 ServiceSyncRuntime::new_with_txn_tracker(sender, context, deadline, txn_tracker);
501
502 let result = service_runtime.run_query(application_id, query);
503
504 self.resource_controller
507 .track_service_oracle_execution(execution_start.elapsed())?;
508
509 let QueryOutcome {
510 response,
511 operations,
512 } = result?;
513
514 self.resource_controller
515 .track_service_oracle_response(response.len())?;
516
517 self.scheduled_operations.extend(operations);
518 Ok(response)
519 }
520}
521
522impl SyncRuntimeInternal<UserServiceInstance> {
523 fn load_service_instance(
525 &mut self,
526 this: ServiceSyncRuntimeHandle,
527 id: ApplicationId,
528 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
529 match self.loaded_applications.entry(id) {
530 #[cfg(web)]
532 hash_map::Entry::Vacant(_) => {
533 drop(this);
534 Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
535 id,
536 )))
537 }
538 #[cfg(not(web))]
539 hash_map::Entry::Vacant(entry) => {
540 let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
541 let (code, description, txn_tracker_moved) = self
542 .execution_state_sender
543 .send_request(move |callback| ExecutionRequest::LoadService {
544 id,
545 callback,
546 txn_tracker: txn_tracker_moved,
547 })?
548 .recv_response()?;
549 self.transaction_tracker = txn_tracker_moved;
550
551 let instance = code.instantiate(this)?;
552 Ok(entry
553 .insert(LoadedApplication::new(instance, description))
554 .clone())
555 }
556 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
557 }
558 }
559}
560
561impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
562 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
563 let handle = self.0.take().expect(
564 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
565 );
566 let runtime = Arc::into_inner(handle.0)?
567 .into_inner()
568 .expect("`SyncRuntime` should run in a single thread which should not panic");
569 Some(runtime)
570 }
571}
572
573impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
574 for SyncRuntimeHandle<UserInstance>
575{
576 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
577 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
578 }
579}
580
581impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
582 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
583 self.0
584 .try_lock()
585 .expect("Synchronous runtimes run on a single execution thread")
586 }
587}
588
589impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
590where
591 Self: ContractOrServiceRuntime,
592{
593 type Read = ();
594 type ReadValueBytes = u32;
595 type ContainsKey = u32;
596 type ContainsKeys = u32;
597 type ReadMultiValuesBytes = u32;
598 type FindKeysByPrefix = u32;
599 type FindKeyValuesByPrefix = u32;
600
601 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
602 let mut this = self.inner();
603 let chain_id = this.chain_id;
604 this.resource_controller.track_runtime_chain_id()?;
605 Ok(chain_id)
606 }
607
608 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
609 let mut this = self.inner();
610 let height = this.height;
611 this.resource_controller.track_runtime_block_height()?;
612 Ok(height)
613 }
614
615 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
616 let mut this = self.inner();
617 let application_id = this.current_application().id;
618 this.resource_controller.track_runtime_application_id()?;
619 Ok(application_id)
620 }
621
622 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
623 let mut this = self.inner();
624 let application_creator_chain_id = this.current_application().description.creator_chain_id;
625 this.resource_controller.track_runtime_application_id()?;
626 Ok(application_creator_chain_id)
627 }
628
629 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
630 let mut this = self.inner();
631 let parameters = this.current_application().description.parameters.clone();
632 this.resource_controller
633 .track_runtime_application_parameters(¶meters)?;
634 Ok(parameters)
635 }
636
637 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
638 let mut this = self.inner();
639 let timestamp = this
640 .execution_state_sender
641 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
642 .recv_response()?;
643 this.resource_controller.track_runtime_timestamp()?;
644 Ok(timestamp)
645 }
646
647 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
648 let mut this = self.inner();
649 let balance = this
650 .execution_state_sender
651 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
652 .recv_response()?;
653 this.resource_controller.track_runtime_balance()?;
654 Ok(balance)
655 }
656
657 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
658 let mut this = self.inner();
659 let balance = this
660 .execution_state_sender
661 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
662 .recv_response()?;
663 this.resource_controller.track_runtime_balance()?;
664 Ok(balance)
665 }
666
667 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
668 let mut this = self.inner();
669 let owner_balances = this
670 .execution_state_sender
671 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
672 .recv_response()?;
673 this.resource_controller
674 .track_runtime_owner_balances(&owner_balances)?;
675 Ok(owner_balances)
676 }
677
678 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
679 let mut this = self.inner();
680 let owners = this
681 .execution_state_sender
682 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
683 .recv_response()?;
684 this.resource_controller.track_runtime_owners(&owners)?;
685 Ok(owners)
686 }
687
688 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
689 let mut this = self.inner();
690 let chain_ownership = this
691 .execution_state_sender
692 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
693 .recv_response()?;
694 this.resource_controller
695 .track_runtime_chain_ownership(&chain_ownership)?;
696 Ok(chain_ownership)
697 }
698
699 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
700 let mut this = self.inner();
701 let id = this.current_application().id;
702 this.resource_controller.track_read_operation()?;
703 let receiver = this
704 .execution_state_sender
705 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
706 let state = this.view_user_states.entry(id).or_default();
707 state.contains_key_queries.register(receiver)
708 }
709
710 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
711 let mut this = self.inner();
712 let id = this.current_application().id;
713 let state = this.view_user_states.entry(id).or_default();
714 let value = state.contains_key_queries.wait(*promise)?;
715 Ok(value)
716 }
717
718 fn contains_keys_new(
719 &mut self,
720 keys: Vec<Vec<u8>>,
721 ) -> Result<Self::ContainsKeys, ExecutionError> {
722 let mut this = self.inner();
723 let id = this.current_application().id;
724 this.resource_controller.track_read_operation()?;
725 let receiver = this
726 .execution_state_sender
727 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
728 let state = this.view_user_states.entry(id).or_default();
729 state.contains_keys_queries.register(receiver)
730 }
731
732 fn contains_keys_wait(
733 &mut self,
734 promise: &Self::ContainsKeys,
735 ) -> Result<Vec<bool>, ExecutionError> {
736 let mut this = self.inner();
737 let id = this.current_application().id;
738 let state = this.view_user_states.entry(id).or_default();
739 let value = state.contains_keys_queries.wait(*promise)?;
740 Ok(value)
741 }
742
743 fn read_multi_values_bytes_new(
744 &mut self,
745 keys: Vec<Vec<u8>>,
746 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
747 let mut this = self.inner();
748 let id = this.current_application().id;
749 this.resource_controller.track_read_operation()?;
750 let receiver = this.execution_state_sender.send_request(move |callback| {
751 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
752 })?;
753 let state = this.view_user_states.entry(id).or_default();
754 state.read_multi_values_queries.register(receiver)
755 }
756
757 fn read_multi_values_bytes_wait(
758 &mut self,
759 promise: &Self::ReadMultiValuesBytes,
760 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
761 let mut this = self.inner();
762 let id = this.current_application().id;
763 let state = this.view_user_states.entry(id).or_default();
764 let values = state.read_multi_values_queries.wait(*promise)?;
765 for value in &values {
766 if let Some(value) = &value {
767 this.resource_controller
768 .track_bytes_read(value.len() as u64)?;
769 }
770 }
771 Ok(values)
772 }
773
774 fn read_value_bytes_new(
775 &mut self,
776 key: Vec<u8>,
777 ) -> Result<Self::ReadValueBytes, ExecutionError> {
778 let mut this = self.inner();
779 let id = this.current_application().id;
780 this.resource_controller.track_read_operation()?;
781 let receiver = this
782 .execution_state_sender
783 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
784 let state = this.view_user_states.entry(id).or_default();
785 state.read_value_queries.register(receiver)
786 }
787
788 fn read_value_bytes_wait(
789 &mut self,
790 promise: &Self::ReadValueBytes,
791 ) -> Result<Option<Vec<u8>>, ExecutionError> {
792 let mut this = self.inner();
793 let id = this.current_application().id;
794 let value = {
795 let state = this.view_user_states.entry(id).or_default();
796 state.read_value_queries.wait(*promise)?
797 };
798 if let Some(value) = &value {
799 this.resource_controller
800 .track_bytes_read(value.len() as u64)?;
801 }
802 Ok(value)
803 }
804
805 fn find_keys_by_prefix_new(
806 &mut self,
807 key_prefix: Vec<u8>,
808 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
809 let mut this = self.inner();
810 let id = this.current_application().id;
811 this.resource_controller.track_read_operation()?;
812 let receiver = this.execution_state_sender.send_request(move |callback| {
813 ExecutionRequest::FindKeysByPrefix {
814 id,
815 key_prefix,
816 callback,
817 }
818 })?;
819 let state = this.view_user_states.entry(id).or_default();
820 state.find_keys_queries.register(receiver)
821 }
822
823 fn find_keys_by_prefix_wait(
824 &mut self,
825 promise: &Self::FindKeysByPrefix,
826 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
827 let mut this = self.inner();
828 let id = this.current_application().id;
829 let keys = {
830 let state = this.view_user_states.entry(id).or_default();
831 state.find_keys_queries.wait(*promise)?
832 };
833 let mut read_size = 0;
834 for key in &keys {
835 read_size += key.len();
836 }
837 this.resource_controller
838 .track_bytes_read(read_size as u64)?;
839 Ok(keys)
840 }
841
842 fn find_key_values_by_prefix_new(
843 &mut self,
844 key_prefix: Vec<u8>,
845 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
846 let mut this = self.inner();
847 let id = this.current_application().id;
848 this.resource_controller.track_read_operation()?;
849 let receiver = this.execution_state_sender.send_request(move |callback| {
850 ExecutionRequest::FindKeyValuesByPrefix {
851 id,
852 key_prefix,
853 callback,
854 }
855 })?;
856 let state = this.view_user_states.entry(id).or_default();
857 state.find_key_values_queries.register(receiver)
858 }
859
860 fn find_key_values_by_prefix_wait(
861 &mut self,
862 promise: &Self::FindKeyValuesByPrefix,
863 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
864 let mut this = self.inner();
865 let id = this.current_application().id;
866 let state = this.view_user_states.entry(id).or_default();
867 let key_values = state.find_key_values_queries.wait(*promise)?;
868 let mut read_size = 0;
869 for (key, value) in &key_values {
870 read_size += key.len() + value.len();
871 }
872 this.resource_controller
873 .track_bytes_read(read_size as u64)?;
874 Ok(key_values)
875 }
876
877 fn perform_http_request(
878 &mut self,
879 request: http::Request,
880 ) -> Result<http::Response, ExecutionError> {
881 let mut this = self.inner();
882 let app_permissions = this
883 .execution_state_sender
884 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
885 .recv_response()?;
886
887 let app_id = this.current_application().id;
888 ensure!(
889 app_permissions.can_make_http_requests(&app_id),
890 ExecutionError::UnauthorizedApplication(app_id)
891 );
892
893 this.resource_controller.track_http_request()?;
894
895 let response =
896 if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
897 match response {
898 OracleResponse::Http(response) => response,
899 _ => return Err(ExecutionError::OracleResponseMismatch),
900 }
901 } else {
902 this.execution_state_sender
903 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
904 request,
905 http_responses_are_oracle_responses:
906 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
907 callback,
908 })?
909 .recv_response()?
910 };
911 this.transaction_tracker
912 .add_oracle_response(OracleResponse::Http(response.clone()));
913 Ok(response)
914 }
915
916 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
917 let mut this = self.inner();
918 if !this
919 .transaction_tracker
920 .replay_oracle_response(OracleResponse::Assert)?
921 {
922 let local_time = this.transaction_tracker.local_time();
924 ensure!(
925 local_time < timestamp,
926 ExecutionError::AssertBefore {
927 timestamp,
928 local_time,
929 }
930 );
931 }
932 Ok(())
933 }
934
935 fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
936 let mut this = self.inner();
937 let blob_id = hash.into();
938 if let Some(content) = this.transaction_tracker.get_blob_content(&blob_id) {
939 return Ok(content.bytes().to_vec());
940 };
941 let (content, is_new) = this
942 .execution_state_sender
943 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
944 .recv_response()?;
945 if is_new {
946 this.transaction_tracker
947 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
948 }
949 Ok(content.into_vec_or_clone())
950 }
951
952 fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
953 let mut this = self.inner();
954 let blob_id = hash.into();
955 let is_new = this
956 .execution_state_sender
957 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
958 .recv_response()?;
959 if is_new {
960 this.transaction_tracker
961 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
962 }
963 Ok(())
964 }
965}
966
967trait ContractOrServiceRuntime {
970 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
976}
977
978impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
979 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
980}
981
982impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
983 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
984}
985
986impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
987 fn clone(&self) -> Self {
988 SyncRuntimeHandle(self.0.clone())
989 }
990}
991
992impl ContractSyncRuntime {
993 pub(crate) fn new(
994 execution_state_sender: ExecutionStateSender,
995 chain_id: ChainId,
996 refund_grant_to: Option<Account>,
997 resource_controller: ResourceController,
998 action: &UserAction,
999 txn_tracker: TransactionTracker,
1000 ) -> Self {
1001 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1002 SyncRuntimeInternal::new(
1003 chain_id,
1004 action.height(),
1005 action.round(),
1006 if let UserAction::Message(context, _) = action {
1007 Some(context.into())
1008 } else {
1009 None
1010 },
1011 execution_state_sender,
1012 None,
1013 refund_grant_to,
1014 resource_controller,
1015 txn_tracker,
1016 action.timestamp(),
1017 ),
1018 )))
1019 }
1020
1021 pub(crate) fn preload_contract(
1022 &self,
1023 id: ApplicationId,
1024 code: UserContractCode,
1025 description: ApplicationDescription,
1026 ) -> Result<(), ExecutionError> {
1027 let this = self
1028 .0
1029 .as_ref()
1030 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1031 let runtime_handle = this.clone();
1032 let mut this_guard = this.inner();
1033
1034 if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1035 entry.insert(LoadedApplication::new(
1036 code.instantiate(runtime_handle)?,
1037 description,
1038 ));
1039 this_guard.applications_to_finalize.push(id);
1040 }
1041
1042 Ok(())
1043 }
1044
1045 pub(crate) fn run_action(
1047 mut self,
1048 application_id: ApplicationId,
1049 chain_id: ChainId,
1050 action: UserAction,
1051 ) -> Result<(Option<Vec<u8>>, ResourceController, TransactionTracker), ExecutionError> {
1052 let result = self
1053 .deref_mut()
1054 .run_action(application_id, chain_id, action)?;
1055 let runtime = self
1056 .into_inner()
1057 .expect("Runtime clones should have been freed by now");
1058 Ok((
1059 result,
1060 runtime.resource_controller,
1061 runtime.transaction_tracker,
1062 ))
1063 }
1064}
1065
1066impl ContractSyncRuntimeHandle {
1067 fn run_action(
1068 &mut self,
1069 application_id: ApplicationId,
1070 chain_id: ChainId,
1071 action: UserAction,
1072 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1073 let finalize_context = FinalizeContext {
1074 authenticated_signer: action.signer(),
1075 chain_id,
1076 height: action.height(),
1077 round: action.round(),
1078 };
1079
1080 {
1081 let runtime = self.inner();
1082 assert_eq!(runtime.chain_id, chain_id);
1083 assert_eq!(runtime.height, action.height());
1084 }
1085
1086 let signer = action.signer();
1087 let closure = move |code: &mut UserContractInstance| match action {
1088 UserAction::Instantiate(_context, argument) => {
1089 code.instantiate(argument).map(|()| None)
1090 }
1091 UserAction::Operation(_context, operation) => {
1092 code.execute_operation(operation).map(Option::Some)
1093 }
1094 UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1095 UserAction::ProcessStreams(_context, updates) => {
1096 code.process_streams(updates).map(|()| None)
1097 }
1098 };
1099
1100 let result = self.execute(application_id, signer, closure)?;
1101 self.finalize(finalize_context)?;
1102 Ok(result)
1103 }
1104
1105 fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1107 let applications = mem::take(&mut self.inner().applications_to_finalize)
1108 .into_iter()
1109 .rev();
1110
1111 self.inner().is_finalizing = true;
1112
1113 for application in applications {
1114 self.execute(application, context.authenticated_signer, |contract| {
1115 contract.finalize().map(|_| None)
1116 })?;
1117 self.inner().loaded_applications.remove(&application);
1118 }
1119
1120 Ok(())
1121 }
1122
1123 fn execute(
1125 &mut self,
1126 application_id: ApplicationId,
1127 signer: Option<AccountOwner>,
1128 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1129 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1130 let contract = {
1131 let mut runtime = self.inner();
1132 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1133
1134 let status = ApplicationStatus {
1135 caller_id: None,
1136 id: application_id,
1137 description: application.description.clone(),
1138 signer,
1139 };
1140
1141 runtime.push_application(status);
1142
1143 application
1144 };
1145
1146 let result = closure(
1147 &mut contract
1148 .instance
1149 .try_lock()
1150 .expect("Application should not be already executing"),
1151 )?;
1152
1153 let mut runtime = self.inner();
1154 let application_status = runtime.pop_application();
1155 assert_eq!(application_status.caller_id, None);
1156 assert_eq!(application_status.id, application_id);
1157 assert_eq!(application_status.description, contract.description);
1158 assert_eq!(application_status.signer, signer);
1159 assert!(runtime.call_stack.is_empty());
1160
1161 Ok(result)
1162 }
1163}
1164
1165impl ContractRuntime for ContractSyncRuntimeHandle {
1166 fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1167 let this = self.inner();
1168 Ok(this.current_application().signer)
1169 }
1170
1171 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1172 Ok(self
1173 .inner()
1174 .executing_message
1175 .map(|metadata| metadata.is_bouncing))
1176 }
1177
1178 fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1179 Ok(self
1180 .inner()
1181 .executing_message
1182 .map(|metadata| metadata.origin))
1183 }
1184
1185 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1186 let this = self.inner();
1187 if this.call_stack.len() <= 1 {
1188 return Ok(None);
1189 }
1190 Ok(this.current_application().caller_id)
1191 }
1192
1193 fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1194 Ok(match vm_runtime {
1195 VmRuntime::Wasm => {
1196 self.inner()
1197 .resource_controller
1198 .policy()
1199 .maximum_wasm_fuel_per_block
1200 }
1201 VmRuntime::Evm => {
1202 self.inner()
1203 .resource_controller
1204 .policy()
1205 .maximum_evm_fuel_per_block
1206 }
1207 })
1208 }
1209
1210 fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1211 Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1212 }
1213
1214 fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1215 let mut this = self.inner();
1216 this.resource_controller.track_fuel(fuel, vm_runtime)
1217 }
1218
1219 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1220 let mut this = self.inner();
1221 let application = this.current_application();
1222 let application_id = application.id;
1223 let authenticated_signer = application.signer;
1224 let mut refund_grant_to = this.refund_grant_to;
1225
1226 let grant = this
1227 .resource_controller
1228 .policy()
1229 .total_price(&message.grant)?;
1230 if grant.is_zero() {
1231 refund_grant_to = None;
1232 } else {
1233 this.resource_controller.track_grant(grant)?;
1234 }
1235 let kind = if message.is_tracked {
1236 MessageKind::Tracked
1237 } else {
1238 MessageKind::Simple
1239 };
1240
1241 this.transaction_tracker
1242 .add_outgoing_message(OutgoingMessage {
1243 destination: message.destination,
1244 authenticated_signer,
1245 refund_grant_to,
1246 grant,
1247 kind,
1248 message: Message::User {
1249 application_id,
1250 bytes: message.message,
1251 },
1252 });
1253
1254 Ok(())
1255 }
1256
1257 fn transfer(
1258 &mut self,
1259 source: AccountOwner,
1260 destination: Account,
1261 amount: Amount,
1262 ) -> Result<(), ExecutionError> {
1263 let mut this = self.inner();
1264 let current_application = this.current_application();
1265 let application_id = current_application.id;
1266 let signer = current_application.signer;
1267
1268 let maybe_message = this
1269 .execution_state_sender
1270 .send_request(|callback| ExecutionRequest::Transfer {
1271 source,
1272 destination,
1273 amount,
1274 signer,
1275 application_id,
1276 callback,
1277 })?
1278 .recv_response()?;
1279
1280 this.transaction_tracker
1281 .add_outgoing_messages(maybe_message);
1282 Ok(())
1283 }
1284
1285 fn claim(
1286 &mut self,
1287 source: Account,
1288 destination: Account,
1289 amount: Amount,
1290 ) -> Result<(), ExecutionError> {
1291 let mut this = self.inner();
1292 let current_application = this.current_application();
1293 let application_id = current_application.id;
1294 let signer = current_application.signer;
1295
1296 let maybe_message = this
1297 .execution_state_sender
1298 .send_request(|callback| ExecutionRequest::Claim {
1299 source,
1300 destination,
1301 amount,
1302 signer,
1303 application_id,
1304 callback,
1305 })?
1306 .recv_response()?;
1307 this.transaction_tracker
1308 .add_outgoing_messages(maybe_message);
1309 Ok(())
1310 }
1311
1312 fn try_call_application(
1313 &mut self,
1314 authenticated: bool,
1315 callee_id: ApplicationId,
1316 argument: Vec<u8>,
1317 ) -> Result<Vec<u8>, ExecutionError> {
1318 let contract = self
1319 .inner()
1320 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1321
1322 let value = contract
1323 .try_lock()
1324 .expect("Applications should not have reentrant calls")
1325 .execute_operation(argument)?;
1326
1327 self.inner().finish_call()?;
1328
1329 Ok(value)
1330 }
1331
1332 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1333 let mut this = self.inner();
1334 ensure!(
1335 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1336 ExecutionError::StreamNameTooLong
1337 );
1338 let application_id = GenericApplicationId::User(this.current_application().id);
1339 let stream_id = StreamId {
1340 stream_name,
1341 application_id,
1342 };
1343 let index = this
1344 .execution_state_sender
1345 .send_request(|callback| ExecutionRequest::NextEventIndex {
1346 stream_id: stream_id.clone(),
1347 callback,
1348 })?
1349 .recv_response()?;
1350 this.resource_controller
1352 .track_bytes_written(value.len() as u64)?;
1353 this.transaction_tracker.add_event(stream_id, index, value);
1354 Ok(index)
1355 }
1356
1357 fn read_event(
1358 &mut self,
1359 chain_id: ChainId,
1360 stream_name: StreamName,
1361 index: u32,
1362 ) -> Result<Vec<u8>, ExecutionError> {
1363 let mut this = self.inner();
1364 ensure!(
1365 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1366 ExecutionError::StreamNameTooLong
1367 );
1368 let application_id = GenericApplicationId::User(this.current_application().id);
1369 let stream_id = StreamId {
1370 stream_name,
1371 application_id,
1372 };
1373 let event_id = EventId {
1374 stream_id,
1375 index,
1376 chain_id,
1377 };
1378 let event = match this.transaction_tracker.next_replayed_oracle_response()? {
1379 None => this
1380 .execution_state_sender
1381 .send_request(|callback| ExecutionRequest::ReadEvent {
1382 event_id: event_id.clone(),
1383 callback,
1384 })?
1385 .recv_response()?,
1386 Some(OracleResponse::Event(recorded_event_id, event))
1387 if recorded_event_id == event_id =>
1388 {
1389 event
1390 }
1391 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
1392 };
1393 this.transaction_tracker
1394 .add_oracle_response(OracleResponse::Event(event_id, event.clone()));
1395 this.resource_controller
1397 .track_bytes_read(event.len() as u64)?;
1398 Ok(event)
1399 }
1400
1401 fn subscribe_to_events(
1402 &mut self,
1403 chain_id: ChainId,
1404 application_id: ApplicationId,
1405 stream_name: StreamName,
1406 ) -> Result<(), ExecutionError> {
1407 let mut this = self.inner();
1408 ensure!(
1409 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1410 ExecutionError::StreamNameTooLong
1411 );
1412 let stream_id = StreamId {
1413 stream_name,
1414 application_id: application_id.into(),
1415 };
1416 let subscriber_app_id = this.current_application().id;
1417 let next_index = this
1418 .execution_state_sender
1419 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1420 chain_id,
1421 stream_id: stream_id.clone(),
1422 subscriber_app_id,
1423 callback,
1424 })?
1425 .recv_response()?;
1426 this.transaction_tracker.add_stream_to_process(
1427 subscriber_app_id,
1428 chain_id,
1429 stream_id,
1430 0,
1431 next_index,
1432 );
1433 Ok(())
1434 }
1435
1436 fn unsubscribe_from_events(
1437 &mut self,
1438 chain_id: ChainId,
1439 application_id: ApplicationId,
1440 stream_name: StreamName,
1441 ) -> Result<(), ExecutionError> {
1442 let mut this = self.inner();
1443 ensure!(
1444 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1445 ExecutionError::StreamNameTooLong
1446 );
1447 let stream_id = StreamId {
1448 stream_name,
1449 application_id: application_id.into(),
1450 };
1451 let subscriber_app_id = this.current_application().id;
1452 this.execution_state_sender
1453 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1454 chain_id,
1455 stream_id: stream_id.clone(),
1456 subscriber_app_id,
1457 callback,
1458 })?
1459 .recv_response()?;
1460 this.transaction_tracker
1461 .remove_stream_to_process(application_id, chain_id, stream_id);
1462 Ok(())
1463 }
1464
1465 fn query_service(
1466 &mut self,
1467 application_id: ApplicationId,
1468 query: Vec<u8>,
1469 ) -> Result<Vec<u8>, ExecutionError> {
1470 let mut this = self.inner();
1471
1472 let app_permissions = this
1473 .execution_state_sender
1474 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1475 .recv_response()?;
1476
1477 let app_id = this.current_application().id;
1478 ensure!(
1479 app_permissions.can_call_services(&app_id),
1480 ExecutionError::UnauthorizedApplication(app_id)
1481 );
1482
1483 this.resource_controller.track_service_oracle_call()?;
1484 let response =
1485 if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1486 match response {
1487 OracleResponse::Service(bytes) => bytes,
1488 _ => return Err(ExecutionError::OracleResponseMismatch),
1489 }
1490 } else {
1491 this.run_service_oracle_query(application_id, query)?
1492 };
1493
1494 this.transaction_tracker
1495 .add_oracle_response(OracleResponse::Service(response.clone()));
1496
1497 Ok(response)
1498 }
1499
1500 fn open_chain(
1501 &mut self,
1502 ownership: ChainOwnership,
1503 application_permissions: ApplicationPermissions,
1504 balance: Amount,
1505 ) -> Result<ChainId, ExecutionError> {
1506 let parent_id = self.inner().chain_id;
1507 let block_height = self.block_height()?;
1508
1509 let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1510 let timestamp = self.inner().user_context;
1511
1512 let (chain_id, txn_tracker_moved) = self
1513 .inner()
1514 .execution_state_sender
1515 .send_request(|callback| ExecutionRequest::OpenChain {
1516 ownership,
1517 balance,
1518 parent_id,
1519 block_height,
1520 timestamp,
1521 application_permissions,
1522 callback,
1523 txn_tracker: txn_tracker_moved,
1524 })?
1525 .recv_response()?;
1526
1527 self.inner().transaction_tracker = txn_tracker_moved;
1528
1529 Ok(chain_id)
1530 }
1531
1532 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1533 let this = self.inner();
1534 let application_id = this.current_application().id;
1535 this.execution_state_sender
1536 .send_request(|callback| ExecutionRequest::CloseChain {
1537 application_id,
1538 callback,
1539 })?
1540 .recv_response()?
1541 }
1542
1543 fn change_application_permissions(
1544 &mut self,
1545 application_permissions: ApplicationPermissions,
1546 ) -> Result<(), ExecutionError> {
1547 let this = self.inner();
1548 let application_id = this.current_application().id;
1549 this.execution_state_sender
1550 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1551 application_id,
1552 application_permissions,
1553 callback,
1554 })?
1555 .recv_response()?
1556 }
1557
1558 fn create_application(
1559 &mut self,
1560 module_id: ModuleId,
1561 parameters: Vec<u8>,
1562 argument: Vec<u8>,
1563 required_application_ids: Vec<ApplicationId>,
1564 ) -> Result<ApplicationId, ExecutionError> {
1565 let chain_id = self.inner().chain_id;
1566 let block_height = self.block_height()?;
1567
1568 let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1569
1570 let CreateApplicationResult {
1571 app_id,
1572 txn_tracker: txn_tracker_moved,
1573 } = self
1574 .inner()
1575 .execution_state_sender
1576 .send_request(move |callback| ExecutionRequest::CreateApplication {
1577 chain_id,
1578 block_height,
1579 module_id,
1580 parameters,
1581 required_application_ids,
1582 callback,
1583 txn_tracker: txn_tracker_moved,
1584 })?
1585 .recv_response()??;
1586
1587 self.inner().transaction_tracker = txn_tracker_moved;
1588
1589 let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1590
1591 contract
1592 .try_lock()
1593 .expect("Applications should not have reentrant calls")
1594 .instantiate(argument)?;
1595
1596 self.inner().finish_call()?;
1597
1598 Ok(app_id)
1599 }
1600
1601 fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1602 let blob = Blob::new_data(bytes);
1603 let blob_id = blob.id();
1604 self.inner().transaction_tracker.add_created_blob(blob);
1605 Ok(DataBlobHash(blob_id.hash))
1606 }
1607
1608 fn publish_module(
1609 &mut self,
1610 contract: Bytecode,
1611 service: Bytecode,
1612 vm_runtime: VmRuntime,
1613 ) -> Result<ModuleId, ExecutionError> {
1614 let (blobs, module_id) =
1615 crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1616 for blob in blobs {
1617 self.inner().transaction_tracker.add_created_blob(blob);
1618 }
1619 Ok(module_id)
1620 }
1621
1622 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1623 let mut this = self.inner();
1624 let round =
1625 if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1626 match response {
1627 OracleResponse::Round(round) => round,
1628 _ => return Err(ExecutionError::OracleResponseMismatch),
1629 }
1630 } else {
1631 this.round
1632 };
1633 this.transaction_tracker
1634 .add_oracle_response(OracleResponse::Round(round));
1635 Ok(round)
1636 }
1637
1638 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1639 let mut this = self.inner();
1640 let id = this.current_application().id;
1641 let state = this.view_user_states.entry(id).or_default();
1642 state.force_all_pending_queries()?;
1643 this.resource_controller.track_write_operations(
1644 batch
1645 .num_operations()
1646 .try_into()
1647 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1648 )?;
1649 this.resource_controller
1650 .track_bytes_written(batch.size() as u64)?;
1651 this.execution_state_sender
1652 .send_request(|callback| ExecutionRequest::WriteBatch {
1653 id,
1654 batch,
1655 callback,
1656 })?
1657 .recv_response()?;
1658 Ok(())
1659 }
1660}
1661
1662impl ServiceSyncRuntime {
1663 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1665 let mut txn_tracker = TransactionTracker::default();
1666 txn_tracker.set_local_time(context.local_time);
1667 Self::new_with_txn_tracker(execution_state_sender, context, None, txn_tracker)
1668 }
1669
1670 pub fn new_with_txn_tracker(
1672 execution_state_sender: ExecutionStateSender,
1673 context: QueryContext,
1674 deadline: Option<Instant>,
1675 txn_tracker: TransactionTracker,
1676 ) -> Self {
1677 let runtime = SyncRuntime(Some(
1678 SyncRuntimeInternal::new(
1679 context.chain_id,
1680 context.next_block_height,
1681 None,
1682 None,
1683 execution_state_sender,
1684 deadline,
1685 None,
1686 ResourceController::default(),
1687 txn_tracker,
1688 (),
1689 )
1690 .into(),
1691 ));
1692
1693 ServiceSyncRuntime {
1694 runtime,
1695 current_context: context,
1696 }
1697 }
1698
1699 pub(crate) fn preload_service(
1701 &self,
1702 id: ApplicationId,
1703 code: UserServiceCode,
1704 description: ApplicationDescription,
1705 ) -> Result<(), ExecutionError> {
1706 let this = self
1707 .runtime
1708 .0
1709 .as_ref()
1710 .expect("services shouldn't be preloaded while the runtime is being dropped");
1711 let runtime_handle = this.clone();
1712 let mut this_guard = this.inner();
1713
1714 if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1715 entry.insert(LoadedApplication::new(
1716 code.instantiate(runtime_handle)?,
1717 description,
1718 ));
1719 this_guard.applications_to_finalize.push(id);
1720 }
1721
1722 Ok(())
1723 }
1724
1725 pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1727 while let Ok(request) = incoming_requests.recv() {
1728 let ServiceRuntimeRequest::Query {
1729 application_id,
1730 context,
1731 query,
1732 callback,
1733 } = request;
1734
1735 self.prepare_for_query(context);
1736
1737 let _ = callback.send(self.run_query(application_id, query));
1738 }
1739 }
1740
1741 pub(crate) fn prepare_for_query(&mut self, new_context: QueryContext) {
1743 let expected_context = QueryContext {
1744 local_time: new_context.local_time,
1745 ..self.current_context
1746 };
1747
1748 if new_context != expected_context {
1749 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1750 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1751 } else {
1752 self.handle_mut()
1753 .inner()
1754 .transaction_tracker
1755 .set_local_time(new_context.local_time);
1756 }
1757 }
1758
1759 pub(crate) fn run_query(
1761 &mut self,
1762 application_id: ApplicationId,
1763 query: Vec<u8>,
1764 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1765 let this = self.handle_mut();
1766 let response = this.try_query_application(application_id, query)?;
1767 let operations = mem::take(&mut this.inner().scheduled_operations);
1768
1769 Ok(QueryOutcome {
1770 response,
1771 operations,
1772 })
1773 }
1774
1775 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1777 self.runtime.0.as_mut().expect(
1778 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1779 )
1780 }
1781}
1782
1783impl ServiceRuntime for ServiceSyncRuntimeHandle {
1784 fn try_query_application(
1786 &mut self,
1787 queried_id: ApplicationId,
1788 argument: Vec<u8>,
1789 ) -> Result<Vec<u8>, ExecutionError> {
1790 let service = {
1791 let mut this = self.inner();
1792
1793 let application = this.load_service_instance(self.clone(), queried_id)?;
1795 this.push_application(ApplicationStatus {
1797 caller_id: None,
1798 id: queried_id,
1799 description: application.description,
1800 signer: None,
1801 });
1802 application.instance
1803 };
1804 let response = service
1805 .try_lock()
1806 .expect("Applications should not have reentrant calls")
1807 .handle_query(argument)?;
1808 self.inner().pop_application();
1809 Ok(response)
1810 }
1811
1812 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1813 let mut this = self.inner();
1814 let application_id = this.current_application().id;
1815
1816 this.scheduled_operations.push(Operation::User {
1817 application_id,
1818 bytes: operation,
1819 });
1820
1821 Ok(())
1822 }
1823
1824 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1825 if let Some(deadline) = self.inner().deadline {
1826 if Instant::now() >= deadline {
1827 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1828 }
1829 }
1830 Ok(())
1831 }
1832}
1833
1834pub enum ServiceRuntimeRequest {
1836 Query {
1837 application_id: ApplicationId,
1838 context: QueryContext,
1839 query: Vec<u8>,
1840 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1841 },
1842}
1843
1844#[derive(Clone, Copy, Debug)]
1846struct ExecutingMessage {
1847 is_bouncing: bool,
1848 origin: ChainId,
1849}
1850
1851impl From<&MessageContext> for ExecutingMessage {
1852 fn from(context: &MessageContext) -> Self {
1853 ExecutingMessage {
1854 is_bouncing: context.is_bouncing,
1855 origin: context.origin,
1856 }
1857 }
1858}
1859
1860pub fn create_bytecode_blobs_sync(
1862 contract: Bytecode,
1863 service: Bytecode,
1864 vm_runtime: VmRuntime,
1865) -> (Vec<Blob>, ModuleId) {
1866 match vm_runtime {
1867 VmRuntime::Wasm => {
1868 let compressed_contract = contract.compress();
1869 let compressed_service = service.compress();
1870 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1871 let service_blob = Blob::new_service_bytecode(compressed_service);
1872 let module_id =
1873 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1874 (vec![contract_blob, service_blob], module_id)
1875 }
1876 VmRuntime::Evm => {
1877 let compressed_contract = contract.compress();
1878 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1879 let module_id = ModuleId::new(
1880 evm_contract_blob.id().hash,
1881 evm_contract_blob.id().hash,
1882 vm_runtime,
1883 );
1884 (vec![evm_contract_blob], module_id)
1885 }
1886 }
1887}