1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15 SendMessageRequest, Timestamp,
16 },
17 ensure, http,
18 identifiers::{
19 Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20 },
21 ownership::ChainOwnership,
22 time::Instant,
23 vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27use tracing::instrument;
28
29use crate::{
30 execution::UserAction,
31 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
32 resources::ResourceController,
33 system::CreateApplicationResult,
34 util::{ReceiverExt, UnboundedSenderExt},
35 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
36 ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
37 OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
38 UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
39};
40
41#[cfg(test)]
42#[path = "unit_tests/runtime_tests.rs"]
43mod tests;
44
45pub trait WithContext {
46 type UserContext;
47 type Code;
48}
49
50impl WithContext for UserContractInstance {
51 type UserContext = Timestamp;
52 type Code = UserContractCode;
53}
54
55impl WithContext for UserServiceInstance {
56 type UserContext = ();
57 type Code = UserServiceCode;
58}
59
60#[cfg(test)]
61impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
62 type UserContext = ();
63 type Code = ();
64}
65
66#[derive(Debug)]
67pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
68
69pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
70
71pub struct ServiceSyncRuntime {
72 runtime: SyncRuntime<UserServiceInstance>,
73 current_context: QueryContext,
74}
75
76#[derive(Debug)]
77pub struct SyncRuntimeHandle<UserInstance: WithContext>(
78 Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
79);
80
81pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
82pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
83
84#[derive(Debug)]
86pub struct SyncRuntimeInternal<UserInstance: WithContext> {
87 chain_id: ChainId,
89 height: BlockHeight,
92 round: Option<u32>,
94 #[debug(skip_if = Option::is_none)]
96 executing_message: Option<ExecutingMessage>,
97
98 execution_state_sender: ExecutionStateSender,
100
101 is_finalizing: bool,
105 applications_to_finalize: Vec<ApplicationId>,
107
108 preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
110 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
112 call_stack: Vec<ApplicationStatus>,
114 active_applications: HashSet<ApplicationId>,
116 scheduled_operations: Vec<Operation>,
118
119 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
121
122 deadline: Option<Instant>,
126
127 #[debug(skip_if = Option::is_none)]
129 refund_grant_to: Option<Account>,
130 resource_controller: ResourceController,
132 user_context: UserInstance::UserContext,
134 allow_application_logs: bool,
136}
137
138#[derive(Debug)]
140struct ApplicationStatus {
141 caller_id: Option<ApplicationId>,
143 id: ApplicationId,
145 description: ApplicationDescription,
147 signer: Option<AccountOwner>,
149}
150
151#[derive(Debug)]
153struct LoadedApplication<Instance> {
154 instance: Arc<Mutex<Instance>>,
155 description: ApplicationDescription,
156}
157
158impl<Instance> LoadedApplication<Instance> {
159 fn new(instance: Instance, description: ApplicationDescription) -> Self {
161 LoadedApplication {
162 instance: Arc::new(Mutex::new(instance)),
163 description,
164 }
165 }
166}
167
168impl<Instance> Clone for LoadedApplication<Instance> {
169 fn clone(&self) -> Self {
172 LoadedApplication {
173 instance: self.instance.clone(),
174 description: self.description.clone(),
175 }
176 }
177}
178
179#[derive(Debug)]
180enum Promise<T> {
181 Ready(T),
182 Pending(Receiver<T>),
183}
184
185impl<T> Promise<T> {
186 fn force(&mut self) -> Result<(), ExecutionError> {
187 if let Promise::Pending(receiver) = self {
188 let value = receiver
189 .recv_ref()
190 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
191 *self = Promise::Ready(value);
192 }
193 Ok(())
194 }
195
196 fn read(self) -> Result<T, ExecutionError> {
197 match self {
198 Promise::Pending(receiver) => {
199 let value = receiver.recv_response()?;
200 Ok(value)
201 }
202 Promise::Ready(value) => Ok(value),
203 }
204 }
205}
206
207#[derive(Debug, Default)]
209struct QueryManager<T> {
210 pending_queries: BTreeMap<u32, Promise<T>>,
212 query_count: u32,
214 active_query_count: u32,
216}
217
218impl<T> QueryManager<T> {
219 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
220 let id = self.query_count;
221 self.pending_queries.insert(id, Promise::Pending(receiver));
222 self.query_count = self
223 .query_count
224 .checked_add(1)
225 .ok_or(ArithmeticError::Overflow)?;
226 self.active_query_count = self
227 .active_query_count
228 .checked_add(1)
229 .ok_or(ArithmeticError::Overflow)?;
230 Ok(id)
231 }
232
233 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
234 let promise = self
235 .pending_queries
236 .remove(&id)
237 .ok_or(ExecutionError::InvalidPromise)?;
238 let value = promise.read()?;
239 self.active_query_count -= 1;
240 Ok(value)
241 }
242
243 fn force_all(&mut self) -> Result<(), ExecutionError> {
244 for promise in self.pending_queries.values_mut() {
245 promise.force()?;
246 }
247 Ok(())
248 }
249}
250
251type Keys = Vec<Vec<u8>>;
252type Value = Vec<u8>;
253type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
254
255#[derive(Debug, Default)]
256struct ViewUserState {
257 contains_key_queries: QueryManager<bool>,
259 contains_keys_queries: QueryManager<Vec<bool>>,
261 read_value_queries: QueryManager<Option<Value>>,
263 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
265 find_keys_queries: QueryManager<Keys>,
267 find_key_values_queries: QueryManager<KeyValues>,
269}
270
271impl ViewUserState {
272 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
273 self.contains_key_queries.force_all()?;
274 self.contains_keys_queries.force_all()?;
275 self.read_value_queries.force_all()?;
276 self.read_multi_values_queries.force_all()?;
277 self.find_keys_queries.force_all()?;
278 self.find_key_values_queries.force_all()?;
279 Ok(())
280 }
281}
282
283impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
284 type Target = SyncRuntimeHandle<UserInstance>;
285
286 fn deref(&self) -> &Self::Target {
287 self.0.as_ref().expect(
288 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
289 )
290 }
291}
292
293impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
294 fn deref_mut(&mut self) -> &mut Self::Target {
295 self.0.as_mut().expect(
296 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
297 )
298 }
299}
300
301impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
302 fn drop(&mut self) {
303 if let Some(handle) = self.0.take() {
306 handle.inner().loaded_applications.clear();
307 }
308 }
309}
310
311impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
312 #[expect(clippy::too_many_arguments)]
313 fn new(
314 chain_id: ChainId,
315 height: BlockHeight,
316 round: Option<u32>,
317 executing_message: Option<ExecutingMessage>,
318 execution_state_sender: ExecutionStateSender,
319 deadline: Option<Instant>,
320 refund_grant_to: Option<Account>,
321 resource_controller: ResourceController,
322 user_context: UserInstance::UserContext,
323 allow_application_logs: bool,
324 ) -> Self {
325 Self {
326 chain_id,
327 height,
328 round,
329 executing_message,
330 execution_state_sender,
331 is_finalizing: false,
332 applications_to_finalize: Vec::new(),
333 preloaded_applications: HashMap::new(),
334 loaded_applications: HashMap::new(),
335 call_stack: Vec::new(),
336 active_applications: HashSet::new(),
337 view_user_states: BTreeMap::new(),
338 deadline,
339 refund_grant_to,
340 resource_controller,
341 scheduled_operations: Vec::new(),
342 user_context,
343 allow_application_logs,
344 }
345 }
346
347 fn current_application(&self) -> &ApplicationStatus {
355 self.call_stack
356 .last()
357 .expect("Call stack is unexpectedly empty")
358 }
359
360 fn push_application(&mut self, status: ApplicationStatus) {
364 self.active_applications.insert(status.id);
365 self.call_stack.push(status);
366 }
367
368 fn pop_application(&mut self) -> ApplicationStatus {
376 let status = self
377 .call_stack
378 .pop()
379 .expect("Can't remove application from empty call stack");
380 assert!(self.active_applications.remove(&status.id));
381 status
382 }
383
384 fn check_for_reentrancy(
388 &mut self,
389 application_id: ApplicationId,
390 ) -> Result<(), ExecutionError> {
391 ensure!(
392 !self.active_applications.contains(&application_id),
393 ExecutionError::ReentrantCall(application_id)
394 );
395 Ok(())
396 }
397}
398
399impl SyncRuntimeInternal<UserContractInstance> {
400 #[instrument(skip_all, fields(application_id = %id))]
402 fn load_contract_instance(
403 &mut self,
404 this: SyncRuntimeHandle<UserContractInstance>,
405 id: ApplicationId,
406 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
407 match self.loaded_applications.entry(id) {
408 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
409
410 hash_map::Entry::Vacant(entry) => {
411 let (code, description) = match self.preloaded_applications.entry(id) {
414 #[cfg(web)]
416 hash_map::Entry::Vacant(_) => {
417 drop(this);
418 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
419 id,
420 )));
421 }
422 #[cfg(not(web))]
423 hash_map::Entry::Vacant(entry) => {
424 let (code, description) = self
425 .execution_state_sender
426 .send_request(move |callback| ExecutionRequest::LoadContract {
427 id,
428 callback,
429 })?
430 .recv_response()?;
431 entry.insert((code, description)).clone()
432 }
433 hash_map::Entry::Occupied(entry) => entry.get().clone(),
434 };
435 let instance = code.instantiate(this)?;
436
437 self.applications_to_finalize.push(id);
438 Ok(entry
439 .insert(LoadedApplication::new(instance, description))
440 .clone())
441 }
442 }
443 }
444
445 fn prepare_for_call(
447 &mut self,
448 this: ContractSyncRuntimeHandle,
449 authenticated: bool,
450 callee_id: ApplicationId,
451 ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
452 self.check_for_reentrancy(callee_id)?;
453
454 ensure!(
455 !self.is_finalizing,
456 ExecutionError::CrossApplicationCallInFinalize {
457 caller_id: Box::new(self.current_application().id),
458 callee_id: Box::new(callee_id),
459 }
460 );
461
462 let application = self.load_contract_instance(this, callee_id)?;
464
465 let caller = self.current_application();
466 let caller_id = caller.id;
467 let caller_signer = caller.signer;
468 let authenticated_signer = match caller_signer {
470 Some(signer) if authenticated => Some(signer),
471 _ => None,
472 };
473 let authenticated_caller_id = authenticated.then_some(caller_id);
474 self.push_application(ApplicationStatus {
475 caller_id: authenticated_caller_id,
476 id: callee_id,
477 description: application.description,
478 signer: authenticated_signer,
480 });
481 Ok(application.instance)
482 }
483
484 fn finish_call(&mut self) -> Result<(), ExecutionError> {
486 self.pop_application();
487 Ok(())
488 }
489
490 fn run_service_oracle_query(
492 &mut self,
493 application_id: ApplicationId,
494 query: Vec<u8>,
495 ) -> Result<Vec<u8>, ExecutionError> {
496 let timeout = self
497 .resource_controller
498 .remaining_service_oracle_execution_time()?;
499 let execution_start = Instant::now();
500 let deadline = Some(execution_start + timeout);
501 let response = self
502 .execution_state_sender
503 .send_request(|callback| ExecutionRequest::QueryServiceOracle {
504 deadline,
505 application_id,
506 next_block_height: self.height,
507 query,
508 callback,
509 })?
510 .recv_response()?;
511
512 self.resource_controller
513 .track_service_oracle_execution(execution_start.elapsed())?;
514 self.resource_controller
515 .track_service_oracle_response(response.len())?;
516
517 Ok(response)
518 }
519}
520
521impl SyncRuntimeInternal<UserServiceInstance> {
522 fn load_service_instance(
524 &mut self,
525 this: SyncRuntimeHandle<UserServiceInstance>,
526 id: ApplicationId,
527 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
528 match self.loaded_applications.entry(id) {
529 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
530
531 hash_map::Entry::Vacant(entry) => {
532 let (code, description) = match self.preloaded_applications.entry(id) {
535 #[cfg(web)]
537 hash_map::Entry::Vacant(_) => {
538 drop(this);
539 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
540 id,
541 )));
542 }
543 #[cfg(not(web))]
544 hash_map::Entry::Vacant(entry) => {
545 let (code, description) = self
546 .execution_state_sender
547 .send_request(move |callback| ExecutionRequest::LoadService {
548 id,
549 callback,
550 })?
551 .recv_response()?;
552 entry.insert((code, description)).clone()
553 }
554 hash_map::Entry::Occupied(entry) => entry.get().clone(),
555 };
556 let instance = code.instantiate(this)?;
557
558 self.applications_to_finalize.push(id);
559 Ok(entry
560 .insert(LoadedApplication::new(instance, description))
561 .clone())
562 }
563 }
564 }
565}
566
567impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
568 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
569 let handle = self.0.take().expect(
570 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
571 );
572 let runtime = Arc::into_inner(handle.0)?
573 .into_inner()
574 .expect("`SyncRuntime` should run in a single thread which should not panic");
575 Some(runtime)
576 }
577}
578
579impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
580 for SyncRuntimeHandle<UserInstance>
581{
582 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
583 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
584 }
585}
586
587impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
588 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
589 self.0
590 .try_lock()
591 .expect("Synchronous runtimes run on a single execution thread")
592 }
593}
594
595impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
596where
597 Self: ContractOrServiceRuntime,
598{
599 type Read = ();
600 type ReadValueBytes = u32;
601 type ContainsKey = u32;
602 type ContainsKeys = u32;
603 type ReadMultiValuesBytes = u32;
604 type FindKeysByPrefix = u32;
605 type FindKeyValuesByPrefix = u32;
606
607 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
608 let mut this = self.inner();
609 let chain_id = this.chain_id;
610 this.resource_controller.track_runtime_chain_id()?;
611 Ok(chain_id)
612 }
613
614 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
615 let mut this = self.inner();
616 let height = this.height;
617 this.resource_controller.track_runtime_block_height()?;
618 Ok(height)
619 }
620
621 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
622 let mut this = self.inner();
623 let application_id = this.current_application().id;
624 this.resource_controller.track_runtime_application_id()?;
625 Ok(application_id)
626 }
627
628 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
629 let mut this = self.inner();
630 let application_creator_chain_id = this.current_application().description.creator_chain_id;
631 this.resource_controller.track_runtime_application_id()?;
632 Ok(application_creator_chain_id)
633 }
634
635 fn read_application_description(
636 &mut self,
637 application_id: ApplicationId,
638 ) -> Result<ApplicationDescription, ExecutionError> {
639 let mut this = self.inner();
640 let description = this
641 .execution_state_sender
642 .send_request(|callback| ExecutionRequest::ReadApplicationDescription {
643 application_id,
644 callback,
645 })?
646 .recv_response()?;
647 this.resource_controller
648 .track_runtime_application_description(&description)?;
649 Ok(description)
650 }
651
652 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
653 let mut this = self.inner();
654 let parameters = this.current_application().description.parameters.clone();
655 this.resource_controller
656 .track_runtime_application_parameters(¶meters)?;
657 Ok(parameters)
658 }
659
660 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
661 let mut this = self.inner();
662 let timestamp = this
663 .execution_state_sender
664 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
665 .recv_response()?;
666 this.resource_controller.track_runtime_timestamp()?;
667 Ok(timestamp)
668 }
669
670 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
671 let mut this = self.inner();
672 let balance = this
673 .execution_state_sender
674 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
675 .recv_response()?;
676 this.resource_controller.track_runtime_balance()?;
677 Ok(balance)
678 }
679
680 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
681 let mut this = self.inner();
682 let balance = this
683 .execution_state_sender
684 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
685 .recv_response()?;
686 this.resource_controller.track_runtime_balance()?;
687 Ok(balance)
688 }
689
690 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
691 let mut this = self.inner();
692 let owner_balances = this
693 .execution_state_sender
694 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
695 .recv_response()?;
696 this.resource_controller
697 .track_runtime_owner_balances(&owner_balances)?;
698 Ok(owner_balances)
699 }
700
701 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
702 let mut this = self.inner();
703 let owners = this
704 .execution_state_sender
705 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
706 .recv_response()?;
707 this.resource_controller.track_runtime_owners(&owners)?;
708 Ok(owners)
709 }
710
711 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
712 let mut this = self.inner();
713 let chain_ownership = this
714 .execution_state_sender
715 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
716 .recv_response()?;
717 this.resource_controller
718 .track_runtime_chain_ownership(&chain_ownership)?;
719 Ok(chain_ownership)
720 }
721
722 fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
723 let this = self.inner();
724 let application_permissions = this
725 .execution_state_sender
726 .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
727 .recv_response()?;
728 Ok(application_permissions)
729 }
730
731 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
732 let mut this = self.inner();
733 let id = this.current_application().id;
734 this.resource_controller.track_read_operation()?;
735 let receiver = this
736 .execution_state_sender
737 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
738 let state = this.view_user_states.entry(id).or_default();
739 state.contains_key_queries.register(receiver)
740 }
741
742 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
743 let mut this = self.inner();
744 let id = this.current_application().id;
745 let state = this.view_user_states.entry(id).or_default();
746 let value = state.contains_key_queries.wait(*promise)?;
747 Ok(value)
748 }
749
750 fn contains_keys_new(
751 &mut self,
752 keys: Vec<Vec<u8>>,
753 ) -> Result<Self::ContainsKeys, ExecutionError> {
754 let mut this = self.inner();
755 let id = this.current_application().id;
756 this.resource_controller.track_read_operation()?;
757 let receiver = this
758 .execution_state_sender
759 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
760 let state = this.view_user_states.entry(id).or_default();
761 state.contains_keys_queries.register(receiver)
762 }
763
764 fn contains_keys_wait(
765 &mut self,
766 promise: &Self::ContainsKeys,
767 ) -> Result<Vec<bool>, ExecutionError> {
768 let mut this = self.inner();
769 let id = this.current_application().id;
770 let state = this.view_user_states.entry(id).or_default();
771 let value = state.contains_keys_queries.wait(*promise)?;
772 Ok(value)
773 }
774
775 fn read_multi_values_bytes_new(
776 &mut self,
777 keys: Vec<Vec<u8>>,
778 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
779 let mut this = self.inner();
780 let id = this.current_application().id;
781 this.resource_controller.track_read_operation()?;
782 let receiver = this.execution_state_sender.send_request(move |callback| {
783 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
784 })?;
785 let state = this.view_user_states.entry(id).or_default();
786 state.read_multi_values_queries.register(receiver)
787 }
788
789 fn read_multi_values_bytes_wait(
790 &mut self,
791 promise: &Self::ReadMultiValuesBytes,
792 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
793 let mut this = self.inner();
794 let id = this.current_application().id;
795 let state = this.view_user_states.entry(id).or_default();
796 let values = state.read_multi_values_queries.wait(*promise)?;
797 for value in &values {
798 if let Some(value) = &value {
799 this.resource_controller
800 .track_bytes_read(value.len() as u64)?;
801 }
802 }
803 Ok(values)
804 }
805
806 fn read_value_bytes_new(
807 &mut self,
808 key: Vec<u8>,
809 ) -> Result<Self::ReadValueBytes, ExecutionError> {
810 let mut this = self.inner();
811 let id = this.current_application().id;
812 this.resource_controller.track_read_operation()?;
813 let receiver = this
814 .execution_state_sender
815 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
816 let state = this.view_user_states.entry(id).or_default();
817 state.read_value_queries.register(receiver)
818 }
819
820 fn read_value_bytes_wait(
821 &mut self,
822 promise: &Self::ReadValueBytes,
823 ) -> Result<Option<Vec<u8>>, ExecutionError> {
824 let mut this = self.inner();
825 let id = this.current_application().id;
826 let value = {
827 let state = this.view_user_states.entry(id).or_default();
828 state.read_value_queries.wait(*promise)?
829 };
830 if let Some(value) = &value {
831 this.resource_controller
832 .track_bytes_read(value.len() as u64)?;
833 }
834 Ok(value)
835 }
836
837 fn find_keys_by_prefix_new(
838 &mut self,
839 key_prefix: Vec<u8>,
840 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
841 let mut this = self.inner();
842 let id = this.current_application().id;
843 this.resource_controller.track_read_operation()?;
844 let receiver = this.execution_state_sender.send_request(move |callback| {
845 ExecutionRequest::FindKeysByPrefix {
846 id,
847 key_prefix,
848 callback,
849 }
850 })?;
851 let state = this.view_user_states.entry(id).or_default();
852 state.find_keys_queries.register(receiver)
853 }
854
855 fn find_keys_by_prefix_wait(
856 &mut self,
857 promise: &Self::FindKeysByPrefix,
858 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
859 let mut this = self.inner();
860 let id = this.current_application().id;
861 let keys = {
862 let state = this.view_user_states.entry(id).or_default();
863 state.find_keys_queries.wait(*promise)?
864 };
865 let mut read_size = 0;
866 for key in &keys {
867 read_size += key.len();
868 }
869 this.resource_controller
870 .track_bytes_read(read_size as u64)?;
871 Ok(keys)
872 }
873
874 fn find_key_values_by_prefix_new(
875 &mut self,
876 key_prefix: Vec<u8>,
877 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
878 let mut this = self.inner();
879 let id = this.current_application().id;
880 this.resource_controller.track_read_operation()?;
881 let receiver = this.execution_state_sender.send_request(move |callback| {
882 ExecutionRequest::FindKeyValuesByPrefix {
883 id,
884 key_prefix,
885 callback,
886 }
887 })?;
888 let state = this.view_user_states.entry(id).or_default();
889 state.find_key_values_queries.register(receiver)
890 }
891
892 fn find_key_values_by_prefix_wait(
893 &mut self,
894 promise: &Self::FindKeyValuesByPrefix,
895 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
896 let mut this = self.inner();
897 let id = this.current_application().id;
898 let state = this.view_user_states.entry(id).or_default();
899 let key_values = state.find_key_values_queries.wait(*promise)?;
900 let mut read_size = 0;
901 for (key, value) in &key_values {
902 read_size += key.len() + value.len();
903 }
904 this.resource_controller
905 .track_bytes_read(read_size as u64)?;
906 Ok(key_values)
907 }
908
909 fn perform_http_request(
910 &mut self,
911 request: http::Request,
912 ) -> Result<http::Response, ExecutionError> {
913 let mut this = self.inner();
914 let app_permissions = this
915 .execution_state_sender
916 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
917 .recv_response()?;
918
919 let app_id = this.current_application().id;
920 ensure!(
921 app_permissions.can_make_http_requests(&app_id),
922 ExecutionError::UnauthorizedApplication(app_id)
923 );
924
925 this.resource_controller.track_http_request()?;
926
927 let response = this
928 .execution_state_sender
929 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
930 request,
931 http_responses_are_oracle_responses:
932 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
933 callback,
934 })?
935 .recv_response()?;
936 Ok(response)
937 }
938
939 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
940 let this = self.inner();
941 this.execution_state_sender
942 .send_request(|callback| ExecutionRequest::AssertBefore {
943 timestamp,
944 callback,
945 })?
946 .recv_response()?
947 }
948
949 fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
950 let this = self.inner();
951 let blob_id = hash.into();
952 let content = this
953 .execution_state_sender
954 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
955 .recv_response()?;
956 Ok(content.into_vec_or_clone())
957 }
958
959 fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
960 let this = self.inner();
961 let blob_id = hash.into();
962 this.execution_state_sender
963 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
964 .recv_response()?;
965 Ok(())
966 }
967
968 fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
969 Ok(self.inner().allow_application_logs)
970 }
971
972 #[cfg(web)]
973 fn send_log(&mut self, message: String, level: tracing::log::Level) {
974 let this = self.inner();
975 this.execution_state_sender
977 .unbounded_send(ExecutionRequest::Log { message, level })
978 .ok();
979 }
980}
981
982trait ContractOrServiceRuntime {
985 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
991}
992
993impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
994 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
995}
996
997impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
998 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
999}
1000
1001impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
1002 fn clone(&self) -> Self {
1003 SyncRuntimeHandle(self.0.clone())
1004 }
1005}
1006
1007impl ContractSyncRuntime {
1008 pub(crate) fn new(
1009 execution_state_sender: ExecutionStateSender,
1010 chain_id: ChainId,
1011 refund_grant_to: Option<Account>,
1012 resource_controller: ResourceController,
1013 action: &UserAction,
1014 allow_application_logs: bool,
1015 ) -> Self {
1016 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1017 SyncRuntimeInternal::new(
1018 chain_id,
1019 action.height(),
1020 action.round(),
1021 if let UserAction::Message(context, _) = action {
1022 Some(context.into())
1023 } else {
1024 None
1025 },
1026 execution_state_sender,
1027 None,
1028 refund_grant_to,
1029 resource_controller,
1030 action.timestamp(),
1031 allow_application_logs,
1032 ),
1033 )))
1034 }
1035
1036 pub(crate) fn preload_contract(
1038 &self,
1039 id: ApplicationId,
1040 code: UserContractCode,
1041 description: ApplicationDescription,
1042 ) -> Result<(), ExecutionError> {
1043 let this = self
1044 .0
1045 .as_ref()
1046 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1047 let mut this_guard = this.inner();
1048
1049 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1050 entry.insert((code, description));
1051 }
1052
1053 Ok(())
1054 }
1055
1056 pub(crate) fn run_action(
1058 mut self,
1059 application_id: ApplicationId,
1060 chain_id: ChainId,
1061 action: UserAction,
1062 ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1063 let result = self
1064 .deref_mut()
1065 .run_action(application_id, chain_id, action)?;
1066 let runtime = self
1067 .into_inner()
1068 .expect("Runtime clones should have been freed by now");
1069
1070 Ok((result, runtime.resource_controller))
1071 }
1072}
1073
1074impl ContractSyncRuntimeHandle {
1075 #[instrument(skip_all, fields(application_id = %application_id))]
1076 fn run_action(
1077 &mut self,
1078 application_id: ApplicationId,
1079 chain_id: ChainId,
1080 action: UserAction,
1081 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1082 let finalize_context = FinalizeContext {
1083 authenticated_signer: action.signer(),
1084 chain_id,
1085 height: action.height(),
1086 round: action.round(),
1087 };
1088
1089 {
1090 let runtime = self.inner();
1091 assert_eq!(runtime.chain_id, chain_id);
1092 assert_eq!(runtime.height, action.height());
1093 }
1094
1095 let signer = action.signer();
1096 let closure = move |code: &mut UserContractInstance| match action {
1097 UserAction::Instantiate(_context, argument) => {
1098 code.instantiate(argument).map(|()| None)
1099 }
1100 UserAction::Operation(_context, operation) => {
1101 code.execute_operation(operation).map(Option::Some)
1102 }
1103 UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1104 UserAction::ProcessStreams(_context, updates) => {
1105 code.process_streams(updates).map(|()| None)
1106 }
1107 };
1108
1109 let result = self.execute(application_id, signer, closure)?;
1110 self.finalize(finalize_context)?;
1111 Ok(result)
1112 }
1113
1114 #[instrument(skip_all)]
1116 fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1117 let applications = mem::take(&mut self.inner().applications_to_finalize)
1118 .into_iter()
1119 .rev();
1120
1121 self.inner().is_finalizing = true;
1122
1123 for application in applications {
1124 self.execute(application, context.authenticated_signer, |contract| {
1125 contract.finalize().map(|_| None)
1126 })?;
1127 self.inner().loaded_applications.remove(&application);
1128 }
1129
1130 Ok(())
1131 }
1132
1133 #[instrument(skip_all, fields(application_id = %application_id))]
1135 fn execute(
1136 &mut self,
1137 application_id: ApplicationId,
1138 signer: Option<AccountOwner>,
1139 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1140 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1141 let contract = {
1142 let mut runtime = self.inner();
1143 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1144
1145 let status = ApplicationStatus {
1146 caller_id: None,
1147 id: application_id,
1148 description: application.description.clone(),
1149 signer,
1150 };
1151
1152 runtime.push_application(status);
1153
1154 application
1155 };
1156
1157 let result = closure(
1158 &mut contract
1159 .instance
1160 .try_lock()
1161 .expect("Application should not be already executing"),
1162 )?;
1163
1164 let mut runtime = self.inner();
1165 let application_status = runtime.pop_application();
1166 assert_eq!(application_status.caller_id, None);
1167 assert_eq!(application_status.id, application_id);
1168 assert_eq!(application_status.description, contract.description);
1169 assert_eq!(application_status.signer, signer);
1170 assert!(runtime.call_stack.is_empty());
1171
1172 Ok(result)
1173 }
1174}
1175
1176impl ContractRuntime for ContractSyncRuntimeHandle {
1177 fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1178 let this = self.inner();
1179 Ok(this.current_application().signer)
1180 }
1181
1182 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1183 Ok(self
1184 .inner()
1185 .executing_message
1186 .map(|metadata| metadata.is_bouncing))
1187 }
1188
1189 fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1190 Ok(self
1191 .inner()
1192 .executing_message
1193 .map(|metadata| metadata.origin))
1194 }
1195
1196 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1197 let this = self.inner();
1198 if this.call_stack.len() <= 1 {
1199 return Ok(None);
1200 }
1201 Ok(this.current_application().caller_id)
1202 }
1203
1204 fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1205 Ok(match vm_runtime {
1206 VmRuntime::Wasm => {
1207 self.inner()
1208 .resource_controller
1209 .policy()
1210 .maximum_wasm_fuel_per_block
1211 }
1212 VmRuntime::Evm => {
1213 self.inner()
1214 .resource_controller
1215 .policy()
1216 .maximum_evm_fuel_per_block
1217 }
1218 })
1219 }
1220
1221 fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1222 Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1223 }
1224
1225 fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1226 let mut this = self.inner();
1227 this.resource_controller.track_fuel(fuel, vm_runtime)
1228 }
1229
1230 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1231 let mut this = self.inner();
1232 let application = this.current_application();
1233 let application_id = application.id;
1234 let authenticated_signer = application.signer;
1235 let mut refund_grant_to = this.refund_grant_to;
1236
1237 let grant = this
1238 .resource_controller
1239 .policy()
1240 .total_price(&message.grant)?;
1241 if grant.is_zero() {
1242 refund_grant_to = None;
1243 } else {
1244 this.resource_controller.track_grant(grant)?;
1245 }
1246 let kind = if message.is_tracked {
1247 MessageKind::Tracked
1248 } else {
1249 MessageKind::Simple
1250 };
1251
1252 this.execution_state_sender
1253 .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1254 message: OutgoingMessage {
1255 destination: message.destination,
1256 authenticated_signer,
1257 refund_grant_to,
1258 grant,
1259 kind,
1260 message: Message::User {
1261 application_id,
1262 bytes: message.message,
1263 },
1264 },
1265 callback,
1266 })?
1267 .recv_response()?;
1268
1269 Ok(())
1270 }
1271
1272 fn transfer(
1273 &mut self,
1274 source: AccountOwner,
1275 destination: Account,
1276 amount: Amount,
1277 ) -> Result<(), ExecutionError> {
1278 let this = self.inner();
1279 let current_application = this.current_application();
1280 let application_id = current_application.id;
1281 let signer = current_application.signer;
1282
1283 this.execution_state_sender
1284 .send_request(|callback| ExecutionRequest::Transfer {
1285 source,
1286 destination,
1287 amount,
1288 signer,
1289 application_id,
1290 callback,
1291 })?
1292 .recv_response()?;
1293 Ok(())
1294 }
1295
1296 fn claim(
1297 &mut self,
1298 source: Account,
1299 destination: Account,
1300 amount: Amount,
1301 ) -> Result<(), ExecutionError> {
1302 let this = self.inner();
1303 let current_application = this.current_application();
1304 let application_id = current_application.id;
1305 let signer = current_application.signer;
1306
1307 this.execution_state_sender
1308 .send_request(|callback| ExecutionRequest::Claim {
1309 source,
1310 destination,
1311 amount,
1312 signer,
1313 application_id,
1314 callback,
1315 })?
1316 .recv_response()?;
1317 Ok(())
1318 }
1319
1320 fn try_call_application(
1321 &mut self,
1322 authenticated: bool,
1323 callee_id: ApplicationId,
1324 argument: Vec<u8>,
1325 ) -> Result<Vec<u8>, ExecutionError> {
1326 let contract = self
1327 .inner()
1328 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1329
1330 let value = contract
1331 .try_lock()
1332 .expect("Applications should not have reentrant calls")
1333 .execute_operation(argument)?;
1334
1335 self.inner().finish_call()?;
1336
1337 Ok(value)
1338 }
1339
1340 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1341 let mut this = self.inner();
1342 ensure!(
1343 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1344 ExecutionError::StreamNameTooLong
1345 );
1346 let application_id = GenericApplicationId::User(this.current_application().id);
1347 let stream_id = StreamId {
1348 stream_name,
1349 application_id,
1350 };
1351 let value_len = value.len() as u64;
1352 let index = this
1353 .execution_state_sender
1354 .send_request(|callback| ExecutionRequest::Emit {
1355 stream_id,
1356 value,
1357 callback,
1358 })?
1359 .recv_response()?;
1360 this.resource_controller.track_bytes_written(value_len)?;
1362 Ok(index)
1363 }
1364
1365 fn read_event(
1366 &mut self,
1367 chain_id: ChainId,
1368 stream_name: StreamName,
1369 index: u32,
1370 ) -> Result<Vec<u8>, ExecutionError> {
1371 let mut this = self.inner();
1372 ensure!(
1373 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1374 ExecutionError::StreamNameTooLong
1375 );
1376 let application_id = GenericApplicationId::User(this.current_application().id);
1377 let stream_id = StreamId {
1378 stream_name,
1379 application_id,
1380 };
1381 let event_id = EventId {
1382 stream_id,
1383 index,
1384 chain_id,
1385 };
1386 let event = this
1387 .execution_state_sender
1388 .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1389 .recv_response()?;
1390 this.resource_controller
1392 .track_bytes_read(event.len() as u64)?;
1393 Ok(event)
1394 }
1395
1396 fn subscribe_to_events(
1397 &mut self,
1398 chain_id: ChainId,
1399 application_id: ApplicationId,
1400 stream_name: StreamName,
1401 ) -> Result<(), ExecutionError> {
1402 let this = self.inner();
1403 ensure!(
1404 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1405 ExecutionError::StreamNameTooLong
1406 );
1407 let stream_id = StreamId {
1408 stream_name,
1409 application_id: application_id.into(),
1410 };
1411 let subscriber_app_id = this.current_application().id;
1412 this.execution_state_sender
1413 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1414 chain_id,
1415 stream_id,
1416 subscriber_app_id,
1417 callback,
1418 })?
1419 .recv_response()?;
1420 Ok(())
1421 }
1422
1423 fn unsubscribe_from_events(
1424 &mut self,
1425 chain_id: ChainId,
1426 application_id: ApplicationId,
1427 stream_name: StreamName,
1428 ) -> Result<(), ExecutionError> {
1429 let this = self.inner();
1430 ensure!(
1431 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1432 ExecutionError::StreamNameTooLong
1433 );
1434 let stream_id = StreamId {
1435 stream_name,
1436 application_id: application_id.into(),
1437 };
1438 let subscriber_app_id = this.current_application().id;
1439 this.execution_state_sender
1440 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1441 chain_id,
1442 stream_id,
1443 subscriber_app_id,
1444 callback,
1445 })?
1446 .recv_response()?;
1447 Ok(())
1448 }
1449
1450 fn query_service(
1451 &mut self,
1452 application_id: ApplicationId,
1453 query: Vec<u8>,
1454 ) -> Result<Vec<u8>, ExecutionError> {
1455 let mut this = self.inner();
1456
1457 let app_permissions = this
1458 .execution_state_sender
1459 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1460 .recv_response()?;
1461
1462 let app_id = this.current_application().id;
1463 ensure!(
1464 app_permissions.can_call_services(&app_id),
1465 ExecutionError::UnauthorizedApplication(app_id)
1466 );
1467
1468 this.resource_controller.track_service_oracle_call()?;
1469
1470 this.run_service_oracle_query(application_id, query)
1471 }
1472
1473 fn open_chain(
1474 &mut self,
1475 ownership: ChainOwnership,
1476 application_permissions: ApplicationPermissions,
1477 balance: Amount,
1478 ) -> Result<ChainId, ExecutionError> {
1479 let parent_id = self.inner().chain_id;
1480 let block_height = self.block_height()?;
1481
1482 let timestamp = self.inner().user_context;
1483
1484 let chain_id = self
1485 .inner()
1486 .execution_state_sender
1487 .send_request(|callback| ExecutionRequest::OpenChain {
1488 ownership,
1489 balance,
1490 parent_id,
1491 block_height,
1492 timestamp,
1493 application_permissions,
1494 callback,
1495 })?
1496 .recv_response()?;
1497
1498 Ok(chain_id)
1499 }
1500
1501 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1502 let this = self.inner();
1503 let application_id = this.current_application().id;
1504 this.execution_state_sender
1505 .send_request(|callback| ExecutionRequest::CloseChain {
1506 application_id,
1507 callback,
1508 })?
1509 .recv_response()?
1510 }
1511
1512 fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1513 let this = self.inner();
1514 let application_id = this.current_application().id;
1515 this.execution_state_sender
1516 .send_request(|callback| ExecutionRequest::ChangeOwnership {
1517 application_id,
1518 ownership,
1519 callback,
1520 })?
1521 .recv_response()?
1522 }
1523
1524 fn change_application_permissions(
1525 &mut self,
1526 application_permissions: ApplicationPermissions,
1527 ) -> Result<(), ExecutionError> {
1528 let this = self.inner();
1529 let application_id = this.current_application().id;
1530 this.execution_state_sender
1531 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1532 application_id,
1533 application_permissions,
1534 callback,
1535 })?
1536 .recv_response()?
1537 }
1538
1539 fn create_application(
1540 &mut self,
1541 module_id: ModuleId,
1542 parameters: Vec<u8>,
1543 argument: Vec<u8>,
1544 required_application_ids: Vec<ApplicationId>,
1545 ) -> Result<ApplicationId, ExecutionError> {
1546 let chain_id = self.inner().chain_id;
1547 let block_height = self.block_height()?;
1548
1549 let CreateApplicationResult { app_id } = self
1550 .inner()
1551 .execution_state_sender
1552 .send_request(move |callback| ExecutionRequest::CreateApplication {
1553 chain_id,
1554 block_height,
1555 module_id,
1556 parameters,
1557 required_application_ids,
1558 callback,
1559 })?
1560 .recv_response()??;
1561
1562 let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1563
1564 contract
1565 .try_lock()
1566 .expect("Applications should not have reentrant calls")
1567 .instantiate(argument)?;
1568
1569 self.inner().finish_call()?;
1570
1571 Ok(app_id)
1572 }
1573
1574 fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1575 let blob = Blob::new_data(bytes);
1576 let blob_id = blob.id();
1577 let this = self.inner();
1578 this.execution_state_sender
1579 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1580 .recv_response()?;
1581 Ok(DataBlobHash(blob_id.hash))
1582 }
1583
1584 fn publish_module(
1585 &mut self,
1586 contract: Bytecode,
1587 service: Bytecode,
1588 vm_runtime: VmRuntime,
1589 ) -> Result<ModuleId, ExecutionError> {
1590 let (blobs, module_id) =
1591 crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1592 let this = self.inner();
1593 for blob in blobs {
1594 this.execution_state_sender
1595 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1596 .recv_response()?;
1597 }
1598 Ok(module_id)
1599 }
1600
1601 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1602 let this = self.inner();
1603 let round = this.round;
1604 this.execution_state_sender
1605 .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1606 .recv_response()
1607 }
1608
1609 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1610 let mut this = self.inner();
1611 let id = this.current_application().id;
1612 let state = this.view_user_states.entry(id).or_default();
1613 state.force_all_pending_queries()?;
1614 this.resource_controller.track_write_operations(
1615 batch
1616 .num_operations()
1617 .try_into()
1618 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1619 )?;
1620 this.resource_controller
1621 .track_bytes_written(batch.size() as u64)?;
1622 this.execution_state_sender
1623 .send_request(|callback| ExecutionRequest::WriteBatch {
1624 id,
1625 batch,
1626 callback,
1627 })?
1628 .recv_response()?;
1629 Ok(())
1630 }
1631}
1632
1633impl ServiceSyncRuntime {
1634 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1636 Self::new_with_deadline(execution_state_sender, context, None)
1637 }
1638
1639 pub fn new_with_deadline(
1641 execution_state_sender: ExecutionStateSender,
1642 context: QueryContext,
1643 deadline: Option<Instant>,
1644 ) -> Self {
1645 let allow_application_logs = execution_state_sender
1647 .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1648 .ok()
1649 .and_then(|receiver| receiver.recv_response().ok())
1650 .unwrap_or(false);
1651
1652 let runtime = SyncRuntime(Some(
1653 SyncRuntimeInternal::new(
1654 context.chain_id,
1655 context.next_block_height,
1656 None,
1657 None,
1658 execution_state_sender,
1659 deadline,
1660 None,
1661 ResourceController::default(),
1662 (),
1663 allow_application_logs,
1664 )
1665 .into(),
1666 ));
1667
1668 ServiceSyncRuntime {
1669 runtime,
1670 current_context: context,
1671 }
1672 }
1673
1674 pub(crate) fn preload_service(
1676 &self,
1677 id: ApplicationId,
1678 code: UserServiceCode,
1679 description: ApplicationDescription,
1680 ) -> Result<(), ExecutionError> {
1681 let this = self
1682 .runtime
1683 .0
1684 .as_ref()
1685 .expect("services shouldn't be preloaded while the runtime is being dropped");
1686 let mut this_guard = this.inner();
1687
1688 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1689 entry.insert((code, description));
1690 }
1691
1692 Ok(())
1693 }
1694
1695 pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1697 while let Ok(request) = incoming_requests.recv() {
1698 let ServiceRuntimeRequest::Query {
1699 application_id,
1700 context,
1701 query,
1702 callback,
1703 } = request;
1704
1705 let result = self
1706 .prepare_for_query(context)
1707 .and_then(|()| self.run_query(application_id, query));
1708
1709 if let Err(err) = callback.send(result) {
1710 tracing::debug!(%err, "Receiver for query result has been dropped");
1711 }
1712 }
1713 }
1714
1715 pub(crate) fn prepare_for_query(
1717 &mut self,
1718 new_context: QueryContext,
1719 ) -> Result<(), ExecutionError> {
1720 let expected_context = QueryContext {
1721 local_time: new_context.local_time,
1722 ..self.current_context
1723 };
1724
1725 if new_context != expected_context {
1726 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1727 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1728 } else {
1729 self.handle_mut()
1730 .inner()
1731 .execution_state_sender
1732 .send_request(|callback| ExecutionRequest::SetLocalTime {
1733 local_time: new_context.local_time,
1734 callback,
1735 })?
1736 .recv_response()?;
1737 }
1738 Ok(())
1739 }
1740
1741 pub(crate) fn run_query(
1743 &mut self,
1744 application_id: ApplicationId,
1745 query: Vec<u8>,
1746 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1747 let this = self.handle_mut();
1748 let response = this.try_query_application(application_id, query)?;
1749 let operations = mem::take(&mut this.inner().scheduled_operations);
1750
1751 Ok(QueryOutcome {
1752 response,
1753 operations,
1754 })
1755 }
1756
1757 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1759 self.runtime.0.as_mut().expect(
1760 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1761 )
1762 }
1763}
1764
1765impl ServiceRuntime for ServiceSyncRuntimeHandle {
1766 fn try_query_application(
1768 &mut self,
1769 queried_id: ApplicationId,
1770 argument: Vec<u8>,
1771 ) -> Result<Vec<u8>, ExecutionError> {
1772 let service = {
1773 let mut this = self.inner();
1774
1775 let application = this.load_service_instance(self.clone(), queried_id)?;
1777 this.push_application(ApplicationStatus {
1779 caller_id: None,
1780 id: queried_id,
1781 description: application.description,
1782 signer: None,
1783 });
1784 application.instance
1785 };
1786 let response = service
1787 .try_lock()
1788 .expect("Applications should not have reentrant calls")
1789 .handle_query(argument)?;
1790 self.inner().pop_application();
1791 Ok(response)
1792 }
1793
1794 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1795 let mut this = self.inner();
1796 let application_id = this.current_application().id;
1797
1798 this.scheduled_operations.push(Operation::User {
1799 application_id,
1800 bytes: operation,
1801 });
1802
1803 Ok(())
1804 }
1805
1806 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1807 if let Some(deadline) = self.inner().deadline {
1808 if Instant::now() >= deadline {
1809 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1810 }
1811 }
1812 Ok(())
1813 }
1814}
1815
1816pub enum ServiceRuntimeRequest {
1818 Query {
1819 application_id: ApplicationId,
1820 context: QueryContext,
1821 query: Vec<u8>,
1822 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1823 },
1824}
1825
1826#[derive(Clone, Copy, Debug)]
1828struct ExecutingMessage {
1829 is_bouncing: bool,
1830 origin: ChainId,
1831}
1832
1833impl From<&MessageContext> for ExecutingMessage {
1834 fn from(context: &MessageContext) -> Self {
1835 ExecutingMessage {
1836 is_bouncing: context.is_bouncing,
1837 origin: context.origin,
1838 }
1839 }
1840}
1841
1842pub fn create_bytecode_blobs_sync(
1844 contract: Bytecode,
1845 service: Bytecode,
1846 vm_runtime: VmRuntime,
1847) -> (Vec<Blob>, ModuleId) {
1848 match vm_runtime {
1849 VmRuntime::Wasm => {
1850 let compressed_contract = contract.compress();
1851 let compressed_service = service.compress();
1852 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1853 let service_blob = Blob::new_service_bytecode(compressed_service);
1854 let module_id =
1855 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1856 (vec![contract_blob, service_blob], module_id)
1857 }
1858 VmRuntime::Evm => {
1859 let compressed_contract = contract.compress();
1860 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1861 let module_id = ModuleId::new(
1862 evm_contract_blob.id().hash,
1863 evm_contract_blob.id().hash,
1864 vm_runtime,
1865 );
1866 (vec![evm_contract_blob], module_id)
1867 }
1868 }
1869}