1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15 SendMessageRequest, Timestamp,
16 },
17 ensure, http,
18 identifiers::{
19 Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20 },
21 ownership::ChainOwnership,
22 time::Instant,
23 vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27use tracing::instrument;
28
29use crate::{
30 execution::UserAction,
31 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
32 resources::ResourceController,
33 system::CreateApplicationResult,
34 util::{ReceiverExt, UnboundedSenderExt},
35 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
36 ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
37 OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
38 UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
39};
40
41#[cfg(test)]
42#[path = "unit_tests/runtime_tests.rs"]
43mod tests;
44
45pub trait WithContext {
46 type UserContext;
47 type Code;
48}
49
50impl WithContext for UserContractInstance {
51 type UserContext = Timestamp;
52 type Code = UserContractCode;
53}
54
55impl WithContext for UserServiceInstance {
56 type UserContext = ();
57 type Code = UserServiceCode;
58}
59
60#[cfg(test)]
61impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
62 type UserContext = ();
63 type Code = ();
64}
65
66#[derive(Debug)]
67pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
68
69pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
70
71pub struct ServiceSyncRuntime {
72 runtime: SyncRuntime<UserServiceInstance>,
73 current_context: QueryContext,
74}
75
76#[derive(Debug)]
77pub struct SyncRuntimeHandle<UserInstance: WithContext>(
78 Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
79);
80
81pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
82pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
83
84#[derive(Debug)]
86pub struct SyncRuntimeInternal<UserInstance: WithContext> {
87 chain_id: ChainId,
89 height: BlockHeight,
92 round: Option<u32>,
94 #[debug(skip_if = Option::is_none)]
96 executing_message: Option<ExecutingMessage>,
97
98 execution_state_sender: ExecutionStateSender,
100
101 is_finalizing: bool,
105 applications_to_finalize: Vec<ApplicationId>,
107
108 preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
110 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
112 call_stack: Vec<ApplicationStatus>,
114 active_applications: HashSet<ApplicationId>,
116 scheduled_operations: Vec<Operation>,
118
119 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
121
122 deadline: Option<Instant>,
126
127 #[debug(skip_if = Option::is_none)]
129 refund_grant_to: Option<Account>,
130 resource_controller: ResourceController,
132 user_context: UserInstance::UserContext,
134 allow_application_logs: bool,
136}
137
138#[derive(Debug)]
140struct ApplicationStatus {
141 caller_id: Option<ApplicationId>,
143 id: ApplicationId,
145 description: ApplicationDescription,
147 signer: Option<AccountOwner>,
149}
150
151#[derive(Debug)]
153struct LoadedApplication<Instance> {
154 instance: Arc<Mutex<Instance>>,
155 description: ApplicationDescription,
156}
157
158impl<Instance> LoadedApplication<Instance> {
159 fn new(instance: Instance, description: ApplicationDescription) -> Self {
161 LoadedApplication {
162 instance: Arc::new(Mutex::new(instance)),
163 description,
164 }
165 }
166}
167
168impl<Instance> Clone for LoadedApplication<Instance> {
169 fn clone(&self) -> Self {
172 LoadedApplication {
173 instance: self.instance.clone(),
174 description: self.description.clone(),
175 }
176 }
177}
178
179#[derive(Debug)]
180enum Promise<T> {
181 Ready(T),
182 Pending(Receiver<T>),
183}
184
185impl<T> Promise<T> {
186 fn force(&mut self) -> Result<(), ExecutionError> {
187 if let Promise::Pending(receiver) = self {
188 let value = receiver
189 .recv_ref()
190 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
191 *self = Promise::Ready(value);
192 }
193 Ok(())
194 }
195
196 fn read(self) -> Result<T, ExecutionError> {
197 match self {
198 Promise::Pending(receiver) => {
199 let value = receiver.recv_response()?;
200 Ok(value)
201 }
202 Promise::Ready(value) => Ok(value),
203 }
204 }
205}
206
207#[derive(Debug, Default)]
209struct QueryManager<T> {
210 pending_queries: BTreeMap<u32, Promise<T>>,
212 query_count: u32,
214 active_query_count: u32,
216}
217
218impl<T> QueryManager<T> {
219 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
220 let id = self.query_count;
221 self.pending_queries.insert(id, Promise::Pending(receiver));
222 self.query_count = self
223 .query_count
224 .checked_add(1)
225 .ok_or(ArithmeticError::Overflow)?;
226 self.active_query_count = self
227 .active_query_count
228 .checked_add(1)
229 .ok_or(ArithmeticError::Overflow)?;
230 Ok(id)
231 }
232
233 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
234 let promise = self
235 .pending_queries
236 .remove(&id)
237 .ok_or(ExecutionError::InvalidPromise)?;
238 let value = promise.read()?;
239 self.active_query_count -= 1;
240 Ok(value)
241 }
242
243 fn force_all(&mut self) -> Result<(), ExecutionError> {
244 for promise in self.pending_queries.values_mut() {
245 promise.force()?;
246 }
247 Ok(())
248 }
249}
250
251type Keys = Vec<Vec<u8>>;
252type Value = Vec<u8>;
253type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
254
255#[derive(Debug, Default)]
256struct ViewUserState {
257 contains_key_queries: QueryManager<bool>,
259 contains_keys_queries: QueryManager<Vec<bool>>,
261 read_value_queries: QueryManager<Option<Value>>,
263 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
265 find_keys_queries: QueryManager<Keys>,
267 find_key_values_queries: QueryManager<KeyValues>,
269}
270
271impl ViewUserState {
272 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
273 self.contains_key_queries.force_all()?;
274 self.contains_keys_queries.force_all()?;
275 self.read_value_queries.force_all()?;
276 self.read_multi_values_queries.force_all()?;
277 self.find_keys_queries.force_all()?;
278 self.find_key_values_queries.force_all()?;
279 Ok(())
280 }
281}
282
283impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
284 type Target = SyncRuntimeHandle<UserInstance>;
285
286 fn deref(&self) -> &Self::Target {
287 self.0.as_ref().expect(
288 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
289 )
290 }
291}
292
293impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
294 fn deref_mut(&mut self) -> &mut Self::Target {
295 self.0.as_mut().expect(
296 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
297 )
298 }
299}
300
301impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
302 fn drop(&mut self) {
303 if let Some(handle) = self.0.take() {
306 handle.inner().loaded_applications.clear();
307 }
308 }
309}
310
311impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
312 #[expect(clippy::too_many_arguments)]
313 fn new(
314 chain_id: ChainId,
315 height: BlockHeight,
316 round: Option<u32>,
317 executing_message: Option<ExecutingMessage>,
318 execution_state_sender: ExecutionStateSender,
319 deadline: Option<Instant>,
320 refund_grant_to: Option<Account>,
321 resource_controller: ResourceController,
322 user_context: UserInstance::UserContext,
323 allow_application_logs: bool,
324 ) -> Self {
325 Self {
326 chain_id,
327 height,
328 round,
329 executing_message,
330 execution_state_sender,
331 is_finalizing: false,
332 applications_to_finalize: Vec::new(),
333 preloaded_applications: HashMap::new(),
334 loaded_applications: HashMap::new(),
335 call_stack: Vec::new(),
336 active_applications: HashSet::new(),
337 view_user_states: BTreeMap::new(),
338 deadline,
339 refund_grant_to,
340 resource_controller,
341 scheduled_operations: Vec::new(),
342 user_context,
343 allow_application_logs,
344 }
345 }
346
347 fn current_application(&self) -> &ApplicationStatus {
355 self.call_stack
356 .last()
357 .expect("Call stack is unexpectedly empty")
358 }
359
360 fn push_application(&mut self, status: ApplicationStatus) {
364 self.active_applications.insert(status.id);
365 self.call_stack.push(status);
366 }
367
368 fn pop_application(&mut self) -> ApplicationStatus {
376 let status = self
377 .call_stack
378 .pop()
379 .expect("Can't remove application from empty call stack");
380 assert!(self.active_applications.remove(&status.id));
381 status
382 }
383
384 fn check_for_reentrancy(&self, application_id: ApplicationId) -> Result<(), ExecutionError> {
388 ensure!(
389 !self.active_applications.contains(&application_id),
390 ExecutionError::ReentrantCall(application_id)
391 );
392 Ok(())
393 }
394}
395
396impl SyncRuntimeInternal<UserContractInstance> {
397 #[instrument(skip_all, fields(application_id = %id))]
399 fn load_contract_instance(
400 &mut self,
401 this: SyncRuntimeHandle<UserContractInstance>,
402 id: ApplicationId,
403 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
404 match self.loaded_applications.entry(id) {
405 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
406
407 hash_map::Entry::Vacant(entry) => {
408 let (code, description) = match self.preloaded_applications.entry(id) {
411 #[cfg(web)]
413 hash_map::Entry::Vacant(_) => {
414 drop(this);
415 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
416 id,
417 )));
418 }
419 #[cfg(not(web))]
420 hash_map::Entry::Vacant(entry) => {
421 let (code, description) = self
422 .execution_state_sender
423 .send_request(move |callback| ExecutionRequest::LoadContract {
424 id,
425 callback,
426 })?
427 .recv_response()?;
428 entry.insert((code, description)).clone()
429 }
430 hash_map::Entry::Occupied(entry) => entry.get().clone(),
431 };
432 let instance = code.instantiate(this)?;
433
434 self.applications_to_finalize.push(id);
435 Ok(entry
436 .insert(LoadedApplication::new(instance, description))
437 .clone())
438 }
439 }
440 }
441
442 fn prepare_for_call(
444 &mut self,
445 this: ContractSyncRuntimeHandle,
446 authenticated: bool,
447 callee_id: ApplicationId,
448 ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
449 self.check_for_reentrancy(callee_id)?;
450
451 ensure!(
452 !self.is_finalizing,
453 ExecutionError::CrossApplicationCallInFinalize {
454 caller_id: Box::new(self.current_application().id),
455 callee_id: Box::new(callee_id),
456 }
457 );
458
459 let application = self.load_contract_instance(this, callee_id)?;
461
462 let caller = self.current_application();
463 let caller_id = caller.id;
464 let caller_signer = caller.signer;
465 let authenticated_signer = match caller_signer {
467 Some(signer) if authenticated => Some(signer),
468 _ => None,
469 };
470 let authenticated_caller_id = authenticated.then_some(caller_id);
471 self.push_application(ApplicationStatus {
472 caller_id: authenticated_caller_id,
473 id: callee_id,
474 description: application.description,
475 signer: authenticated_signer,
477 });
478 Ok(application.instance)
479 }
480
481 fn finish_call(&mut self) {
483 self.pop_application();
484 }
485
486 fn run_service_oracle_query(
488 &mut self,
489 application_id: ApplicationId,
490 query: Vec<u8>,
491 ) -> Result<Vec<u8>, ExecutionError> {
492 let timeout = self
493 .resource_controller
494 .remaining_service_oracle_execution_time()?;
495 let execution_start = Instant::now();
496 let deadline = Some(execution_start + timeout);
497 let response = self
498 .execution_state_sender
499 .send_request(|callback| ExecutionRequest::QueryServiceOracle {
500 deadline,
501 application_id,
502 next_block_height: self.height,
503 query,
504 callback,
505 })?
506 .recv_response()?;
507
508 self.resource_controller
509 .track_service_oracle_execution(execution_start.elapsed())?;
510 self.resource_controller
511 .track_service_oracle_response(response.len())?;
512
513 Ok(response)
514 }
515}
516
517impl SyncRuntimeInternal<UserServiceInstance> {
518 fn load_service_instance(
520 &mut self,
521 this: SyncRuntimeHandle<UserServiceInstance>,
522 id: ApplicationId,
523 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
524 match self.loaded_applications.entry(id) {
525 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
526
527 hash_map::Entry::Vacant(entry) => {
528 let (code, description) = match self.preloaded_applications.entry(id) {
531 #[cfg(web)]
533 hash_map::Entry::Vacant(_) => {
534 drop(this);
535 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
536 id,
537 )));
538 }
539 #[cfg(not(web))]
540 hash_map::Entry::Vacant(entry) => {
541 let (code, description) = self
542 .execution_state_sender
543 .send_request(move |callback| ExecutionRequest::LoadService {
544 id,
545 callback,
546 })?
547 .recv_response()?;
548 entry.insert((code, description)).clone()
549 }
550 hash_map::Entry::Occupied(entry) => entry.get().clone(),
551 };
552 let instance = code.instantiate(this)?;
553
554 self.applications_to_finalize.push(id);
555 Ok(entry
556 .insert(LoadedApplication::new(instance, description))
557 .clone())
558 }
559 }
560 }
561}
562
563impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
564 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
565 let handle = self.0.take().expect(
566 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
567 );
568 let runtime = Arc::into_inner(handle.0)?
569 .into_inner()
570 .expect("`SyncRuntime` should run in a single thread which should not panic");
571 Some(runtime)
572 }
573}
574
575impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
576 for SyncRuntimeHandle<UserInstance>
577{
578 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
579 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
580 }
581}
582
583impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
584 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
585 self.0
586 .try_lock()
587 .expect("Synchronous runtimes run on a single execution thread")
588 }
589}
590
591impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
592where
593 Self: ContractOrServiceRuntime,
594{
595 type Read = ();
596 type ReadValueBytes = u32;
597 type ContainsKey = u32;
598 type ContainsKeys = u32;
599 type ReadMultiValuesBytes = u32;
600 type FindKeysByPrefix = u32;
601 type FindKeyValuesByPrefix = u32;
602
603 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
604 let mut this = self.inner();
605 let chain_id = this.chain_id;
606 this.resource_controller.track_runtime_chain_id()?;
607 Ok(chain_id)
608 }
609
610 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
611 let mut this = self.inner();
612 let height = this.height;
613 this.resource_controller.track_runtime_block_height()?;
614 Ok(height)
615 }
616
617 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
618 let mut this = self.inner();
619 let application_id = this.current_application().id;
620 this.resource_controller.track_runtime_application_id()?;
621 Ok(application_id)
622 }
623
624 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
625 let mut this = self.inner();
626 let application_creator_chain_id = this.current_application().description.creator_chain_id;
627 this.resource_controller.track_runtime_application_id()?;
628 Ok(application_creator_chain_id)
629 }
630
631 fn read_application_description(
632 &mut self,
633 application_id: ApplicationId,
634 ) -> Result<ApplicationDescription, ExecutionError> {
635 let mut this = self.inner();
636 let description = this
637 .execution_state_sender
638 .send_request(|callback| ExecutionRequest::ReadApplicationDescription {
639 application_id,
640 callback,
641 })?
642 .recv_response()?;
643 this.resource_controller
644 .track_runtime_application_description(&description)?;
645 Ok(description)
646 }
647
648 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
649 let mut this = self.inner();
650 let parameters = this.current_application().description.parameters.clone();
651 this.resource_controller
652 .track_runtime_application_parameters(¶meters)?;
653 Ok(parameters)
654 }
655
656 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
657 let mut this = self.inner();
658 let timestamp = this
659 .execution_state_sender
660 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
661 .recv_response()?;
662 this.resource_controller.track_runtime_timestamp()?;
663 Ok(timestamp)
664 }
665
666 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
667 let mut this = self.inner();
668 let balance = this
669 .execution_state_sender
670 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
671 .recv_response()?;
672 this.resource_controller.track_runtime_balance()?;
673 Ok(balance)
674 }
675
676 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
677 let mut this = self.inner();
678 let balance = this
679 .execution_state_sender
680 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
681 .recv_response()?;
682 this.resource_controller.track_runtime_balance()?;
683 Ok(balance)
684 }
685
686 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
687 let mut this = self.inner();
688 let owner_balances = this
689 .execution_state_sender
690 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
691 .recv_response()?;
692 this.resource_controller
693 .track_runtime_owner_balances(&owner_balances)?;
694 Ok(owner_balances)
695 }
696
697 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
698 let mut this = self.inner();
699 let owners = this
700 .execution_state_sender
701 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
702 .recv_response()?;
703 this.resource_controller.track_runtime_owners(&owners)?;
704 Ok(owners)
705 }
706
707 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
708 let mut this = self.inner();
709 let chain_ownership = this
710 .execution_state_sender
711 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
712 .recv_response()?;
713 this.resource_controller
714 .track_runtime_chain_ownership(&chain_ownership)?;
715 Ok(chain_ownership)
716 }
717
718 fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
719 let this = self.inner();
720 let application_permissions = this
721 .execution_state_sender
722 .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
723 .recv_response()?;
724 Ok(application_permissions)
725 }
726
727 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
728 let mut this = self.inner();
729 let id = this.current_application().id;
730 this.resource_controller.track_read_operation()?;
731 let receiver = this
732 .execution_state_sender
733 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
734 let state = this.view_user_states.entry(id).or_default();
735 state.contains_key_queries.register(receiver)
736 }
737
738 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
739 let mut this = self.inner();
740 let id = this.current_application().id;
741 let state = this.view_user_states.entry(id).or_default();
742 let value = state.contains_key_queries.wait(*promise)?;
743 Ok(value)
744 }
745
746 fn contains_keys_new(
747 &mut self,
748 keys: Vec<Vec<u8>>,
749 ) -> Result<Self::ContainsKeys, ExecutionError> {
750 let mut this = self.inner();
751 let id = this.current_application().id;
752 this.resource_controller.track_read_operation()?;
753 let receiver = this
754 .execution_state_sender
755 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
756 let state = this.view_user_states.entry(id).or_default();
757 state.contains_keys_queries.register(receiver)
758 }
759
760 fn contains_keys_wait(
761 &mut self,
762 promise: &Self::ContainsKeys,
763 ) -> Result<Vec<bool>, ExecutionError> {
764 let mut this = self.inner();
765 let id = this.current_application().id;
766 let state = this.view_user_states.entry(id).or_default();
767 let value = state.contains_keys_queries.wait(*promise)?;
768 Ok(value)
769 }
770
771 fn read_multi_values_bytes_new(
772 &mut self,
773 keys: Vec<Vec<u8>>,
774 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
775 let mut this = self.inner();
776 let id = this.current_application().id;
777 this.resource_controller.track_read_operation()?;
778 let receiver = this.execution_state_sender.send_request(move |callback| {
779 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
780 })?;
781 let state = this.view_user_states.entry(id).or_default();
782 state.read_multi_values_queries.register(receiver)
783 }
784
785 fn read_multi_values_bytes_wait(
786 &mut self,
787 promise: &Self::ReadMultiValuesBytes,
788 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
789 let mut this = self.inner();
790 let id = this.current_application().id;
791 let state = this.view_user_states.entry(id).or_default();
792 let values = state.read_multi_values_queries.wait(*promise)?;
793 for value in &values {
794 if let Some(value) = &value {
795 this.resource_controller
796 .track_bytes_read(value.len() as u64)?;
797 }
798 }
799 Ok(values)
800 }
801
802 fn read_value_bytes_new(
803 &mut self,
804 key: Vec<u8>,
805 ) -> Result<Self::ReadValueBytes, ExecutionError> {
806 let mut this = self.inner();
807 let id = this.current_application().id;
808 this.resource_controller.track_read_operation()?;
809 let receiver = this
810 .execution_state_sender
811 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
812 let state = this.view_user_states.entry(id).or_default();
813 state.read_value_queries.register(receiver)
814 }
815
816 fn read_value_bytes_wait(
817 &mut self,
818 promise: &Self::ReadValueBytes,
819 ) -> Result<Option<Vec<u8>>, ExecutionError> {
820 let mut this = self.inner();
821 let id = this.current_application().id;
822 let value = {
823 let state = this.view_user_states.entry(id).or_default();
824 state.read_value_queries.wait(*promise)?
825 };
826 if let Some(value) = &value {
827 this.resource_controller
828 .track_bytes_read(value.len() as u64)?;
829 }
830 Ok(value)
831 }
832
833 fn find_keys_by_prefix_new(
834 &mut self,
835 key_prefix: Vec<u8>,
836 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
837 let mut this = self.inner();
838 let id = this.current_application().id;
839 this.resource_controller.track_read_operation()?;
840 let receiver = this.execution_state_sender.send_request(move |callback| {
841 ExecutionRequest::FindKeysByPrefix {
842 id,
843 key_prefix,
844 callback,
845 }
846 })?;
847 let state = this.view_user_states.entry(id).or_default();
848 state.find_keys_queries.register(receiver)
849 }
850
851 fn find_keys_by_prefix_wait(
852 &mut self,
853 promise: &Self::FindKeysByPrefix,
854 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
855 let mut this = self.inner();
856 let id = this.current_application().id;
857 let keys = {
858 let state = this.view_user_states.entry(id).or_default();
859 state.find_keys_queries.wait(*promise)?
860 };
861 let mut read_size = 0;
862 for key in &keys {
863 read_size += key.len();
864 }
865 this.resource_controller
866 .track_bytes_read(read_size as u64)?;
867 Ok(keys)
868 }
869
870 fn find_key_values_by_prefix_new(
871 &mut self,
872 key_prefix: Vec<u8>,
873 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
874 let mut this = self.inner();
875 let id = this.current_application().id;
876 this.resource_controller.track_read_operation()?;
877 let receiver = this.execution_state_sender.send_request(move |callback| {
878 ExecutionRequest::FindKeyValuesByPrefix {
879 id,
880 key_prefix,
881 callback,
882 }
883 })?;
884 let state = this.view_user_states.entry(id).or_default();
885 state.find_key_values_queries.register(receiver)
886 }
887
888 fn find_key_values_by_prefix_wait(
889 &mut self,
890 promise: &Self::FindKeyValuesByPrefix,
891 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
892 let mut this = self.inner();
893 let id = this.current_application().id;
894 let state = this.view_user_states.entry(id).or_default();
895 let key_values = state.find_key_values_queries.wait(*promise)?;
896 let mut read_size = 0;
897 for (key, value) in &key_values {
898 read_size += key.len() + value.len();
899 }
900 this.resource_controller
901 .track_bytes_read(read_size as u64)?;
902 Ok(key_values)
903 }
904
905 fn perform_http_request(
906 &mut self,
907 request: http::Request,
908 ) -> Result<http::Response, ExecutionError> {
909 let mut this = self.inner();
910 let app_permissions = this
911 .execution_state_sender
912 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
913 .recv_response()?;
914
915 let app_id = this.current_application().id;
916 ensure!(
917 app_permissions.can_make_http_requests(&app_id),
918 ExecutionError::UnauthorizedApplication(app_id)
919 );
920
921 this.resource_controller.track_http_request()?;
922
923 let response = this
924 .execution_state_sender
925 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
926 request,
927 http_responses_are_oracle_responses:
928 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
929 callback,
930 })?
931 .recv_response()?;
932 Ok(response)
933 }
934
935 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
936 let this = self.inner();
937 this.execution_state_sender
938 .send_request(|callback| ExecutionRequest::AssertBefore {
939 timestamp,
940 callback,
941 })?
942 .recv_response()?
943 }
944
945 fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
946 let this = self.inner();
947 let blob_id = hash.into();
948 let content = this
949 .execution_state_sender
950 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
951 .recv_response()?;
952 Ok(content.into_vec_or_clone())
953 }
954
955 fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
956 let this = self.inner();
957 let blob_id = hash.into();
958 this.execution_state_sender
959 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
960 .recv_response()?;
961 Ok(())
962 }
963
964 fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
965 Ok(self.inner().allow_application_logs)
966 }
967
968 #[cfg(web)]
969 fn send_log(&mut self, message: String, level: tracing::log::Level) {
970 let this = self.inner();
971 this.execution_state_sender
973 .unbounded_send(ExecutionRequest::Log { message, level })
974 .ok();
975 }
976}
977
978trait ContractOrServiceRuntime {
981 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
987}
988
989impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
990 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
991}
992
993impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
994 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
995}
996
997impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
998 fn clone(&self) -> Self {
999 SyncRuntimeHandle(self.0.clone())
1000 }
1001}
1002
1003impl ContractSyncRuntime {
1004 pub(crate) fn new(
1005 execution_state_sender: ExecutionStateSender,
1006 chain_id: ChainId,
1007 refund_grant_to: Option<Account>,
1008 resource_controller: ResourceController,
1009 action: &UserAction,
1010 allow_application_logs: bool,
1011 ) -> Self {
1012 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1013 SyncRuntimeInternal::new(
1014 chain_id,
1015 action.height(),
1016 action.round(),
1017 if let UserAction::Message(context, _) = action {
1018 Some(context.into())
1019 } else {
1020 None
1021 },
1022 execution_state_sender,
1023 None,
1024 refund_grant_to,
1025 resource_controller,
1026 action.timestamp(),
1027 allow_application_logs,
1028 ),
1029 )))
1030 }
1031
1032 pub(crate) fn preload_contract(
1034 &self,
1035 id: ApplicationId,
1036 code: UserContractCode,
1037 description: ApplicationDescription,
1038 ) {
1039 let this = self
1040 .0
1041 .as_ref()
1042 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1043 let mut this_guard = this.inner();
1044
1045 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1046 entry.insert((code, description));
1047 }
1048 }
1049
1050 pub(crate) fn run_action(
1052 mut self,
1053 application_id: ApplicationId,
1054 chain_id: ChainId,
1055 action: UserAction,
1056 ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1057 let result = self
1058 .deref_mut()
1059 .run_action(application_id, chain_id, action)?;
1060 let runtime = self
1061 .into_inner()
1062 .expect("Runtime clones should have been freed by now");
1063
1064 Ok((result, runtime.resource_controller))
1065 }
1066}
1067
1068impl ContractSyncRuntimeHandle {
1069 #[instrument(skip_all, fields(application_id = %application_id))]
1070 fn run_action(
1071 &self,
1072 application_id: ApplicationId,
1073 chain_id: ChainId,
1074 action: UserAction,
1075 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1076 let finalize_context = FinalizeContext {
1077 authenticated_signer: action.signer(),
1078 chain_id,
1079 height: action.height(),
1080 round: action.round(),
1081 };
1082
1083 {
1084 let runtime = self.inner();
1085 assert_eq!(runtime.chain_id, chain_id);
1086 assert_eq!(runtime.height, action.height());
1087 }
1088
1089 let signer = action.signer();
1090 let closure = move |code: &mut UserContractInstance| match action {
1091 UserAction::Instantiate(_context, argument) => {
1092 code.instantiate(argument).map(|()| None)
1093 }
1094 UserAction::Operation(_context, operation) => {
1095 code.execute_operation(operation).map(Option::Some)
1096 }
1097 UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1098 UserAction::ProcessStreams(_context, updates) => {
1099 code.process_streams(updates).map(|()| None)
1100 }
1101 };
1102
1103 let result = self.execute(application_id, signer, closure)?;
1104 self.finalize(finalize_context)?;
1105 Ok(result)
1106 }
1107
1108 #[instrument(skip_all)]
1110 fn finalize(&self, context: FinalizeContext) -> Result<(), ExecutionError> {
1111 let applications = mem::take(&mut self.inner().applications_to_finalize)
1112 .into_iter()
1113 .rev();
1114
1115 self.inner().is_finalizing = true;
1116
1117 for application in applications {
1118 self.execute(application, context.authenticated_signer, |contract| {
1119 contract.finalize().map(|_| None)
1120 })?;
1121 self.inner().loaded_applications.remove(&application);
1122 }
1123
1124 Ok(())
1125 }
1126
1127 #[instrument(skip_all, fields(application_id = %application_id))]
1129 fn execute(
1130 &self,
1131 application_id: ApplicationId,
1132 signer: Option<AccountOwner>,
1133 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1134 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1135 let contract = {
1136 let mut runtime = self.inner();
1137 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1138
1139 let status = ApplicationStatus {
1140 caller_id: None,
1141 id: application_id,
1142 description: application.description.clone(),
1143 signer,
1144 };
1145
1146 runtime.push_application(status);
1147
1148 application
1149 };
1150
1151 let result = closure(
1152 &mut contract
1153 .instance
1154 .try_lock()
1155 .expect("Application should not be already executing"),
1156 )?;
1157
1158 let mut runtime = self.inner();
1159 let application_status = runtime.pop_application();
1160 assert_eq!(application_status.caller_id, None);
1161 assert_eq!(application_status.id, application_id);
1162 assert_eq!(application_status.description, contract.description);
1163 assert_eq!(application_status.signer, signer);
1164 assert!(runtime.call_stack.is_empty());
1165
1166 Ok(result)
1167 }
1168}
1169
1170impl ContractRuntime for ContractSyncRuntimeHandle {
1171 fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1172 let this = self.inner();
1173 Ok(this.current_application().signer)
1174 }
1175
1176 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1177 Ok(self
1178 .inner()
1179 .executing_message
1180 .map(|metadata| metadata.is_bouncing))
1181 }
1182
1183 fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1184 Ok(self
1185 .inner()
1186 .executing_message
1187 .map(|metadata| metadata.origin))
1188 }
1189
1190 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1191 let this = self.inner();
1192 if this.call_stack.len() <= 1 {
1193 return Ok(None);
1194 }
1195 Ok(this.current_application().caller_id)
1196 }
1197
1198 fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1199 Ok(match vm_runtime {
1200 VmRuntime::Wasm => {
1201 self.inner()
1202 .resource_controller
1203 .policy()
1204 .maximum_wasm_fuel_per_block
1205 }
1206 VmRuntime::Evm => {
1207 self.inner()
1208 .resource_controller
1209 .policy()
1210 .maximum_evm_fuel_per_block
1211 }
1212 })
1213 }
1214
1215 fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1216 Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1217 }
1218
1219 fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1220 let mut this = self.inner();
1221 this.resource_controller.track_fuel(fuel, vm_runtime)
1222 }
1223
1224 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1225 let mut this = self.inner();
1226 let application = this.current_application();
1227 let application_id = application.id;
1228 let authenticated_signer = application.signer;
1229 let mut refund_grant_to = this.refund_grant_to;
1230
1231 let grant = this
1232 .resource_controller
1233 .policy()
1234 .total_price(&message.grant)?;
1235 if grant.is_zero() {
1236 refund_grant_to = None;
1237 } else {
1238 this.resource_controller.track_grant(grant)?;
1239 }
1240 let kind = if message.is_tracked {
1241 MessageKind::Tracked
1242 } else {
1243 MessageKind::Simple
1244 };
1245
1246 this.execution_state_sender
1247 .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1248 message: OutgoingMessage {
1249 destination: message.destination,
1250 authenticated_signer,
1251 refund_grant_to,
1252 grant,
1253 kind,
1254 message: Message::User {
1255 application_id,
1256 bytes: message.message,
1257 },
1258 },
1259 callback,
1260 })?
1261 .recv_response()?;
1262
1263 Ok(())
1264 }
1265
1266 fn transfer(
1267 &mut self,
1268 source: AccountOwner,
1269 destination: Account,
1270 amount: Amount,
1271 ) -> Result<(), ExecutionError> {
1272 let this = self.inner();
1273 let current_application = this.current_application();
1274 let application_id = current_application.id;
1275 let signer = current_application.signer;
1276
1277 this.execution_state_sender
1278 .send_request(|callback| ExecutionRequest::Transfer {
1279 source,
1280 destination,
1281 amount,
1282 signer,
1283 application_id,
1284 callback,
1285 })?
1286 .recv_response()?;
1287 Ok(())
1288 }
1289
1290 fn claim(
1291 &mut self,
1292 source: Account,
1293 destination: Account,
1294 amount: Amount,
1295 ) -> Result<(), ExecutionError> {
1296 let this = self.inner();
1297 let current_application = this.current_application();
1298 let application_id = current_application.id;
1299 let signer = current_application.signer;
1300
1301 this.execution_state_sender
1302 .send_request(|callback| ExecutionRequest::Claim {
1303 source,
1304 destination,
1305 amount,
1306 signer,
1307 application_id,
1308 callback,
1309 })?
1310 .recv_response()?;
1311 Ok(())
1312 }
1313
1314 fn try_call_application(
1315 &mut self,
1316 authenticated: bool,
1317 callee_id: ApplicationId,
1318 argument: Vec<u8>,
1319 ) -> Result<Vec<u8>, ExecutionError> {
1320 let contract = self
1321 .inner()
1322 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1323
1324 let value = contract
1325 .try_lock()
1326 .expect("Applications should not have reentrant calls")
1327 .execute_operation(argument)?;
1328
1329 self.inner().finish_call();
1330
1331 Ok(value)
1332 }
1333
1334 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1335 let mut this = self.inner();
1336 ensure!(
1337 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1338 ExecutionError::StreamNameTooLong
1339 );
1340 let application_id = GenericApplicationId::User(this.current_application().id);
1341 let stream_id = StreamId {
1342 stream_name,
1343 application_id,
1344 };
1345 let value_len = value.len() as u64;
1346 let index = this
1347 .execution_state_sender
1348 .send_request(|callback| ExecutionRequest::Emit {
1349 stream_id,
1350 value,
1351 callback,
1352 })?
1353 .recv_response()?;
1354 this.resource_controller.track_bytes_written(value_len)?;
1356 Ok(index)
1357 }
1358
1359 fn read_event(
1360 &mut self,
1361 chain_id: ChainId,
1362 stream_name: StreamName,
1363 index: u32,
1364 ) -> Result<Vec<u8>, ExecutionError> {
1365 let mut this = self.inner();
1366 ensure!(
1367 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1368 ExecutionError::StreamNameTooLong
1369 );
1370 let application_id = GenericApplicationId::User(this.current_application().id);
1371 let stream_id = StreamId {
1372 stream_name,
1373 application_id,
1374 };
1375 let event_id = EventId {
1376 stream_id,
1377 index,
1378 chain_id,
1379 };
1380 let event = this
1381 .execution_state_sender
1382 .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1383 .recv_response()?;
1384 this.resource_controller
1386 .track_bytes_read(event.len() as u64)?;
1387 Ok(event)
1388 }
1389
1390 fn subscribe_to_events(
1391 &mut self,
1392 chain_id: ChainId,
1393 application_id: ApplicationId,
1394 stream_name: StreamName,
1395 ) -> Result<(), ExecutionError> {
1396 let this = self.inner();
1397 ensure!(
1398 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1399 ExecutionError::StreamNameTooLong
1400 );
1401 let stream_id = StreamId {
1402 stream_name,
1403 application_id: application_id.into(),
1404 };
1405 let subscriber_app_id = this.current_application().id;
1406 this.execution_state_sender
1407 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1408 chain_id,
1409 stream_id,
1410 subscriber_app_id,
1411 callback,
1412 })?
1413 .recv_response()?;
1414 Ok(())
1415 }
1416
1417 fn unsubscribe_from_events(
1418 &mut self,
1419 chain_id: ChainId,
1420 application_id: ApplicationId,
1421 stream_name: StreamName,
1422 ) -> Result<(), ExecutionError> {
1423 let this = self.inner();
1424 ensure!(
1425 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1426 ExecutionError::StreamNameTooLong
1427 );
1428 let stream_id = StreamId {
1429 stream_name,
1430 application_id: application_id.into(),
1431 };
1432 let subscriber_app_id = this.current_application().id;
1433 this.execution_state_sender
1434 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1435 chain_id,
1436 stream_id,
1437 subscriber_app_id,
1438 callback,
1439 })?
1440 .recv_response()?;
1441 Ok(())
1442 }
1443
1444 fn query_service(
1445 &mut self,
1446 application_id: ApplicationId,
1447 query: Vec<u8>,
1448 ) -> Result<Vec<u8>, ExecutionError> {
1449 let mut this = self.inner();
1450
1451 let app_permissions = this
1452 .execution_state_sender
1453 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1454 .recv_response()?;
1455
1456 let app_id = this.current_application().id;
1457 ensure!(
1458 app_permissions.can_call_services(&app_id),
1459 ExecutionError::UnauthorizedApplication(app_id)
1460 );
1461
1462 this.resource_controller.track_service_oracle_call()?;
1463
1464 this.run_service_oracle_query(application_id, query)
1465 }
1466
1467 fn open_chain(
1468 &mut self,
1469 ownership: ChainOwnership,
1470 application_permissions: ApplicationPermissions,
1471 balance: Amount,
1472 ) -> Result<ChainId, ExecutionError> {
1473 let parent_id = self.inner().chain_id;
1474 let block_height = self.block_height()?;
1475
1476 let timestamp = self.inner().user_context;
1477
1478 let chain_id = self
1479 .inner()
1480 .execution_state_sender
1481 .send_request(|callback| ExecutionRequest::OpenChain {
1482 ownership,
1483 balance,
1484 parent_id,
1485 block_height,
1486 timestamp,
1487 application_permissions,
1488 callback,
1489 })?
1490 .recv_response()?;
1491
1492 Ok(chain_id)
1493 }
1494
1495 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1496 let this = self.inner();
1497 let application_id = this.current_application().id;
1498 this.execution_state_sender
1499 .send_request(|callback| ExecutionRequest::CloseChain {
1500 application_id,
1501 callback,
1502 })?
1503 .recv_response()?
1504 }
1505
1506 fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1507 let this = self.inner();
1508 let application_id = this.current_application().id;
1509 this.execution_state_sender
1510 .send_request(|callback| ExecutionRequest::ChangeOwnership {
1511 application_id,
1512 ownership,
1513 callback,
1514 })?
1515 .recv_response()?
1516 }
1517
1518 fn change_application_permissions(
1519 &mut self,
1520 application_permissions: ApplicationPermissions,
1521 ) -> Result<(), ExecutionError> {
1522 let this = self.inner();
1523 let application_id = this.current_application().id;
1524 this.execution_state_sender
1525 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1526 application_id,
1527 application_permissions,
1528 callback,
1529 })?
1530 .recv_response()?
1531 }
1532
1533 fn create_application(
1534 &mut self,
1535 module_id: ModuleId,
1536 parameters: Vec<u8>,
1537 argument: Vec<u8>,
1538 required_application_ids: Vec<ApplicationId>,
1539 ) -> Result<ApplicationId, ExecutionError> {
1540 let chain_id = self.inner().chain_id;
1541 let block_height = self.block_height()?;
1542
1543 let CreateApplicationResult { app_id } = self
1544 .inner()
1545 .execution_state_sender
1546 .send_request(move |callback| ExecutionRequest::CreateApplication {
1547 chain_id,
1548 block_height,
1549 module_id,
1550 parameters,
1551 required_application_ids,
1552 callback,
1553 })?
1554 .recv_response()??;
1555
1556 let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1557
1558 contract
1559 .try_lock()
1560 .expect("Applications should not have reentrant calls")
1561 .instantiate(argument)?;
1562
1563 self.inner().finish_call();
1564
1565 Ok(app_id)
1566 }
1567
1568 fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1569 let blob = Blob::new_data(bytes);
1570 let blob_id = blob.id();
1571 let this = self.inner();
1572 this.execution_state_sender
1573 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1574 .recv_response()?;
1575 Ok(DataBlobHash(blob_id.hash))
1576 }
1577
1578 fn publish_module(
1579 &mut self,
1580 contract: Bytecode,
1581 service: Bytecode,
1582 vm_runtime: VmRuntime,
1583 ) -> Result<ModuleId, ExecutionError> {
1584 let (blobs, module_id) =
1585 crate::runtime::create_bytecode_blobs_sync(&contract, &service, vm_runtime);
1586 let this = self.inner();
1587 for blob in blobs {
1588 this.execution_state_sender
1589 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1590 .recv_response()?;
1591 }
1592 Ok(module_id)
1593 }
1594
1595 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1596 let this = self.inner();
1597 let round = this.round;
1598 this.execution_state_sender
1599 .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1600 .recv_response()
1601 }
1602
1603 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1604 let mut this = self.inner();
1605 let id = this.current_application().id;
1606 let state = this.view_user_states.entry(id).or_default();
1607 state.force_all_pending_queries()?;
1608 this.resource_controller.track_write_operations(
1609 batch
1610 .num_operations()
1611 .try_into()
1612 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1613 )?;
1614 this.resource_controller
1615 .track_bytes_written(batch.size() as u64)?;
1616 this.execution_state_sender
1617 .send_request(|callback| ExecutionRequest::WriteBatch {
1618 id,
1619 batch,
1620 callback,
1621 })?
1622 .recv_response()?;
1623 Ok(())
1624 }
1625}
1626
1627impl ServiceSyncRuntime {
1628 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1630 Self::new_with_deadline(execution_state_sender, context, None)
1631 }
1632
1633 pub fn new_with_deadline(
1635 execution_state_sender: ExecutionStateSender,
1636 context: QueryContext,
1637 deadline: Option<Instant>,
1638 ) -> Self {
1639 let allow_application_logs = execution_state_sender
1641 .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1642 .ok()
1643 .and_then(|receiver| receiver.recv_response().ok())
1644 .unwrap_or(false);
1645
1646 let runtime = SyncRuntime(Some(
1647 SyncRuntimeInternal::new(
1648 context.chain_id,
1649 context.next_block_height,
1650 None,
1651 None,
1652 execution_state_sender,
1653 deadline,
1654 None,
1655 ResourceController::default(),
1656 (),
1657 allow_application_logs,
1658 )
1659 .into(),
1660 ));
1661
1662 ServiceSyncRuntime {
1663 runtime,
1664 current_context: context,
1665 }
1666 }
1667
1668 pub(crate) fn preload_service(
1670 &self,
1671 id: ApplicationId,
1672 code: UserServiceCode,
1673 description: ApplicationDescription,
1674 ) {
1675 let this = self
1676 .runtime
1677 .0
1678 .as_ref()
1679 .expect("services shouldn't be preloaded while the runtime is being dropped");
1680 let mut this_guard = this.inner();
1681
1682 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1683 entry.insert((code, description));
1684 }
1685 }
1686
1687 pub fn run(&mut self, incoming_requests: &std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1689 while let Ok(request) = incoming_requests.recv() {
1690 let ServiceRuntimeRequest::Query {
1691 application_id,
1692 context,
1693 query,
1694 callback,
1695 } = request;
1696
1697 let result = self
1698 .prepare_for_query(context)
1699 .and_then(|()| self.run_query(application_id, query));
1700
1701 if let Err(err) = callback.send(result) {
1702 tracing::debug!(%err, "Receiver for query result has been dropped");
1703 }
1704 }
1705 }
1706
1707 pub(crate) fn prepare_for_query(
1709 &mut self,
1710 new_context: QueryContext,
1711 ) -> Result<(), ExecutionError> {
1712 let expected_context = QueryContext {
1713 local_time: new_context.local_time,
1714 ..self.current_context
1715 };
1716
1717 if new_context != expected_context {
1718 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1719 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1720 } else {
1721 self.handle_mut()
1722 .inner()
1723 .execution_state_sender
1724 .send_request(|callback| ExecutionRequest::SetLocalTime {
1725 local_time: new_context.local_time,
1726 callback,
1727 })?
1728 .recv_response()?;
1729 }
1730 Ok(())
1731 }
1732
1733 pub(crate) fn run_query(
1735 &mut self,
1736 application_id: ApplicationId,
1737 query: Vec<u8>,
1738 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1739 let this = self.handle_mut();
1740 let response = this.try_query_application(application_id, query)?;
1741 let operations = mem::take(&mut this.inner().scheduled_operations);
1742
1743 Ok(QueryOutcome {
1744 response,
1745 operations,
1746 })
1747 }
1748
1749 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1751 self.runtime.0.as_mut().expect(
1752 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1753 )
1754 }
1755}
1756
1757impl ServiceRuntime for ServiceSyncRuntimeHandle {
1758 fn try_query_application(
1760 &mut self,
1761 queried_id: ApplicationId,
1762 argument: Vec<u8>,
1763 ) -> Result<Vec<u8>, ExecutionError> {
1764 let service = {
1765 let mut this = self.inner();
1766
1767 let application = this.load_service_instance(self.clone(), queried_id)?;
1769 this.push_application(ApplicationStatus {
1771 caller_id: None,
1772 id: queried_id,
1773 description: application.description,
1774 signer: None,
1775 });
1776 application.instance
1777 };
1778 let response = service
1779 .try_lock()
1780 .expect("Applications should not have reentrant calls")
1781 .handle_query(argument)?;
1782 self.inner().pop_application();
1783 Ok(response)
1784 }
1785
1786 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1787 let mut this = self.inner();
1788 let application_id = this.current_application().id;
1789
1790 this.scheduled_operations.push(Operation::User {
1791 application_id,
1792 bytes: operation,
1793 });
1794
1795 Ok(())
1796 }
1797
1798 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1799 if let Some(deadline) = self.inner().deadline {
1800 if Instant::now() >= deadline {
1801 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1802 }
1803 }
1804 Ok(())
1805 }
1806}
1807
1808pub enum ServiceRuntimeRequest {
1810 Query {
1811 application_id: ApplicationId,
1812 context: QueryContext,
1813 query: Vec<u8>,
1814 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1815 },
1816}
1817
1818#[derive(Clone, Copy, Debug)]
1820struct ExecutingMessage {
1821 is_bouncing: bool,
1822 origin: ChainId,
1823}
1824
1825impl From<&MessageContext> for ExecutingMessage {
1826 fn from(context: &MessageContext) -> Self {
1827 ExecutingMessage {
1828 is_bouncing: context.is_bouncing,
1829 origin: context.origin,
1830 }
1831 }
1832}
1833
1834pub fn create_bytecode_blobs_sync(
1836 contract: &Bytecode,
1837 service: &Bytecode,
1838 vm_runtime: VmRuntime,
1839) -> (Vec<Blob>, ModuleId) {
1840 match vm_runtime {
1841 VmRuntime::Wasm => {
1842 let compressed_contract = contract.compress();
1843 let compressed_service = service.compress();
1844 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1845 let service_blob = Blob::new_service_bytecode(compressed_service);
1846 let module_id =
1847 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1848 (vec![contract_blob, service_blob], module_id)
1849 }
1850 VmRuntime::Evm => {
1851 let compressed_contract = contract.compress();
1852 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1853 let module_id = ModuleId::new(
1854 evm_contract_blob.id().hash,
1855 evm_contract_blob.id().hash,
1856 vm_runtime,
1857 );
1858 (vec![evm_contract_blob], module_id)
1859 }
1860 }
1861}