1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13 crypto::CryptoHash,
14 data_types::{
15 Amount, ApplicationPermissions, ArithmeticError, BlockHeight, OracleResponse, Resources,
16 SendMessageRequest, Timestamp,
17 },
18 ensure,
19 identifiers::{
20 Account, ApplicationId, BlobId, BlobType, ChainId, ChannelName, MessageId, Owner,
21 StreamName,
22 },
23 ownership::ChainOwnership,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29 execution::UserAction,
30 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31 resources::ResourceController,
32 util::{ReceiverExt, UnboundedSenderExt},
33 BaseRuntime, ContractRuntime, ExecutionError, FinalizeContext, MessageContext,
34 OperationContext, QueryContext, RawExecutionOutcome, ServiceRuntime, TransactionTracker,
35 UserApplicationDescription, UserApplicationId, UserContractInstance, UserServiceInstance,
36 MAX_EVENT_KEY_LEN, MAX_STREAM_NAME_LEN,
37};
38
39#[cfg(test)]
40#[path = "unit_tests/runtime_tests.rs"]
41mod tests;
42
43#[derive(Debug)]
44pub struct SyncRuntime<UserInstance>(Option<SyncRuntimeHandle<UserInstance>>);
45
46pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
47
48pub struct ServiceSyncRuntime {
49 runtime: SyncRuntime<UserServiceInstance>,
50 current_context: QueryContext,
51}
52
53#[derive(Debug)]
54pub struct SyncRuntimeHandle<UserInstance>(Arc<Mutex<SyncRuntimeInternal<UserInstance>>>);
55
56pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
57pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
58
59#[derive(Debug)]
61pub struct SyncRuntimeInternal<UserInstance> {
62 chain_id: ChainId,
64 height: BlockHeight,
67 local_time: Timestamp,
69 authenticated_signer: Option<Owner>,
71 executing_message: Option<ExecutingMessage>,
73
74 execution_state_sender: ExecutionStateSender,
76
77 is_finalizing: bool,
81 applications_to_finalize: Vec<UserApplicationId>,
83
84 loaded_applications: HashMap<UserApplicationId, LoadedApplication<UserInstance>>,
86 call_stack: Vec<ApplicationStatus>,
88 active_applications: HashSet<UserApplicationId>,
90 transaction_tracker: TransactionTracker,
92
93 view_user_states: BTreeMap<UserApplicationId, ViewUserState>,
95
96 refund_grant_to: Option<Account>,
98 resource_controller: ResourceController,
100}
101
102#[derive(Debug)]
104struct ApplicationStatus {
105 caller_id: Option<UserApplicationId>,
107 id: UserApplicationId,
109 parameters: Vec<u8>,
111 signer: Option<Owner>,
113 outcome: RawExecutionOutcome<Vec<u8>>,
115}
116
117#[derive(Debug)]
119struct LoadedApplication<Instance> {
120 instance: Arc<Mutex<Instance>>,
121 parameters: Vec<u8>,
122}
123
124impl<Instance> LoadedApplication<Instance> {
125 fn new(instance: Instance, description: UserApplicationDescription) -> Self {
127 LoadedApplication {
128 instance: Arc::new(Mutex::new(instance)),
129 parameters: description.parameters,
130 }
131 }
132}
133
134impl<Instance> Clone for LoadedApplication<Instance> {
135 fn clone(&self) -> Self {
138 LoadedApplication {
139 instance: self.instance.clone(),
140 parameters: self.parameters.clone(),
141 }
142 }
143}
144
145#[derive(Debug)]
146enum Promise<T> {
147 Ready(T),
148 Pending(Receiver<T>),
149}
150
151impl<T> Promise<T> {
152 fn force(&mut self) -> Result<(), ExecutionError> {
153 if let Promise::Pending(receiver) = self {
154 let value = receiver
155 .recv_ref()
156 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
157 *self = Promise::Ready(value);
158 }
159 Ok(())
160 }
161
162 fn read(self) -> Result<T, ExecutionError> {
163 match self {
164 Promise::Pending(receiver) => {
165 let value = receiver.recv_response()?;
166 Ok(value)
167 }
168 Promise::Ready(value) => Ok(value),
169 }
170 }
171}
172
173#[derive(Debug, Default)]
175struct QueryManager<T> {
176 pending_queries: BTreeMap<u32, Promise<T>>,
178 query_count: u32,
180 active_query_count: u32,
182}
183
184impl<T> QueryManager<T> {
185 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
186 let id = self.query_count;
187 self.pending_queries.insert(id, Promise::Pending(receiver));
188 self.query_count = self
189 .query_count
190 .checked_add(1)
191 .ok_or(ArithmeticError::Overflow)?;
192 self.active_query_count = self
193 .active_query_count
194 .checked_add(1)
195 .ok_or(ArithmeticError::Overflow)?;
196 Ok(id)
197 }
198
199 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
200 let promise = self
201 .pending_queries
202 .remove(&id)
203 .ok_or(ExecutionError::InvalidPromise)?;
204 let value = promise.read()?;
205 self.active_query_count -= 1;
206 Ok(value)
207 }
208
209 fn force_all(&mut self) -> Result<(), ExecutionError> {
210 for promise in self.pending_queries.values_mut() {
211 promise.force()?;
212 }
213 Ok(())
214 }
215}
216
217type Keys = Vec<Vec<u8>>;
218type Value = Vec<u8>;
219type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
220
221#[derive(Debug, Default)]
222struct ViewUserState {
223 contains_key_queries: QueryManager<bool>,
225 contains_keys_queries: QueryManager<Vec<bool>>,
227 read_value_queries: QueryManager<Option<Value>>,
229 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
231 find_keys_queries: QueryManager<Keys>,
233 find_key_values_queries: QueryManager<KeyValues>,
235}
236
237impl ViewUserState {
238 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
239 self.contains_key_queries.force_all()?;
240 self.contains_keys_queries.force_all()?;
241 self.read_value_queries.force_all()?;
242 self.read_multi_values_queries.force_all()?;
243 self.find_keys_queries.force_all()?;
244 self.find_key_values_queries.force_all()?;
245 Ok(())
246 }
247}
248
249impl<UserInstance> Deref for SyncRuntime<UserInstance> {
250 type Target = SyncRuntimeHandle<UserInstance>;
251
252 fn deref(&self) -> &Self::Target {
253 self.0.as_ref().expect(
254 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
255 )
256 }
257}
258
259impl<UserInstance> DerefMut for SyncRuntime<UserInstance> {
260 fn deref_mut(&mut self) -> &mut Self::Target {
261 self.0.as_mut().expect(
262 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
263 )
264 }
265}
266
267impl<UserInstance> Drop for SyncRuntime<UserInstance> {
268 fn drop(&mut self) {
269 if let Some(mut handle) = self.0.take() {
272 handle.inner().loaded_applications.clear();
273 }
274 }
275}
276
277impl<UserInstance> SyncRuntimeInternal<UserInstance> {
278 #[expect(clippy::too_many_arguments)]
279 fn new(
280 chain_id: ChainId,
281 height: BlockHeight,
282 local_time: Timestamp,
283 authenticated_signer: Option<Owner>,
284 executing_message: Option<ExecutingMessage>,
285 execution_state_sender: ExecutionStateSender,
286 refund_grant_to: Option<Account>,
287 resource_controller: ResourceController,
288 transaction_tracker: TransactionTracker,
289 ) -> Self {
290 Self {
291 chain_id,
292 height,
293 local_time,
294 authenticated_signer,
295 executing_message,
296 execution_state_sender,
297 is_finalizing: false,
298 applications_to_finalize: Vec::new(),
299 loaded_applications: HashMap::new(),
300 call_stack: Vec::new(),
301 active_applications: HashSet::new(),
302 view_user_states: BTreeMap::new(),
303 refund_grant_to,
304 resource_controller,
305 transaction_tracker,
306 }
307 }
308
309 fn current_application(&mut self) -> &ApplicationStatus {
317 self.call_stack
318 .last()
319 .expect("Call stack is unexpectedly empty")
320 }
321
322 fn current_application_mut(&mut self) -> &mut ApplicationStatus {
330 self.call_stack
331 .last_mut()
332 .expect("Call stack is unexpectedly empty")
333 }
334
335 fn push_application(&mut self, status: ApplicationStatus) {
339 self.active_applications.insert(status.id);
340 self.call_stack.push(status);
341 }
342
343 fn pop_application(&mut self) -> ApplicationStatus {
351 let status = self
352 .call_stack
353 .pop()
354 .expect("Can't remove application from empty call stack");
355 assert!(self.active_applications.remove(&status.id));
356 status
357 }
358
359 fn check_for_reentrancy(
363 &mut self,
364 application_id: UserApplicationId,
365 ) -> Result<(), ExecutionError> {
366 ensure!(
367 !self.active_applications.contains(&application_id),
368 ExecutionError::ReentrantCall(application_id)
369 );
370 Ok(())
371 }
372}
373
374impl SyncRuntimeInternal<UserContractInstance> {
375 fn load_contract_instance(
377 &mut self,
378 this: Arc<Mutex<Self>>,
379 id: UserApplicationId,
380 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
381 match self.loaded_applications.entry(id) {
382 hash_map::Entry::Vacant(entry) => {
383 let (code, description) = self
384 .execution_state_sender
385 .send_request(|callback| ExecutionRequest::LoadContract { id, callback })?
386 .recv_response()?;
387
388 let instance = code.instantiate(SyncRuntimeHandle(this))?;
389
390 self.applications_to_finalize.push(id);
391 Ok(entry
392 .insert(LoadedApplication::new(instance, description))
393 .clone())
394 }
395 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
396 }
397 }
398
399 fn prepare_for_call(
401 &mut self,
402 this: Arc<Mutex<Self>>,
403 authenticated: bool,
404 callee_id: UserApplicationId,
405 ) -> Result<(Arc<Mutex<UserContractInstance>>, OperationContext), ExecutionError> {
406 self.check_for_reentrancy(callee_id)?;
407
408 ensure!(
409 !self.is_finalizing,
410 ExecutionError::CrossApplicationCallInFinalize {
411 caller_id: Box::new(self.current_application().id),
412 callee_id: Box::new(callee_id),
413 }
414 );
415
416 let application = self.load_contract_instance(this, callee_id)?;
418
419 let caller = self.current_application();
420 let caller_id = caller.id;
421 let caller_signer = caller.signer;
422 let authenticated_signer = match caller_signer {
424 Some(signer) if authenticated => Some(signer),
425 _ => None,
426 };
427 let authenticated_caller_id = authenticated.then_some(caller_id);
428 let callee_context = OperationContext {
429 chain_id: self.chain_id,
430 authenticated_signer,
431 authenticated_caller_id,
432 height: self.height,
433 index: None,
434 };
435 self.push_application(ApplicationStatus {
436 caller_id: authenticated_caller_id,
437 id: callee_id,
438 parameters: application.parameters,
439 signer: authenticated_signer,
441 outcome: RawExecutionOutcome::default(),
442 });
443 Ok((application.instance, callee_context))
444 }
445
446 fn finish_call(&mut self) -> Result<(), ExecutionError> {
448 let ApplicationStatus {
449 id: callee_id,
450 signer,
451 outcome,
452 ..
453 } = self.pop_application();
454
455 self.handle_outcome(outcome, signer, callee_id)?;
456
457 Ok(())
458 }
459
460 fn handle_outcome(
466 &mut self,
467 raw_outcome: RawExecutionOutcome<Vec<u8>, Resources>,
468 signer: Option<Owner>,
469 application_id: UserApplicationId,
470 ) -> Result<(), ExecutionError> {
471 let outcome = raw_outcome
472 .with_refund_grant_to(self.refund_grant_to)
473 .with_authenticated_signer(signer)
474 .into_priced(&self.resource_controller.policy)?;
475
476 for message in &outcome.messages {
477 self.resource_controller.track_grant(message.grant)?;
478 }
479
480 self.transaction_tracker
481 .add_user_outcome(application_id, outcome)?;
482 Ok(())
483 }
484}
485
486impl SyncRuntimeInternal<UserServiceInstance> {
487 fn load_service_instance(
489 &mut self,
490 this: Arc<Mutex<Self>>,
491 id: UserApplicationId,
492 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
493 match self.loaded_applications.entry(id) {
494 hash_map::Entry::Vacant(entry) => {
495 let (code, description) = self
496 .execution_state_sender
497 .send_request(|callback| ExecutionRequest::LoadService { id, callback })?
498 .recv_response()?;
499
500 let instance = code.instantiate(SyncRuntimeHandle(this))?;
501 Ok(entry
502 .insert(LoadedApplication::new(instance, description))
503 .clone())
504 }
505 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
506 }
507 }
508}
509
510impl<UserInstance> SyncRuntime<UserInstance> {
511 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
512 let handle = self.0.take().expect(
513 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
514 );
515 let runtime = Arc::into_inner(handle.0)?
516 .into_inner()
517 .expect("`SyncRuntime` should run in a single thread which should not panic");
518 Some(runtime)
519 }
520}
521
522impl<UserInstance> From<SyncRuntimeInternal<UserInstance>> for SyncRuntimeHandle<UserInstance> {
523 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
524 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
525 }
526}
527
528impl<UserInstance> SyncRuntimeHandle<UserInstance> {
529 fn inner(&mut self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
530 self.0
531 .try_lock()
532 .expect("Synchronous runtimes run on a single execution thread")
533 }
534}
535
536impl<UserInstance> BaseRuntime for SyncRuntimeHandle<UserInstance> {
537 type Read = <SyncRuntimeInternal<UserInstance> as BaseRuntime>::Read;
538 type ReadValueBytes = <SyncRuntimeInternal<UserInstance> as BaseRuntime>::ReadValueBytes;
539 type ContainsKey = <SyncRuntimeInternal<UserInstance> as BaseRuntime>::ContainsKey;
540 type ContainsKeys = <SyncRuntimeInternal<UserInstance> as BaseRuntime>::ContainsKeys;
541 type ReadMultiValuesBytes =
542 <SyncRuntimeInternal<UserInstance> as BaseRuntime>::ReadMultiValuesBytes;
543 type FindKeysByPrefix = <SyncRuntimeInternal<UserInstance> as BaseRuntime>::FindKeysByPrefix;
544 type FindKeyValuesByPrefix =
545 <SyncRuntimeInternal<UserInstance> as BaseRuntime>::FindKeyValuesByPrefix;
546
547 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
548 self.inner().chain_id()
549 }
550
551 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
552 self.inner().block_height()
553 }
554
555 fn application_id(&mut self) -> Result<UserApplicationId, ExecutionError> {
556 self.inner().application_id()
557 }
558
559 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
560 self.inner().application_creator_chain_id()
561 }
562
563 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
564 self.inner().application_parameters()
565 }
566
567 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
568 self.inner().read_system_timestamp()
569 }
570
571 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
572 self.inner().read_chain_balance()
573 }
574
575 fn read_owner_balance(&mut self, owner: Owner) -> Result<Amount, ExecutionError> {
576 self.inner().read_owner_balance(owner)
577 }
578
579 fn read_owner_balances(&mut self) -> Result<Vec<(Owner, Amount)>, ExecutionError> {
580 self.inner().read_owner_balances()
581 }
582
583 fn read_balance_owners(&mut self) -> Result<Vec<Owner>, ExecutionError> {
584 self.inner().read_balance_owners()
585 }
586
587 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
588 self.inner().chain_ownership()
589 }
590
591 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
592 self.inner().contains_key_new(key)
593 }
594
595 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
596 self.inner().contains_key_wait(promise)
597 }
598
599 fn contains_keys_new(
600 &mut self,
601 keys: Vec<Vec<u8>>,
602 ) -> Result<Self::ContainsKeys, ExecutionError> {
603 self.inner().contains_keys_new(keys)
604 }
605
606 fn contains_keys_wait(
607 &mut self,
608 promise: &Self::ContainsKeys,
609 ) -> Result<Vec<bool>, ExecutionError> {
610 self.inner().contains_keys_wait(promise)
611 }
612
613 fn read_multi_values_bytes_new(
614 &mut self,
615 keys: Vec<Vec<u8>>,
616 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
617 self.inner().read_multi_values_bytes_new(keys)
618 }
619
620 fn read_multi_values_bytes_wait(
621 &mut self,
622 promise: &Self::ReadMultiValuesBytes,
623 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
624 self.inner().read_multi_values_bytes_wait(promise)
625 }
626
627 fn read_value_bytes_new(
628 &mut self,
629 key: Vec<u8>,
630 ) -> Result<Self::ReadValueBytes, ExecutionError> {
631 self.inner().read_value_bytes_new(key)
632 }
633
634 fn read_value_bytes_wait(
635 &mut self,
636 promise: &Self::ReadValueBytes,
637 ) -> Result<Option<Vec<u8>>, ExecutionError> {
638 self.inner().read_value_bytes_wait(promise)
639 }
640
641 fn find_keys_by_prefix_new(
642 &mut self,
643 key_prefix: Vec<u8>,
644 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
645 self.inner().find_keys_by_prefix_new(key_prefix)
646 }
647
648 fn find_keys_by_prefix_wait(
649 &mut self,
650 promise: &Self::FindKeysByPrefix,
651 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
652 self.inner().find_keys_by_prefix_wait(promise)
653 }
654
655 fn find_key_values_by_prefix_new(
656 &mut self,
657 key_prefix: Vec<u8>,
658 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
659 self.inner().find_key_values_by_prefix_new(key_prefix)
660 }
661
662 fn find_key_values_by_prefix_wait(
663 &mut self,
664 promise: &Self::FindKeyValuesByPrefix,
665 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
666 self.inner().find_key_values_by_prefix_wait(promise)
667 }
668
669 fn query_service(
670 &mut self,
671 application_id: ApplicationId,
672 query: Vec<u8>,
673 ) -> Result<Vec<u8>, ExecutionError> {
674 self.inner().query_service(application_id, query)
675 }
676
677 fn http_post(
678 &mut self,
679 url: &str,
680 content_type: String,
681 payload: Vec<u8>,
682 ) -> Result<Vec<u8>, ExecutionError> {
683 self.inner().http_post(url, content_type, payload)
684 }
685
686 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
687 self.inner().assert_before(timestamp)
688 }
689
690 fn read_data_blob(&mut self, hash: &CryptoHash) -> Result<Vec<u8>, ExecutionError> {
691 self.inner().read_data_blob(hash)
692 }
693
694 fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError> {
695 self.inner().assert_data_blob_exists(hash)
696 }
697}
698
699impl<UserInstance> BaseRuntime for SyncRuntimeInternal<UserInstance> {
700 type Read = ();
701 type ReadValueBytes = u32;
702 type ContainsKey = u32;
703 type ContainsKeys = u32;
704 type ReadMultiValuesBytes = u32;
705 type FindKeysByPrefix = u32;
706 type FindKeyValuesByPrefix = u32;
707
708 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
709 Ok(self.chain_id)
710 }
711
712 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
713 Ok(self.height)
714 }
715
716 fn application_id(&mut self) -> Result<UserApplicationId, ExecutionError> {
717 Ok(self.current_application().id)
718 }
719
720 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
721 Ok(self.current_application().id.creation.chain_id)
722 }
723
724 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
725 Ok(self.current_application().parameters.clone())
726 }
727
728 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
729 self.execution_state_sender
730 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
731 .recv_response()
732 }
733
734 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
735 self.execution_state_sender
736 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
737 .recv_response()
738 }
739
740 fn read_owner_balance(&mut self, owner: Owner) -> Result<Amount, ExecutionError> {
741 self.execution_state_sender
742 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
743 .recv_response()
744 }
745
746 fn read_owner_balances(&mut self) -> Result<Vec<(Owner, Amount)>, ExecutionError> {
747 self.execution_state_sender
748 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
749 .recv_response()
750 }
751
752 fn read_balance_owners(&mut self) -> Result<Vec<Owner>, ExecutionError> {
753 self.execution_state_sender
754 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
755 .recv_response()
756 }
757
758 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
759 self.execution_state_sender
760 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
761 .recv_response()
762 }
763
764 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
765 let id = self.application_id()?;
766 let state = self.view_user_states.entry(id).or_default();
767 self.resource_controller.track_read_operations(1)?;
768 let receiver = self
769 .execution_state_sender
770 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
771 state.contains_key_queries.register(receiver)
772 }
773
774 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
775 let id = self.application_id()?;
776 let state = self.view_user_states.entry(id).or_default();
777 let value = state.contains_key_queries.wait(*promise)?;
778 Ok(value)
779 }
780
781 fn contains_keys_new(
782 &mut self,
783 keys: Vec<Vec<u8>>,
784 ) -> Result<Self::ContainsKeys, ExecutionError> {
785 let id = self.application_id()?;
786 let state = self.view_user_states.entry(id).or_default();
787 self.resource_controller.track_read_operations(1)?;
788 let receiver = self
789 .execution_state_sender
790 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
791 state.contains_keys_queries.register(receiver)
792 }
793
794 fn contains_keys_wait(
795 &mut self,
796 promise: &Self::ContainsKeys,
797 ) -> Result<Vec<bool>, ExecutionError> {
798 let id = self.application_id()?;
799 let state = self.view_user_states.entry(id).or_default();
800 let value = state.contains_keys_queries.wait(*promise)?;
801 Ok(value)
802 }
803
804 fn read_multi_values_bytes_new(
805 &mut self,
806 keys: Vec<Vec<u8>>,
807 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
808 let id = self.application_id()?;
809 let state = self.view_user_states.entry(id).or_default();
810 self.resource_controller.track_read_operations(1)?;
811 let receiver = self.execution_state_sender.send_request(move |callback| {
812 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
813 })?;
814 state.read_multi_values_queries.register(receiver)
815 }
816
817 fn read_multi_values_bytes_wait(
818 &mut self,
819 promise: &Self::ReadMultiValuesBytes,
820 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
821 let id = self.application_id()?;
822 let state = self.view_user_states.entry(id).or_default();
823 let values = state.read_multi_values_queries.wait(*promise)?;
824 for value in &values {
825 if let Some(value) = &value {
826 self.resource_controller
827 .track_bytes_read(value.len() as u64)?;
828 }
829 }
830 Ok(values)
831 }
832
833 fn read_value_bytes_new(
834 &mut self,
835 key: Vec<u8>,
836 ) -> Result<Self::ReadValueBytes, ExecutionError> {
837 let id = self.application_id()?;
838 let state = self.view_user_states.entry(id).or_default();
839 self.resource_controller.track_read_operations(1)?;
840 let receiver = self
841 .execution_state_sender
842 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
843 state.read_value_queries.register(receiver)
844 }
845
846 fn read_value_bytes_wait(
847 &mut self,
848 promise: &Self::ReadValueBytes,
849 ) -> Result<Option<Vec<u8>>, ExecutionError> {
850 let id = self.application_id()?;
851 let state = self.view_user_states.entry(id).or_default();
852 let value = state.read_value_queries.wait(*promise)?;
853 if let Some(value) = &value {
854 self.resource_controller
855 .track_bytes_read(value.len() as u64)?;
856 }
857 Ok(value)
858 }
859
860 fn find_keys_by_prefix_new(
861 &mut self,
862 key_prefix: Vec<u8>,
863 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
864 let id = self.application_id()?;
865 let state = self.view_user_states.entry(id).or_default();
866 self.resource_controller.track_read_operations(1)?;
867 let receiver = self.execution_state_sender.send_request(move |callback| {
868 ExecutionRequest::FindKeysByPrefix {
869 id,
870 key_prefix,
871 callback,
872 }
873 })?;
874 state.find_keys_queries.register(receiver)
875 }
876
877 fn find_keys_by_prefix_wait(
878 &mut self,
879 promise: &Self::FindKeysByPrefix,
880 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
881 let id = self.application_id()?;
882 let state = self.view_user_states.entry(id).or_default();
883 let keys = state.find_keys_queries.wait(*promise)?;
884 let mut read_size = 0;
885 for key in &keys {
886 read_size += key.len();
887 }
888 self.resource_controller
889 .track_bytes_read(read_size as u64)?;
890 Ok(keys)
891 }
892
893 fn find_key_values_by_prefix_new(
894 &mut self,
895 key_prefix: Vec<u8>,
896 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
897 let id = self.application_id()?;
898 let state = self.view_user_states.entry(id).or_default();
899 self.resource_controller.track_read_operations(1)?;
900 let receiver = self.execution_state_sender.send_request(move |callback| {
901 ExecutionRequest::FindKeyValuesByPrefix {
902 id,
903 key_prefix,
904 callback,
905 }
906 })?;
907 state.find_key_values_queries.register(receiver)
908 }
909
910 fn find_key_values_by_prefix_wait(
911 &mut self,
912 promise: &Self::FindKeyValuesByPrefix,
913 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
914 let id = self.application_id()?;
915 let state = self.view_user_states.entry(id).or_default();
916 let key_values = state.find_key_values_queries.wait(*promise)?;
917 let mut read_size = 0;
918 for (key, value) in &key_values {
919 read_size += key.len() + value.len();
920 }
921 self.resource_controller
922 .track_bytes_read(read_size as u64)?;
923 Ok(key_values)
924 }
925
926 fn query_service(
927 &mut self,
928 application_id: ApplicationId,
929 query: Vec<u8>,
930 ) -> Result<Vec<u8>, ExecutionError> {
931 ensure!(
932 cfg!(feature = "unstable-oracles"),
933 ExecutionError::UnstableOracle
934 );
935 let response =
936 if let Some(response) = self.transaction_tracker.next_replayed_oracle_response()? {
937 match response {
938 OracleResponse::Service(bytes) => bytes,
939 _ => return Err(ExecutionError::OracleResponseMismatch),
940 }
941 } else {
942 let context = QueryContext {
943 chain_id: self.chain_id,
944 next_block_height: self.height,
945 local_time: self.local_time,
946 };
947 let sender = self.execution_state_sender.clone();
948 ServiceSyncRuntime::new(sender, context).run_query(application_id, query)?
949 };
950 self.transaction_tracker
951 .add_oracle_response(OracleResponse::Service(response.clone()));
952 Ok(response)
953 }
954
955 fn http_post(
956 &mut self,
957 url: &str,
958 content_type: String,
959 payload: Vec<u8>,
960 ) -> Result<Vec<u8>, ExecutionError> {
961 ensure!(
962 cfg!(feature = "unstable-oracles"),
963 ExecutionError::UnstableOracle
964 );
965 let bytes =
966 if let Some(response) = self.transaction_tracker.next_replayed_oracle_response()? {
967 match response {
968 OracleResponse::Post(bytes) => bytes,
969 _ => return Err(ExecutionError::OracleResponseMismatch),
970 }
971 } else {
972 let url = url.to_string();
973 self.execution_state_sender
974 .send_request(|callback| ExecutionRequest::HttpPost {
975 url,
976 content_type,
977 payload,
978 callback,
979 })?
980 .recv_response()?
981 };
982 self.transaction_tracker
983 .add_oracle_response(OracleResponse::Post(bytes.clone()));
984 Ok(bytes)
985 }
986
987 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
988 if !self
989 .transaction_tracker
990 .replay_oracle_response(OracleResponse::Assert)?
991 {
992 ensure!(
994 self.local_time < timestamp,
995 ExecutionError::AssertBefore {
996 timestamp,
997 local_time: self.local_time,
998 }
999 );
1000 }
1001 Ok(())
1002 }
1003
1004 fn read_data_blob(&mut self, hash: &CryptoHash) -> Result<Vec<u8>, ExecutionError> {
1005 let blob_id = BlobId::new(*hash, BlobType::Data);
1006 self.transaction_tracker
1007 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
1008 let blob_content = self
1009 .execution_state_sender
1010 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
1011 .recv_response()?;
1012 Ok(blob_content.inner_bytes())
1013 }
1014
1015 fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError> {
1016 let blob_id = BlobId::new(*hash, BlobType::Data);
1017 self.transaction_tracker
1018 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
1019 self.execution_state_sender
1020 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
1021 .recv_response()?;
1022 Ok(())
1023 }
1024}
1025
1026impl<UserInstance> Clone for SyncRuntimeHandle<UserInstance> {
1027 fn clone(&self) -> Self {
1028 SyncRuntimeHandle(self.0.clone())
1029 }
1030}
1031
1032impl ContractSyncRuntime {
1033 #[expect(clippy::too_many_arguments)]
1035 pub(crate) fn run_action(
1036 execution_state_sender: ExecutionStateSender,
1037 application_id: UserApplicationId,
1038 chain_id: ChainId,
1039 local_time: Timestamp,
1040 refund_grant_to: Option<Account>,
1041 resource_controller: ResourceController,
1042 action: UserAction,
1043 txn_tracker: TransactionTracker,
1044 ) -> Result<(ResourceController, TransactionTracker), ExecutionError> {
1045 let executing_message = match &action {
1046 UserAction::Message(context, _) => Some(context.into()),
1047 _ => None,
1048 };
1049 let signer = action.signer();
1050 let height = action.height();
1051 let mut runtime = SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1052 SyncRuntimeInternal::new(
1053 chain_id,
1054 height,
1055 local_time,
1056 signer,
1057 executing_message,
1058 execution_state_sender,
1059 refund_grant_to,
1060 resource_controller,
1061 txn_tracker,
1062 ),
1063 )));
1064 let finalize_context = FinalizeContext {
1065 authenticated_signer: signer,
1066 chain_id,
1067 height,
1068 };
1069 runtime.execute(application_id, signer, move |code| match action {
1070 UserAction::Instantiate(context, argument) => code.instantiate(context, argument),
1071 UserAction::Operation(context, operation) => {
1072 code.execute_operation(context, operation).map(|_| ())
1073 }
1074 UserAction::Message(context, message) => code.execute_message(context, message),
1075 })?;
1076 runtime.finalize(finalize_context)?;
1077 let runtime = runtime
1078 .into_inner()
1079 .expect("Runtime clones should have been freed by now");
1080 Ok((runtime.resource_controller, runtime.transaction_tracker))
1081 }
1082
1083 fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1085 let applications = mem::take(&mut self.inner().applications_to_finalize)
1086 .into_iter()
1087 .rev();
1088
1089 self.inner().is_finalizing = true;
1090
1091 for application in applications {
1092 self.execute(application, context.authenticated_signer, |contract| {
1093 contract.finalize(context)
1094 })?;
1095 self.inner().loaded_applications.remove(&application);
1096 }
1097
1098 Ok(())
1099 }
1100
1101 fn execute(
1103 &mut self,
1104 application_id: UserApplicationId,
1105 signer: Option<Owner>,
1106 closure: impl FnOnce(&mut UserContractInstance) -> Result<(), ExecutionError>,
1107 ) -> Result<(), ExecutionError> {
1108 let contract = {
1109 let cloned_runtime = self.0.clone().expect(
1110 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
1111 );
1112 let mut runtime = self.inner();
1113 let application = runtime.load_contract_instance(cloned_runtime.0, application_id)?;
1114
1115 let status = ApplicationStatus {
1116 caller_id: None,
1117 id: application_id,
1118 parameters: application.parameters.clone(),
1119 signer,
1120 outcome: RawExecutionOutcome::default(),
1121 };
1122
1123 runtime.push_application(status);
1124
1125 application
1126 };
1127
1128 closure(
1129 &mut contract
1130 .instance
1131 .try_lock()
1132 .expect("Application should not be already executing"),
1133 )?;
1134
1135 let mut runtime = self.inner();
1136 let application_status = runtime.pop_application();
1137 assert_eq!(application_status.caller_id, None);
1138 assert_eq!(application_status.id, application_id);
1139 assert_eq!(application_status.parameters, contract.parameters);
1140 assert_eq!(application_status.signer, signer);
1141 assert!(runtime.call_stack.is_empty());
1142
1143 runtime.handle_outcome(application_status.outcome, signer, application_id)?;
1144
1145 Ok(())
1146 }
1147}
1148
1149impl ContractRuntime for ContractSyncRuntimeHandle {
1150 fn authenticated_signer(&mut self) -> Result<Option<Owner>, ExecutionError> {
1151 Ok(self.inner().authenticated_signer)
1152 }
1153
1154 fn message_id(&mut self) -> Result<Option<MessageId>, ExecutionError> {
1155 Ok(self.inner().executing_message.map(|metadata| metadata.id))
1156 }
1157
1158 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1159 Ok(self
1160 .inner()
1161 .executing_message
1162 .map(|metadata| metadata.is_bouncing))
1163 }
1164
1165 fn authenticated_caller_id(&mut self) -> Result<Option<UserApplicationId>, ExecutionError> {
1166 let mut this = self.inner();
1167 if this.call_stack.len() <= 1 {
1168 return Ok(None);
1169 }
1170 Ok(this.current_application().caller_id)
1171 }
1172
1173 fn remaining_fuel(&mut self) -> Result<u64, ExecutionError> {
1174 Ok(self.inner().resource_controller.remaining_fuel())
1175 }
1176
1177 fn consume_fuel(&mut self, fuel: u64) -> Result<(), ExecutionError> {
1178 let mut this = self.inner();
1179 this.resource_controller.track_fuel(fuel)
1180 }
1181
1182 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1183 let mut this = self.inner();
1184 let application = this.current_application_mut();
1185
1186 application.outcome.messages.push(message.into());
1187
1188 Ok(())
1189 }
1190
1191 fn subscribe(&mut self, chain: ChainId, channel: ChannelName) -> Result<(), ExecutionError> {
1192 let mut this = self.inner();
1193 let application = this.current_application_mut();
1194
1195 application.outcome.subscribe.push((channel, chain));
1196
1197 Ok(())
1198 }
1199
1200 fn unsubscribe(&mut self, chain: ChainId, channel: ChannelName) -> Result<(), ExecutionError> {
1201 let mut this = self.inner();
1202 let application = this.current_application_mut();
1203
1204 application.outcome.unsubscribe.push((channel, chain));
1205
1206 Ok(())
1207 }
1208
1209 fn transfer(
1210 &mut self,
1211 source: Option<Owner>,
1212 destination: Account,
1213 amount: Amount,
1214 ) -> Result<(), ExecutionError> {
1215 let signer = self.inner().current_application().signer;
1216 let execution_outcome = self
1217 .inner()
1218 .execution_state_sender
1219 .send_request(|callback| ExecutionRequest::Transfer {
1220 source,
1221 destination,
1222 amount,
1223 signer,
1224 callback,
1225 })?
1226 .recv_response()?;
1227 self.inner()
1228 .transaction_tracker
1229 .add_system_outcome(execution_outcome)?;
1230 Ok(())
1231 }
1232
1233 fn claim(
1234 &mut self,
1235 source: Account,
1236 destination: Account,
1237 amount: Amount,
1238 ) -> Result<(), ExecutionError> {
1239 let signer = self.inner().current_application().signer;
1240 let execution_outcome = self
1241 .inner()
1242 .execution_state_sender
1243 .send_request(|callback| ExecutionRequest::Claim {
1244 source,
1245 destination,
1246 amount,
1247 signer,
1248 callback,
1249 })?
1250 .recv_response()?
1251 .with_authenticated_signer(signer);
1252 self.inner()
1253 .transaction_tracker
1254 .add_system_outcome(execution_outcome)?;
1255 Ok(())
1256 }
1257
1258 fn try_call_application(
1259 &mut self,
1260 authenticated: bool,
1261 callee_id: UserApplicationId,
1262 argument: Vec<u8>,
1263 ) -> Result<Vec<u8>, ExecutionError> {
1264 let cloned_self = self.clone().0;
1265 let (contract, context) =
1266 self.inner()
1267 .prepare_for_call(cloned_self, authenticated, callee_id)?;
1268
1269 let value = contract
1270 .try_lock()
1271 .expect("Applications should not have reentrant calls")
1272 .execute_operation(context, argument)?;
1273
1274 self.inner().finish_call()?;
1275
1276 Ok(value)
1277 }
1278
1279 fn emit(
1280 &mut self,
1281 name: StreamName,
1282 key: Vec<u8>,
1283 value: Vec<u8>,
1284 ) -> Result<(), ExecutionError> {
1285 let mut this = self.inner();
1286 ensure!(
1287 key.len() <= MAX_EVENT_KEY_LEN,
1288 ExecutionError::EventKeyTooLong
1289 );
1290 ensure!(
1291 name.0.len() <= MAX_STREAM_NAME_LEN,
1292 ExecutionError::StreamNameTooLong
1293 );
1294 let application = this.current_application_mut();
1295 application.outcome.events.push((name, key, value));
1296 Ok(())
1297 }
1298
1299 fn open_chain(
1300 &mut self,
1301 ownership: ChainOwnership,
1302 application_permissions: ApplicationPermissions,
1303 balance: Amount,
1304 ) -> Result<(MessageId, ChainId), ExecutionError> {
1305 let mut this = self.inner();
1306 let message_id = MessageId {
1307 chain_id: this.chain_id,
1308 height: this.height,
1309 index: this.transaction_tracker.next_message_index(),
1310 };
1311 let chain_id = ChainId::child(message_id);
1312 let [open_chain_message, subscribe_message] = this
1313 .execution_state_sender
1314 .send_request(|callback| ExecutionRequest::OpenChain {
1315 ownership,
1316 balance,
1317 next_message_id: message_id,
1318 application_permissions,
1319 callback,
1320 })?
1321 .recv_response()?;
1322 let outcome = RawExecutionOutcome::default()
1323 .with_message(open_chain_message)
1324 .with_message(subscribe_message);
1325 this.transaction_tracker.add_system_outcome(outcome)?;
1326 Ok((message_id, chain_id))
1327 }
1328
1329 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1330 let mut this = self.inner();
1331 let application_id = this.current_application().id;
1332 this.execution_state_sender
1333 .send_request(|callback| ExecutionRequest::CloseChain {
1334 application_id,
1335 callback,
1336 })?
1337 .recv_response()?
1338 }
1339
1340 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1341 let mut this = self.inner();
1342 let id = this.application_id()?;
1343 let state = this.view_user_states.entry(id).or_default();
1344 state.force_all_pending_queries()?;
1345 this.resource_controller.track_write_operations(
1346 batch
1347 .num_operations()
1348 .try_into()
1349 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1350 )?;
1351 this.resource_controller
1352 .track_bytes_written(batch.size() as u64)?;
1353 this.execution_state_sender
1354 .send_request(|callback| ExecutionRequest::WriteBatch {
1355 id,
1356 batch,
1357 callback,
1358 })?
1359 .recv_response()?;
1360 Ok(())
1361 }
1362}
1363
1364impl ServiceSyncRuntime {
1365 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1367 let runtime = SyncRuntime(Some(
1368 SyncRuntimeInternal::new(
1369 context.chain_id,
1370 context.next_block_height,
1371 context.local_time,
1372 None,
1373 None,
1374 execution_state_sender,
1375 None,
1376 ResourceController::default(),
1377 TransactionTracker::default(),
1378 )
1379 .into(),
1380 ));
1381
1382 ServiceSyncRuntime {
1383 runtime,
1384 current_context: context,
1385 }
1386 }
1387
1388 pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1390 while let Ok(request) = incoming_requests.recv() {
1391 let ServiceRuntimeRequest::Query {
1392 application_id,
1393 context,
1394 query,
1395 callback,
1396 } = request;
1397
1398 self.prepare_for_query(context);
1399
1400 let _ = callback.send(self.run_query(application_id, query));
1401 }
1402 }
1403
1404 pub(crate) fn prepare_for_query(&mut self, new_context: QueryContext) {
1406 let expected_context = QueryContext {
1407 local_time: new_context.local_time,
1408 ..self.current_context
1409 };
1410
1411 if new_context != expected_context {
1412 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1413 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1414 } else {
1415 self.handle_mut().inner().local_time = new_context.local_time;
1416 }
1417 }
1418
1419 pub(crate) fn run_query(
1421 &mut self,
1422 application_id: UserApplicationId,
1423 query: Vec<u8>,
1424 ) -> Result<Vec<u8>, ExecutionError> {
1425 self.handle_mut()
1426 .try_query_application(application_id, query)
1427 }
1428
1429 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1431 self.runtime.0.as_mut().expect(
1432 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1433 )
1434 }
1435}
1436
1437impl ServiceRuntime for ServiceSyncRuntimeHandle {
1438 fn try_query_application(
1440 &mut self,
1441 queried_id: UserApplicationId,
1442 argument: Vec<u8>,
1443 ) -> Result<Vec<u8>, ExecutionError> {
1444 let (query_context, service) = {
1445 let cloned_self = self.clone().0;
1446 let mut this = self.inner();
1447
1448 let application = this.load_service_instance(cloned_self, queried_id)?;
1450 let query_context = QueryContext {
1452 chain_id: this.chain_id,
1453 next_block_height: this.height,
1454 local_time: this.local_time,
1455 };
1456 this.push_application(ApplicationStatus {
1457 caller_id: None,
1458 id: queried_id,
1459 parameters: application.parameters,
1460 signer: None,
1461 outcome: RawExecutionOutcome::default(),
1462 });
1463 (query_context, application.instance)
1464 };
1465 let response = service
1466 .try_lock()
1467 .expect("Applications should not have reentrant calls")
1468 .handle_query(query_context, argument)?;
1469 self.inner().pop_application();
1470 Ok(response)
1471 }
1472
1473 fn fetch_url(&mut self, url: &str) -> Result<Vec<u8>, ExecutionError> {
1475 let this = self.inner();
1476 let url = url.to_string();
1477 this.execution_state_sender
1478 .send_request(|callback| ExecutionRequest::FetchUrl { url, callback })?
1479 .recv_response()
1480 }
1481}
1482
1483pub enum ServiceRuntimeRequest {
1485 Query {
1486 application_id: UserApplicationId,
1487 context: QueryContext,
1488 query: Vec<u8>,
1489 callback: oneshot::Sender<Result<Vec<u8>, ExecutionError>>,
1490 },
1491}
1492
1493#[derive(Clone, Copy, Debug)]
1495struct ExecutingMessage {
1496 id: MessageId,
1497 is_bouncing: bool,
1498}
1499
1500impl From<&MessageContext> for ExecutingMessage {
1501 fn from(context: &MessageContext) -> Self {
1502 ExecutingMessage {
1503 id: context.message_id,
1504 is_bouncing: context.is_bouncing,
1505 }
1506 }
1507}