1#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9#[cfg(with_metrics)]
10use std::sync::LazyLock;
11use std::{
12 collections::{BTreeMap, BTreeSet, HashSet},
13 mem,
14};
15
16use custom_debug_derive::Debug;
17use linera_base::{
18 crypto::CryptoHash,
19 data_types::{
20 Amount, ApplicationPermissions, Blob, BlobContent, BlockHeight, OracleResponse, Timestamp,
21 },
22 ensure, hex_debug,
23 identifiers::{
24 Account, AccountOwner, BlobId, BlobType, ChainDescription, ChainId, EventId, MessageId,
25 ModuleId, StreamId,
26 },
27 ownership::{ChainOwnership, TimeoutConfig},
28};
29use linera_views::{
30 context::Context,
31 map_view::{HashedMapView, MapView},
32 register_view::HashedRegisterView,
33 set_view::HashedSetView,
34 views::{ClonableView, HashableView, View, ViewError},
35};
36use serde::{Deserialize, Serialize};
37#[cfg(with_metrics)]
38use {linera_base::prometheus_util::register_int_counter_vec, prometheus::IntCounterVec};
39
40#[cfg(test)]
41use crate::test_utils::SystemExecutionState;
42use crate::{
43 committee::{Committee, Epoch},
44 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext, MessageContext,
45 MessageKind, OperationContext, OutgoingMessage, QueryContext, QueryOutcome, ResourceController,
46 TransactionTracker,
47};
48
49pub static OPEN_CHAIN_MESSAGE_INDEX: u32 = 0;
51pub static EPOCH_STREAM_NAME: &[u8] = &[0];
53pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
55
56#[cfg(with_metrics)]
58static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
59 register_int_counter_vec(
60 "open_chain_count",
61 "The number of times the `OpenChain` operation was executed",
62 &[],
63 )
64});
65
66#[derive(Debug, ClonableView, HashableView)]
68pub struct SystemExecutionStateView<C> {
69 pub description: HashedRegisterView<C, Option<ChainDescription>>,
71 pub epoch: HashedRegisterView<C, Option<Epoch>>,
73 pub admin_id: HashedRegisterView<C, Option<ChainId>>,
75 pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
80 pub ownership: HashedRegisterView<C, ChainOwnership>,
82 pub balance: HashedRegisterView<C, Amount>,
84 pub balances: HashedMapView<C, AccountOwner, Amount>,
86 pub timestamp: HashedRegisterView<C, Timestamp>,
88 pub closed: HashedRegisterView<C, bool>,
90 pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
92 pub used_blobs: HashedSetView<C, BlobId>,
94 pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
96}
97
98#[derive(Debug, Default, Clone, Serialize, Deserialize)]
100pub struct EventSubscriptions {
101 pub next_index: u32,
104 pub applications: BTreeSet<ApplicationId>,
106}
107
108#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
110pub struct OpenChainConfig {
111 pub ownership: ChainOwnership,
112 pub admin_id: ChainId,
113 pub epoch: Epoch,
114 pub committees: BTreeMap<Epoch, Committee>,
115 pub balance: Amount,
116 pub application_permissions: ApplicationPermissions,
117}
118
119#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
121pub enum SystemOperation {
122 Transfer {
125 owner: AccountOwner,
126 recipient: Recipient,
127 amount: Amount,
128 },
129 Claim {
133 owner: AccountOwner,
134 target_id: ChainId,
135 recipient: Recipient,
136 amount: Amount,
137 },
138 OpenChain(OpenChainConfig),
141 CloseChain,
143 ChangeOwnership {
145 #[debug(skip_if = Vec::is_empty)]
147 super_owners: Vec<AccountOwner>,
148 #[debug(skip_if = Vec::is_empty)]
150 owners: Vec<(AccountOwner, u64)>,
151 multi_leader_rounds: u32,
153 open_multi_leader_rounds: bool,
157 timeout_config: TimeoutConfig,
159 },
160 ChangeApplicationPermissions(ApplicationPermissions),
162 PublishModule { module_id: ModuleId },
164 PublishDataBlob { blob_hash: CryptoHash },
166 ReadBlob { blob_id: BlobId },
169 CreateApplication {
171 module_id: ModuleId,
172 #[serde(with = "serde_bytes")]
173 #[debug(with = "hex_debug")]
174 parameters: Vec<u8>,
175 #[serde(with = "serde_bytes")]
176 #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
177 instantiation_argument: Vec<u8>,
178 #[debug(skip_if = Vec::is_empty)]
179 required_application_ids: Vec<ApplicationId>,
180 },
181 Admin(AdminOperation),
183 ProcessNewEpoch(Epoch),
185 ProcessRemovedEpoch(Epoch),
187 UpdateStreams(Vec<(ChainId, StreamId, u32)>),
189}
190
191#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
193pub enum AdminOperation {
194 PublishCommitteeBlob { blob_hash: CryptoHash },
197 CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
200 RemoveCommittee { epoch: Epoch },
204}
205
206#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
208pub enum SystemMessage {
209 Credit {
212 target: AccountOwner,
213 amount: Amount,
214 source: AccountOwner,
215 },
216 Withdraw {
220 owner: AccountOwner,
221 amount: Amount,
222 recipient: Recipient,
223 },
224 OpenChain(Box<OpenChainConfig>),
226 ApplicationCreated,
228}
229
230#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
232pub struct SystemQuery;
233
234#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
236pub struct SystemResponse {
237 pub chain_id: ChainId,
238 pub balance: Amount,
239}
240
241#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
243pub enum Recipient {
244 Burn,
246 Account(Account),
248}
249
250impl Recipient {
251 pub fn chain(chain_id: ChainId) -> Recipient {
253 Recipient::Account(Account::chain(chain_id))
254 }
255
256 #[cfg(with_testing)]
258 pub fn root(index: u32) -> Recipient {
259 Recipient::chain(ChainId::root(index))
260 }
261}
262
263#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
265pub struct UserData(pub Option<[u8; 32]>);
266
267impl UserData {
268 pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
269 let option_array = match opt_str {
271 Some(s) => {
272 let vec = s.into_bytes();
274 if vec.len() <= 32 {
275 let mut array = [b' '; 32];
277
278 let len = vec.len().min(32);
280 array[..len].copy_from_slice(&vec[..len]);
281
282 Some(array)
283 } else {
284 return Err(vec.len());
285 }
286 }
287 None => None,
288 };
289
290 Ok(UserData(option_array))
292 }
293}
294
295#[derive(Debug)]
296pub struct CreateApplicationResult {
297 pub app_id: ApplicationId,
298 pub txn_tracker: TransactionTracker,
299}
300
301impl<C> SystemExecutionStateView<C>
302where
303 C: Context + Clone + Send + Sync + 'static,
304 C::Extra: ExecutionRuntimeContext,
305{
306 pub fn is_active(&self) -> bool {
308 self.description.get().is_some()
309 && self.ownership.get().is_active()
310 && self.current_committee().is_some()
311 && self.admin_id.get().is_some()
312 }
313
314 pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
316 let epoch = self.epoch.get().as_ref()?;
317 let committee = self.committees.get().get(epoch)?;
318 Some((*epoch, committee))
319 }
320
321 pub async fn execute_operation(
324 &mut self,
325 context: OperationContext,
326 operation: SystemOperation,
327 txn_tracker: &mut TransactionTracker,
328 resource_controller: &mut ResourceController<Option<AccountOwner>>,
329 ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
330 use SystemOperation::*;
331 let mut new_application = None;
332 match operation {
333 OpenChain(config) => {
334 let next_message_id = context.next_message_id(txn_tracker.next_message_index());
335 let message = self.open_chain(config, next_message_id).await?;
336 txn_tracker.add_outgoing_message(message)?;
337 #[cfg(with_metrics)]
338 OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
339 }
340 ChangeOwnership {
341 super_owners,
342 owners,
343 multi_leader_rounds,
344 open_multi_leader_rounds,
345 timeout_config,
346 } => {
347 self.ownership.set(ChainOwnership {
348 super_owners: super_owners.into_iter().collect(),
349 owners: owners.into_iter().collect(),
350 multi_leader_rounds,
351 open_multi_leader_rounds,
352 timeout_config,
353 });
354 }
355 ChangeApplicationPermissions(application_permissions) => {
356 self.application_permissions.set(application_permissions);
357 }
358 CloseChain => self.close_chain().await?,
359 Transfer {
360 owner,
361 amount,
362 recipient,
363 } => {
364 let maybe_message = self
365 .transfer(context.authenticated_signer, None, owner, recipient, amount)
366 .await?;
367 txn_tracker.add_outgoing_messages(maybe_message)?;
368 }
369 Claim {
370 owner,
371 target_id,
372 recipient,
373 amount,
374 } => {
375 let message = self
376 .claim(
377 context.authenticated_signer,
378 None,
379 owner,
380 target_id,
381 recipient,
382 amount,
383 )
384 .await?;
385 txn_tracker.add_outgoing_message(message)?;
386 }
387 Admin(admin_operation) => {
388 ensure!(
389 *self.admin_id.get() == Some(context.chain_id),
390 ExecutionError::AdminOperationOnNonAdminChain
391 );
392 match admin_operation {
393 AdminOperation::PublishCommitteeBlob { blob_hash } => {
394 self.blob_published(&BlobId::new(blob_hash, BlobType::Committee))?;
395 }
396 AdminOperation::CreateCommittee { epoch, blob_hash } => {
397 self.check_next_epoch(epoch)?;
398 let blob_id = BlobId::new(blob_hash, BlobType::Committee);
399 let committee =
400 bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
401 self.blob_used(Some(txn_tracker), blob_id).await?;
402 self.committees.get_mut().insert(epoch, committee);
403 self.epoch.set(Some(epoch));
404 txn_tracker.add_event(
405 StreamId::system(EPOCH_STREAM_NAME),
406 epoch.0,
407 bcs::to_bytes(&blob_hash)?,
408 );
409 }
410 AdminOperation::RemoveCommittee { epoch } => {
411 ensure!(
412 self.committees.get_mut().remove(&epoch).is_some(),
413 ExecutionError::InvalidCommitteeRemoval
414 );
415 txn_tracker.add_event(
416 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
417 epoch.0,
418 vec![],
419 );
420 }
421 }
422 }
423 PublishModule { module_id } => {
424 for blob_id in module_id.bytecode_blob_ids() {
425 self.blob_published(&blob_id)?;
426 }
427 }
428 CreateApplication {
429 module_id,
430 parameters,
431 instantiation_argument,
432 required_application_ids,
433 } => {
434 let txn_tracker_moved = mem::take(txn_tracker);
435 let CreateApplicationResult {
436 app_id,
437 txn_tracker: txn_tracker_moved,
438 } = self
439 .create_application(
440 context.chain_id,
441 context.height,
442 module_id,
443 parameters,
444 required_application_ids,
445 txn_tracker_moved,
446 )
447 .await?;
448 *txn_tracker = txn_tracker_moved;
449 new_application = Some((app_id, instantiation_argument));
450 }
451 PublishDataBlob { blob_hash } => {
452 self.blob_published(&BlobId::new(blob_hash, BlobType::Data))?;
453 }
454 ReadBlob { blob_id } => {
455 let content = self.read_blob_content(blob_id).await?;
456 if blob_id.blob_type == BlobType::Data {
457 resource_controller
458 .with_state(self)
459 .await?
460 .track_blob_read(content.bytes().len() as u64)?;
461 }
462 self.blob_used(Some(txn_tracker), blob_id).await?;
463 }
464 ProcessNewEpoch(epoch) => {
465 self.check_next_epoch(epoch)?;
466 let admin_id = self
467 .admin_id
468 .get()
469 .ok_or_else(|| ExecutionError::InactiveChain)?;
470 let event_id = EventId {
471 chain_id: admin_id,
472 stream_id: StreamId::system(EPOCH_STREAM_NAME),
473 index: epoch.0,
474 };
475 let bytes = match txn_tracker.next_replayed_oracle_response()? {
476 None => self.context().extra().get_event(event_id.clone()).await?,
477 Some(OracleResponse::Event(recorded_event_id, bytes))
478 if recorded_event_id == event_id =>
479 {
480 bytes
481 }
482 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
483 };
484 let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
485 txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
486 let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
487 self.blob_used(Some(txn_tracker), blob_id).await?;
488 self.committees.get_mut().insert(epoch, committee);
489 self.epoch.set(Some(epoch));
490 }
491 ProcessRemovedEpoch(epoch) => {
492 ensure!(
493 self.committees.get_mut().remove(&epoch).is_some(),
494 ExecutionError::InvalidCommitteeRemoval
495 );
496 let admin_id = self
497 .admin_id
498 .get()
499 .ok_or_else(|| ExecutionError::InactiveChain)?;
500 let event_id = EventId {
501 chain_id: admin_id,
502 stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
503 index: epoch.0,
504 };
505 let bytes = match txn_tracker.next_replayed_oracle_response()? {
506 None => self.context().extra().get_event(event_id.clone()).await?,
507 Some(OracleResponse::Event(recorded_event_id, bytes))
508 if recorded_event_id == event_id =>
509 {
510 bytes
511 }
512 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
513 };
514 txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
515 }
516 UpdateStreams(streams) => {
517 for (chain_id, stream_id, next_index) in streams {
518 let subscriptions = self
519 .event_subscriptions
520 .get_mut_or_default(&(chain_id, stream_id))
521 .await?;
522 subscriptions.next_index = subscriptions.next_index.max(next_index);
523 }
524 }
525 }
526
527 Ok(new_application)
528 }
529
530 fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
533 let expected = self.epoch.get().expect("chain is active").try_add_one()?;
534 ensure!(
535 provided == expected,
536 ExecutionError::InvalidCommitteeEpoch { provided, expected }
537 );
538 Ok(())
539 }
540
541 pub async fn transfer(
542 &mut self,
543 authenticated_signer: Option<AccountOwner>,
544 authenticated_application_id: Option<ApplicationId>,
545 source: AccountOwner,
546 recipient: Recipient,
547 amount: Amount,
548 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
549 if source == AccountOwner::CHAIN {
550 ensure!(
551 authenticated_signer.is_some()
552 && self
553 .ownership
554 .get()
555 .verify_owner(&authenticated_signer.unwrap()),
556 ExecutionError::UnauthenticatedTransferOwner
557 );
558 } else {
559 ensure!(
560 authenticated_signer == Some(source)
561 || authenticated_application_id.map(AccountOwner::from) == Some(source),
562 ExecutionError::UnauthenticatedTransferOwner
563 );
564 }
565 ensure!(
566 amount > Amount::ZERO,
567 ExecutionError::IncorrectTransferAmount
568 );
569 self.debit(&source, amount).await?;
570 match recipient {
571 Recipient::Account(account) => {
572 let message = SystemMessage::Credit {
573 amount,
574 source,
575 target: account.owner,
576 };
577 Ok(Some(
578 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
579 ))
580 }
581 Recipient::Burn => Ok(None),
582 }
583 }
584
585 pub async fn claim(
586 &self,
587 authenticated_signer: Option<AccountOwner>,
588 authenticated_application_id: Option<ApplicationId>,
589 source: AccountOwner,
590 target_id: ChainId,
591 recipient: Recipient,
592 amount: Amount,
593 ) -> Result<OutgoingMessage, ExecutionError> {
594 ensure!(
595 authenticated_signer == Some(source)
596 || authenticated_application_id.map(AccountOwner::from) == Some(source),
597 ExecutionError::UnauthenticatedClaimOwner
598 );
599 ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
600
601 let message = SystemMessage::Withdraw {
602 amount,
603 owner: source,
604 recipient,
605 };
606 Ok(
607 OutgoingMessage::new(target_id, message)
608 .with_authenticated_signer(authenticated_signer),
609 )
610 }
611
612 async fn debit(
614 &mut self,
615 account: &AccountOwner,
616 amount: Amount,
617 ) -> Result<(), ExecutionError> {
618 let balance = if account == &AccountOwner::CHAIN {
619 self.balance.get_mut()
620 } else {
621 self.balances.get_mut(account).await?.ok_or_else(|| {
622 ExecutionError::InsufficientFunding {
623 balance: Amount::ZERO,
624 account: *account,
625 }
626 })?
627 };
628
629 balance
630 .try_sub_assign(amount)
631 .map_err(|_| ExecutionError::InsufficientFunding {
632 balance: *balance,
633 account: *account,
634 })?;
635
636 if account != &AccountOwner::CHAIN && balance.is_zero() {
637 self.balances.remove(account)?;
638 }
639
640 Ok(())
641 }
642
643 pub async fn execute_message(
645 &mut self,
646 context: MessageContext,
647 message: SystemMessage,
648 ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
649 let mut outcome = Vec::new();
650 use SystemMessage::*;
651 match message {
652 Credit {
653 amount,
654 source,
655 target,
656 } => {
657 let receiver = if context.is_bouncing { source } else { target };
658 if receiver == AccountOwner::CHAIN {
659 let new_balance = self.balance.get().saturating_add(amount);
660 self.balance.set(new_balance);
661 } else {
662 let balance = self.balances.get_mut_or_default(&receiver).await?;
663 *balance = balance.saturating_add(amount);
664 }
665 }
666 Withdraw {
667 amount,
668 owner,
669 recipient,
670 } => {
671 self.debit(&owner, amount).await?;
672 match recipient {
673 Recipient::Account(account) => {
674 let message = SystemMessage::Credit {
675 amount,
676 source: owner,
677 target: account.owner,
678 };
679 outcome.push(
680 OutgoingMessage::new(account.chain_id, message)
681 .with_kind(MessageKind::Tracked),
682 );
683 }
684 Recipient::Burn => (),
685 }
686 }
687 OpenChain(_) => {}
689 ApplicationCreated => {}
691 }
692 Ok(outcome)
693 }
694
695 pub fn initialize_chain(
697 &mut self,
698 message_id: MessageId,
699 timestamp: Timestamp,
700 config: OpenChainConfig,
701 ) {
702 assert!(self.description.get().is_none());
704 assert!(!self.ownership.get().is_active());
705 assert!(self.committees.get().is_empty());
706 let OpenChainConfig {
707 ownership,
708 admin_id,
709 epoch,
710 committees,
711 balance,
712 application_permissions,
713 } = config;
714 let description = ChainDescription::Child(message_id);
715 self.description.set(Some(description));
716 self.epoch.set(Some(epoch));
717 self.committees.set(committees);
718 self.admin_id.set(Some(admin_id));
719 self.ownership.set(ownership);
720 self.timestamp.set(timestamp);
721 self.balance.set(balance);
722 self.application_permissions.set(application_permissions);
723 }
724
725 pub async fn handle_query(
726 &mut self,
727 context: QueryContext,
728 _query: SystemQuery,
729 ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
730 let response = SystemResponse {
731 chain_id: context.chain_id,
732 balance: *self.balance.get(),
733 };
734 Ok(QueryOutcome {
735 response,
736 operations: vec![],
737 })
738 }
739
740 pub async fn open_chain(
743 &mut self,
744 config: OpenChainConfig,
745 next_message_id: MessageId,
746 ) -> Result<OutgoingMessage, ExecutionError> {
747 let child_id = ChainId::child(next_message_id);
748 ensure!(
749 self.admin_id.get().as_ref() == Some(&config.admin_id),
750 ExecutionError::InvalidNewChainAdminId(child_id)
751 );
752 ensure!(
753 self.committees.get() == &config.committees,
754 ExecutionError::InvalidCommittees
755 );
756 ensure!(
757 self.epoch.get().as_ref() == Some(&config.epoch),
758 ExecutionError::InvalidEpoch {
759 chain_id: child_id,
760 epoch: config.epoch,
761 }
762 );
763 self.debit(&AccountOwner::CHAIN, config.balance).await?;
764 let message = SystemMessage::OpenChain(Box::new(config));
765 Ok(OutgoingMessage::new(child_id, message).with_kind(MessageKind::Protected))
766 }
767
768 pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
769 self.closed.set(true);
770 Ok(())
771 }
772
773 pub async fn create_application(
774 &mut self,
775 chain_id: ChainId,
776 block_height: BlockHeight,
777 module_id: ModuleId,
778 parameters: Vec<u8>,
779 required_application_ids: Vec<ApplicationId>,
780 mut txn_tracker: TransactionTracker,
781 ) -> Result<CreateApplicationResult, ExecutionError> {
782 let application_index = txn_tracker.next_application_index();
783
784 let blob_ids = self.check_bytecode_blobs(&module_id).await?;
785 for blob_id in blob_ids {
788 self.blob_used(Some(&mut txn_tracker), blob_id).await?;
789 }
790
791 let application_description = ApplicationDescription {
792 module_id,
793 creator_chain_id: chain_id,
794 block_height,
795 application_index,
796 parameters,
797 required_application_ids,
798 };
799 self.check_required_applications(&application_description, Some(&mut txn_tracker))
800 .await?;
801
802 txn_tracker.add_created_blob(Blob::new_application_description(&application_description));
803
804 Ok(CreateApplicationResult {
805 app_id: ApplicationId::from(&application_description),
806 txn_tracker,
807 })
808 }
809
810 async fn check_required_applications(
811 &mut self,
812 application_description: &ApplicationDescription,
813 mut txn_tracker: Option<&mut TransactionTracker>,
814 ) -> Result<(), ExecutionError> {
815 for required_id in &application_description.required_application_ids {
817 Box::pin(self.describe_application(*required_id, txn_tracker.as_deref_mut())).await?;
818 }
819 Ok(())
820 }
821
822 pub async fn describe_application(
824 &mut self,
825 id: ApplicationId,
826 mut txn_tracker: Option<&mut TransactionTracker>,
827 ) -> Result<ApplicationDescription, ExecutionError> {
828 let blob_id = id.description_blob_id();
829 let blob_content = match txn_tracker
830 .as_ref()
831 .and_then(|tracker| tracker.created_blobs().get(&blob_id))
832 {
833 Some(blob) => blob.content().clone(),
834 None => self.read_blob_content(blob_id).await?,
835 };
836 self.blob_used(txn_tracker.as_deref_mut(), blob_id).await?;
837 let description: ApplicationDescription = bcs::from_bytes(blob_content.bytes())?;
838
839 let blob_ids = self.check_bytecode_blobs(&description.module_id).await?;
840 for blob_id in blob_ids {
843 self.blob_used(txn_tracker.as_deref_mut(), blob_id).await?;
844 }
845
846 self.check_required_applications(&description, txn_tracker)
847 .await?;
848
849 Ok(description)
850 }
851
852 pub async fn find_dependencies(
854 &mut self,
855 mut stack: Vec<ApplicationId>,
856 txn_tracker: &mut TransactionTracker,
857 ) -> Result<Vec<ApplicationId>, ExecutionError> {
858 let mut result = Vec::new();
860 let mut sorted = HashSet::new();
862 let mut seen = HashSet::new();
864
865 while let Some(id) = stack.pop() {
866 if sorted.contains(&id) {
867 continue;
868 }
869 if seen.contains(&id) {
870 sorted.insert(id);
873 result.push(id);
874 continue;
875 }
876 seen.insert(id);
879 stack.push(id);
881 let app = self.describe_application(id, Some(txn_tracker)).await?;
882 for child in app.required_application_ids.iter().rev() {
883 if !seen.contains(child) {
884 stack.push(*child);
885 }
886 }
887 }
888 Ok(result)
889 }
890
891 pub(crate) async fn blob_used(
894 &mut self,
895 maybe_txn_tracker: Option<&mut TransactionTracker>,
896 blob_id: BlobId,
897 ) -> Result<bool, ExecutionError> {
898 if self.used_blobs.contains(&blob_id).await? {
899 return Ok(false); }
901 self.used_blobs.insert(&blob_id)?;
902 if let Some(txn_tracker) = maybe_txn_tracker {
903 txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
904 }
905 Ok(true)
906 }
907
908 fn blob_published(&mut self, blob_id: &BlobId) -> Result<(), ExecutionError> {
911 self.used_blobs.insert(blob_id)?;
912 Ok(())
913 }
914
915 pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
916 match self.context().extra().get_blob(blob_id).await {
917 Ok(blob) => Ok(blob.into()),
918 Err(ViewError::BlobsNotFound(_)) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
919 Err(error) => Err(error.into()),
920 }
921 }
922
923 pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
924 if self.context().extra().contains_blob(blob_id).await? {
925 Ok(())
926 } else {
927 Err(ExecutionError::BlobsNotFound(vec![blob_id]))
928 }
929 }
930
931 async fn check_bytecode_blobs(
932 &mut self,
933 module_id: &ModuleId,
934 ) -> Result<Vec<BlobId>, ExecutionError> {
935 let blob_ids = module_id.bytecode_blob_ids();
936
937 let mut missing_blobs = Vec::new();
938 for blob_id in &blob_ids {
939 if !self.context().extra().contains_blob(*blob_id).await? {
940 missing_blobs.push(*blob_id);
941 }
942 }
943 ensure!(
944 missing_blobs.is_empty(),
945 ExecutionError::BlobsNotFound(missing_blobs)
946 );
947
948 Ok(blob_ids)
949 }
950}