1#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10 collections::{BTreeMap, BTreeSet, HashSet},
11 mem,
12};
13
14use custom_debug_derive::Debug;
15use linera_base::{
16 crypto::CryptoHash,
17 data_types::{
18 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
19 ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
20 },
21 ensure, hex_debug,
22 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
23 ownership::{ChainOwnership, TimeoutConfig},
24};
25use linera_views::{
26 context::Context,
27 map_view::HashedMapView,
28 register_view::HashedRegisterView,
29 set_view::HashedSetView,
30 views::{ClonableView, HashableView, ReplaceContext, View},
31};
32use serde::{Deserialize, Serialize};
33
34#[cfg(test)]
35use crate::test_utils::SystemExecutionState;
36use crate::{
37 committee::Committee, ApplicationDescription, ApplicationId, ExecutionError,
38 ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext, OutgoingMessage,
39 QueryContext, QueryOutcome, ResourceController, TransactionTracker,
40};
41
42pub static EPOCH_STREAM_NAME: &[u8] = &[0];
44pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
46
47#[cfg(with_metrics)]
49mod metrics {
50 use std::sync::LazyLock;
51
52 use linera_base::prometheus_util::register_int_counter_vec;
53 use prometheus::IntCounterVec;
54
55 pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
56 register_int_counter_vec(
57 "open_chain_count",
58 "The number of times the `OpenChain` operation was executed",
59 &[],
60 )
61 });
62}
63
64#[derive(Debug, ClonableView, HashableView)]
66pub struct SystemExecutionStateView<C> {
67 pub description: HashedRegisterView<C, Option<ChainDescription>>,
69 pub epoch: HashedRegisterView<C, Epoch>,
71 pub admin_id: HashedRegisterView<C, Option<ChainId>>,
73 pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
78 pub ownership: HashedRegisterView<C, ChainOwnership>,
80 pub balance: HashedRegisterView<C, Amount>,
82 pub balances: HashedMapView<C, AccountOwner, Amount>,
84 pub timestamp: HashedRegisterView<C, Timestamp>,
86 pub closed: HashedRegisterView<C, bool>,
88 pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
90 pub used_blobs: HashedSetView<C, BlobId>,
92 pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
94}
95
96impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
97 type Target = SystemExecutionStateView<C2>;
98
99 async fn with_context(
100 &mut self,
101 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
102 ) -> Self::Target {
103 SystemExecutionStateView {
104 description: self.description.with_context(ctx.clone()).await,
105 epoch: self.epoch.with_context(ctx.clone()).await,
106 admin_id: self.admin_id.with_context(ctx.clone()).await,
107 committees: self.committees.with_context(ctx.clone()).await,
108 ownership: self.ownership.with_context(ctx.clone()).await,
109 balance: self.balance.with_context(ctx.clone()).await,
110 balances: self.balances.with_context(ctx.clone()).await,
111 timestamp: self.timestamp.with_context(ctx.clone()).await,
112 closed: self.closed.with_context(ctx.clone()).await,
113 application_permissions: self.application_permissions.with_context(ctx.clone()).await,
114 used_blobs: self.used_blobs.with_context(ctx.clone()).await,
115 event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
116 }
117 }
118}
119
120#[derive(Debug, Default, Clone, Serialize, Deserialize)]
122pub struct EventSubscriptions {
123 pub next_index: u32,
126 pub applications: BTreeSet<ApplicationId>,
128}
129
130#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
132pub struct OpenChainConfig {
133 pub ownership: ChainOwnership,
135 pub balance: Amount,
137 pub application_permissions: ApplicationPermissions,
139}
140
141impl OpenChainConfig {
142 pub fn init_chain_config(
145 &self,
146 epoch: Epoch,
147 min_active_epoch: Epoch,
148 max_active_epoch: Epoch,
149 ) -> InitialChainConfig {
150 InitialChainConfig {
151 application_permissions: self.application_permissions.clone(),
152 balance: self.balance,
153 epoch,
154 min_active_epoch,
155 max_active_epoch,
156 ownership: self.ownership.clone(),
157 }
158 }
159}
160
161#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
163pub enum SystemOperation {
164 Transfer {
167 owner: AccountOwner,
168 recipient: Account,
169 amount: Amount,
170 },
171 Claim {
175 owner: AccountOwner,
176 target_id: ChainId,
177 recipient: Account,
178 amount: Amount,
179 },
180 OpenChain(OpenChainConfig),
183 CloseChain,
185 ChangeOwnership {
187 #[debug(skip_if = Vec::is_empty)]
189 super_owners: Vec<AccountOwner>,
190 #[debug(skip_if = Vec::is_empty)]
192 owners: Vec<(AccountOwner, u64)>,
193 multi_leader_rounds: u32,
195 open_multi_leader_rounds: bool,
199 timeout_config: TimeoutConfig,
201 },
202 ChangeApplicationPermissions(ApplicationPermissions),
204 PublishModule { module_id: ModuleId },
206 PublishDataBlob { blob_hash: CryptoHash },
208 VerifyBlob { blob_id: BlobId },
210 CreateApplication {
212 module_id: ModuleId,
213 #[serde(with = "serde_bytes")]
214 #[debug(with = "hex_debug")]
215 parameters: Vec<u8>,
216 #[serde(with = "serde_bytes")]
217 #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
218 instantiation_argument: Vec<u8>,
219 #[debug(skip_if = Vec::is_empty)]
220 required_application_ids: Vec<ApplicationId>,
221 },
222 Admin(AdminOperation),
224 ProcessNewEpoch(Epoch),
226 ProcessRemovedEpoch(Epoch),
228 UpdateStreams(Vec<(ChainId, StreamId, u32)>),
230}
231
232#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
234pub enum AdminOperation {
235 PublishCommitteeBlob { blob_hash: CryptoHash },
238 CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
241 RemoveCommittee { epoch: Epoch },
245}
246
247#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
249pub enum SystemMessage {
250 Credit {
253 target: AccountOwner,
254 amount: Amount,
255 source: AccountOwner,
256 },
257 Withdraw {
261 owner: AccountOwner,
262 amount: Amount,
263 recipient: Account,
264 },
265}
266
267#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
269pub struct SystemQuery;
270
271#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
273pub struct SystemResponse {
274 pub chain_id: ChainId,
275 pub balance: Amount,
276}
277
278#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
280pub struct UserData(pub Option<[u8; 32]>);
281
282impl UserData {
283 pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
284 let option_array = match opt_str {
286 Some(s) => {
287 let vec = s.into_bytes();
289 if vec.len() <= 32 {
290 let mut array = [b' '; 32];
292
293 let len = vec.len().min(32);
295 array[..len].copy_from_slice(&vec[..len]);
296
297 Some(array)
298 } else {
299 return Err(vec.len());
300 }
301 }
302 None => None,
303 };
304
305 Ok(UserData(option_array))
307 }
308}
309
310#[derive(Debug)]
311pub struct CreateApplicationResult {
312 pub app_id: ApplicationId,
313 pub txn_tracker: TransactionTracker,
314}
315
316impl<C> SystemExecutionStateView<C>
317where
318 C: Context + Clone + Send + Sync + 'static,
319 C::Extra: ExecutionRuntimeContext,
320{
321 pub fn is_active(&self) -> bool {
323 self.description.get().is_some()
324 && self.ownership.get().is_active()
325 && self.current_committee().is_some()
326 && self.admin_id.get().is_some()
327 }
328
329 pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
331 let epoch = self.epoch.get();
332 let committee = self.committees.get().get(epoch)?;
333 Some((*epoch, committee))
334 }
335
336 async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
337 match self.context().extra().get_event(event_id.clone()).await? {
338 None => Err(ExecutionError::EventsNotFound(vec![event_id])),
339 Some(vec) => Ok(vec),
340 }
341 }
342
343 pub async fn execute_operation(
346 &mut self,
347 context: OperationContext,
348 operation: SystemOperation,
349 txn_tracker: &mut TransactionTracker,
350 resource_controller: &mut ResourceController<Option<AccountOwner>>,
351 ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
352 use SystemOperation::*;
353 let mut new_application = None;
354 match operation {
355 OpenChain(config) => {
356 let _chain_id = self
357 .open_chain(
358 config,
359 context.chain_id,
360 context.height,
361 context.timestamp,
362 txn_tracker,
363 )
364 .await?;
365 #[cfg(with_metrics)]
366 metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
367 }
368 ChangeOwnership {
369 super_owners,
370 owners,
371 multi_leader_rounds,
372 open_multi_leader_rounds,
373 timeout_config,
374 } => {
375 self.ownership.set(ChainOwnership {
376 super_owners: super_owners.into_iter().collect(),
377 owners: owners.into_iter().collect(),
378 multi_leader_rounds,
379 open_multi_leader_rounds,
380 timeout_config,
381 });
382 }
383 ChangeApplicationPermissions(application_permissions) => {
384 self.application_permissions.set(application_permissions);
385 }
386 CloseChain => self.close_chain().await?,
387 Transfer {
388 owner,
389 amount,
390 recipient,
391 } => {
392 let maybe_message = self
393 .transfer(context.authenticated_signer, None, owner, recipient, amount)
394 .await?;
395 txn_tracker.add_outgoing_messages(maybe_message);
396 }
397 Claim {
398 owner,
399 target_id,
400 recipient,
401 amount,
402 } => {
403 let maybe_message = self
404 .claim(
405 context.authenticated_signer,
406 None,
407 owner,
408 target_id,
409 recipient,
410 amount,
411 )
412 .await?;
413 txn_tracker.add_outgoing_messages(maybe_message);
414 }
415 Admin(admin_operation) => {
416 ensure!(
417 *self.admin_id.get() == Some(context.chain_id),
418 ExecutionError::AdminOperationOnNonAdminChain
419 );
420 match admin_operation {
421 AdminOperation::PublishCommitteeBlob { blob_hash } => {
422 self.blob_published(
423 &BlobId::new(blob_hash, BlobType::Committee),
424 txn_tracker,
425 )?;
426 }
427 AdminOperation::CreateCommittee { epoch, blob_hash } => {
428 self.check_next_epoch(epoch)?;
429 let blob_id = BlobId::new(blob_hash, BlobType::Committee);
430 let committee =
431 bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
432 self.blob_used(txn_tracker, blob_id).await?;
433 self.committees.get_mut().insert(epoch, committee);
434 self.epoch.set(epoch);
435 txn_tracker.add_event(
436 StreamId::system(EPOCH_STREAM_NAME),
437 epoch.0,
438 bcs::to_bytes(&blob_hash)?,
439 );
440 }
441 AdminOperation::RemoveCommittee { epoch } => {
442 ensure!(
443 self.committees.get_mut().remove(&epoch).is_some(),
444 ExecutionError::InvalidCommitteeRemoval
445 );
446 txn_tracker.add_event(
447 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
448 epoch.0,
449 vec![],
450 );
451 }
452 }
453 }
454 PublishModule { module_id } => {
455 for blob_id in module_id.bytecode_blob_ids() {
456 self.blob_published(&blob_id, txn_tracker)?;
457 }
458 }
459 CreateApplication {
460 module_id,
461 parameters,
462 instantiation_argument,
463 required_application_ids,
464 } => {
465 let txn_tracker_moved = mem::take(txn_tracker);
466 let CreateApplicationResult {
467 app_id,
468 txn_tracker: txn_tracker_moved,
469 } = self
470 .create_application(
471 context.chain_id,
472 context.height,
473 module_id,
474 parameters,
475 required_application_ids,
476 txn_tracker_moved,
477 )
478 .await?;
479 *txn_tracker = txn_tracker_moved;
480 new_application = Some((app_id, instantiation_argument));
481 }
482 PublishDataBlob { blob_hash } => {
483 self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
484 }
485 VerifyBlob { blob_id } => {
486 self.assert_blob_exists(blob_id).await?;
487 resource_controller
488 .with_state(self)
489 .await?
490 .track_blob_read(0)?;
491 self.blob_used(txn_tracker, blob_id).await?;
492 }
493 ProcessNewEpoch(epoch) => {
494 self.check_next_epoch(epoch)?;
495 let admin_id = self
496 .admin_id
497 .get()
498 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
499 let event_id = EventId {
500 chain_id: admin_id,
501 stream_id: StreamId::system(EPOCH_STREAM_NAME),
502 index: epoch.0,
503 };
504 let bytes = match txn_tracker.next_replayed_oracle_response()? {
505 None => self.get_event(event_id.clone()).await?,
506 Some(OracleResponse::Event(recorded_event_id, bytes))
507 if recorded_event_id == event_id =>
508 {
509 bytes
510 }
511 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
512 };
513 let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
514 txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
515 let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
516 self.blob_used(txn_tracker, blob_id).await?;
517 self.committees.get_mut().insert(epoch, committee);
518 self.epoch.set(epoch);
519 }
520 ProcessRemovedEpoch(epoch) => {
521 ensure!(
522 self.committees.get_mut().remove(&epoch).is_some(),
523 ExecutionError::InvalidCommitteeRemoval
524 );
525 let admin_id = self
526 .admin_id
527 .get()
528 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
529 let event_id = EventId {
530 chain_id: admin_id,
531 stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
532 index: epoch.0,
533 };
534 let bytes = match txn_tracker.next_replayed_oracle_response()? {
535 None => self.get_event(event_id.clone()).await?,
536 Some(OracleResponse::Event(recorded_event_id, bytes))
537 if recorded_event_id == event_id =>
538 {
539 bytes
540 }
541 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
542 };
543 txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
544 }
545 UpdateStreams(streams) => {
546 let mut missing_events = Vec::new();
547 for (chain_id, stream_id, next_index) in streams {
548 let subscriptions = self
549 .event_subscriptions
550 .get_mut_or_default(&(chain_id, stream_id.clone()))
551 .await?;
552 ensure!(
553 subscriptions.next_index < next_index,
554 ExecutionError::OutdatedUpdateStreams
555 );
556 for application_id in &subscriptions.applications {
557 txn_tracker.add_stream_to_process(
558 *application_id,
559 chain_id,
560 stream_id.clone(),
561 subscriptions.next_index,
562 next_index,
563 );
564 }
565 subscriptions.next_index = next_index;
566 let index = next_index
567 .checked_sub(1)
568 .ok_or(ArithmeticError::Underflow)?;
569 let event_id = EventId {
570 chain_id,
571 stream_id,
572 index,
573 };
574 match txn_tracker.next_replayed_oracle_response()? {
575 None => {
576 if !self
577 .context()
578 .extra()
579 .contains_event(event_id.clone())
580 .await?
581 {
582 missing_events.push(event_id);
583 continue;
584 }
585 }
586 Some(OracleResponse::EventExists(recorded_event_id))
587 if recorded_event_id == event_id => {}
588 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
589 }
590 txn_tracker.add_oracle_response(OracleResponse::EventExists(event_id));
591 }
592 ensure!(
593 missing_events.is_empty(),
594 ExecutionError::EventsNotFound(missing_events)
595 );
596 }
597 }
598
599 Ok(new_application)
600 }
601
602 fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
605 let expected = self.epoch.get().try_add_one()?;
606 ensure!(
607 provided == expected,
608 ExecutionError::InvalidCommitteeEpoch { provided, expected }
609 );
610 Ok(())
611 }
612
613 async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
614 if owner == &AccountOwner::CHAIN {
615 let new_balance = self.balance.get().saturating_add(amount);
616 self.balance.set(new_balance);
617 } else {
618 let balance = self.balances.get_mut_or_default(owner).await?;
619 *balance = balance.saturating_add(amount);
620 }
621 Ok(())
622 }
623
624 async fn credit_or_send_message(
625 &mut self,
626 source: AccountOwner,
627 recipient: Account,
628 amount: Amount,
629 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
630 let source_chain_id = self.context().extra().chain_id();
631 if recipient.chain_id == source_chain_id {
632 let target = recipient.owner;
634 self.credit(&target, amount).await?;
635 Ok(None)
636 } else {
637 let message = SystemMessage::Credit {
639 amount,
640 source,
641 target: recipient.owner,
642 };
643 Ok(Some(
644 OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
645 ))
646 }
647 }
648
649 pub async fn transfer(
650 &mut self,
651 authenticated_signer: Option<AccountOwner>,
652 authenticated_application_id: Option<ApplicationId>,
653 source: AccountOwner,
654 recipient: Account,
655 amount: Amount,
656 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
657 if source == AccountOwner::CHAIN {
658 ensure!(
659 authenticated_signer.is_some()
660 && self
661 .ownership
662 .get()
663 .verify_owner(&authenticated_signer.unwrap()),
664 ExecutionError::UnauthenticatedTransferOwner
665 );
666 } else {
667 ensure!(
668 authenticated_signer == Some(source)
669 || authenticated_application_id.map(AccountOwner::from) == Some(source),
670 ExecutionError::UnauthenticatedTransferOwner
671 );
672 }
673 ensure!(
674 amount > Amount::ZERO,
675 ExecutionError::IncorrectTransferAmount
676 );
677 self.debit(&source, amount).await?;
678 self.credit_or_send_message(source, recipient, amount).await
679 }
680
681 pub async fn claim(
682 &mut self,
683 authenticated_signer: Option<AccountOwner>,
684 authenticated_application_id: Option<ApplicationId>,
685 source: AccountOwner,
686 target_id: ChainId,
687 recipient: Account,
688 amount: Amount,
689 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
690 ensure!(
691 authenticated_signer == Some(source)
692 || authenticated_application_id.map(AccountOwner::from) == Some(source),
693 ExecutionError::UnauthenticatedClaimOwner
694 );
695 ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
696
697 let current_chain_id = self.context().extra().chain_id();
698 if target_id == current_chain_id {
699 self.debit(&source, amount).await?;
701 self.credit_or_send_message(source, recipient, amount).await
702 } else {
703 let message = SystemMessage::Withdraw {
705 amount,
706 owner: source,
707 recipient,
708 };
709 Ok(Some(
710 OutgoingMessage::new(target_id, message)
711 .with_authenticated_signer(authenticated_signer),
712 ))
713 }
714 }
715
716 async fn debit(
718 &mut self,
719 account: &AccountOwner,
720 amount: Amount,
721 ) -> Result<(), ExecutionError> {
722 let balance = if account == &AccountOwner::CHAIN {
723 self.balance.get_mut()
724 } else {
725 self.balances.get_mut(account).await?.ok_or_else(|| {
726 ExecutionError::InsufficientBalance {
727 balance: Amount::ZERO,
728 account: *account,
729 }
730 })?
731 };
732
733 balance
734 .try_sub_assign(amount)
735 .map_err(|_| ExecutionError::InsufficientBalance {
736 balance: *balance,
737 account: *account,
738 })?;
739
740 if account != &AccountOwner::CHAIN && balance.is_zero() {
741 self.balances.remove(account)?;
742 }
743
744 Ok(())
745 }
746
747 pub async fn execute_message(
749 &mut self,
750 context: MessageContext,
751 message: SystemMessage,
752 ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
753 let mut outcome = Vec::new();
754 use SystemMessage::*;
755 match message {
756 Credit {
757 amount,
758 source,
759 target,
760 } => {
761 let receiver = if context.is_bouncing { source } else { target };
762 self.credit(&receiver, amount).await?;
763 }
764 Withdraw {
765 amount,
766 owner,
767 recipient,
768 } => {
769 self.debit(&owner, amount).await?;
770 if let Some(message) = self
771 .credit_or_send_message(owner, recipient, amount)
772 .await?
773 {
774 outcome.push(message);
775 }
776 }
777 }
778 Ok(outcome)
779 }
780
781 pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
784 if self.description.get().is_some() {
785 return Ok(true);
787 }
788 let description_blob = self
789 .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
790 .await?;
791 let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
792 let InitialChainConfig {
793 ownership,
794 epoch,
795 balance,
796 min_active_epoch,
797 max_active_epoch,
798 application_permissions,
799 } = description.config().clone();
800 self.timestamp.set(description.timestamp());
801 self.description.set(Some(description));
802 self.epoch.set(epoch);
803 let committees = self
804 .context()
805 .extra()
806 .committees_for(min_active_epoch..=max_active_epoch)
807 .await?;
808 self.committees.set(committees);
809 let admin_id = self
810 .context()
811 .extra()
812 .get_network_description()
813 .await?
814 .ok_or(ExecutionError::NoNetworkDescriptionFound)?
815 .admin_chain_id;
816 self.admin_id.set(Some(admin_id));
817 self.ownership.set(ownership);
818 self.balance.set(balance);
819 self.application_permissions.set(application_permissions);
820 Ok(false)
821 }
822
823 pub async fn handle_query(
824 &mut self,
825 context: QueryContext,
826 _query: SystemQuery,
827 ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
828 let response = SystemResponse {
829 chain_id: context.chain_id,
830 balance: *self.balance.get(),
831 };
832 Ok(QueryOutcome {
833 response,
834 operations: vec![],
835 })
836 }
837
838 pub async fn open_chain(
841 &mut self,
842 config: OpenChainConfig,
843 parent: ChainId,
844 block_height: BlockHeight,
845 timestamp: Timestamp,
846 txn_tracker: &mut TransactionTracker,
847 ) -> Result<ChainId, ExecutionError> {
848 let chain_index = txn_tracker.next_chain_index();
849 let chain_origin = ChainOrigin::Child {
850 parent,
851 block_height,
852 chain_index,
853 };
854 let init_chain_config = config.init_chain_config(
855 *self.epoch.get(),
856 self.committees
857 .get()
858 .keys()
859 .min()
860 .copied()
861 .unwrap_or(Epoch::ZERO),
862 self.committees
863 .get()
864 .keys()
865 .max()
866 .copied()
867 .unwrap_or(Epoch::ZERO),
868 );
869 let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
870 let child_id = chain_description.id();
871 self.debit(&AccountOwner::CHAIN, config.balance).await?;
872 let blob = Blob::new_chain_description(&chain_description);
873 txn_tracker.add_created_blob(blob);
874 Ok(child_id)
875 }
876
877 pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
878 self.closed.set(true);
879 Ok(())
880 }
881
882 pub async fn create_application(
883 &mut self,
884 chain_id: ChainId,
885 block_height: BlockHeight,
886 module_id: ModuleId,
887 parameters: Vec<u8>,
888 required_application_ids: Vec<ApplicationId>,
889 mut txn_tracker: TransactionTracker,
890 ) -> Result<CreateApplicationResult, ExecutionError> {
891 let application_index = txn_tracker.next_application_index();
892
893 let blob_ids = self.check_bytecode_blobs(&module_id, &txn_tracker).await?;
894 for blob_id in blob_ids {
897 self.blob_used(&mut txn_tracker, blob_id).await?;
898 }
899
900 let application_description = ApplicationDescription {
901 module_id,
902 creator_chain_id: chain_id,
903 block_height,
904 application_index,
905 parameters,
906 required_application_ids,
907 };
908 self.check_required_applications(&application_description, &mut txn_tracker)
909 .await?;
910
911 let blob = Blob::new_application_description(&application_description);
912 self.used_blobs.insert(&blob.id())?;
913 txn_tracker.add_created_blob(blob);
914
915 Ok(CreateApplicationResult {
916 app_id: ApplicationId::from(&application_description),
917 txn_tracker,
918 })
919 }
920
921 async fn check_required_applications(
922 &mut self,
923 application_description: &ApplicationDescription,
924 txn_tracker: &mut TransactionTracker,
925 ) -> Result<(), ExecutionError> {
926 for required_id in &application_description.required_application_ids {
928 Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
929 }
930 Ok(())
931 }
932
933 pub async fn describe_application(
935 &mut self,
936 id: ApplicationId,
937 txn_tracker: &mut TransactionTracker,
938 ) -> Result<ApplicationDescription, ExecutionError> {
939 let blob_id = id.description_blob_id();
940 let content = match txn_tracker.created_blobs().get(&blob_id) {
941 Some(content) => content.clone(),
942 None => self.read_blob_content(blob_id).await?,
943 };
944 self.blob_used(txn_tracker, blob_id).await?;
945 let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
946
947 let blob_ids = self
948 .check_bytecode_blobs(&description.module_id, txn_tracker)
949 .await?;
950 for blob_id in blob_ids {
953 self.blob_used(txn_tracker, blob_id).await?;
954 }
955
956 self.check_required_applications(&description, txn_tracker)
957 .await?;
958
959 Ok(description)
960 }
961
962 pub async fn find_dependencies(
964 &mut self,
965 mut stack: Vec<ApplicationId>,
966 txn_tracker: &mut TransactionTracker,
967 ) -> Result<Vec<ApplicationId>, ExecutionError> {
968 let mut result = Vec::new();
970 let mut sorted = HashSet::new();
972 let mut seen = HashSet::new();
974
975 while let Some(id) = stack.pop() {
976 if sorted.contains(&id) {
977 continue;
978 }
979 if seen.contains(&id) {
980 sorted.insert(id);
983 result.push(id);
984 continue;
985 }
986 seen.insert(id);
989 stack.push(id);
991 let app = self.describe_application(id, txn_tracker).await?;
992 for child in app.required_application_ids.iter().rev() {
993 if !seen.contains(child) {
994 stack.push(*child);
995 }
996 }
997 }
998 Ok(result)
999 }
1000
1001 pub(crate) async fn blob_used(
1004 &mut self,
1005 txn_tracker: &mut TransactionTracker,
1006 blob_id: BlobId,
1007 ) -> Result<bool, ExecutionError> {
1008 if self.used_blobs.contains(&blob_id).await? {
1009 return Ok(false); }
1011 self.used_blobs.insert(&blob_id)?;
1012 txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
1013 Ok(true)
1014 }
1015
1016 fn blob_published(
1019 &mut self,
1020 blob_id: &BlobId,
1021 txn_tracker: &mut TransactionTracker,
1022 ) -> Result<(), ExecutionError> {
1023 self.used_blobs.insert(blob_id)?;
1024 txn_tracker.add_published_blob(*blob_id);
1025 Ok(())
1026 }
1027
1028 pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1029 match self.context().extra().get_blob(blob_id).await {
1030 Ok(Some(blob)) => Ok(blob.into()),
1031 Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1032 Err(error) => Err(error.into()),
1033 }
1034 }
1035
1036 pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1037 if self.context().extra().contains_blob(blob_id).await? {
1038 Ok(())
1039 } else {
1040 Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1041 }
1042 }
1043
1044 async fn check_bytecode_blobs(
1045 &mut self,
1046 module_id: &ModuleId,
1047 txn_tracker: &TransactionTracker,
1048 ) -> Result<Vec<BlobId>, ExecutionError> {
1049 let blob_ids = module_id.bytecode_blob_ids();
1050
1051 let mut missing_blobs = Vec::new();
1052 for blob_id in &blob_ids {
1053 if txn_tracker.created_blobs().contains_key(blob_id) {
1055 continue; }
1057 if !self.context().extra().contains_blob(*blob_id).await? {
1059 missing_blobs.push(*blob_id);
1060 }
1061 }
1062 ensure!(
1063 missing_blobs.is_empty(),
1064 ExecutionError::BlobsNotFound(missing_blobs)
1065 );
1066
1067 Ok(blob_ids)
1068 }
1069}