#[cfg(test)]
#[path = "./unit_tests/system_tests.rs"]
mod tests;
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
sync::Arc,
};
use allocative::Allocative;
use custom_debug_derive::Debug;
use linera_base::{
crypto::CryptoHash,
data_types::{
Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
},
ensure, hex_debug,
identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
ownership::{ChainOwnership, TimeoutConfig},
};
use linera_views::{
context::Context,
lazy_register_view::HashedLazyRegisterView,
map_view::HashedMapView,
register_view::HashedRegisterView,
set_view::HashedSetView,
views::{ClonableView, HashableView, ReplaceContext, View},
ViewError,
};
use serde::{Deserialize, Serialize};
#[cfg(test)]
use crate::test_utils::SystemExecutionState;
use crate::{
committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
};
pub static EPOCH_STREAM_NAME: &[u8] = &[0];
pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
#[cfg(with_metrics)]
mod metrics {
use std::sync::LazyLock;
use linera_base::prometheus_util::register_int_counter_vec;
use prometheus::IntCounterVec;
pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"open_chain_count",
"The number of times the `OpenChain` operation was executed",
&[],
)
});
}
#[derive(Debug, ClonableView, HashableView, Allocative)]
#[allocative(bound = "C")]
pub struct SystemExecutionStateView<C> {
pub description: HashedLazyRegisterView<C, Option<ChainDescription>>,
pub epoch: HashedRegisterView<C, Epoch>,
pub admin_chain_id: HashedRegisterView<C, Option<ChainId>>,
pub committees: HashedLazyRegisterView<C, BTreeMap<Epoch, Committee>>,
pub ownership: HashedLazyRegisterView<C, ChainOwnership>,
pub balance: HashedRegisterView<C, Amount>,
pub balances: HashedMapView<C, AccountOwner, Amount>,
pub timestamp: HashedRegisterView<C, Timestamp>,
pub closed: HashedRegisterView<C, bool>,
pub application_permissions: HashedLazyRegisterView<C, ApplicationPermissions>,
pub used_blobs: HashedSetView<C, BlobId>,
pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
}
impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
type Target = SystemExecutionStateView<C2>;
async fn with_context(
&mut self,
ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
) -> Self::Target {
SystemExecutionStateView {
description: self.description.with_context(ctx.clone()).await,
epoch: self.epoch.with_context(ctx.clone()).await,
admin_chain_id: self.admin_chain_id.with_context(ctx.clone()).await,
committees: self.committees.with_context(ctx.clone()).await,
ownership: self.ownership.with_context(ctx.clone()).await,
balance: self.balance.with_context(ctx.clone()).await,
balances: self.balances.with_context(ctx.clone()).await,
timestamp: self.timestamp.with_context(ctx.clone()).await,
closed: self.closed.with_context(ctx.clone()).await,
application_permissions: self.application_permissions.with_context(ctx.clone()).await,
used_blobs: self.used_blobs.with_context(ctx.clone()).await,
event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
}
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize, Allocative)]
pub struct EventSubscriptions {
pub next_index: u32,
pub applications: BTreeSet<ApplicationId>,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
pub struct OpenChainConfig {
pub ownership: ChainOwnership,
pub balance: Amount,
pub application_permissions: ApplicationPermissions,
}
impl OpenChainConfig {
pub fn init_chain_config(
&self,
epoch: Epoch,
min_active_epoch: Epoch,
max_active_epoch: Epoch,
) -> InitialChainConfig {
InitialChainConfig {
application_permissions: self.application_permissions.clone(),
balance: self.balance,
epoch,
min_active_epoch,
max_active_epoch,
ownership: self.ownership.clone(),
}
}
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
pub enum SystemOperation {
Transfer {
owner: AccountOwner,
recipient: Account,
amount: Amount,
},
Claim {
owner: AccountOwner,
target_id: ChainId,
recipient: Account,
amount: Amount,
},
OpenChain(OpenChainConfig),
CloseChain,
ChangeOwnership {
#[debug(skip_if = Vec::is_empty)]
super_owners: Vec<AccountOwner>,
#[debug(skip_if = Vec::is_empty)]
owners: Vec<(AccountOwner, u64)>,
multi_leader_rounds: u32,
open_multi_leader_rounds: bool,
timeout_config: TimeoutConfig,
},
ChangeApplicationPermissions(ApplicationPermissions),
PublishModule { module_id: ModuleId },
PublishDataBlob { blob_hash: CryptoHash },
VerifyBlob { blob_id: BlobId },
CreateApplication {
module_id: ModuleId,
#[serde(with = "serde_bytes")]
#[debug(with = "hex_debug")]
parameters: Vec<u8>,
#[serde(with = "serde_bytes")]
#[debug(with = "hex_debug", skip_if = Vec::is_empty)]
instantiation_argument: Vec<u8>,
#[debug(skip_if = Vec::is_empty)]
required_application_ids: Vec<ApplicationId>,
},
Admin(AdminOperation),
ProcessNewEpoch(Epoch),
ProcessRemovedEpoch(Epoch),
UpdateStreams(Vec<(ChainId, StreamId, u32)>),
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
pub enum AdminOperation {
PublishCommitteeBlob { blob_hash: CryptoHash },
CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
RemoveCommittee { epoch: Epoch },
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
pub enum SystemMessage {
Credit {
target: AccountOwner,
amount: Amount,
source: AccountOwner,
},
Withdraw {
owner: AccountOwner,
amount: Amount,
recipient: Account,
},
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct SystemQuery;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct SystemResponse {
pub chain_id: ChainId,
pub balance: Amount,
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
pub struct UserData(pub Option<[u8; 32]>);
impl UserData {
pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
let option_array = match opt_str {
Some(s) => {
let vec = s.into_bytes();
if vec.len() <= 32 {
let mut array = [b' '; 32];
let len = vec.len().min(32);
array[..len].copy_from_slice(&vec[..len]);
Some(array)
} else {
return Err(vec.len());
}
}
None => None,
};
Ok(UserData(option_array))
}
}
#[derive(Debug)]
pub struct CreateApplicationResult {
pub app_id: ApplicationId,
}
impl<C> SystemExecutionStateView<C>
where
C: Context + Clone + 'static,
C::Extra: ExecutionRuntimeContext,
{
pub async fn is_active(&self) -> Result<bool, ViewError> {
Ok(self.description.get().await?.is_some()
&& self.ownership.get().await?.is_active()
&& self.current_committee().await?.is_some()
&& self.admin_chain_id.get().is_some())
}
pub async fn current_committee(&self) -> Result<Option<(Epoch, Arc<Committee>)>, ViewError> {
let epoch = *self.epoch.get();
if let Some(committee) = self.context().extra().get_or_load_committee(epoch).await? {
return Ok(Some((epoch, committee)));
}
let Some(committee) = self.committees.get().await?.get(&epoch).cloned() else {
return Ok(None);
};
Ok(Some((epoch, Arc::new(committee))))
}
async fn get_event(&self, event_id: EventId) -> Result<Arc<Vec<u8>>, ExecutionError> {
match self.context().extra().get_event(event_id.clone()).await? {
None => Err(ExecutionError::EventsNotFound(vec![event_id])),
Some(vec) => Ok(vec),
}
}
pub async fn execute_operation(
&mut self,
context: OperationContext,
operation: SystemOperation,
txn_tracker: &mut TransactionTracker,
resource_controller: &mut ResourceController<Option<AccountOwner>>,
) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
use SystemOperation::*;
let mut new_application = None;
match operation {
OpenChain(config) => {
let _chain_id = self
.open_chain(
config,
context.chain_id,
context.height,
context.timestamp,
txn_tracker,
)
.await?;
#[cfg(with_metrics)]
metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
}
ChangeOwnership {
super_owners,
owners,
multi_leader_rounds,
open_multi_leader_rounds,
timeout_config,
} => {
self.ownership.set(ChainOwnership {
super_owners: super_owners.into_iter().collect(),
owners: owners.into_iter().collect(),
multi_leader_rounds,
open_multi_leader_rounds,
timeout_config,
});
}
ChangeApplicationPermissions(application_permissions) => {
self.application_permissions.set(application_permissions);
}
CloseChain => self.close_chain()?,
Transfer {
owner,
amount,
recipient,
} => {
let maybe_message = self
.transfer(context.authenticated_signer, None, owner, recipient, amount)
.await?;
txn_tracker.add_outgoing_messages(maybe_message);
}
Claim {
owner,
target_id,
recipient,
amount,
} => {
let maybe_message = self
.claim(
context.authenticated_signer,
None,
owner,
target_id,
recipient,
amount,
)
.await?;
txn_tracker.add_outgoing_messages(maybe_message);
}
Admin(admin_operation) => {
ensure!(
*self.admin_chain_id.get() == Some(context.chain_id),
ExecutionError::AdminOperationOnNonAdminChain
);
match admin_operation {
AdminOperation::PublishCommitteeBlob { blob_hash } => {
self.blob_published(
&BlobId::new(blob_hash, BlobType::Committee),
txn_tracker,
)?;
}
AdminOperation::CreateCommittee { epoch, blob_hash } => {
self.check_next_epoch(epoch)?;
let blob_id = BlobId::new(blob_hash, BlobType::Committee);
let committee =
bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
self.blob_used(txn_tracker, blob_id).await?;
self.committees.get_mut().await?.insert(epoch, committee);
self.epoch.set(epoch);
txn_tracker.add_event(
StreamId::system(EPOCH_STREAM_NAME),
epoch.0,
bcs::to_bytes(&blob_hash)?,
);
}
AdminOperation::RemoveCommittee { epoch } => {
ensure!(
self.committees.get_mut().await?.remove(&epoch).is_some(),
ExecutionError::InvalidCommitteeRemoval
);
txn_tracker.add_event(
StreamId::system(REMOVED_EPOCH_STREAM_NAME),
epoch.0,
vec![],
);
}
}
}
PublishModule { module_id } => {
for blob_id in module_id.bytecode_blob_ids() {
self.blob_published(&blob_id, txn_tracker)?;
}
}
CreateApplication {
module_id,
parameters,
instantiation_argument,
required_application_ids,
} => {
let CreateApplicationResult { app_id } = self
.create_application(
context.chain_id,
context.height,
module_id,
parameters,
required_application_ids,
txn_tracker,
)
.await?;
new_application = Some((app_id, instantiation_argument));
}
PublishDataBlob { blob_hash } => {
self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
}
VerifyBlob { blob_id } => {
self.assert_blob_exists(blob_id).await?;
resource_controller
.with_state(self)
.await?
.track_blob_read(0)?;
self.blob_used(txn_tracker, blob_id).await?;
}
ProcessNewEpoch(epoch) => {
self.check_next_epoch(epoch)?;
let admin_chain_id = self
.admin_chain_id
.get()
.ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
let event_id = EventId {
chain_id: admin_chain_id,
stream_id: StreamId::system(EPOCH_STREAM_NAME),
index: epoch.0,
};
let bytes = txn_tracker
.oracle(|| async {
let bytes = self.get_event(event_id.clone()).await?;
Ok(OracleResponse::Event(
event_id.clone(),
Arc::unwrap_or_clone(bytes),
))
})
.await?
.to_event(&event_id)?;
let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
self.blob_used(txn_tracker, blob_id).await?;
self.committees.get_mut().await?.insert(epoch, committee);
self.epoch.set(epoch);
}
ProcessRemovedEpoch(epoch) => {
ensure!(
self.committees.get_mut().await?.remove(&epoch).is_some(),
ExecutionError::InvalidCommitteeRemoval
);
let admin_chain_id = self
.admin_chain_id
.get()
.ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
let event_id = EventId {
chain_id: admin_chain_id,
stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
index: epoch.0,
};
txn_tracker
.oracle(|| async {
let bytes = self.get_event(event_id.clone()).await?;
Ok(OracleResponse::Event(event_id, Arc::unwrap_or_clone(bytes)))
})
.await?;
}
UpdateStreams(streams) => {
let mut missing_events = Vec::new();
for (chain_id, stream_id, next_index) in streams {
let subscriptions = self
.event_subscriptions
.get_mut_or_default(&(chain_id, stream_id.clone()))
.await?;
ensure!(
subscriptions.next_index < next_index,
ExecutionError::OutdatedUpdateStreams
);
for application_id in &subscriptions.applications {
txn_tracker.add_stream_to_process(
*application_id,
chain_id,
stream_id.clone(),
subscriptions.next_index,
next_index,
);
}
subscriptions.next_index = next_index;
let index = next_index
.checked_sub(1)
.ok_or(ArithmeticError::Underflow)?;
let event_id = EventId {
chain_id,
stream_id,
index,
};
let extra = self.context().extra();
txn_tracker
.oracle(|| async {
if !extra.contains_event(event_id.clone()).await? {
missing_events.push(event_id.clone());
}
Ok(OracleResponse::EventExists(event_id))
})
.await?;
}
ensure!(
missing_events.is_empty(),
ExecutionError::EventsNotFound(missing_events)
);
}
}
Ok(new_application)
}
fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
let expected = self.epoch.get().try_add_one()?;
ensure!(
provided == expected,
ExecutionError::InvalidCommitteeEpoch { provided, expected }
);
Ok(())
}
async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
if owner == &AccountOwner::CHAIN {
let new_balance = self.balance.get().saturating_add(amount);
self.balance.set(new_balance);
} else {
let balance = self.balances.get_mut_or_default(owner).await?;
*balance = balance.saturating_add(amount);
}
Ok(())
}
async fn credit_or_send_message(
&mut self,
source: AccountOwner,
recipient: Account,
amount: Amount,
) -> Result<Option<OutgoingMessage>, ExecutionError> {
let source_chain_id = self.context().extra().chain_id();
if recipient.chain_id == source_chain_id {
let target = recipient.owner;
self.credit(&target, amount).await?;
Ok(None)
} else {
let message = SystemMessage::Credit {
amount,
source,
target: recipient.owner,
};
Ok(Some(
OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
))
}
}
pub async fn transfer(
&mut self,
authenticated_signer: Option<AccountOwner>,
authenticated_application_id: Option<ApplicationId>,
source: AccountOwner,
recipient: Account,
amount: Amount,
) -> Result<Option<OutgoingMessage>, ExecutionError> {
if source == AccountOwner::CHAIN {
ensure!(
authenticated_signer.is_some()
&& self
.ownership
.get()
.await?
.verify_owner(&authenticated_signer.unwrap()),
ExecutionError::UnauthenticatedTransferOwner
);
} else {
ensure!(
authenticated_signer == Some(source)
|| authenticated_application_id.map(AccountOwner::from) == Some(source),
ExecutionError::UnauthenticatedTransferOwner
);
}
ensure!(
amount > Amount::ZERO,
ExecutionError::IncorrectTransferAmount
);
self.debit(&source, amount).await?;
self.credit_or_send_message(source, recipient, amount).await
}
pub async fn claim(
&mut self,
authenticated_signer: Option<AccountOwner>,
authenticated_application_id: Option<ApplicationId>,
source: AccountOwner,
target_id: ChainId,
recipient: Account,
amount: Amount,
) -> Result<Option<OutgoingMessage>, ExecutionError> {
ensure!(
authenticated_signer == Some(source)
|| authenticated_application_id.map(AccountOwner::from) == Some(source),
ExecutionError::UnauthenticatedClaimOwner
);
ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
let current_chain_id = self.context().extra().chain_id();
if target_id == current_chain_id {
self.debit(&source, amount).await?;
self.credit_or_send_message(source, recipient, amount).await
} else {
let message = SystemMessage::Withdraw {
amount,
owner: source,
recipient,
};
Ok(Some(
OutgoingMessage::new(target_id, message)
.with_authenticated_signer(authenticated_signer),
))
}
}
async fn debit(
&mut self,
account: &AccountOwner,
amount: Amount,
) -> Result<(), ExecutionError> {
let balance = if account == &AccountOwner::CHAIN {
self.balance.get_mut()
} else {
self.balances.get_mut(account).await?.ok_or_else(|| {
ExecutionError::InsufficientBalance {
balance: Amount::ZERO,
account: *account,
}
})?
};
balance
.try_sub_assign(amount)
.map_err(|_| ExecutionError::InsufficientBalance {
balance: *balance,
account: *account,
})?;
if account != &AccountOwner::CHAIN && balance.is_zero() {
self.balances.remove(account)?;
}
Ok(())
}
pub async fn execute_message(
&mut self,
context: MessageContext,
message: SystemMessage,
) -> Result<Vec<OutgoingMessage>, ExecutionError> {
let mut outcome = Vec::new();
use SystemMessage::*;
match message {
Credit {
amount,
source,
target,
} => {
let receiver = if context.is_bouncing { source } else { target };
self.credit(&receiver, amount).await?;
}
Withdraw {
amount,
owner,
recipient,
} => {
self.debit(&owner, amount).await?;
if let Some(message) = self
.credit_or_send_message(owner, recipient, amount)
.await?
{
outcome.push(message);
}
}
}
Ok(outcome)
}
pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
if self.description.get().await?.is_some() {
return Ok(true);
}
let description_blob = self
.read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
.await?;
let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
let InitialChainConfig {
ownership,
epoch,
balance,
min_active_epoch,
max_active_epoch,
application_permissions,
} = description.config().clone();
self.timestamp.set(description.timestamp());
self.description.set(Some(description));
self.epoch.set(epoch);
let committees = self
.context()
.extra()
.get_committees(min_active_epoch..=max_active_epoch)
.await?;
let admin_chain_id = self
.context()
.extra()
.get_network_description()
.await?
.ok_or(ExecutionError::NoNetworkDescriptionFound)?
.admin_chain_id;
self.committees.set(committees);
self.admin_chain_id.set(Some(admin_chain_id));
self.ownership.set(ownership);
self.balance.set(balance);
self.application_permissions.set(application_permissions);
Ok(false)
}
pub fn handle_query(
&mut self,
context: QueryContext,
_query: SystemQuery,
) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
let response = SystemResponse {
chain_id: context.chain_id,
balance: *self.balance.get(),
};
Ok(QueryOutcome {
response,
operations: vec![],
})
}
pub async fn open_chain(
&mut self,
config: OpenChainConfig,
parent: ChainId,
block_height: BlockHeight,
timestamp: Timestamp,
txn_tracker: &mut TransactionTracker,
) -> Result<ChainId, ExecutionError> {
let chain_index = txn_tracker.next_chain_index();
let chain_origin = ChainOrigin::Child {
parent,
block_height,
chain_index,
};
let committees = self.committees.get().await?;
let init_chain_config = config.init_chain_config(
*self.epoch.get(),
committees.keys().min().copied().unwrap_or(Epoch::ZERO),
committees.keys().max().copied().unwrap_or(Epoch::ZERO),
);
let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
let child_id = chain_description.id();
self.debit(&AccountOwner::CHAIN, config.balance).await?;
let blob = Blob::new_chain_description(&chain_description);
txn_tracker.add_created_blob(blob);
Ok(child_id)
}
pub fn close_chain(&mut self) -> Result<(), ExecutionError> {
self.closed.set(true);
Ok(())
}
pub async fn create_application(
&mut self,
chain_id: ChainId,
block_height: BlockHeight,
module_id: ModuleId,
parameters: Vec<u8>,
required_application_ids: Vec<ApplicationId>,
txn_tracker: &mut TransactionTracker,
) -> Result<CreateApplicationResult, ExecutionError> {
let application_index = txn_tracker.next_application_index();
let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
for blob_id in blob_ids {
self.blob_used(txn_tracker, blob_id).await?;
}
let application_description = ApplicationDescription {
module_id,
creator_chain_id: chain_id,
block_height,
application_index,
parameters,
required_application_ids,
};
self.check_required_applications(&application_description, txn_tracker)
.await?;
let blob = Blob::new_application_description(&application_description);
self.used_blobs.insert(&blob.id())?;
txn_tracker.add_created_blob(blob);
Ok(CreateApplicationResult {
app_id: ApplicationId::from(&application_description),
})
}
async fn check_required_applications(
&mut self,
application_description: &ApplicationDescription,
txn_tracker: &mut TransactionTracker,
) -> Result<(), ExecutionError> {
for required_id in &application_description.required_application_ids {
Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
}
Ok(())
}
pub async fn describe_application(
&mut self,
id: ApplicationId,
txn_tracker: &mut TransactionTracker,
) -> Result<ApplicationDescription, ExecutionError> {
let blob_id = id.description_blob_id();
let content = match txn_tracker.created_blobs().get(&blob_id) {
Some(content) => content.clone(),
None => self.read_blob_content(blob_id).await?,
};
self.blob_used(txn_tracker, blob_id).await?;
let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
let blob_ids = self
.check_bytecode_blobs(&description.module_id, txn_tracker)
.await?;
for blob_id in blob_ids {
self.blob_used(txn_tracker, blob_id).await?;
}
self.check_required_applications(&description, txn_tracker)
.await?;
Ok(description)
}
pub async fn find_dependencies(
&mut self,
mut stack: Vec<ApplicationId>,
txn_tracker: &mut TransactionTracker,
) -> Result<Vec<ApplicationId>, ExecutionError> {
let mut result = Vec::new();
let mut sorted = HashSet::new();
let mut seen = HashSet::new();
while let Some(id) = stack.pop() {
if sorted.contains(&id) {
continue;
}
if seen.contains(&id) {
sorted.insert(id);
result.push(id);
continue;
}
seen.insert(id);
stack.push(id);
let app = self.describe_application(id, txn_tracker).await?;
for child in app.required_application_ids.iter().rev() {
if !seen.contains(child) {
stack.push(*child);
}
}
}
Ok(result)
}
pub(crate) async fn blob_used(
&mut self,
txn_tracker: &mut TransactionTracker,
blob_id: BlobId,
) -> Result<bool, ExecutionError> {
if self.used_blobs.contains(&blob_id).await? {
return Ok(false); }
self.used_blobs.insert(&blob_id)?;
txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
Ok(true)
}
fn blob_published(
&mut self,
blob_id: &BlobId,
txn_tracker: &mut TransactionTracker,
) -> Result<(), ExecutionError> {
self.used_blobs.insert(blob_id)?;
txn_tracker.add_published_blob(*blob_id);
Ok(())
}
pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
match self.context().extra().get_blob(blob_id).await {
Ok(Some(blob)) => Ok(Arc::unwrap_or_clone(blob).into()),
Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
Err(error) => Err(error.into()),
}
}
pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
if self.context().extra().contains_blob(blob_id).await? {
Ok(())
} else {
Err(ExecutionError::BlobsNotFound(vec![blob_id]))
}
}
async fn check_bytecode_blobs(
&mut self,
module_id: &ModuleId,
txn_tracker: &TransactionTracker,
) -> Result<Vec<BlobId>, ExecutionError> {
let blob_ids = module_id.bytecode_blob_ids();
let mut missing_blobs = Vec::new();
for blob_id in &blob_ids {
if txn_tracker.created_blobs().contains_key(blob_id) {
continue; }
if !self.context().extra().contains_blob(*blob_id).await? {
missing_blobs.push(*blob_id);
}
}
ensure!(
missing_blobs.is_empty(),
ExecutionError::BlobsNotFound(missing_blobs)
);
Ok(blob_ids)
}
}