1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15 SendMessageRequest, Timestamp,
16 },
17 ensure, http,
18 identifiers::{
19 Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20 },
21 ownership::ChainOwnership,
22 time::Instant,
23 vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29 execution::UserAction,
30 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31 resources::ResourceController,
32 system::CreateApplicationResult,
33 util::{ReceiverExt, UnboundedSenderExt},
34 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35 ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36 OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37 UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45 type UserContext;
46 type Code;
47}
48
49impl WithContext for UserContractInstance {
50 type UserContext = Timestamp;
51 type Code = UserContractCode;
52}
53
54impl WithContext for UserServiceInstance {
55 type UserContext = ();
56 type Code = UserServiceCode;
57}
58
59#[cfg(test)]
60impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
61 type UserContext = ();
62 type Code = ();
63}
64
65#[derive(Debug)]
66pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
67
68pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
69
70pub struct ServiceSyncRuntime {
71 runtime: SyncRuntime<UserServiceInstance>,
72 current_context: QueryContext,
73}
74
75#[derive(Debug)]
76pub struct SyncRuntimeHandle<UserInstance: WithContext>(
77 Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
78);
79
80pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
81pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
82
83#[derive(Debug)]
85pub struct SyncRuntimeInternal<UserInstance: WithContext> {
86 chain_id: ChainId,
88 height: BlockHeight,
91 round: Option<u32>,
93 #[debug(skip_if = Option::is_none)]
95 executing_message: Option<ExecutingMessage>,
96
97 execution_state_sender: ExecutionStateSender,
99
100 is_finalizing: bool,
104 applications_to_finalize: Vec<ApplicationId>,
106
107 preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
109 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
111 call_stack: Vec<ApplicationStatus>,
113 active_applications: HashSet<ApplicationId>,
115 scheduled_operations: Vec<Operation>,
117
118 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
120
121 deadline: Option<Instant>,
125
126 #[debug(skip_if = Option::is_none)]
128 refund_grant_to: Option<Account>,
129 resource_controller: ResourceController,
131 user_context: UserInstance::UserContext,
133 allow_application_logs: bool,
135}
136
137#[derive(Debug)]
139struct ApplicationStatus {
140 caller_id: Option<ApplicationId>,
142 id: ApplicationId,
144 description: ApplicationDescription,
146 signer: Option<AccountOwner>,
148}
149
150#[derive(Debug)]
152struct LoadedApplication<Instance> {
153 instance: Arc<Mutex<Instance>>,
154 description: ApplicationDescription,
155}
156
157impl<Instance> LoadedApplication<Instance> {
158 fn new(instance: Instance, description: ApplicationDescription) -> Self {
160 LoadedApplication {
161 instance: Arc::new(Mutex::new(instance)),
162 description,
163 }
164 }
165}
166
167impl<Instance> Clone for LoadedApplication<Instance> {
168 fn clone(&self) -> Self {
171 LoadedApplication {
172 instance: self.instance.clone(),
173 description: self.description.clone(),
174 }
175 }
176}
177
178#[derive(Debug)]
179enum Promise<T> {
180 Ready(T),
181 Pending(Receiver<T>),
182}
183
184impl<T> Promise<T> {
185 fn force(&mut self) -> Result<(), ExecutionError> {
186 if let Promise::Pending(receiver) = self {
187 let value = receiver
188 .recv_ref()
189 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
190 *self = Promise::Ready(value);
191 }
192 Ok(())
193 }
194
195 fn read(self) -> Result<T, ExecutionError> {
196 match self {
197 Promise::Pending(receiver) => {
198 let value = receiver.recv_response()?;
199 Ok(value)
200 }
201 Promise::Ready(value) => Ok(value),
202 }
203 }
204}
205
206#[derive(Debug, Default)]
208struct QueryManager<T> {
209 pending_queries: BTreeMap<u32, Promise<T>>,
211 query_count: u32,
213 active_query_count: u32,
215}
216
217impl<T> QueryManager<T> {
218 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
219 let id = self.query_count;
220 self.pending_queries.insert(id, Promise::Pending(receiver));
221 self.query_count = self
222 .query_count
223 .checked_add(1)
224 .ok_or(ArithmeticError::Overflow)?;
225 self.active_query_count = self
226 .active_query_count
227 .checked_add(1)
228 .ok_or(ArithmeticError::Overflow)?;
229 Ok(id)
230 }
231
232 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
233 let promise = self
234 .pending_queries
235 .remove(&id)
236 .ok_or(ExecutionError::InvalidPromise)?;
237 let value = promise.read()?;
238 self.active_query_count -= 1;
239 Ok(value)
240 }
241
242 fn force_all(&mut self) -> Result<(), ExecutionError> {
243 for promise in self.pending_queries.values_mut() {
244 promise.force()?;
245 }
246 Ok(())
247 }
248}
249
250type Keys = Vec<Vec<u8>>;
251type Value = Vec<u8>;
252type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
253
254#[derive(Debug, Default)]
255struct ViewUserState {
256 contains_key_queries: QueryManager<bool>,
258 contains_keys_queries: QueryManager<Vec<bool>>,
260 read_value_queries: QueryManager<Option<Value>>,
262 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
264 find_keys_queries: QueryManager<Keys>,
266 find_key_values_queries: QueryManager<KeyValues>,
268}
269
270impl ViewUserState {
271 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
272 self.contains_key_queries.force_all()?;
273 self.contains_keys_queries.force_all()?;
274 self.read_value_queries.force_all()?;
275 self.read_multi_values_queries.force_all()?;
276 self.find_keys_queries.force_all()?;
277 self.find_key_values_queries.force_all()?;
278 Ok(())
279 }
280}
281
282impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
283 type Target = SyncRuntimeHandle<UserInstance>;
284
285 fn deref(&self) -> &Self::Target {
286 self.0.as_ref().expect(
287 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288 )
289 }
290}
291
292impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
293 fn deref_mut(&mut self) -> &mut Self::Target {
294 self.0.as_mut().expect(
295 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
296 )
297 }
298}
299
300impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
301 fn drop(&mut self) {
302 if let Some(handle) = self.0.take() {
305 handle.inner().loaded_applications.clear();
306 }
307 }
308}
309
310impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
311 #[expect(clippy::too_many_arguments)]
312 fn new(
313 chain_id: ChainId,
314 height: BlockHeight,
315 round: Option<u32>,
316 executing_message: Option<ExecutingMessage>,
317 execution_state_sender: ExecutionStateSender,
318 deadline: Option<Instant>,
319 refund_grant_to: Option<Account>,
320 resource_controller: ResourceController,
321 user_context: UserInstance::UserContext,
322 allow_application_logs: bool,
323 ) -> Self {
324 Self {
325 chain_id,
326 height,
327 round,
328 executing_message,
329 execution_state_sender,
330 is_finalizing: false,
331 applications_to_finalize: Vec::new(),
332 preloaded_applications: HashMap::new(),
333 loaded_applications: HashMap::new(),
334 call_stack: Vec::new(),
335 active_applications: HashSet::new(),
336 view_user_states: BTreeMap::new(),
337 deadline,
338 refund_grant_to,
339 resource_controller,
340 scheduled_operations: Vec::new(),
341 user_context,
342 allow_application_logs,
343 }
344 }
345
346 fn current_application(&self) -> &ApplicationStatus {
354 self.call_stack
355 .last()
356 .expect("Call stack is unexpectedly empty")
357 }
358
359 fn push_application(&mut self, status: ApplicationStatus) {
363 self.active_applications.insert(status.id);
364 self.call_stack.push(status);
365 }
366
367 fn pop_application(&mut self) -> ApplicationStatus {
375 let status = self
376 .call_stack
377 .pop()
378 .expect("Can't remove application from empty call stack");
379 assert!(self.active_applications.remove(&status.id));
380 status
381 }
382
383 fn check_for_reentrancy(
387 &mut self,
388 application_id: ApplicationId,
389 ) -> Result<(), ExecutionError> {
390 ensure!(
391 !self.active_applications.contains(&application_id),
392 ExecutionError::ReentrantCall(application_id)
393 );
394 Ok(())
395 }
396}
397
398impl SyncRuntimeInternal<UserContractInstance> {
399 fn load_contract_instance(
401 &mut self,
402 this: SyncRuntimeHandle<UserContractInstance>,
403 id: ApplicationId,
404 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
405 match self.loaded_applications.entry(id) {
406 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
407
408 hash_map::Entry::Vacant(entry) => {
409 let (code, description) = match self.preloaded_applications.entry(id) {
412 #[cfg(web)]
414 hash_map::Entry::Vacant(_) => {
415 drop(this);
416 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
417 id,
418 )));
419 }
420 #[cfg(not(web))]
421 hash_map::Entry::Vacant(entry) => {
422 let (code, description) = self
423 .execution_state_sender
424 .send_request(move |callback| ExecutionRequest::LoadContract {
425 id,
426 callback,
427 })?
428 .recv_response()?;
429 entry.insert((code, description)).clone()
430 }
431 hash_map::Entry::Occupied(entry) => entry.get().clone(),
432 };
433 let instance = code.instantiate(this)?;
434
435 self.applications_to_finalize.push(id);
436 Ok(entry
437 .insert(LoadedApplication::new(instance, description))
438 .clone())
439 }
440 }
441 }
442
443 fn prepare_for_call(
445 &mut self,
446 this: ContractSyncRuntimeHandle,
447 authenticated: bool,
448 callee_id: ApplicationId,
449 ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
450 self.check_for_reentrancy(callee_id)?;
451
452 ensure!(
453 !self.is_finalizing,
454 ExecutionError::CrossApplicationCallInFinalize {
455 caller_id: Box::new(self.current_application().id),
456 callee_id: Box::new(callee_id),
457 }
458 );
459
460 let application = self.load_contract_instance(this, callee_id)?;
462
463 let caller = self.current_application();
464 let caller_id = caller.id;
465 let caller_signer = caller.signer;
466 let authenticated_signer = match caller_signer {
468 Some(signer) if authenticated => Some(signer),
469 _ => None,
470 };
471 let authenticated_caller_id = authenticated.then_some(caller_id);
472 self.push_application(ApplicationStatus {
473 caller_id: authenticated_caller_id,
474 id: callee_id,
475 description: application.description,
476 signer: authenticated_signer,
478 });
479 Ok(application.instance)
480 }
481
482 fn finish_call(&mut self) -> Result<(), ExecutionError> {
484 self.pop_application();
485 Ok(())
486 }
487
488 fn run_service_oracle_query(
490 &mut self,
491 application_id: ApplicationId,
492 query: Vec<u8>,
493 ) -> Result<Vec<u8>, ExecutionError> {
494 let timeout = self
495 .resource_controller
496 .remaining_service_oracle_execution_time()?;
497 let execution_start = Instant::now();
498 let deadline = Some(execution_start + timeout);
499 let response = self
500 .execution_state_sender
501 .send_request(|callback| ExecutionRequest::QueryServiceOracle {
502 deadline,
503 application_id,
504 next_block_height: self.height,
505 query,
506 callback,
507 })?
508 .recv_response()?;
509
510 self.resource_controller
511 .track_service_oracle_execution(execution_start.elapsed())?;
512 self.resource_controller
513 .track_service_oracle_response(response.len())?;
514
515 Ok(response)
516 }
517}
518
519impl SyncRuntimeInternal<UserServiceInstance> {
520 fn load_service_instance(
522 &mut self,
523 this: SyncRuntimeHandle<UserServiceInstance>,
524 id: ApplicationId,
525 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
526 match self.loaded_applications.entry(id) {
527 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
528
529 hash_map::Entry::Vacant(entry) => {
530 let (code, description) = match self.preloaded_applications.entry(id) {
533 #[cfg(web)]
535 hash_map::Entry::Vacant(_) => {
536 drop(this);
537 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
538 id,
539 )));
540 }
541 #[cfg(not(web))]
542 hash_map::Entry::Vacant(entry) => {
543 let (code, description) = self
544 .execution_state_sender
545 .send_request(move |callback| ExecutionRequest::LoadService {
546 id,
547 callback,
548 })?
549 .recv_response()?;
550 entry.insert((code, description)).clone()
551 }
552 hash_map::Entry::Occupied(entry) => entry.get().clone(),
553 };
554 let instance = code.instantiate(this)?;
555
556 self.applications_to_finalize.push(id);
557 Ok(entry
558 .insert(LoadedApplication::new(instance, description))
559 .clone())
560 }
561 }
562 }
563}
564
565impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
566 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
567 let handle = self.0.take().expect(
568 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
569 );
570 let runtime = Arc::into_inner(handle.0)?
571 .into_inner()
572 .expect("`SyncRuntime` should run in a single thread which should not panic");
573 Some(runtime)
574 }
575}
576
577impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
578 for SyncRuntimeHandle<UserInstance>
579{
580 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
581 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
582 }
583}
584
585impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
586 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
587 self.0
588 .try_lock()
589 .expect("Synchronous runtimes run on a single execution thread")
590 }
591}
592
593impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
594where
595 Self: ContractOrServiceRuntime,
596{
597 type Read = ();
598 type ReadValueBytes = u32;
599 type ContainsKey = u32;
600 type ContainsKeys = u32;
601 type ReadMultiValuesBytes = u32;
602 type FindKeysByPrefix = u32;
603 type FindKeyValuesByPrefix = u32;
604
605 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
606 let mut this = self.inner();
607 let chain_id = this.chain_id;
608 this.resource_controller.track_runtime_chain_id()?;
609 Ok(chain_id)
610 }
611
612 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
613 let mut this = self.inner();
614 let height = this.height;
615 this.resource_controller.track_runtime_block_height()?;
616 Ok(height)
617 }
618
619 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
620 let mut this = self.inner();
621 let application_id = this.current_application().id;
622 this.resource_controller.track_runtime_application_id()?;
623 Ok(application_id)
624 }
625
626 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
627 let mut this = self.inner();
628 let application_creator_chain_id = this.current_application().description.creator_chain_id;
629 this.resource_controller.track_runtime_application_id()?;
630 Ok(application_creator_chain_id)
631 }
632
633 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
634 let mut this = self.inner();
635 let parameters = this.current_application().description.parameters.clone();
636 this.resource_controller
637 .track_runtime_application_parameters(¶meters)?;
638 Ok(parameters)
639 }
640
641 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
642 let mut this = self.inner();
643 let timestamp = this
644 .execution_state_sender
645 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
646 .recv_response()?;
647 this.resource_controller.track_runtime_timestamp()?;
648 Ok(timestamp)
649 }
650
651 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
652 let mut this = self.inner();
653 let balance = this
654 .execution_state_sender
655 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
656 .recv_response()?;
657 this.resource_controller.track_runtime_balance()?;
658 Ok(balance)
659 }
660
661 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
662 let mut this = self.inner();
663 let balance = this
664 .execution_state_sender
665 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
666 .recv_response()?;
667 this.resource_controller.track_runtime_balance()?;
668 Ok(balance)
669 }
670
671 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
672 let mut this = self.inner();
673 let owner_balances = this
674 .execution_state_sender
675 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
676 .recv_response()?;
677 this.resource_controller
678 .track_runtime_owner_balances(&owner_balances)?;
679 Ok(owner_balances)
680 }
681
682 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
683 let mut this = self.inner();
684 let owners = this
685 .execution_state_sender
686 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
687 .recv_response()?;
688 this.resource_controller.track_runtime_owners(&owners)?;
689 Ok(owners)
690 }
691
692 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
693 let mut this = self.inner();
694 let chain_ownership = this
695 .execution_state_sender
696 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
697 .recv_response()?;
698 this.resource_controller
699 .track_runtime_chain_ownership(&chain_ownership)?;
700 Ok(chain_ownership)
701 }
702
703 fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
704 let this = self.inner();
705 let application_permissions = this
706 .execution_state_sender
707 .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
708 .recv_response()?;
709 Ok(application_permissions)
710 }
711
712 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
713 let mut this = self.inner();
714 let id = this.current_application().id;
715 this.resource_controller.track_read_operation()?;
716 let receiver = this
717 .execution_state_sender
718 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
719 let state = this.view_user_states.entry(id).or_default();
720 state.contains_key_queries.register(receiver)
721 }
722
723 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
724 let mut this = self.inner();
725 let id = this.current_application().id;
726 let state = this.view_user_states.entry(id).or_default();
727 let value = state.contains_key_queries.wait(*promise)?;
728 Ok(value)
729 }
730
731 fn contains_keys_new(
732 &mut self,
733 keys: Vec<Vec<u8>>,
734 ) -> Result<Self::ContainsKeys, ExecutionError> {
735 let mut this = self.inner();
736 let id = this.current_application().id;
737 this.resource_controller.track_read_operation()?;
738 let receiver = this
739 .execution_state_sender
740 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
741 let state = this.view_user_states.entry(id).or_default();
742 state.contains_keys_queries.register(receiver)
743 }
744
745 fn contains_keys_wait(
746 &mut self,
747 promise: &Self::ContainsKeys,
748 ) -> Result<Vec<bool>, ExecutionError> {
749 let mut this = self.inner();
750 let id = this.current_application().id;
751 let state = this.view_user_states.entry(id).or_default();
752 let value = state.contains_keys_queries.wait(*promise)?;
753 Ok(value)
754 }
755
756 fn read_multi_values_bytes_new(
757 &mut self,
758 keys: Vec<Vec<u8>>,
759 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
760 let mut this = self.inner();
761 let id = this.current_application().id;
762 this.resource_controller.track_read_operation()?;
763 let receiver = this.execution_state_sender.send_request(move |callback| {
764 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
765 })?;
766 let state = this.view_user_states.entry(id).or_default();
767 state.read_multi_values_queries.register(receiver)
768 }
769
770 fn read_multi_values_bytes_wait(
771 &mut self,
772 promise: &Self::ReadMultiValuesBytes,
773 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
774 let mut this = self.inner();
775 let id = this.current_application().id;
776 let state = this.view_user_states.entry(id).or_default();
777 let values = state.read_multi_values_queries.wait(*promise)?;
778 for value in &values {
779 if let Some(value) = &value {
780 this.resource_controller
781 .track_bytes_read(value.len() as u64)?;
782 }
783 }
784 Ok(values)
785 }
786
787 fn read_value_bytes_new(
788 &mut self,
789 key: Vec<u8>,
790 ) -> Result<Self::ReadValueBytes, ExecutionError> {
791 let mut this = self.inner();
792 let id = this.current_application().id;
793 this.resource_controller.track_read_operation()?;
794 let receiver = this
795 .execution_state_sender
796 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
797 let state = this.view_user_states.entry(id).or_default();
798 state.read_value_queries.register(receiver)
799 }
800
801 fn read_value_bytes_wait(
802 &mut self,
803 promise: &Self::ReadValueBytes,
804 ) -> Result<Option<Vec<u8>>, ExecutionError> {
805 let mut this = self.inner();
806 let id = this.current_application().id;
807 let value = {
808 let state = this.view_user_states.entry(id).or_default();
809 state.read_value_queries.wait(*promise)?
810 };
811 if let Some(value) = &value {
812 this.resource_controller
813 .track_bytes_read(value.len() as u64)?;
814 }
815 Ok(value)
816 }
817
818 fn find_keys_by_prefix_new(
819 &mut self,
820 key_prefix: Vec<u8>,
821 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
822 let mut this = self.inner();
823 let id = this.current_application().id;
824 this.resource_controller.track_read_operation()?;
825 let receiver = this.execution_state_sender.send_request(move |callback| {
826 ExecutionRequest::FindKeysByPrefix {
827 id,
828 key_prefix,
829 callback,
830 }
831 })?;
832 let state = this.view_user_states.entry(id).or_default();
833 state.find_keys_queries.register(receiver)
834 }
835
836 fn find_keys_by_prefix_wait(
837 &mut self,
838 promise: &Self::FindKeysByPrefix,
839 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
840 let mut this = self.inner();
841 let id = this.current_application().id;
842 let keys = {
843 let state = this.view_user_states.entry(id).or_default();
844 state.find_keys_queries.wait(*promise)?
845 };
846 let mut read_size = 0;
847 for key in &keys {
848 read_size += key.len();
849 }
850 this.resource_controller
851 .track_bytes_read(read_size as u64)?;
852 Ok(keys)
853 }
854
855 fn find_key_values_by_prefix_new(
856 &mut self,
857 key_prefix: Vec<u8>,
858 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
859 let mut this = self.inner();
860 let id = this.current_application().id;
861 this.resource_controller.track_read_operation()?;
862 let receiver = this.execution_state_sender.send_request(move |callback| {
863 ExecutionRequest::FindKeyValuesByPrefix {
864 id,
865 key_prefix,
866 callback,
867 }
868 })?;
869 let state = this.view_user_states.entry(id).or_default();
870 state.find_key_values_queries.register(receiver)
871 }
872
873 fn find_key_values_by_prefix_wait(
874 &mut self,
875 promise: &Self::FindKeyValuesByPrefix,
876 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
877 let mut this = self.inner();
878 let id = this.current_application().id;
879 let state = this.view_user_states.entry(id).or_default();
880 let key_values = state.find_key_values_queries.wait(*promise)?;
881 let mut read_size = 0;
882 for (key, value) in &key_values {
883 read_size += key.len() + value.len();
884 }
885 this.resource_controller
886 .track_bytes_read(read_size as u64)?;
887 Ok(key_values)
888 }
889
890 fn perform_http_request(
891 &mut self,
892 request: http::Request,
893 ) -> Result<http::Response, ExecutionError> {
894 let mut this = self.inner();
895 let app_permissions = this
896 .execution_state_sender
897 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
898 .recv_response()?;
899
900 let app_id = this.current_application().id;
901 ensure!(
902 app_permissions.can_make_http_requests(&app_id),
903 ExecutionError::UnauthorizedApplication(app_id)
904 );
905
906 this.resource_controller.track_http_request()?;
907
908 let response = this
909 .execution_state_sender
910 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
911 request,
912 http_responses_are_oracle_responses:
913 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
914 callback,
915 })?
916 .recv_response()?;
917 Ok(response)
918 }
919
920 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
921 let this = self.inner();
922 this.execution_state_sender
923 .send_request(|callback| ExecutionRequest::AssertBefore {
924 timestamp,
925 callback,
926 })?
927 .recv_response()?
928 }
929
930 fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
931 let this = self.inner();
932 let blob_id = hash.into();
933 let content = this
934 .execution_state_sender
935 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
936 .recv_response()?;
937 Ok(content.into_vec_or_clone())
938 }
939
940 fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
941 let this = self.inner();
942 let blob_id = hash.into();
943 this.execution_state_sender
944 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
945 .recv_response()?;
946 Ok(())
947 }
948
949 fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
950 Ok(self.inner().allow_application_logs)
951 }
952
953 #[cfg(web)]
954 fn send_log(&mut self, message: String, level: tracing::log::Level) {
955 let this = self.inner();
956 let _ = this
958 .execution_state_sender
959 .unbounded_send(ExecutionRequest::Log { message, level });
960 }
961}
962
963trait ContractOrServiceRuntime {
966 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
972}
973
974impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
975 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
976}
977
978impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
979 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
980}
981
982impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
983 fn clone(&self) -> Self {
984 SyncRuntimeHandle(self.0.clone())
985 }
986}
987
988impl ContractSyncRuntime {
989 pub(crate) fn new(
990 execution_state_sender: ExecutionStateSender,
991 chain_id: ChainId,
992 refund_grant_to: Option<Account>,
993 resource_controller: ResourceController,
994 action: &UserAction,
995 allow_application_logs: bool,
996 ) -> Self {
997 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
998 SyncRuntimeInternal::new(
999 chain_id,
1000 action.height(),
1001 action.round(),
1002 if let UserAction::Message(context, _) = action {
1003 Some(context.into())
1004 } else {
1005 None
1006 },
1007 execution_state_sender,
1008 None,
1009 refund_grant_to,
1010 resource_controller,
1011 action.timestamp(),
1012 allow_application_logs,
1013 ),
1014 )))
1015 }
1016
1017 pub(crate) fn preload_contract(
1019 &self,
1020 id: ApplicationId,
1021 code: UserContractCode,
1022 description: ApplicationDescription,
1023 ) -> Result<(), ExecutionError> {
1024 let this = self
1025 .0
1026 .as_ref()
1027 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1028 let mut this_guard = this.inner();
1029
1030 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1031 entry.insert((code, description));
1032 }
1033
1034 Ok(())
1035 }
1036
1037 pub(crate) fn run_action(
1039 mut self,
1040 application_id: ApplicationId,
1041 chain_id: ChainId,
1042 action: UserAction,
1043 ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1044 let result = self
1045 .deref_mut()
1046 .run_action(application_id, chain_id, action)?;
1047 let runtime = self
1048 .into_inner()
1049 .expect("Runtime clones should have been freed by now");
1050
1051 Ok((result, runtime.resource_controller))
1052 }
1053}
1054
1055impl ContractSyncRuntimeHandle {
1056 fn run_action(
1057 &mut self,
1058 application_id: ApplicationId,
1059 chain_id: ChainId,
1060 action: UserAction,
1061 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1062 let finalize_context = FinalizeContext {
1063 authenticated_signer: action.signer(),
1064 chain_id,
1065 height: action.height(),
1066 round: action.round(),
1067 };
1068
1069 {
1070 let runtime = self.inner();
1071 assert_eq!(runtime.chain_id, chain_id);
1072 assert_eq!(runtime.height, action.height());
1073 }
1074
1075 let signer = action.signer();
1076 let closure = move |code: &mut UserContractInstance| match action {
1077 UserAction::Instantiate(_context, argument) => {
1078 code.instantiate(argument).map(|()| None)
1079 }
1080 UserAction::Operation(_context, operation) => {
1081 code.execute_operation(operation).map(Option::Some)
1082 }
1083 UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1084 UserAction::ProcessStreams(_context, updates) => {
1085 code.process_streams(updates).map(|()| None)
1086 }
1087 };
1088
1089 let result = self.execute(application_id, signer, closure)?;
1090 self.finalize(finalize_context)?;
1091 Ok(result)
1092 }
1093
1094 fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1096 let applications = mem::take(&mut self.inner().applications_to_finalize)
1097 .into_iter()
1098 .rev();
1099
1100 self.inner().is_finalizing = true;
1101
1102 for application in applications {
1103 self.execute(application, context.authenticated_signer, |contract| {
1104 contract.finalize().map(|_| None)
1105 })?;
1106 self.inner().loaded_applications.remove(&application);
1107 }
1108
1109 Ok(())
1110 }
1111
1112 fn execute(
1114 &mut self,
1115 application_id: ApplicationId,
1116 signer: Option<AccountOwner>,
1117 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1118 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1119 let contract = {
1120 let mut runtime = self.inner();
1121 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1122
1123 let status = ApplicationStatus {
1124 caller_id: None,
1125 id: application_id,
1126 description: application.description.clone(),
1127 signer,
1128 };
1129
1130 runtime.push_application(status);
1131
1132 application
1133 };
1134
1135 let result = closure(
1136 &mut contract
1137 .instance
1138 .try_lock()
1139 .expect("Application should not be already executing"),
1140 )?;
1141
1142 let mut runtime = self.inner();
1143 let application_status = runtime.pop_application();
1144 assert_eq!(application_status.caller_id, None);
1145 assert_eq!(application_status.id, application_id);
1146 assert_eq!(application_status.description, contract.description);
1147 assert_eq!(application_status.signer, signer);
1148 assert!(runtime.call_stack.is_empty());
1149
1150 Ok(result)
1151 }
1152}
1153
1154impl ContractRuntime for ContractSyncRuntimeHandle {
1155 fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1156 let this = self.inner();
1157 Ok(this.current_application().signer)
1158 }
1159
1160 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1161 Ok(self
1162 .inner()
1163 .executing_message
1164 .map(|metadata| metadata.is_bouncing))
1165 }
1166
1167 fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1168 Ok(self
1169 .inner()
1170 .executing_message
1171 .map(|metadata| metadata.origin))
1172 }
1173
1174 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1175 let this = self.inner();
1176 if this.call_stack.len() <= 1 {
1177 return Ok(None);
1178 }
1179 Ok(this.current_application().caller_id)
1180 }
1181
1182 fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1183 Ok(match vm_runtime {
1184 VmRuntime::Wasm => {
1185 self.inner()
1186 .resource_controller
1187 .policy()
1188 .maximum_wasm_fuel_per_block
1189 }
1190 VmRuntime::Evm => {
1191 self.inner()
1192 .resource_controller
1193 .policy()
1194 .maximum_evm_fuel_per_block
1195 }
1196 })
1197 }
1198
1199 fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1200 Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1201 }
1202
1203 fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1204 let mut this = self.inner();
1205 this.resource_controller.track_fuel(fuel, vm_runtime)
1206 }
1207
1208 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1209 let mut this = self.inner();
1210 let application = this.current_application();
1211 let application_id = application.id;
1212 let authenticated_signer = application.signer;
1213 let mut refund_grant_to = this.refund_grant_to;
1214
1215 let grant = this
1216 .resource_controller
1217 .policy()
1218 .total_price(&message.grant)?;
1219 if grant.is_zero() {
1220 refund_grant_to = None;
1221 } else {
1222 this.resource_controller.track_grant(grant)?;
1223 }
1224 let kind = if message.is_tracked {
1225 MessageKind::Tracked
1226 } else {
1227 MessageKind::Simple
1228 };
1229
1230 this.execution_state_sender
1231 .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1232 message: OutgoingMessage {
1233 destination: message.destination,
1234 authenticated_signer,
1235 refund_grant_to,
1236 grant,
1237 kind,
1238 message: Message::User {
1239 application_id,
1240 bytes: message.message,
1241 },
1242 },
1243 callback,
1244 })?
1245 .recv_response()?;
1246
1247 Ok(())
1248 }
1249
1250 fn transfer(
1251 &mut self,
1252 source: AccountOwner,
1253 destination: Account,
1254 amount: Amount,
1255 ) -> Result<(), ExecutionError> {
1256 let this = self.inner();
1257 let current_application = this.current_application();
1258 let application_id = current_application.id;
1259 let signer = current_application.signer;
1260
1261 this.execution_state_sender
1262 .send_request(|callback| ExecutionRequest::Transfer {
1263 source,
1264 destination,
1265 amount,
1266 signer,
1267 application_id,
1268 callback,
1269 })?
1270 .recv_response()?;
1271 Ok(())
1272 }
1273
1274 fn claim(
1275 &mut self,
1276 source: Account,
1277 destination: Account,
1278 amount: Amount,
1279 ) -> Result<(), ExecutionError> {
1280 let this = self.inner();
1281 let current_application = this.current_application();
1282 let application_id = current_application.id;
1283 let signer = current_application.signer;
1284
1285 this.execution_state_sender
1286 .send_request(|callback| ExecutionRequest::Claim {
1287 source,
1288 destination,
1289 amount,
1290 signer,
1291 application_id,
1292 callback,
1293 })?
1294 .recv_response()?;
1295 Ok(())
1296 }
1297
1298 fn try_call_application(
1299 &mut self,
1300 authenticated: bool,
1301 callee_id: ApplicationId,
1302 argument: Vec<u8>,
1303 ) -> Result<Vec<u8>, ExecutionError> {
1304 let contract = self
1305 .inner()
1306 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1307
1308 let value = contract
1309 .try_lock()
1310 .expect("Applications should not have reentrant calls")
1311 .execute_operation(argument)?;
1312
1313 self.inner().finish_call()?;
1314
1315 Ok(value)
1316 }
1317
1318 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1319 let mut this = self.inner();
1320 ensure!(
1321 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1322 ExecutionError::StreamNameTooLong
1323 );
1324 let application_id = GenericApplicationId::User(this.current_application().id);
1325 let stream_id = StreamId {
1326 stream_name,
1327 application_id,
1328 };
1329 let value_len = value.len() as u64;
1330 let index = this
1331 .execution_state_sender
1332 .send_request(|callback| ExecutionRequest::Emit {
1333 stream_id,
1334 value,
1335 callback,
1336 })?
1337 .recv_response()?;
1338 this.resource_controller.track_bytes_written(value_len)?;
1340 Ok(index)
1341 }
1342
1343 fn read_event(
1344 &mut self,
1345 chain_id: ChainId,
1346 stream_name: StreamName,
1347 index: u32,
1348 ) -> Result<Vec<u8>, ExecutionError> {
1349 let mut this = self.inner();
1350 ensure!(
1351 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1352 ExecutionError::StreamNameTooLong
1353 );
1354 let application_id = GenericApplicationId::User(this.current_application().id);
1355 let stream_id = StreamId {
1356 stream_name,
1357 application_id,
1358 };
1359 let event_id = EventId {
1360 stream_id,
1361 index,
1362 chain_id,
1363 };
1364 let event = this
1365 .execution_state_sender
1366 .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1367 .recv_response()?;
1368 this.resource_controller
1370 .track_bytes_read(event.len() as u64)?;
1371 Ok(event)
1372 }
1373
1374 fn subscribe_to_events(
1375 &mut self,
1376 chain_id: ChainId,
1377 application_id: ApplicationId,
1378 stream_name: StreamName,
1379 ) -> Result<(), ExecutionError> {
1380 let this = self.inner();
1381 ensure!(
1382 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1383 ExecutionError::StreamNameTooLong
1384 );
1385 let stream_id = StreamId {
1386 stream_name,
1387 application_id: application_id.into(),
1388 };
1389 let subscriber_app_id = this.current_application().id;
1390 this.execution_state_sender
1391 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1392 chain_id,
1393 stream_id,
1394 subscriber_app_id,
1395 callback,
1396 })?
1397 .recv_response()?;
1398 Ok(())
1399 }
1400
1401 fn unsubscribe_from_events(
1402 &mut self,
1403 chain_id: ChainId,
1404 application_id: ApplicationId,
1405 stream_name: StreamName,
1406 ) -> Result<(), ExecutionError> {
1407 let this = self.inner();
1408 ensure!(
1409 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1410 ExecutionError::StreamNameTooLong
1411 );
1412 let stream_id = StreamId {
1413 stream_name,
1414 application_id: application_id.into(),
1415 };
1416 let subscriber_app_id = this.current_application().id;
1417 this.execution_state_sender
1418 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1419 chain_id,
1420 stream_id,
1421 subscriber_app_id,
1422 callback,
1423 })?
1424 .recv_response()?;
1425 Ok(())
1426 }
1427
1428 fn query_service(
1429 &mut self,
1430 application_id: ApplicationId,
1431 query: Vec<u8>,
1432 ) -> Result<Vec<u8>, ExecutionError> {
1433 let mut this = self.inner();
1434
1435 let app_permissions = this
1436 .execution_state_sender
1437 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1438 .recv_response()?;
1439
1440 let app_id = this.current_application().id;
1441 ensure!(
1442 app_permissions.can_call_services(&app_id),
1443 ExecutionError::UnauthorizedApplication(app_id)
1444 );
1445
1446 this.resource_controller.track_service_oracle_call()?;
1447
1448 this.run_service_oracle_query(application_id, query)
1449 }
1450
1451 fn open_chain(
1452 &mut self,
1453 ownership: ChainOwnership,
1454 application_permissions: ApplicationPermissions,
1455 balance: Amount,
1456 ) -> Result<ChainId, ExecutionError> {
1457 let parent_id = self.inner().chain_id;
1458 let block_height = self.block_height()?;
1459
1460 let timestamp = self.inner().user_context;
1461
1462 let chain_id = self
1463 .inner()
1464 .execution_state_sender
1465 .send_request(|callback| ExecutionRequest::OpenChain {
1466 ownership,
1467 balance,
1468 parent_id,
1469 block_height,
1470 timestamp,
1471 application_permissions,
1472 callback,
1473 })?
1474 .recv_response()?;
1475
1476 Ok(chain_id)
1477 }
1478
1479 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1480 let this = self.inner();
1481 let application_id = this.current_application().id;
1482 this.execution_state_sender
1483 .send_request(|callback| ExecutionRequest::CloseChain {
1484 application_id,
1485 callback,
1486 })?
1487 .recv_response()?
1488 }
1489
1490 fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1491 let this = self.inner();
1492 let application_id = this.current_application().id;
1493 this.execution_state_sender
1494 .send_request(|callback| ExecutionRequest::ChangeOwnership {
1495 application_id,
1496 ownership,
1497 callback,
1498 })?
1499 .recv_response()?
1500 }
1501
1502 fn change_application_permissions(
1503 &mut self,
1504 application_permissions: ApplicationPermissions,
1505 ) -> Result<(), ExecutionError> {
1506 let this = self.inner();
1507 let application_id = this.current_application().id;
1508 this.execution_state_sender
1509 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1510 application_id,
1511 application_permissions,
1512 callback,
1513 })?
1514 .recv_response()?
1515 }
1516
1517 fn create_application(
1518 &mut self,
1519 module_id: ModuleId,
1520 parameters: Vec<u8>,
1521 argument: Vec<u8>,
1522 required_application_ids: Vec<ApplicationId>,
1523 ) -> Result<ApplicationId, ExecutionError> {
1524 let chain_id = self.inner().chain_id;
1525 let block_height = self.block_height()?;
1526
1527 let CreateApplicationResult { app_id } = self
1528 .inner()
1529 .execution_state_sender
1530 .send_request(move |callback| ExecutionRequest::CreateApplication {
1531 chain_id,
1532 block_height,
1533 module_id,
1534 parameters,
1535 required_application_ids,
1536 callback,
1537 })?
1538 .recv_response()??;
1539
1540 let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1541
1542 contract
1543 .try_lock()
1544 .expect("Applications should not have reentrant calls")
1545 .instantiate(argument)?;
1546
1547 self.inner().finish_call()?;
1548
1549 Ok(app_id)
1550 }
1551
1552 fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1553 let blob = Blob::new_data(bytes);
1554 let blob_id = blob.id();
1555 let this = self.inner();
1556 this.execution_state_sender
1557 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1558 .recv_response()?;
1559 Ok(DataBlobHash(blob_id.hash))
1560 }
1561
1562 fn publish_module(
1563 &mut self,
1564 contract: Bytecode,
1565 service: Bytecode,
1566 vm_runtime: VmRuntime,
1567 ) -> Result<ModuleId, ExecutionError> {
1568 let (blobs, module_id) =
1569 crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1570 let this = self.inner();
1571 for blob in blobs {
1572 this.execution_state_sender
1573 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1574 .recv_response()?;
1575 }
1576 Ok(module_id)
1577 }
1578
1579 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1580 let this = self.inner();
1581 let round = this.round;
1582 this.execution_state_sender
1583 .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1584 .recv_response()
1585 }
1586
1587 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1588 let mut this = self.inner();
1589 let id = this.current_application().id;
1590 let state = this.view_user_states.entry(id).or_default();
1591 state.force_all_pending_queries()?;
1592 this.resource_controller.track_write_operations(
1593 batch
1594 .num_operations()
1595 .try_into()
1596 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1597 )?;
1598 this.resource_controller
1599 .track_bytes_written(batch.size() as u64)?;
1600 this.execution_state_sender
1601 .send_request(|callback| ExecutionRequest::WriteBatch {
1602 id,
1603 batch,
1604 callback,
1605 })?
1606 .recv_response()?;
1607 Ok(())
1608 }
1609}
1610
1611impl ServiceSyncRuntime {
1612 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1614 Self::new_with_deadline(execution_state_sender, context, None)
1615 }
1616
1617 pub fn new_with_deadline(
1619 execution_state_sender: ExecutionStateSender,
1620 context: QueryContext,
1621 deadline: Option<Instant>,
1622 ) -> Self {
1623 let allow_application_logs = execution_state_sender
1625 .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1626 .ok()
1627 .and_then(|receiver| receiver.recv_response().ok())
1628 .unwrap_or(false);
1629
1630 let runtime = SyncRuntime(Some(
1631 SyncRuntimeInternal::new(
1632 context.chain_id,
1633 context.next_block_height,
1634 None,
1635 None,
1636 execution_state_sender,
1637 deadline,
1638 None,
1639 ResourceController::default(),
1640 (),
1641 allow_application_logs,
1642 )
1643 .into(),
1644 ));
1645
1646 ServiceSyncRuntime {
1647 runtime,
1648 current_context: context,
1649 }
1650 }
1651
1652 pub(crate) fn preload_service(
1654 &self,
1655 id: ApplicationId,
1656 code: UserServiceCode,
1657 description: ApplicationDescription,
1658 ) -> Result<(), ExecutionError> {
1659 let this = self
1660 .runtime
1661 .0
1662 .as_ref()
1663 .expect("services shouldn't be preloaded while the runtime is being dropped");
1664 let mut this_guard = this.inner();
1665
1666 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1667 entry.insert((code, description));
1668 }
1669
1670 Ok(())
1671 }
1672
1673 pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1675 while let Ok(request) = incoming_requests.recv() {
1676 let ServiceRuntimeRequest::Query {
1677 application_id,
1678 context,
1679 query,
1680 callback,
1681 } = request;
1682
1683 let result = self
1684 .prepare_for_query(context)
1685 .and_then(|()| self.run_query(application_id, query));
1686
1687 if let Err(err) = callback.send(result) {
1688 tracing::debug!(%err, "Receiver for query result has been dropped");
1689 }
1690 }
1691 }
1692
1693 pub(crate) fn prepare_for_query(
1695 &mut self,
1696 new_context: QueryContext,
1697 ) -> Result<(), ExecutionError> {
1698 let expected_context = QueryContext {
1699 local_time: new_context.local_time,
1700 ..self.current_context
1701 };
1702
1703 if new_context != expected_context {
1704 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1705 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1706 } else {
1707 self.handle_mut()
1708 .inner()
1709 .execution_state_sender
1710 .send_request(|callback| ExecutionRequest::SetLocalTime {
1711 local_time: new_context.local_time,
1712 callback,
1713 })?
1714 .recv_response()?;
1715 }
1716 Ok(())
1717 }
1718
1719 pub(crate) fn run_query(
1721 &mut self,
1722 application_id: ApplicationId,
1723 query: Vec<u8>,
1724 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1725 let this = self.handle_mut();
1726 let response = this.try_query_application(application_id, query)?;
1727 let operations = mem::take(&mut this.inner().scheduled_operations);
1728
1729 Ok(QueryOutcome {
1730 response,
1731 operations,
1732 })
1733 }
1734
1735 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1737 self.runtime.0.as_mut().expect(
1738 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1739 )
1740 }
1741}
1742
1743impl ServiceRuntime for ServiceSyncRuntimeHandle {
1744 fn try_query_application(
1746 &mut self,
1747 queried_id: ApplicationId,
1748 argument: Vec<u8>,
1749 ) -> Result<Vec<u8>, ExecutionError> {
1750 let service = {
1751 let mut this = self.inner();
1752
1753 let application = this.load_service_instance(self.clone(), queried_id)?;
1755 this.push_application(ApplicationStatus {
1757 caller_id: None,
1758 id: queried_id,
1759 description: application.description,
1760 signer: None,
1761 });
1762 application.instance
1763 };
1764 let response = service
1765 .try_lock()
1766 .expect("Applications should not have reentrant calls")
1767 .handle_query(argument)?;
1768 self.inner().pop_application();
1769 Ok(response)
1770 }
1771
1772 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1773 let mut this = self.inner();
1774 let application_id = this.current_application().id;
1775
1776 this.scheduled_operations.push(Operation::User {
1777 application_id,
1778 bytes: operation,
1779 });
1780
1781 Ok(())
1782 }
1783
1784 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1785 if let Some(deadline) = self.inner().deadline {
1786 if Instant::now() >= deadline {
1787 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1788 }
1789 }
1790 Ok(())
1791 }
1792}
1793
1794pub enum ServiceRuntimeRequest {
1796 Query {
1797 application_id: ApplicationId,
1798 context: QueryContext,
1799 query: Vec<u8>,
1800 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1801 },
1802}
1803
1804#[derive(Clone, Copy, Debug)]
1806struct ExecutingMessage {
1807 is_bouncing: bool,
1808 origin: ChainId,
1809}
1810
1811impl From<&MessageContext> for ExecutingMessage {
1812 fn from(context: &MessageContext) -> Self {
1813 ExecutingMessage {
1814 is_bouncing: context.is_bouncing,
1815 origin: context.origin,
1816 }
1817 }
1818}
1819
1820pub fn create_bytecode_blobs_sync(
1822 contract: Bytecode,
1823 service: Bytecode,
1824 vm_runtime: VmRuntime,
1825) -> (Vec<Blob>, ModuleId) {
1826 match vm_runtime {
1827 VmRuntime::Wasm => {
1828 let compressed_contract = contract.compress();
1829 let compressed_service = service.compress();
1830 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1831 let service_blob = Blob::new_service_bytecode(compressed_service);
1832 let module_id =
1833 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1834 (vec![contract_blob, service_blob], module_id)
1835 }
1836 VmRuntime::Evm => {
1837 let compressed_contract = contract.compress();
1838 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1839 let module_id = ModuleId::new(
1840 evm_contract_blob.id().hash,
1841 evm_contract_blob.id().hash,
1842 vm_runtime,
1843 );
1844 (vec![evm_contract_blob], module_id)
1845 }
1846 }
1847}