1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9 time::Instant,
10};
11
12use custom_debug_derive::Debug;
13use linera_base::{
14 crypto::CryptoHash,
15 data_types::{
16 Amount, ApplicationPermissions, ArithmeticError, BlockHeight, OracleResponse,
17 SendMessageRequest, Timestamp,
18 },
19 ensure, http,
20 identifiers::{
21 Account, AccountOwner, BlobId, BlobType, ChainId, ChannelFullName, ChannelName, EventId,
22 GenericApplicationId, MessageId, StreamId, StreamName,
23 },
24 ownership::ChainOwnership,
25};
26use linera_views::batch::Batch;
27use oneshot::Receiver;
28
29use crate::{
30 execution::UserAction,
31 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
32 resources::ResourceController,
33 system::CreateApplicationResult,
34 util::{ReceiverExt, UnboundedSenderExt},
35 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, ExecutionError,
36 FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation, OperationContext,
37 OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, TransactionTracker,
38 UserContractCode, UserContractInstance, UserServiceCode, UserServiceInstance,
39 MAX_STREAM_NAME_LEN,
40};
41
42#[cfg(test)]
43#[path = "unit_tests/runtime_tests.rs"]
44mod tests;
45
46#[derive(Debug)]
47pub struct SyncRuntime<UserInstance>(Option<SyncRuntimeHandle<UserInstance>>);
48
49pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
50
51pub struct ServiceSyncRuntime {
52 runtime: SyncRuntime<UserServiceInstance>,
53 current_context: QueryContext,
54}
55
56#[derive(Debug)]
57pub struct SyncRuntimeHandle<UserInstance>(Arc<Mutex<SyncRuntimeInternal<UserInstance>>>);
58
59pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
60pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
61
62#[derive(Debug)]
64pub struct SyncRuntimeInternal<UserInstance> {
65 chain_id: ChainId,
67 height: BlockHeight,
70 round: Option<u32>,
72 #[debug(skip_if = Option::is_none)]
74 authenticated_signer: Option<AccountOwner>,
75 #[debug(skip_if = Option::is_none)]
77 executing_message: Option<ExecutingMessage>,
78
79 execution_state_sender: ExecutionStateSender,
81
82 is_finalizing: bool,
86 applications_to_finalize: Vec<ApplicationId>,
88
89 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
91 call_stack: Vec<ApplicationStatus>,
93 active_applications: HashSet<ApplicationId>,
95 transaction_tracker: TransactionTracker,
97 scheduled_operations: Vec<Operation>,
99
100 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
102
103 deadline: Option<Instant>,
107
108 #[debug(skip_if = Option::is_none)]
110 refund_grant_to: Option<Account>,
111 resource_controller: ResourceController,
113}
114
115#[derive(Debug)]
117struct ApplicationStatus {
118 caller_id: Option<ApplicationId>,
120 id: ApplicationId,
122 description: ApplicationDescription,
124 signer: Option<AccountOwner>,
126}
127
128#[derive(Debug)]
130struct LoadedApplication<Instance> {
131 instance: Arc<Mutex<Instance>>,
132 description: ApplicationDescription,
133}
134
135impl<Instance> LoadedApplication<Instance> {
136 fn new(instance: Instance, description: ApplicationDescription) -> Self {
138 LoadedApplication {
139 instance: Arc::new(Mutex::new(instance)),
140 description,
141 }
142 }
143}
144
145impl<Instance> Clone for LoadedApplication<Instance> {
146 fn clone(&self) -> Self {
149 LoadedApplication {
150 instance: self.instance.clone(),
151 description: self.description.clone(),
152 }
153 }
154}
155
156#[derive(Debug)]
157enum Promise<T> {
158 Ready(T),
159 Pending(Receiver<T>),
160}
161
162impl<T> Promise<T> {
163 fn force(&mut self) -> Result<(), ExecutionError> {
164 if let Promise::Pending(receiver) = self {
165 let value = receiver
166 .recv_ref()
167 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
168 *self = Promise::Ready(value);
169 }
170 Ok(())
171 }
172
173 fn read(self) -> Result<T, ExecutionError> {
174 match self {
175 Promise::Pending(receiver) => {
176 let value = receiver.recv_response()?;
177 Ok(value)
178 }
179 Promise::Ready(value) => Ok(value),
180 }
181 }
182}
183
184#[derive(Debug, Default)]
186struct QueryManager<T> {
187 pending_queries: BTreeMap<u32, Promise<T>>,
189 query_count: u32,
191 active_query_count: u32,
193}
194
195impl<T> QueryManager<T> {
196 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
197 let id = self.query_count;
198 self.pending_queries.insert(id, Promise::Pending(receiver));
199 self.query_count = self
200 .query_count
201 .checked_add(1)
202 .ok_or(ArithmeticError::Overflow)?;
203 self.active_query_count = self
204 .active_query_count
205 .checked_add(1)
206 .ok_or(ArithmeticError::Overflow)?;
207 Ok(id)
208 }
209
210 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
211 let promise = self
212 .pending_queries
213 .remove(&id)
214 .ok_or(ExecutionError::InvalidPromise)?;
215 let value = promise.read()?;
216 self.active_query_count -= 1;
217 Ok(value)
218 }
219
220 fn force_all(&mut self) -> Result<(), ExecutionError> {
221 for promise in self.pending_queries.values_mut() {
222 promise.force()?;
223 }
224 Ok(())
225 }
226}
227
228type Keys = Vec<Vec<u8>>;
229type Value = Vec<u8>;
230type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
231
232#[derive(Debug, Default)]
233struct ViewUserState {
234 contains_key_queries: QueryManager<bool>,
236 contains_keys_queries: QueryManager<Vec<bool>>,
238 read_value_queries: QueryManager<Option<Value>>,
240 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
242 find_keys_queries: QueryManager<Keys>,
244 find_key_values_queries: QueryManager<KeyValues>,
246}
247
248impl ViewUserState {
249 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
250 self.contains_key_queries.force_all()?;
251 self.contains_keys_queries.force_all()?;
252 self.read_value_queries.force_all()?;
253 self.read_multi_values_queries.force_all()?;
254 self.find_keys_queries.force_all()?;
255 self.find_key_values_queries.force_all()?;
256 Ok(())
257 }
258}
259
260impl<UserInstance> Deref for SyncRuntime<UserInstance> {
261 type Target = SyncRuntimeHandle<UserInstance>;
262
263 fn deref(&self) -> &Self::Target {
264 self.0.as_ref().expect(
265 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
266 )
267 }
268}
269
270impl<UserInstance> DerefMut for SyncRuntime<UserInstance> {
271 fn deref_mut(&mut self) -> &mut Self::Target {
272 self.0.as_mut().expect(
273 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
274 )
275 }
276}
277
278impl<UserInstance> Drop for SyncRuntime<UserInstance> {
279 fn drop(&mut self) {
280 if let Some(handle) = self.0.take() {
283 handle.inner().loaded_applications.clear();
284 }
285 }
286}
287
288impl<UserInstance> SyncRuntimeInternal<UserInstance> {
289 #[expect(clippy::too_many_arguments)]
290 fn new(
291 chain_id: ChainId,
292 height: BlockHeight,
293 round: Option<u32>,
294 authenticated_signer: Option<AccountOwner>,
295 executing_message: Option<ExecutingMessage>,
296 execution_state_sender: ExecutionStateSender,
297 deadline: Option<Instant>,
298 refund_grant_to: Option<Account>,
299 resource_controller: ResourceController,
300 transaction_tracker: TransactionTracker,
301 ) -> Self {
302 Self {
303 chain_id,
304 height,
305 round,
306 authenticated_signer,
307 executing_message,
308 execution_state_sender,
309 is_finalizing: false,
310 applications_to_finalize: Vec::new(),
311 loaded_applications: HashMap::new(),
312 call_stack: Vec::new(),
313 active_applications: HashSet::new(),
314 view_user_states: BTreeMap::new(),
315 deadline,
316 refund_grant_to,
317 resource_controller,
318 transaction_tracker,
319 scheduled_operations: Vec::new(),
320 }
321 }
322
323 fn current_application(&self) -> &ApplicationStatus {
331 self.call_stack
332 .last()
333 .expect("Call stack is unexpectedly empty")
334 }
335
336 fn push_application(&mut self, status: ApplicationStatus) {
340 self.active_applications.insert(status.id);
341 self.call_stack.push(status);
342 }
343
344 fn pop_application(&mut self) -> ApplicationStatus {
352 let status = self
353 .call_stack
354 .pop()
355 .expect("Can't remove application from empty call stack");
356 assert!(self.active_applications.remove(&status.id));
357 status
358 }
359
360 fn check_for_reentrancy(
364 &mut self,
365 application_id: ApplicationId,
366 ) -> Result<(), ExecutionError> {
367 ensure!(
368 !self.active_applications.contains(&application_id),
369 ExecutionError::ReentrantCall(application_id)
370 );
371 Ok(())
372 }
373}
374
375impl SyncRuntimeInternal<UserContractInstance> {
376 fn load_contract_instance(
378 &mut self,
379 this: SyncRuntimeHandle<UserContractInstance>,
380 id: ApplicationId,
381 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
382 match self.loaded_applications.entry(id) {
383 #[cfg(web)]
385 hash_map::Entry::Vacant(_) => {
386 drop(this);
387 Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
388 id,
389 )))
390 }
391 #[cfg(not(web))]
392 hash_map::Entry::Vacant(entry) => {
393 let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
394 let (code, description, txn_tracker_moved) = self
395 .execution_state_sender
396 .send_request(move |callback| ExecutionRequest::LoadContract {
397 id,
398 callback,
399 txn_tracker: txn_tracker_moved,
400 })?
401 .recv_response()?;
402 self.transaction_tracker = txn_tracker_moved;
403
404 let instance = code.instantiate(this)?;
405
406 self.applications_to_finalize.push(id);
407 Ok(entry
408 .insert(LoadedApplication::new(instance, description))
409 .clone())
410 }
411 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
412 }
413 }
414
415 fn prepare_for_call(
417 &mut self,
418 this: ContractSyncRuntimeHandle,
419 authenticated: bool,
420 callee_id: ApplicationId,
421 ) -> Result<(Arc<Mutex<UserContractInstance>>, OperationContext), ExecutionError> {
422 self.check_for_reentrancy(callee_id)?;
423
424 ensure!(
425 !self.is_finalizing,
426 ExecutionError::CrossApplicationCallInFinalize {
427 caller_id: Box::new(self.current_application().id),
428 callee_id: Box::new(callee_id),
429 }
430 );
431
432 let application = self.load_contract_instance(this, callee_id)?;
434
435 let caller = self.current_application();
436 let caller_id = caller.id;
437 let caller_signer = caller.signer;
438 let authenticated_signer = match caller_signer {
440 Some(signer) if authenticated => Some(signer),
441 _ => None,
442 };
443 let authenticated_caller_id = authenticated.then_some(caller_id);
444 let callee_context = OperationContext {
445 chain_id: self.chain_id,
446 authenticated_signer,
447 authenticated_caller_id,
448 height: self.height,
449 round: self.round,
450 index: None,
451 };
452 self.push_application(ApplicationStatus {
453 caller_id: authenticated_caller_id,
454 id: callee_id,
455 description: application.description,
456 signer: authenticated_signer,
458 });
459 Ok((application.instance, callee_context))
460 }
461
462 fn finish_call(&mut self) -> Result<(), ExecutionError> {
464 self.pop_application();
465 Ok(())
466 }
467
468 fn run_service_oracle_query(
470 &mut self,
471 application_id: ApplicationId,
472 query: Vec<u8>,
473 ) -> Result<Vec<u8>, ExecutionError> {
474 let context = QueryContext {
475 chain_id: self.chain_id,
476 next_block_height: self.height,
477 local_time: self.transaction_tracker.local_time(),
478 };
479 let sender = self.execution_state_sender.clone();
480
481 let txn_tracker = TransactionTracker::default()
482 .with_blobs(self.transaction_tracker.created_blobs().clone());
483
484 let timeout = self
485 .resource_controller
486 .remaining_service_oracle_execution_time()?;
487 let execution_start = Instant::now();
488 let deadline = Some(execution_start + timeout);
489
490 let mut service_runtime =
491 ServiceSyncRuntime::new_with_txn_tracker(sender, context, deadline, txn_tracker);
492
493 let result = service_runtime.run_query(application_id, query);
494
495 self.resource_controller
498 .track_service_oracle_execution(execution_start.elapsed())?;
499
500 let QueryOutcome {
501 response,
502 operations,
503 } = result?;
504
505 self.resource_controller
506 .track_service_oracle_response(response.len())?;
507
508 self.scheduled_operations.extend(operations);
509 Ok(response)
510 }
511}
512
513impl SyncRuntimeInternal<UserServiceInstance> {
514 fn load_service_instance(
516 &mut self,
517 this: ServiceSyncRuntimeHandle,
518 id: ApplicationId,
519 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
520 match self.loaded_applications.entry(id) {
521 #[cfg(web)]
523 hash_map::Entry::Vacant(_) => {
524 drop(this);
525 Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
526 id,
527 )))
528 }
529 #[cfg(not(web))]
530 hash_map::Entry::Vacant(entry) => {
531 let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
532 let (code, description, txn_tracker_moved) = self
533 .execution_state_sender
534 .send_request(move |callback| ExecutionRequest::LoadService {
535 id,
536 callback,
537 txn_tracker: txn_tracker_moved,
538 })?
539 .recv_response()?;
540 self.transaction_tracker = txn_tracker_moved;
541
542 let instance = code.instantiate(this)?;
543 Ok(entry
544 .insert(LoadedApplication::new(instance, description))
545 .clone())
546 }
547 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
548 }
549 }
550}
551
552impl<UserInstance> SyncRuntime<UserInstance> {
553 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
554 let handle = self.0.take().expect(
555 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
556 );
557 let runtime = Arc::into_inner(handle.0)?
558 .into_inner()
559 .expect("`SyncRuntime` should run in a single thread which should not panic");
560 Some(runtime)
561 }
562}
563
564impl<UserInstance> From<SyncRuntimeInternal<UserInstance>> for SyncRuntimeHandle<UserInstance> {
565 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
566 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
567 }
568}
569
570impl<UserInstance> SyncRuntimeHandle<UserInstance> {
571 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
572 self.0
573 .try_lock()
574 .expect("Synchronous runtimes run on a single execution thread")
575 }
576}
577
578impl<UserInstance> BaseRuntime for SyncRuntimeHandle<UserInstance>
579where
580 Self: ContractOrServiceRuntime,
581{
582 type Read = ();
583 type ReadValueBytes = u32;
584 type ContainsKey = u32;
585 type ContainsKeys = u32;
586 type ReadMultiValuesBytes = u32;
587 type FindKeysByPrefix = u32;
588 type FindKeyValuesByPrefix = u32;
589
590 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
591 Ok(self.inner().chain_id)
592 }
593
594 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
595 Ok(self.inner().height)
596 }
597
598 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
599 Ok(self.inner().current_application().id)
600 }
601
602 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
603 Ok(self
604 .inner()
605 .current_application()
606 .description
607 .creator_chain_id)
608 }
609
610 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
611 Ok(self
612 .inner()
613 .current_application()
614 .description
615 .parameters
616 .clone())
617 }
618
619 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
620 self.inner()
621 .execution_state_sender
622 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
623 .recv_response()
624 }
625
626 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
627 self.inner()
628 .execution_state_sender
629 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
630 .recv_response()
631 }
632
633 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
634 self.inner()
635 .execution_state_sender
636 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
637 .recv_response()
638 }
639
640 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
641 self.inner()
642 .execution_state_sender
643 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
644 .recv_response()
645 }
646
647 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
648 self.inner()
649 .execution_state_sender
650 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
651 .recv_response()
652 }
653
654 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
655 self.inner()
656 .execution_state_sender
657 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
658 .recv_response()
659 }
660
661 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
662 let mut this = self.inner();
663 let id = this.current_application().id;
664 this.resource_controller.track_read_operations(1)?;
665 let receiver = this
666 .execution_state_sender
667 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
668 let state = this.view_user_states.entry(id).or_default();
669 state.contains_key_queries.register(receiver)
670 }
671
672 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
673 let mut this = self.inner();
674 let id = this.current_application().id;
675 let state = this.view_user_states.entry(id).or_default();
676 let value = state.contains_key_queries.wait(*promise)?;
677 Ok(value)
678 }
679
680 fn contains_keys_new(
681 &mut self,
682 keys: Vec<Vec<u8>>,
683 ) -> Result<Self::ContainsKeys, ExecutionError> {
684 let mut this = self.inner();
685 let id = this.current_application().id;
686 this.resource_controller.track_read_operations(1)?;
687 let receiver = this
688 .execution_state_sender
689 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
690 let state = this.view_user_states.entry(id).or_default();
691 state.contains_keys_queries.register(receiver)
692 }
693
694 fn contains_keys_wait(
695 &mut self,
696 promise: &Self::ContainsKeys,
697 ) -> Result<Vec<bool>, ExecutionError> {
698 let mut this = self.inner();
699 let id = this.current_application().id;
700 let state = this.view_user_states.entry(id).or_default();
701 let value = state.contains_keys_queries.wait(*promise)?;
702 Ok(value)
703 }
704
705 fn read_multi_values_bytes_new(
706 &mut self,
707 keys: Vec<Vec<u8>>,
708 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
709 let mut this = self.inner();
710 let id = this.current_application().id;
711 this.resource_controller.track_read_operations(1)?;
712 let receiver = this.execution_state_sender.send_request(move |callback| {
713 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
714 })?;
715 let state = this.view_user_states.entry(id).or_default();
716 state.read_multi_values_queries.register(receiver)
717 }
718
719 fn read_multi_values_bytes_wait(
720 &mut self,
721 promise: &Self::ReadMultiValuesBytes,
722 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
723 let mut this = self.inner();
724 let id = this.current_application().id;
725 let state = this.view_user_states.entry(id).or_default();
726 let values = state.read_multi_values_queries.wait(*promise)?;
727 for value in &values {
728 if let Some(value) = &value {
729 this.resource_controller
730 .track_bytes_read(value.len() as u64)?;
731 }
732 }
733 Ok(values)
734 }
735
736 fn read_value_bytes_new(
737 &mut self,
738 key: Vec<u8>,
739 ) -> Result<Self::ReadValueBytes, ExecutionError> {
740 let mut this = self.inner();
741 let id = this.current_application().id;
742 this.resource_controller.track_read_operations(1)?;
743 let receiver = this
744 .execution_state_sender
745 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
746 let state = this.view_user_states.entry(id).or_default();
747 state.read_value_queries.register(receiver)
748 }
749
750 fn read_value_bytes_wait(
751 &mut self,
752 promise: &Self::ReadValueBytes,
753 ) -> Result<Option<Vec<u8>>, ExecutionError> {
754 let mut this = self.inner();
755 let id = this.current_application().id;
756 let value = {
757 let state = this.view_user_states.entry(id).or_default();
758 state.read_value_queries.wait(*promise)?
759 };
760 if let Some(value) = &value {
761 this.resource_controller
762 .track_bytes_read(value.len() as u64)?;
763 }
764 Ok(value)
765 }
766
767 fn find_keys_by_prefix_new(
768 &mut self,
769 key_prefix: Vec<u8>,
770 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
771 let mut this = self.inner();
772 let id = this.current_application().id;
773 this.resource_controller.track_read_operations(1)?;
774 let receiver = this.execution_state_sender.send_request(move |callback| {
775 ExecutionRequest::FindKeysByPrefix {
776 id,
777 key_prefix,
778 callback,
779 }
780 })?;
781 let state = this.view_user_states.entry(id).or_default();
782 state.find_keys_queries.register(receiver)
783 }
784
785 fn find_keys_by_prefix_wait(
786 &mut self,
787 promise: &Self::FindKeysByPrefix,
788 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
789 let mut this = self.inner();
790 let id = this.current_application().id;
791 let keys = {
792 let state = this.view_user_states.entry(id).or_default();
793 state.find_keys_queries.wait(*promise)?
794 };
795 let mut read_size = 0;
796 for key in &keys {
797 read_size += key.len();
798 }
799 this.resource_controller
800 .track_bytes_read(read_size as u64)?;
801 Ok(keys)
802 }
803
804 fn find_key_values_by_prefix_new(
805 &mut self,
806 key_prefix: Vec<u8>,
807 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
808 let mut this = self.inner();
809 let id = this.current_application().id;
810 this.resource_controller.track_read_operations(1)?;
811 let receiver = this.execution_state_sender.send_request(move |callback| {
812 ExecutionRequest::FindKeyValuesByPrefix {
813 id,
814 key_prefix,
815 callback,
816 }
817 })?;
818 let state = this.view_user_states.entry(id).or_default();
819 state.find_key_values_queries.register(receiver)
820 }
821
822 fn find_key_values_by_prefix_wait(
823 &mut self,
824 promise: &Self::FindKeyValuesByPrefix,
825 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
826 let mut this = self.inner();
827 let id = this.current_application().id;
828 let state = this.view_user_states.entry(id).or_default();
829 let key_values = state.find_key_values_queries.wait(*promise)?;
830 let mut read_size = 0;
831 for (key, value) in &key_values {
832 read_size += key.len() + value.len();
833 }
834 this.resource_controller
835 .track_bytes_read(read_size as u64)?;
836 Ok(key_values)
837 }
838
839 fn perform_http_request(
840 &mut self,
841 request: http::Request,
842 ) -> Result<http::Response, ExecutionError> {
843 let mut this = self.inner();
844 let app_permissions = this
845 .execution_state_sender
846 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
847 .recv_response()?;
848
849 let app_id = this.current_application().id;
850 ensure!(
851 app_permissions.can_make_http_requests(&app_id),
852 ExecutionError::UnauthorizedApplication(app_id)
853 );
854
855 this.resource_controller.track_http_request()?;
856
857 let response =
858 if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
859 match response {
860 OracleResponse::Http(response) => response,
861 _ => return Err(ExecutionError::OracleResponseMismatch),
862 }
863 } else {
864 this.execution_state_sender
865 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
866 request,
867 http_responses_are_oracle_responses:
868 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
869 callback,
870 })?
871 .recv_response()?
872 };
873 this.transaction_tracker
874 .add_oracle_response(OracleResponse::Http(response.clone()));
875 Ok(response)
876 }
877
878 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
879 let mut this = self.inner();
880 if !this
881 .transaction_tracker
882 .replay_oracle_response(OracleResponse::Assert)?
883 {
884 let local_time = this.transaction_tracker.local_time();
886 ensure!(
887 local_time < timestamp,
888 ExecutionError::AssertBefore {
889 timestamp,
890 local_time,
891 }
892 );
893 }
894 Ok(())
895 }
896
897 fn read_data_blob(&mut self, hash: &CryptoHash) -> Result<Vec<u8>, ExecutionError> {
898 let mut this = self.inner();
899 let blob_id = BlobId::new(*hash, BlobType::Data);
900 let (blob_content, is_new) = this
901 .execution_state_sender
902 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
903 .recv_response()?;
904 if is_new {
905 this.transaction_tracker
906 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
907 }
908 Ok(blob_content.into_bytes().into_vec())
909 }
910
911 fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError> {
912 let mut this = self.inner();
913 let blob_id = BlobId::new(*hash, BlobType::Data);
914 let is_new = this
915 .execution_state_sender
916 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
917 .recv_response()?;
918 if is_new {
919 this.transaction_tracker
920 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
921 }
922 Ok(())
923 }
924}
925
926trait ContractOrServiceRuntime {
929 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
935}
936
937impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
938 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
939}
940
941impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
942 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
943}
944
945impl<UserInstance> Clone for SyncRuntimeHandle<UserInstance> {
946 fn clone(&self) -> Self {
947 SyncRuntimeHandle(self.0.clone())
948 }
949}
950
951impl ContractSyncRuntime {
952 pub(crate) fn new(
953 execution_state_sender: ExecutionStateSender,
954 chain_id: ChainId,
955 refund_grant_to: Option<Account>,
956 resource_controller: ResourceController,
957 action: &UserAction,
958 txn_tracker: TransactionTracker,
959 ) -> Self {
960 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
961 SyncRuntimeInternal::new(
962 chain_id,
963 action.height(),
964 action.round(),
965 action.signer(),
966 if let UserAction::Message(context, _) = action {
967 Some(context.into())
968 } else {
969 None
970 },
971 execution_state_sender,
972 None,
973 refund_grant_to,
974 resource_controller,
975 txn_tracker,
976 ),
977 )))
978 }
979
980 pub(crate) fn preload_contract(
981 &self,
982 id: ApplicationId,
983 code: UserContractCode,
984 description: ApplicationDescription,
985 ) -> Result<(), ExecutionError> {
986 let this = self
987 .0
988 .as_ref()
989 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
990 let runtime_handle = this.clone();
991 let mut this_guard = this.inner();
992
993 if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
994 entry.insert(LoadedApplication::new(
995 code.instantiate(runtime_handle)?,
996 description,
997 ));
998 this_guard.applications_to_finalize.push(id);
999 }
1000
1001 Ok(())
1002 }
1003
1004 pub(crate) fn run_action(
1006 mut self,
1007 application_id: ApplicationId,
1008 chain_id: ChainId,
1009 action: UserAction,
1010 ) -> Result<(Option<Vec<u8>>, ResourceController, TransactionTracker), ExecutionError> {
1011 let result = self
1012 .deref_mut()
1013 .run_action(application_id, chain_id, action)?;
1014 let runtime = self
1015 .into_inner()
1016 .expect("Runtime clones should have been freed by now");
1017 Ok((
1018 result,
1019 runtime.resource_controller,
1020 runtime.transaction_tracker,
1021 ))
1022 }
1023}
1024
1025impl ContractSyncRuntimeHandle {
1026 fn run_action(
1027 &mut self,
1028 application_id: ApplicationId,
1029 chain_id: ChainId,
1030 action: UserAction,
1031 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1032 let finalize_context = FinalizeContext {
1033 authenticated_signer: action.signer(),
1034 chain_id,
1035 height: action.height(),
1036 round: action.round(),
1037 };
1038
1039 {
1040 let runtime = self.inner();
1041 assert_eq!(runtime.authenticated_signer, action.signer());
1042 assert_eq!(runtime.chain_id, chain_id);
1043 assert_eq!(runtime.height, action.height());
1044 }
1045
1046 let signer = action.signer();
1047 let closure = move |code: &mut UserContractInstance| match action {
1048 UserAction::Instantiate(context, argument) => {
1049 code.instantiate(context, argument).map(|()| None)
1050 }
1051 UserAction::Operation(context, operation) => {
1052 code.execute_operation(context, operation).map(Option::Some)
1053 }
1054 UserAction::Message(context, message) => {
1055 code.execute_message(context, message).map(|()| None)
1056 }
1057 };
1058
1059 let result = self.execute(application_id, signer, closure)?;
1060 self.finalize(finalize_context)?;
1061 Ok(result)
1062 }
1063
1064 fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1066 let applications = mem::take(&mut self.inner().applications_to_finalize)
1067 .into_iter()
1068 .rev();
1069
1070 self.inner().is_finalizing = true;
1071
1072 for application in applications {
1073 self.execute(application, context.authenticated_signer, |contract| {
1074 contract.finalize(context).map(|_| None)
1075 })?;
1076 self.inner().loaded_applications.remove(&application);
1077 }
1078
1079 Ok(())
1080 }
1081
1082 fn execute(
1084 &mut self,
1085 application_id: ApplicationId,
1086 signer: Option<AccountOwner>,
1087 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1088 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1089 let contract = {
1090 let mut runtime = self.inner();
1091 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1092
1093 let status = ApplicationStatus {
1094 caller_id: None,
1095 id: application_id,
1096 description: application.description.clone(),
1097 signer,
1098 };
1099
1100 runtime.push_application(status);
1101
1102 application
1103 };
1104
1105 let result = closure(
1106 &mut contract
1107 .instance
1108 .try_lock()
1109 .expect("Application should not be already executing"),
1110 )?;
1111
1112 let mut runtime = self.inner();
1113 let application_status = runtime.pop_application();
1114 assert_eq!(application_status.caller_id, None);
1115 assert_eq!(application_status.id, application_id);
1116 assert_eq!(application_status.description, contract.description);
1117 assert_eq!(application_status.signer, signer);
1118 assert!(runtime.call_stack.is_empty());
1119
1120 Ok(result)
1121 }
1122}
1123
1124impl ContractRuntime for ContractSyncRuntimeHandle {
1125 fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1126 Ok(self.inner().authenticated_signer)
1127 }
1128
1129 fn message_id(&mut self) -> Result<Option<MessageId>, ExecutionError> {
1130 Ok(self.inner().executing_message.map(|metadata| metadata.id))
1131 }
1132
1133 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1134 Ok(self
1135 .inner()
1136 .executing_message
1137 .map(|metadata| metadata.is_bouncing))
1138 }
1139
1140 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1141 let this = self.inner();
1142 if this.call_stack.len() <= 1 {
1143 return Ok(None);
1144 }
1145 Ok(this.current_application().caller_id)
1146 }
1147
1148 fn remaining_fuel(&mut self) -> Result<u64, ExecutionError> {
1149 Ok(self.inner().resource_controller.remaining_fuel())
1150 }
1151
1152 fn consume_fuel(&mut self, fuel: u64) -> Result<(), ExecutionError> {
1153 let mut this = self.inner();
1154 this.resource_controller.track_fuel(fuel)
1155 }
1156
1157 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1158 let mut this = self.inner();
1159 let application = this.current_application();
1160 let application_id = application.id;
1161 let authenticated_signer = application.signer;
1162 let mut refund_grant_to = this.refund_grant_to;
1163
1164 let grant = this
1165 .resource_controller
1166 .policy
1167 .total_price(&message.grant)?;
1168 if grant.is_zero() {
1169 refund_grant_to = None;
1170 } else {
1171 this.resource_controller.track_grant(grant)?;
1172 }
1173 let kind = if message.is_tracked {
1174 MessageKind::Tracked
1175 } else {
1176 MessageKind::Simple
1177 };
1178
1179 this.transaction_tracker
1180 .add_outgoing_message(OutgoingMessage {
1181 destination: message.destination,
1182 authenticated_signer,
1183 refund_grant_to,
1184 grant,
1185 kind,
1186 message: Message::User {
1187 application_id,
1188 bytes: message.message,
1189 },
1190 })?;
1191
1192 Ok(())
1193 }
1194
1195 fn subscribe(&mut self, chain: ChainId, name: ChannelName) -> Result<(), ExecutionError> {
1196 let mut this = self.inner();
1197 let application_id = this.current_application().id;
1198 let full_name = ChannelFullName::new(name, application_id);
1199 this.transaction_tracker.subscribe(full_name, chain);
1200
1201 Ok(())
1202 }
1203
1204 fn unsubscribe(&mut self, chain: ChainId, name: ChannelName) -> Result<(), ExecutionError> {
1205 let mut this = self.inner();
1206 let application_id = this.current_application().id;
1207 let full_name = ChannelFullName::new(name, application_id);
1208 this.transaction_tracker.unsubscribe(full_name, chain);
1209
1210 Ok(())
1211 }
1212
1213 fn transfer(
1214 &mut self,
1215 source: AccountOwner,
1216 destination: Account,
1217 amount: Amount,
1218 ) -> Result<(), ExecutionError> {
1219 let mut this = self.inner();
1220 let current_application = this.current_application();
1221 let application_id = current_application.id;
1222 let signer = current_application.signer;
1223
1224 let maybe_message = this
1225 .execution_state_sender
1226 .send_request(|callback| ExecutionRequest::Transfer {
1227 source,
1228 destination,
1229 amount,
1230 signer,
1231 application_id,
1232 callback,
1233 })?
1234 .recv_response()?;
1235
1236 this.transaction_tracker
1237 .add_outgoing_messages(maybe_message)?;
1238 Ok(())
1239 }
1240
1241 fn claim(
1242 &mut self,
1243 source: Account,
1244 destination: Account,
1245 amount: Amount,
1246 ) -> Result<(), ExecutionError> {
1247 let mut this = self.inner();
1248 let current_application = this.current_application();
1249 let application_id = current_application.id;
1250 let signer = current_application.signer;
1251
1252 let message = this
1253 .execution_state_sender
1254 .send_request(|callback| ExecutionRequest::Claim {
1255 source,
1256 destination,
1257 amount,
1258 signer,
1259 application_id,
1260 callback,
1261 })?
1262 .recv_response()?;
1263 this.transaction_tracker.add_outgoing_message(message)?;
1264 Ok(())
1265 }
1266
1267 fn try_call_application(
1268 &mut self,
1269 authenticated: bool,
1270 callee_id: ApplicationId,
1271 argument: Vec<u8>,
1272 ) -> Result<Vec<u8>, ExecutionError> {
1273 let (contract, context) =
1274 self.inner()
1275 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1276
1277 let value = contract
1278 .try_lock()
1279 .expect("Applications should not have reentrant calls")
1280 .execute_operation(context, argument)?;
1281
1282 self.inner().finish_call()?;
1283
1284 Ok(value)
1285 }
1286
1287 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1288 let mut this = self.inner();
1289 ensure!(
1290 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1291 ExecutionError::StreamNameTooLong
1292 );
1293 let application_id = GenericApplicationId::User(this.current_application().id);
1294 let stream_id = StreamId {
1295 stream_name,
1296 application_id,
1297 };
1298 let index = this
1299 .execution_state_sender
1300 .send_request(|callback| ExecutionRequest::NextEventIndex {
1301 stream_id: stream_id.clone(),
1302 callback,
1303 })?
1304 .recv_response()?;
1305 this.resource_controller
1307 .track_bytes_written(value.len() as u64)?;
1308 this.transaction_tracker.add_event(stream_id, index, value);
1309 Ok(index)
1310 }
1311
1312 fn read_event(
1313 &mut self,
1314 chain_id: ChainId,
1315 stream_name: StreamName,
1316 index: u32,
1317 ) -> Result<Vec<u8>, ExecutionError> {
1318 let mut this = self.inner();
1319 ensure!(
1320 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1321 ExecutionError::StreamNameTooLong
1322 );
1323 let application_id = GenericApplicationId::User(this.current_application().id);
1324 let stream_id = StreamId {
1325 stream_name,
1326 application_id,
1327 };
1328 let event_id = EventId {
1329 stream_id,
1330 index,
1331 chain_id,
1332 };
1333 let event = this
1334 .execution_state_sender
1335 .send_request(|callback| ExecutionRequest::ReadEvent {
1336 event_id: event_id.clone(),
1337 callback,
1338 })?
1339 .recv_response()?;
1340 this.resource_controller
1342 .track_bytes_read(event.len() as u64)?;
1343 this.transaction_tracker
1344 .replay_oracle_response(OracleResponse::Event(event_id, event.clone()))?;
1345 Ok(event)
1346 }
1347
1348 fn subscribe_to_events(
1349 &mut self,
1350 chain_id: ChainId,
1351 application_id: ApplicationId,
1352 stream_name: StreamName,
1353 ) -> Result<(), ExecutionError> {
1354 let this = self.inner();
1355 ensure!(
1356 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1357 ExecutionError::StreamNameTooLong
1358 );
1359 let stream_id = StreamId {
1360 stream_name,
1361 application_id: application_id.into(),
1362 };
1363 let subscriber_app_id = this.current_application().id;
1364 this.execution_state_sender
1365 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1366 chain_id,
1367 stream_id,
1368 subscriber_app_id,
1369 callback,
1370 })?
1371 .recv_response()?;
1372 Ok(())
1373 }
1374
1375 fn unsubscribe_from_events(
1376 &mut self,
1377 chain_id: ChainId,
1378 application_id: ApplicationId,
1379 stream_name: StreamName,
1380 ) -> Result<(), ExecutionError> {
1381 let this = self.inner();
1382 ensure!(
1383 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1384 ExecutionError::StreamNameTooLong
1385 );
1386 let stream_id = StreamId {
1387 stream_name,
1388 application_id: application_id.into(),
1389 };
1390 let subscriber_app_id = this.current_application().id;
1391 this.execution_state_sender
1392 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1393 chain_id,
1394 stream_id,
1395 subscriber_app_id,
1396 callback,
1397 })?
1398 .recv_response()?;
1399 Ok(())
1400 }
1401
1402 fn query_service(
1403 &mut self,
1404 application_id: ApplicationId,
1405 query: Vec<u8>,
1406 ) -> Result<Vec<u8>, ExecutionError> {
1407 let mut this = self.inner();
1408
1409 let app_permissions = this
1410 .execution_state_sender
1411 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1412 .recv_response()?;
1413
1414 let app_id = this.current_application().id;
1415 ensure!(
1416 app_permissions.can_call_services(&app_id),
1417 ExecutionError::UnauthorizedApplication(app_id)
1418 );
1419
1420 this.resource_controller.track_service_oracle_call()?;
1421 let response =
1422 if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1423 match response {
1424 OracleResponse::Service(bytes) => bytes,
1425 _ => return Err(ExecutionError::OracleResponseMismatch),
1426 }
1427 } else {
1428 this.run_service_oracle_query(application_id, query)?
1429 };
1430
1431 this.transaction_tracker
1432 .add_oracle_response(OracleResponse::Service(response.clone()));
1433
1434 Ok(response)
1435 }
1436
1437 fn open_chain(
1438 &mut self,
1439 ownership: ChainOwnership,
1440 application_permissions: ApplicationPermissions,
1441 balance: Amount,
1442 ) -> Result<(MessageId, ChainId), ExecutionError> {
1443 let mut this = self.inner();
1444 let message_id = MessageId {
1445 chain_id: this.chain_id,
1446 height: this.height,
1447 index: this.transaction_tracker.next_message_index(),
1448 };
1449 let chain_id = ChainId::child(message_id);
1450 let open_chain_message = this
1451 .execution_state_sender
1452 .send_request(|callback| ExecutionRequest::OpenChain {
1453 ownership,
1454 balance,
1455 next_message_id: message_id,
1456 application_permissions,
1457 callback,
1458 })?
1459 .recv_response()?;
1460 this.transaction_tracker
1461 .add_outgoing_message(open_chain_message)?;
1462 Ok((message_id, chain_id))
1463 }
1464
1465 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1466 let this = self.inner();
1467 let application_id = this.current_application().id;
1468 this.execution_state_sender
1469 .send_request(|callback| ExecutionRequest::CloseChain {
1470 application_id,
1471 callback,
1472 })?
1473 .recv_response()?
1474 }
1475
1476 fn change_application_permissions(
1477 &mut self,
1478 application_permissions: ApplicationPermissions,
1479 ) -> Result<(), ExecutionError> {
1480 let this = self.inner();
1481 let application_id = this.current_application().id;
1482 this.execution_state_sender
1483 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1484 application_id,
1485 application_permissions,
1486 callback,
1487 })?
1488 .recv_response()?
1489 }
1490
1491 fn create_application(
1492 &mut self,
1493 module_id: ModuleId,
1494 parameters: Vec<u8>,
1495 argument: Vec<u8>,
1496 required_application_ids: Vec<ApplicationId>,
1497 ) -> Result<ApplicationId, ExecutionError> {
1498 let chain_id = self.inner().chain_id;
1499 let block_height = self.block_height()?;
1500
1501 let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1502
1503 let CreateApplicationResult {
1504 app_id,
1505 txn_tracker: txn_tracker_moved,
1506 } = self
1507 .inner()
1508 .execution_state_sender
1509 .send_request(move |callback| ExecutionRequest::CreateApplication {
1510 chain_id,
1511 block_height,
1512 module_id,
1513 parameters,
1514 required_application_ids,
1515 callback,
1516 txn_tracker: txn_tracker_moved,
1517 })?
1518 .recv_response()??;
1519
1520 self.inner().transaction_tracker = txn_tracker_moved;
1521
1522 let (contract, context) = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1523
1524 contract
1525 .try_lock()
1526 .expect("Applications should not have reentrant calls")
1527 .instantiate(context, argument)?;
1528
1529 self.inner().finish_call()?;
1530
1531 Ok(app_id)
1532 }
1533
1534 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1535 let mut this = self.inner();
1536 let round =
1537 if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1538 match response {
1539 OracleResponse::Round(round) => round,
1540 _ => return Err(ExecutionError::OracleResponseMismatch),
1541 }
1542 } else {
1543 this.round
1544 };
1545 this.transaction_tracker
1546 .add_oracle_response(OracleResponse::Round(round));
1547 Ok(round)
1548 }
1549
1550 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1551 let mut this = self.inner();
1552 let id = this.current_application().id;
1553 let state = this.view_user_states.entry(id).or_default();
1554 state.force_all_pending_queries()?;
1555 this.resource_controller.track_write_operations(
1556 batch
1557 .num_operations()
1558 .try_into()
1559 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1560 )?;
1561 this.resource_controller
1562 .track_bytes_written(batch.size() as u64)?;
1563 this.execution_state_sender
1564 .send_request(|callback| ExecutionRequest::WriteBatch {
1565 id,
1566 batch,
1567 callback,
1568 })?
1569 .recv_response()?;
1570 Ok(())
1571 }
1572}
1573
1574impl ServiceSyncRuntime {
1575 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1577 let mut txn_tracker = TransactionTracker::default();
1578 txn_tracker.set_local_time(context.local_time);
1579 Self::new_with_txn_tracker(execution_state_sender, context, None, txn_tracker)
1580 }
1581
1582 pub fn new_with_txn_tracker(
1584 execution_state_sender: ExecutionStateSender,
1585 context: QueryContext,
1586 deadline: Option<Instant>,
1587 txn_tracker: TransactionTracker,
1588 ) -> Self {
1589 let runtime = SyncRuntime(Some(
1590 SyncRuntimeInternal::new(
1591 context.chain_id,
1592 context.next_block_height,
1593 None,
1594 None,
1595 None,
1596 execution_state_sender,
1597 deadline,
1598 None,
1599 ResourceController::default(),
1600 txn_tracker,
1601 )
1602 .into(),
1603 ));
1604
1605 ServiceSyncRuntime {
1606 runtime,
1607 current_context: context,
1608 }
1609 }
1610
1611 pub(crate) fn preload_service(
1613 &self,
1614 id: ApplicationId,
1615 code: UserServiceCode,
1616 description: ApplicationDescription,
1617 ) -> Result<(), ExecutionError> {
1618 let this = self
1619 .runtime
1620 .0
1621 .as_ref()
1622 .expect("services shouldn't be preloaded while the runtime is being dropped");
1623 let runtime_handle = this.clone();
1624 let mut this_guard = this.inner();
1625
1626 if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1627 entry.insert(LoadedApplication::new(
1628 code.instantiate(runtime_handle)?,
1629 description,
1630 ));
1631 this_guard.applications_to_finalize.push(id);
1632 }
1633
1634 Ok(())
1635 }
1636
1637 pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1639 while let Ok(request) = incoming_requests.recv() {
1640 let ServiceRuntimeRequest::Query {
1641 application_id,
1642 context,
1643 query,
1644 callback,
1645 } = request;
1646
1647 self.prepare_for_query(context);
1648
1649 let _ = callback.send(self.run_query(application_id, query));
1650 }
1651 }
1652
1653 pub(crate) fn prepare_for_query(&mut self, new_context: QueryContext) {
1655 let expected_context = QueryContext {
1656 local_time: new_context.local_time,
1657 ..self.current_context
1658 };
1659
1660 if new_context != expected_context {
1661 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1662 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1663 } else {
1664 self.handle_mut()
1665 .inner()
1666 .transaction_tracker
1667 .set_local_time(new_context.local_time);
1668 }
1669 }
1670
1671 pub(crate) fn run_query(
1673 &mut self,
1674 application_id: ApplicationId,
1675 query: Vec<u8>,
1676 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1677 let this = self.handle_mut();
1678 let response = this.try_query_application(application_id, query)?;
1679 let operations = mem::take(&mut this.inner().scheduled_operations);
1680
1681 Ok(QueryOutcome {
1682 response,
1683 operations,
1684 })
1685 }
1686
1687 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1689 self.runtime.0.as_mut().expect(
1690 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1691 )
1692 }
1693}
1694
1695impl ServiceRuntime for ServiceSyncRuntimeHandle {
1696 fn try_query_application(
1698 &mut self,
1699 queried_id: ApplicationId,
1700 argument: Vec<u8>,
1701 ) -> Result<Vec<u8>, ExecutionError> {
1702 let (query_context, service) = {
1703 let mut this = self.inner();
1704
1705 let application = this.load_service_instance(self.clone(), queried_id)?;
1707 let query_context = QueryContext {
1709 chain_id: this.chain_id,
1710 next_block_height: this.height,
1711 local_time: this.transaction_tracker.local_time(),
1712 };
1713 this.push_application(ApplicationStatus {
1714 caller_id: None,
1715 id: queried_id,
1716 description: application.description,
1717 signer: None,
1718 });
1719 (query_context, application.instance)
1720 };
1721 let response = service
1722 .try_lock()
1723 .expect("Applications should not have reentrant calls")
1724 .handle_query(query_context, argument)?;
1725 self.inner().pop_application();
1726 Ok(response)
1727 }
1728
1729 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1730 let mut this = self.inner();
1731 let application_id = this.current_application().id;
1732
1733 this.scheduled_operations.push(Operation::User {
1734 application_id,
1735 bytes: operation,
1736 });
1737
1738 Ok(())
1739 }
1740
1741 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1742 if let Some(deadline) = self.inner().deadline {
1743 if Instant::now() >= deadline {
1744 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1745 }
1746 }
1747 Ok(())
1748 }
1749}
1750
1751pub enum ServiceRuntimeRequest {
1753 Query {
1754 application_id: ApplicationId,
1755 context: QueryContext,
1756 query: Vec<u8>,
1757 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1758 },
1759}
1760
1761#[derive(Clone, Copy, Debug)]
1763struct ExecutingMessage {
1764 id: MessageId,
1765 is_bouncing: bool,
1766}
1767
1768impl From<&MessageContext> for ExecutingMessage {
1769 fn from(context: &MessageContext) -> Self {
1770 ExecutingMessage {
1771 id: context.message_id,
1772 is_bouncing: context.is_bouncing,
1773 }
1774 }
1775}