1#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10 collections::{BTreeMap, BTreeSet, HashSet},
11 sync::Arc,
12};
13
14use allocative::Allocative;
15use custom_debug_derive::Debug;
16use linera_base::{
17 crypto::CryptoHash,
18 data_types::{
19 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
20 ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
21 },
22 ensure, hex_debug,
23 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
24 ownership::{ChainOwnership, TimeoutConfig},
25};
26use linera_views::{
27 context::Context,
28 lazy_register_view::HashedLazyRegisterView,
29 map_view::HashedMapView,
30 register_view::HashedRegisterView,
31 set_view::HashedSetView,
32 views::{ClonableView, HashableView, ReplaceContext, View},
33 ViewError,
34};
35use serde::{Deserialize, Serialize};
36
37#[cfg(test)]
38use crate::test_utils::SystemExecutionState;
39use crate::{
40 committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
41 ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
42 OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
43};
44
45pub static EPOCH_STREAM_NAME: &[u8] = &[0];
47pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
49
50#[cfg(with_metrics)]
52mod metrics {
53 use std::sync::LazyLock;
54
55 use linera_base::prometheus_util::register_int_counter_vec;
56 use prometheus::IntCounterVec;
57
58 pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
59 register_int_counter_vec(
60 "open_chain_count",
61 "The number of times the `OpenChain` operation was executed",
62 &[],
63 )
64 });
65}
66
67#[derive(Debug, ClonableView, HashableView, Allocative)]
69#[allocative(bound = "C")]
70pub struct SystemExecutionStateView<C> {
71 pub description: HashedLazyRegisterView<C, Option<ChainDescription>>,
73 pub epoch: HashedRegisterView<C, Epoch>,
75 pub admin_chain_id: HashedRegisterView<C, Option<ChainId>>,
77 pub committees: HashedLazyRegisterView<C, BTreeMap<Epoch, Committee>>,
88 pub ownership: HashedLazyRegisterView<C, ChainOwnership>,
90 pub balance: HashedRegisterView<C, Amount>,
92 pub balances: HashedMapView<C, AccountOwner, Amount>,
94 pub timestamp: HashedRegisterView<C, Timestamp>,
96 pub closed: HashedRegisterView<C, bool>,
98 pub application_permissions: HashedLazyRegisterView<C, ApplicationPermissions>,
100 pub used_blobs: HashedSetView<C, BlobId>,
102 pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
104}
105
106impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
107 type Target = SystemExecutionStateView<C2>;
108
109 async fn with_context(
110 &mut self,
111 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
112 ) -> Self::Target {
113 SystemExecutionStateView {
114 description: self.description.with_context(ctx.clone()).await,
115 epoch: self.epoch.with_context(ctx.clone()).await,
116 admin_chain_id: self.admin_chain_id.with_context(ctx.clone()).await,
117 committees: self.committees.with_context(ctx.clone()).await,
118 ownership: self.ownership.with_context(ctx.clone()).await,
119 balance: self.balance.with_context(ctx.clone()).await,
120 balances: self.balances.with_context(ctx.clone()).await,
121 timestamp: self.timestamp.with_context(ctx.clone()).await,
122 closed: self.closed.with_context(ctx.clone()).await,
123 application_permissions: self.application_permissions.with_context(ctx.clone()).await,
124 used_blobs: self.used_blobs.with_context(ctx.clone()).await,
125 event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
126 }
127 }
128}
129
130#[derive(Debug, Default, Clone, Serialize, Deserialize, Allocative)]
132pub struct EventSubscriptions {
133 pub next_index: u32,
136 pub applications: BTreeSet<ApplicationId>,
138}
139
140#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
142pub struct OpenChainConfig {
143 pub ownership: ChainOwnership,
145 pub balance: Amount,
147 pub application_permissions: ApplicationPermissions,
149}
150
151impl OpenChainConfig {
152 pub fn init_chain_config(
155 &self,
156 epoch: Epoch,
157 min_active_epoch: Epoch,
158 max_active_epoch: Epoch,
159 ) -> InitialChainConfig {
160 InitialChainConfig {
161 application_permissions: self.application_permissions.clone(),
162 balance: self.balance,
163 epoch,
164 min_active_epoch,
165 max_active_epoch,
166 ownership: self.ownership.clone(),
167 }
168 }
169}
170
171#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
173pub enum SystemOperation {
174 Transfer {
177 owner: AccountOwner,
178 recipient: Account,
179 amount: Amount,
180 },
181 Claim {
185 owner: AccountOwner,
186 target_id: ChainId,
187 recipient: Account,
188 amount: Amount,
189 },
190 OpenChain(OpenChainConfig),
193 CloseChain,
195 ChangeOwnership {
197 #[debug(skip_if = Vec::is_empty)]
199 super_owners: Vec<AccountOwner>,
200 #[debug(skip_if = Vec::is_empty)]
202 owners: Vec<(AccountOwner, u64)>,
203 multi_leader_rounds: u32,
205 open_multi_leader_rounds: bool,
209 timeout_config: TimeoutConfig,
211 },
212 ChangeApplicationPermissions(ApplicationPermissions),
214 PublishModule { module_id: ModuleId },
216 PublishDataBlob { blob_hash: CryptoHash },
218 VerifyBlob { blob_id: BlobId },
220 CreateApplication {
222 module_id: ModuleId,
223 #[serde(with = "serde_bytes")]
224 #[debug(with = "hex_debug")]
225 parameters: Vec<u8>,
226 #[serde(with = "serde_bytes")]
227 #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
228 instantiation_argument: Vec<u8>,
229 #[debug(skip_if = Vec::is_empty)]
230 required_application_ids: Vec<ApplicationId>,
231 },
232 Admin(AdminOperation),
234 ProcessNewEpoch(Epoch),
236 ProcessRemovedEpoch(Epoch),
238 UpdateStreams(Vec<(ChainId, StreamId, u32)>),
240}
241
242#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
244pub enum AdminOperation {
245 PublishCommitteeBlob { blob_hash: CryptoHash },
248 CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
251 RemoveCommittee { epoch: Epoch },
255}
256
257#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
259pub enum SystemMessage {
260 Credit {
263 target: AccountOwner,
264 amount: Amount,
265 source: AccountOwner,
266 },
267 Withdraw {
271 owner: AccountOwner,
272 amount: Amount,
273 recipient: Account,
274 },
275}
276
277#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
279pub struct SystemQuery;
280
281#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
283pub struct SystemResponse {
284 pub chain_id: ChainId,
285 pub balance: Amount,
286}
287
288#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
290pub struct UserData(pub Option<[u8; 32]>);
291
292impl UserData {
293 pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
294 let option_array = match opt_str {
296 Some(s) => {
297 let vec = s.into_bytes();
299 if vec.len() <= 32 {
300 let mut array = [b' '; 32];
302
303 let len = vec.len().min(32);
305 array[..len].copy_from_slice(&vec[..len]);
306
307 Some(array)
308 } else {
309 return Err(vec.len());
310 }
311 }
312 None => None,
313 };
314
315 Ok(UserData(option_array))
317 }
318}
319
320#[derive(Debug)]
321pub struct CreateApplicationResult {
322 pub app_id: ApplicationId,
323}
324
325impl<C> SystemExecutionStateView<C>
326where
327 C: Context + Clone + 'static,
328 C::Extra: ExecutionRuntimeContext,
329{
330 pub async fn is_active(&self) -> Result<bool, ViewError> {
332 Ok(self.description.get().await?.is_some()
333 && self.ownership.get().await?.is_active()
334 && self.current_committee().await?.is_some()
335 && self.admin_chain_id.get().is_some())
336 }
337
338 pub async fn current_committee(&self) -> Result<Option<(Epoch, Arc<Committee>)>, ViewError> {
349 let epoch = *self.epoch.get();
350 if let Some(committee) = self.context().extra().get_or_load_committee(epoch).await? {
351 return Ok(Some((epoch, committee)));
352 }
353 let Some(committee) = self.committees.get().await?.get(&epoch).cloned() else {
354 return Ok(None);
355 };
356 Ok(Some((epoch, Arc::new(committee))))
357 }
358
359 async fn get_event(&self, event_id: EventId) -> Result<Arc<Vec<u8>>, ExecutionError> {
360 match self.context().extra().get_event(event_id.clone()).await? {
361 None => Err(ExecutionError::EventsNotFound(vec![event_id])),
362 Some(vec) => Ok(vec),
363 }
364 }
365
366 pub async fn execute_operation(
369 &mut self,
370 context: OperationContext,
371 operation: SystemOperation,
372 txn_tracker: &mut TransactionTracker,
373 resource_controller: &mut ResourceController<Option<AccountOwner>>,
374 ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
375 use SystemOperation::*;
376 let mut new_application = None;
377 match operation {
378 OpenChain(config) => {
379 let _chain_id = self
380 .open_chain(
381 config,
382 context.chain_id,
383 context.height,
384 context.timestamp,
385 txn_tracker,
386 )
387 .await?;
388 #[cfg(with_metrics)]
389 metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
390 }
391 ChangeOwnership {
392 super_owners,
393 owners,
394 multi_leader_rounds,
395 open_multi_leader_rounds,
396 timeout_config,
397 } => {
398 self.ownership.set(ChainOwnership {
399 super_owners: super_owners.into_iter().collect(),
400 owners: owners.into_iter().collect(),
401 multi_leader_rounds,
402 open_multi_leader_rounds,
403 timeout_config,
404 });
405 }
406 ChangeApplicationPermissions(application_permissions) => {
407 self.application_permissions.set(application_permissions);
408 }
409 CloseChain => self.close_chain()?,
410 Transfer {
411 owner,
412 amount,
413 recipient,
414 } => {
415 let maybe_message = self
416 .transfer(context.authenticated_signer, None, owner, recipient, amount)
417 .await?;
418 txn_tracker.add_outgoing_messages(maybe_message);
419 }
420 Claim {
421 owner,
422 target_id,
423 recipient,
424 amount,
425 } => {
426 let maybe_message = self
427 .claim(
428 context.authenticated_signer,
429 None,
430 owner,
431 target_id,
432 recipient,
433 amount,
434 )
435 .await?;
436 txn_tracker.add_outgoing_messages(maybe_message);
437 }
438 Admin(admin_operation) => {
439 ensure!(
440 *self.admin_chain_id.get() == Some(context.chain_id),
441 ExecutionError::AdminOperationOnNonAdminChain
442 );
443 match admin_operation {
444 AdminOperation::PublishCommitteeBlob { blob_hash } => {
445 self.blob_published(
446 &BlobId::new(blob_hash, BlobType::Committee),
447 txn_tracker,
448 )?;
449 }
450 AdminOperation::CreateCommittee { epoch, blob_hash } => {
451 self.check_next_epoch(epoch)?;
452 let blob_id = BlobId::new(blob_hash, BlobType::Committee);
453 let committee =
454 bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
455 self.blob_used(txn_tracker, blob_id).await?;
456 self.committees.get_mut().await?.insert(epoch, committee);
457 self.epoch.set(epoch);
458 txn_tracker.add_event(
459 StreamId::system(EPOCH_STREAM_NAME),
460 epoch.0,
461 bcs::to_bytes(&blob_hash)?,
462 );
463 }
464 AdminOperation::RemoveCommittee { epoch } => {
465 ensure!(
466 self.committees.get_mut().await?.remove(&epoch).is_some(),
467 ExecutionError::InvalidCommitteeRemoval
468 );
469 txn_tracker.add_event(
470 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
471 epoch.0,
472 vec![],
473 );
474 }
475 }
476 }
477 PublishModule { module_id } => {
478 for blob_id in module_id.bytecode_blob_ids() {
479 self.blob_published(&blob_id, txn_tracker)?;
480 }
481 }
482 CreateApplication {
483 module_id,
484 parameters,
485 instantiation_argument,
486 required_application_ids,
487 } => {
488 let CreateApplicationResult { app_id } = self
489 .create_application(
490 context.chain_id,
491 context.height,
492 module_id,
493 parameters,
494 required_application_ids,
495 txn_tracker,
496 )
497 .await?;
498 new_application = Some((app_id, instantiation_argument));
499 }
500 PublishDataBlob { blob_hash } => {
501 self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
502 }
503 VerifyBlob { blob_id } => {
504 self.assert_blob_exists(blob_id).await?;
505 resource_controller
506 .with_state(self)
507 .await?
508 .track_blob_read(0)?;
509 self.blob_used(txn_tracker, blob_id).await?;
510 }
511 ProcessNewEpoch(epoch) => {
512 self.check_next_epoch(epoch)?;
513 let admin_chain_id = self
514 .admin_chain_id
515 .get()
516 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
517 let event_id = EventId {
518 chain_id: admin_chain_id,
519 stream_id: StreamId::system(EPOCH_STREAM_NAME),
520 index: epoch.0,
521 };
522 let bytes = txn_tracker
523 .oracle(|| async {
524 let bytes = self.get_event(event_id.clone()).await?;
525 Ok(OracleResponse::Event(
526 event_id.clone(),
527 Arc::unwrap_or_clone(bytes),
528 ))
529 })
530 .await?
531 .to_event(&event_id)?;
532 let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
533 let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
534 self.blob_used(txn_tracker, blob_id).await?;
535 self.committees.get_mut().await?.insert(epoch, committee);
536 self.epoch.set(epoch);
537 }
538 ProcessRemovedEpoch(epoch) => {
539 ensure!(
540 self.committees.get_mut().await?.remove(&epoch).is_some(),
541 ExecutionError::InvalidCommitteeRemoval
542 );
543 let admin_chain_id = self
544 .admin_chain_id
545 .get()
546 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
547 let event_id = EventId {
548 chain_id: admin_chain_id,
549 stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
550 index: epoch.0,
551 };
552 txn_tracker
553 .oracle(|| async {
554 let bytes = self.get_event(event_id.clone()).await?;
555 Ok(OracleResponse::Event(event_id, Arc::unwrap_or_clone(bytes)))
556 })
557 .await?;
558 }
559 UpdateStreams(streams) => {
560 let mut missing_events = Vec::new();
561 for (chain_id, stream_id, next_index) in streams {
562 let subscriptions = self
563 .event_subscriptions
564 .get_mut_or_default(&(chain_id, stream_id.clone()))
565 .await?;
566 ensure!(
567 subscriptions.next_index < next_index,
568 ExecutionError::OutdatedUpdateStreams
569 );
570 for application_id in &subscriptions.applications {
571 txn_tracker.add_stream_to_process(
572 *application_id,
573 chain_id,
574 stream_id.clone(),
575 subscriptions.next_index,
576 next_index,
577 );
578 }
579 subscriptions.next_index = next_index;
580 let index = next_index
581 .checked_sub(1)
582 .ok_or(ArithmeticError::Underflow)?;
583 let event_id = EventId {
584 chain_id,
585 stream_id,
586 index,
587 };
588 let extra = self.context().extra();
589 txn_tracker
590 .oracle(|| async {
591 if !extra.contains_event(event_id.clone()).await? {
592 missing_events.push(event_id.clone());
593 }
594 Ok(OracleResponse::EventExists(event_id))
595 })
596 .await?;
597 }
598 ensure!(
599 missing_events.is_empty(),
600 ExecutionError::EventsNotFound(missing_events)
601 );
602 }
603 }
604
605 Ok(new_application)
606 }
607
608 fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
611 let expected = self.epoch.get().try_add_one()?;
612 ensure!(
613 provided == expected,
614 ExecutionError::InvalidCommitteeEpoch { provided, expected }
615 );
616 Ok(())
617 }
618
619 async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
620 if owner == &AccountOwner::CHAIN {
621 let new_balance = self.balance.get().saturating_add(amount);
622 self.balance.set(new_balance);
623 } else {
624 let balance = self.balances.get_mut_or_default(owner).await?;
625 *balance = balance.saturating_add(amount);
626 }
627 Ok(())
628 }
629
630 async fn credit_or_send_message(
631 &mut self,
632 source: AccountOwner,
633 recipient: Account,
634 amount: Amount,
635 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
636 let source_chain_id = self.context().extra().chain_id();
637 if recipient.chain_id == source_chain_id {
638 let target = recipient.owner;
640 self.credit(&target, amount).await?;
641 Ok(None)
642 } else {
643 let message = SystemMessage::Credit {
645 amount,
646 source,
647 target: recipient.owner,
648 };
649 Ok(Some(
650 OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
651 ))
652 }
653 }
654
655 pub async fn transfer(
656 &mut self,
657 authenticated_signer: Option<AccountOwner>,
658 authenticated_application_id: Option<ApplicationId>,
659 source: AccountOwner,
660 recipient: Account,
661 amount: Amount,
662 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
663 if source == AccountOwner::CHAIN {
664 ensure!(
665 authenticated_signer.is_some()
666 && self
667 .ownership
668 .get()
669 .await?
670 .verify_owner(&authenticated_signer.unwrap()),
671 ExecutionError::UnauthenticatedTransferOwner
672 );
673 } else {
674 ensure!(
675 authenticated_signer == Some(source)
676 || authenticated_application_id.map(AccountOwner::from) == Some(source),
677 ExecutionError::UnauthenticatedTransferOwner
678 );
679 }
680 ensure!(
681 amount > Amount::ZERO,
682 ExecutionError::IncorrectTransferAmount
683 );
684 self.debit(&source, amount).await?;
685 self.credit_or_send_message(source, recipient, amount).await
686 }
687
688 pub async fn claim(
689 &mut self,
690 authenticated_signer: Option<AccountOwner>,
691 authenticated_application_id: Option<ApplicationId>,
692 source: AccountOwner,
693 target_id: ChainId,
694 recipient: Account,
695 amount: Amount,
696 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
697 ensure!(
698 authenticated_signer == Some(source)
699 || authenticated_application_id.map(AccountOwner::from) == Some(source),
700 ExecutionError::UnauthenticatedClaimOwner
701 );
702 ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
703
704 let current_chain_id = self.context().extra().chain_id();
705 if target_id == current_chain_id {
706 self.debit(&source, amount).await?;
708 self.credit_or_send_message(source, recipient, amount).await
709 } else {
710 let message = SystemMessage::Withdraw {
712 amount,
713 owner: source,
714 recipient,
715 };
716 Ok(Some(
717 OutgoingMessage::new(target_id, message)
718 .with_authenticated_signer(authenticated_signer),
719 ))
720 }
721 }
722
723 async fn debit(
725 &mut self,
726 account: &AccountOwner,
727 amount: Amount,
728 ) -> Result<(), ExecutionError> {
729 let balance = if account == &AccountOwner::CHAIN {
730 self.balance.get_mut()
731 } else {
732 self.balances.get_mut(account).await?.ok_or_else(|| {
733 ExecutionError::InsufficientBalance {
734 balance: Amount::ZERO,
735 account: *account,
736 }
737 })?
738 };
739
740 balance
741 .try_sub_assign(amount)
742 .map_err(|_| ExecutionError::InsufficientBalance {
743 balance: *balance,
744 account: *account,
745 })?;
746
747 if account != &AccountOwner::CHAIN && balance.is_zero() {
748 self.balances.remove(account)?;
749 }
750
751 Ok(())
752 }
753
754 pub async fn execute_message(
756 &mut self,
757 context: MessageContext,
758 message: SystemMessage,
759 ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
760 let mut outcome = Vec::new();
761 use SystemMessage::*;
762 match message {
763 Credit {
764 amount,
765 source,
766 target,
767 } => {
768 let receiver = if context.is_bouncing { source } else { target };
769 self.credit(&receiver, amount).await?;
770 }
771 Withdraw {
772 amount,
773 owner,
774 recipient,
775 } => {
776 self.debit(&owner, amount).await?;
777 if let Some(message) = self
778 .credit_or_send_message(owner, recipient, amount)
779 .await?
780 {
781 outcome.push(message);
782 }
783 }
784 }
785 Ok(outcome)
786 }
787
788 pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
791 if self.description.get().await?.is_some() {
792 return Ok(true);
794 }
795 let description_blob = self
796 .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
797 .await?;
798 let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
799 let InitialChainConfig {
800 ownership,
801 epoch,
802 balance,
803 min_active_epoch,
804 max_active_epoch,
805 application_permissions,
806 } = description.config().clone();
807 self.timestamp.set(description.timestamp());
808 self.description.set(Some(description));
809 self.epoch.set(epoch);
810 let committees = self
811 .context()
812 .extra()
813 .get_committees(min_active_epoch..=max_active_epoch)
814 .await?;
815 let admin_chain_id = self
816 .context()
817 .extra()
818 .get_network_description()
819 .await?
820 .ok_or(ExecutionError::NoNetworkDescriptionFound)?
821 .admin_chain_id;
822
823 self.committees.set(committees);
824 self.admin_chain_id.set(Some(admin_chain_id));
825 self.ownership.set(ownership);
826 self.balance.set(balance);
827 self.application_permissions.set(application_permissions);
828 Ok(false)
829 }
830
831 pub fn handle_query(
832 &mut self,
833 context: QueryContext,
834 _query: SystemQuery,
835 ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
836 let response = SystemResponse {
837 chain_id: context.chain_id,
838 balance: *self.balance.get(),
839 };
840 Ok(QueryOutcome {
841 response,
842 operations: vec![],
843 })
844 }
845
846 pub async fn open_chain(
849 &mut self,
850 config: OpenChainConfig,
851 parent: ChainId,
852 block_height: BlockHeight,
853 timestamp: Timestamp,
854 txn_tracker: &mut TransactionTracker,
855 ) -> Result<ChainId, ExecutionError> {
856 let chain_index = txn_tracker.next_chain_index();
857 let chain_origin = ChainOrigin::Child {
858 parent,
859 block_height,
860 chain_index,
861 };
862 let committees = self.committees.get().await?;
863 let init_chain_config = config.init_chain_config(
864 *self.epoch.get(),
865 committees.keys().min().copied().unwrap_or(Epoch::ZERO),
866 committees.keys().max().copied().unwrap_or(Epoch::ZERO),
867 );
868 let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
869 let child_id = chain_description.id();
870 self.debit(&AccountOwner::CHAIN, config.balance).await?;
871 let blob = Blob::new_chain_description(&chain_description);
872 txn_tracker.add_created_blob(blob);
873 Ok(child_id)
874 }
875
876 pub fn close_chain(&mut self) -> Result<(), ExecutionError> {
877 self.closed.set(true);
878 Ok(())
879 }
880
881 pub async fn create_application(
882 &mut self,
883 chain_id: ChainId,
884 block_height: BlockHeight,
885 module_id: ModuleId,
886 parameters: Vec<u8>,
887 required_application_ids: Vec<ApplicationId>,
888 txn_tracker: &mut TransactionTracker,
889 ) -> Result<CreateApplicationResult, ExecutionError> {
890 let application_index = txn_tracker.next_application_index();
891
892 let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
893 for blob_id in blob_ids {
896 self.blob_used(txn_tracker, blob_id).await?;
897 }
898
899 let application_description = ApplicationDescription {
900 module_id,
901 creator_chain_id: chain_id,
902 block_height,
903 application_index,
904 parameters,
905 required_application_ids,
906 };
907 self.check_required_applications(&application_description, txn_tracker)
908 .await?;
909
910 let blob = Blob::new_application_description(&application_description);
911 self.used_blobs.insert(&blob.id())?;
912 txn_tracker.add_created_blob(blob);
913
914 Ok(CreateApplicationResult {
915 app_id: ApplicationId::from(&application_description),
916 })
917 }
918
919 async fn check_required_applications(
920 &mut self,
921 application_description: &ApplicationDescription,
922 txn_tracker: &mut TransactionTracker,
923 ) -> Result<(), ExecutionError> {
924 for required_id in &application_description.required_application_ids {
926 Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
927 }
928 Ok(())
929 }
930
931 pub async fn describe_application(
933 &mut self,
934 id: ApplicationId,
935 txn_tracker: &mut TransactionTracker,
936 ) -> Result<ApplicationDescription, ExecutionError> {
937 let blob_id = id.description_blob_id();
938 let content = match txn_tracker.created_blobs().get(&blob_id) {
939 Some(content) => content.clone(),
940 None => self.read_blob_content(blob_id).await?,
941 };
942 self.blob_used(txn_tracker, blob_id).await?;
943 let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
944
945 let blob_ids = self
946 .check_bytecode_blobs(&description.module_id, txn_tracker)
947 .await?;
948 for blob_id in blob_ids {
951 self.blob_used(txn_tracker, blob_id).await?;
952 }
953
954 self.check_required_applications(&description, txn_tracker)
955 .await?;
956
957 Ok(description)
958 }
959
960 pub async fn find_dependencies(
962 &mut self,
963 mut stack: Vec<ApplicationId>,
964 txn_tracker: &mut TransactionTracker,
965 ) -> Result<Vec<ApplicationId>, ExecutionError> {
966 let mut result = Vec::new();
968 let mut sorted = HashSet::new();
970 let mut seen = HashSet::new();
972
973 while let Some(id) = stack.pop() {
974 if sorted.contains(&id) {
975 continue;
976 }
977 if seen.contains(&id) {
978 sorted.insert(id);
981 result.push(id);
982 continue;
983 }
984 seen.insert(id);
987 stack.push(id);
989 let app = self.describe_application(id, txn_tracker).await?;
990 for child in app.required_application_ids.iter().rev() {
991 if !seen.contains(child) {
992 stack.push(*child);
993 }
994 }
995 }
996 Ok(result)
997 }
998
999 pub(crate) async fn blob_used(
1002 &mut self,
1003 txn_tracker: &mut TransactionTracker,
1004 blob_id: BlobId,
1005 ) -> Result<bool, ExecutionError> {
1006 if self.used_blobs.contains(&blob_id).await? {
1007 return Ok(false); }
1009 self.used_blobs.insert(&blob_id)?;
1010 txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
1011 Ok(true)
1012 }
1013
1014 fn blob_published(
1017 &mut self,
1018 blob_id: &BlobId,
1019 txn_tracker: &mut TransactionTracker,
1020 ) -> Result<(), ExecutionError> {
1021 self.used_blobs.insert(blob_id)?;
1022 txn_tracker.add_published_blob(*blob_id);
1023 Ok(())
1024 }
1025
1026 pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1027 match self.context().extra().get_blob(blob_id).await {
1028 Ok(Some(blob)) => Ok(Arc::unwrap_or_clone(blob).into()),
1029 Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1030 Err(error) => Err(error.into()),
1031 }
1032 }
1033
1034 pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1035 if self.context().extra().contains_blob(blob_id).await? {
1036 Ok(())
1037 } else {
1038 Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1039 }
1040 }
1041
1042 async fn check_bytecode_blobs(
1043 &mut self,
1044 module_id: &ModuleId,
1045 txn_tracker: &TransactionTracker,
1046 ) -> Result<Vec<BlobId>, ExecutionError> {
1047 let blob_ids = module_id.bytecode_blob_ids();
1048
1049 let mut missing_blobs = Vec::new();
1050 for blob_id in &blob_ids {
1051 if txn_tracker.created_blobs().contains_key(blob_id) {
1053 continue; }
1055 if !self.context().extra().contains_blob(*blob_id).await? {
1057 missing_blobs.push(*blob_id);
1058 }
1059 }
1060 ensure!(
1061 missing_blobs.is_empty(),
1062 ExecutionError::BlobsNotFound(missing_blobs)
1063 );
1064
1065 Ok(blob_ids)
1066 }
1067}