use crate::{
committee::{Committee, Epoch},
ApplicationRegistryView, Bytecode, BytecodeLocation, ChainOwnership, ChannelName,
ChannelSubscription, Destination, MessageContext, OperationContext, QueryContext,
RawExecutionResult, RawOutgoingMessage, UserApplicationDescription, UserApplicationId,
};
use async_graphql::Enum;
use custom_debug_derive::Debug;
use linera_base::{
crypto::{CryptoHash, PublicKey},
data_types::{Amount, ArithmeticError, RoundNumber, Timestamp},
ensure, hex_debug,
identifiers::{BytecodeId, ChainDescription, ChainId, MessageId, Owner},
};
use linera_views::{
common::Context,
map_view::MapView,
register_view::RegisterView,
set_view::SetView,
views::{HashableView, View, ViewError},
};
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
fmt::{self, Display, Formatter},
iter,
str::FromStr,
};
use thiserror::Error;
#[cfg(any(test, feature = "test"))]
use {crate::applications::ApplicationRegistry, std::collections::BTreeSet};
#[derive(Debug, HashableView)]
pub struct SystemExecutionStateView<C> {
pub description: RegisterView<C, Option<ChainDescription>>,
pub epoch: RegisterView<C, Option<Epoch>>,
pub admin_id: RegisterView<C, Option<ChainId>>,
pub subscriptions: SetView<C, ChannelSubscription>,
pub committees: RegisterView<C, BTreeMap<Epoch, Committee>>,
pub ownership: RegisterView<C, ChainOwnership>,
pub balance: RegisterView<C, Amount>,
pub balances: MapView<C, Owner, Amount>,
pub timestamp: RegisterView<C, Timestamp>,
pub registry: ApplicationRegistryView<C>,
}
#[cfg(any(test, feature = "test"))]
#[derive(Default, Debug, PartialEq, Eq, Clone)]
pub struct SystemExecutionState {
pub description: Option<ChainDescription>,
pub epoch: Option<Epoch>,
pub admin_id: Option<ChainId>,
pub subscriptions: BTreeSet<ChannelSubscription>,
pub committees: BTreeMap<Epoch, Committee>,
pub ownership: ChainOwnership,
pub balance: Amount,
pub balances: BTreeMap<Owner, Amount>,
pub timestamp: Timestamp,
pub registry: ApplicationRegistry,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum SystemOperation {
Transfer {
owner: Option<Owner>,
recipient: Recipient,
amount: Amount,
user_data: UserData,
},
Claim {
owner: Owner,
target: ChainId,
recipient: Recipient,
amount: Amount,
user_data: UserData,
},
OpenChain {
ownership: ChainOwnership,
admin_id: ChainId,
epoch: Epoch,
committees: BTreeMap<Epoch, Committee>,
},
CloseChain,
ChangeOwner { new_public_key: PublicKey },
ChangeMultipleOwners {
new_public_keys: Vec<(PublicKey, u64)>,
multi_leader_rounds: RoundNumber,
},
Subscribe {
chain_id: ChainId,
channel: SystemChannel,
},
Unsubscribe {
chain_id: ChainId,
channel: SystemChannel,
},
PublishBytecode {
contract: Bytecode,
service: Bytecode,
},
CreateApplication {
bytecode_id: BytecodeId,
#[serde(with = "serde_bytes")]
#[debug(with = "hex_debug")]
parameters: Vec<u8>,
#[serde(with = "serde_bytes")]
#[debug(with = "hex_debug")]
initialization_argument: Vec<u8>,
required_application_ids: Vec<UserApplicationId>,
},
RequestApplication {
chain_id: ChainId,
application_id: UserApplicationId,
},
Admin(AdminOperation),
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum AdminOperation {
CreateCommittee { epoch: Epoch, committee: Committee },
RemoveCommittee { epoch: Epoch },
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum SystemMessage {
Credit { account: Account, amount: Amount },
Withdraw {
account: Account,
amount: Amount,
recipient: Recipient,
user_data: UserData,
},
OpenChain {
ownership: ChainOwnership,
admin_id: ChainId,
epoch: Epoch,
committees: BTreeMap<Epoch, Committee>,
},
SetCommittees {
epoch: Epoch,
committees: BTreeMap<Epoch, Committee>,
},
Subscribe {
id: ChainId,
subscription: ChannelSubscription,
},
Unsubscribe {
id: ChainId,
subscription: ChannelSubscription,
},
BytecodePublished { operation_index: u32 },
ApplicationCreated,
BytecodeLocations {
locations: Vec<(BytecodeId, BytecodeLocation)>,
},
RegisterApplications {
applications: Vec<UserApplicationDescription>,
},
Notify { id: ChainId },
RequestApplication(UserApplicationId),
}
impl SystemMessage {
pub fn bytecode_locations(
&self,
certificate_hash: CryptoHash,
) -> Box<dyn Iterator<Item = BytecodeLocation> + '_> {
match self {
SystemMessage::BytecodePublished { operation_index } => {
Box::new(iter::once(BytecodeLocation {
certificate_hash,
operation_index: *operation_index,
}))
}
SystemMessage::BytecodeLocations {
locations: new_locations,
} => Box::new(new_locations.iter().map(|(_id, location)| *location)),
SystemMessage::RegisterApplications { applications } => {
Box::new(applications.iter().map(|app| app.bytecode_location))
}
SystemMessage::Credit { .. }
| SystemMessage::Withdraw { .. }
| SystemMessage::OpenChain { .. }
| SystemMessage::SetCommittees { .. }
| SystemMessage::Subscribe { .. }
| SystemMessage::Unsubscribe { .. }
| SystemMessage::ApplicationCreated { .. }
| SystemMessage::Notify { .. }
| SystemMessage::RequestApplication(_) => Box::new(iter::empty()),
}
}
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct SystemQuery;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct SystemResponse {
pub chain_id: ChainId,
pub balance: Amount,
}
#[derive(Enum, Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub enum SystemChannel {
Admin,
PublishedBytecodes,
}
impl SystemChannel {
pub fn name(&self) -> ChannelName {
bcs::to_bytes(self)
.expect("`SystemChannel` can be serialized")
.into()
}
}
impl Display for SystemChannel {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
let display_name = match self {
SystemChannel::Admin => "Admin",
SystemChannel::PublishedBytecodes => "PublishedBytecodes",
};
write!(formatter, "{display_name}")
}
}
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
pub enum Recipient {
Burn,
Account(Account),
}
impl Recipient {
pub fn chain(chain_id: ChainId) -> Recipient {
Recipient::Account(Account::chain(chain_id))
}
#[cfg(any(test, feature = "test"))]
pub fn root(index: u32) -> Recipient {
Recipient::chain(ChainId::root(index))
}
}
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
pub struct Account {
pub chain_id: ChainId,
pub owner: Option<Owner>,
}
impl Account {
pub fn chain(chain_id: ChainId) -> Self {
Account {
chain_id,
owner: None,
}
}
pub fn owner(chain_id: ChainId, owner: Owner) -> Self {
Account {
chain_id,
owner: Some(owner),
}
}
}
impl std::fmt::Display for Account {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.owner {
Some(owner) => write!(f, "{}:{}", self.chain_id, owner),
None => write!(f, "{}", self.chain_id),
}
}
}
impl FromStr for Account {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parts: Vec<&str> = s.split(':').collect();
anyhow::ensure!(
parts.len() <= 2,
"Expecting format `chain-id:address` or `chain-id`"
);
if parts.len() == 1 {
Ok(Account::chain(s.parse()?))
} else {
let chain_id = parts[0].parse()?;
let owner = parts[1].parse()?;
Ok(Account::owner(chain_id, owner))
}
}
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
pub struct UserData(pub Option<[u8; 32]>);
#[derive(Error, Debug)]
pub enum SystemExecutionError {
#[error(transparent)]
ArithmeticError(#[from] ArithmeticError),
#[error(transparent)]
ViewError(#[from] ViewError),
#[error("Invalid new chain id: {0}")]
InvalidNewChainId(ChainId),
#[error("Invalid admin id in new chain: {0}")]
InvalidNewChainAdminId(ChainId),
#[error("Invalid committees")]
InvalidCommittees,
#[error("{epoch:?} is not recognized by chain {chain_id:}")]
InvalidEpoch { chain_id: ChainId, epoch: Epoch },
#[error("Transfer must have positive amount")]
IncorrectTransferAmount,
#[error("Transfer from owned account must be authenticated by the right signer")]
UnauthenticatedTransferOwner,
#[error(
"The transferred amount must be not exceed the current chain balance: {current_balance}"
)]
InsufficientFunding { current_balance: Amount },
#[error("Claim must have positive amount")]
IncorrectClaimAmount,
#[error("Claim must be authenticated by the right signer")]
UnauthenticatedClaimOwner,
#[error("Admin operations are only allowed on the admin chain.")]
AdminOperationOnNonAdminChain,
#[error("Failed to create new committee")]
InvalidCommitteeCreation,
#[error("Failed to remove committee")]
InvalidCommitteeRemoval,
#[error(
"Attempted to subscribe to the admin channel ({1}) of this chain's ({0}) admin chain {1}"
)]
InvalidAdminSubscription(ChainId, SystemChannel),
#[error("Attempted to subscribe to self on channel {1} on chain {0}")]
SelfSubscription(ChainId, SystemChannel),
#[error("Attempted to subscribe to a channel which does not exist ({1}) on chain {0}")]
NoSuchChannel(ChainId, SystemChannel),
#[error("Invalid unsubscription request to channel {1} on chain {0}")]
InvalidUnsubscription(ChainId, SystemChannel),
#[error("Amount overflow")]
AmountOverflow,
#[error("Amount underflow")]
AmountUnderflow,
#[error("Chain balance overflow")]
BalanceOverflow,
#[error("Chain balance underflow")]
BalanceUnderflow,
#[error("Cannot set epoch to a lower value")]
CannotRewindEpoch,
#[error("Cannot decrease the chain's timestamp")]
TicksOutOfOrder,
#[error("Attempt to create an application using unregistered bytecode identifier {0:?}")]
UnknownBytecodeId(BytecodeId),
#[error("Application {0:?} is not registered by the chain")]
UnknownApplicationId(Box<UserApplicationId>),
}
impl<C> SystemExecutionStateView<C>
where
C: Context + Clone + Send + Sync + 'static,
ViewError: From<C::Error>,
{
pub fn is_active(&self) -> bool {
self.description.get().is_some()
&& self.ownership.get().is_active()
&& self.current_committee().is_some()
&& self.admin_id.get().is_some()
}
pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
let epoch = self.epoch.get().as_ref()?;
let committee = self.committees.get().get(epoch)?;
Some((*epoch, committee))
}
pub async fn execute_operation(
&mut self,
context: &OperationContext,
operation: &SystemOperation,
) -> Result<
(
RawExecutionResult<SystemMessage>,
Option<(UserApplicationId, Vec<u8>)>,
),
SystemExecutionError,
> {
use SystemOperation::*;
let mut result = RawExecutionResult::default();
let mut new_application = None;
match operation {
OpenChain {
ownership,
committees,
admin_id,
epoch,
} => {
let child_id = ChainId::child(context.next_message_id());
ensure!(
self.admin_id.get().as_ref() == Some(admin_id),
SystemExecutionError::InvalidNewChainAdminId(child_id)
);
ensure!(
self.committees.get() == committees,
SystemExecutionError::InvalidCommittees
);
ensure!(
self.epoch.get().as_ref() == Some(epoch),
SystemExecutionError::InvalidEpoch {
chain_id: child_id,
epoch: *epoch
}
);
let e1 = RawOutgoingMessage {
destination: Destination::Recipient(child_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::OpenChain {
ownership: ownership.clone(),
committees: committees.clone(),
admin_id: *admin_id,
epoch: *epoch,
},
};
let subscription = ChannelSubscription {
chain_id: *admin_id,
name: SystemChannel::Admin.name(),
};
let e2 = RawOutgoingMessage {
destination: Destination::Recipient(*admin_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::Subscribe {
id: child_id,
subscription,
},
};
result.messages.extend([e1, e2]);
}
ChangeOwner { new_public_key } => {
self.ownership.set(ChainOwnership::single(*new_public_key));
}
ChangeMultipleOwners {
new_public_keys,
multi_leader_rounds,
} => {
self.ownership.set(ChainOwnership::multiple(
new_public_keys.iter().map(|(key, weight)| (*key, *weight)),
*multi_leader_rounds,
));
}
CloseChain => {
self.ownership.set(ChainOwnership::default());
self.subscriptions
.for_each_index(|subscription| {
let message = RawOutgoingMessage {
destination: Destination::Recipient(subscription.chain_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::Unsubscribe {
id: context.chain_id,
subscription,
},
};
result.messages.push(message);
Ok(())
})
.await?;
self.subscriptions.clear();
}
Transfer {
owner,
amount,
recipient,
..
} => {
if owner.is_some() {
ensure!(
&context.authenticated_signer == owner,
SystemExecutionError::UnauthenticatedTransferOwner
);
}
ensure!(
*amount > Amount::ZERO,
SystemExecutionError::IncorrectTransferAmount
);
let balance = match &owner {
Some(owner) => self.balances.get_mut_or_default(owner).await?,
None => self.balance.get_mut(),
};
ensure!(
*balance >= *amount,
SystemExecutionError::InsufficientFunding {
current_balance: *balance
}
);
balance.try_sub_assign(*amount)?;
if let Recipient::Account(account) = recipient {
let message = RawOutgoingMessage {
destination: Destination::Recipient(account.chain_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::Credit {
amount: *amount,
account: *account,
},
};
result.messages.push(message);
}
}
Claim {
owner,
target,
recipient,
amount,
user_data,
} => {
ensure!(
context.authenticated_signer.as_ref() == Some(owner),
SystemExecutionError::UnauthenticatedClaimOwner
);
ensure!(
*amount > Amount::ZERO,
SystemExecutionError::IncorrectClaimAmount
);
let message = RawOutgoingMessage {
destination: Destination::Recipient(*target),
authenticated: true,
is_skippable: false,
message: SystemMessage::Withdraw {
amount: *amount,
account: Account {
chain_id: *target,
owner: Some(*owner),
},
user_data: user_data.clone(),
recipient: *recipient,
},
};
result.messages.push(message);
}
Admin(admin_operation) => {
ensure!(
*self.admin_id.get() == Some(context.chain_id),
SystemExecutionError::AdminOperationOnNonAdminChain
);
match admin_operation {
AdminOperation::CreateCommittee { epoch, committee } => {
ensure!(
*epoch == self.epoch.get().expect("chain is active").try_add_one()?,
SystemExecutionError::InvalidCommitteeCreation
);
self.committees.get_mut().insert(*epoch, committee.clone());
self.epoch.set(Some(*epoch));
let message = RawOutgoingMessage {
destination: Destination::Subscribers(SystemChannel::Admin.name()),
authenticated: false,
is_skippable: false,
message: SystemMessage::SetCommittees {
epoch: *epoch,
committees: self.committees.get().clone(),
},
};
result.messages.push(message);
}
AdminOperation::RemoveCommittee { epoch } => {
ensure!(
self.committees.get_mut().remove(epoch).is_some(),
SystemExecutionError::InvalidCommitteeRemoval
);
let message = RawOutgoingMessage {
destination: Destination::Subscribers(SystemChannel::Admin.name()),
authenticated: false,
is_skippable: false,
message: SystemMessage::SetCommittees {
epoch: self.epoch.get().expect("chain is active"),
committees: self.committees.get().clone(),
},
};
result.messages.push(message);
}
}
}
Subscribe { chain_id, channel } => {
ensure!(
context.chain_id != *chain_id,
SystemExecutionError::SelfSubscription(context.chain_id, *channel)
);
if *channel == SystemChannel::Admin {
ensure!(
self.admin_id.get().as_ref() == Some(chain_id),
SystemExecutionError::InvalidAdminSubscription(context.chain_id, *channel)
);
}
let subscription = ChannelSubscription {
chain_id: *chain_id,
name: channel.name(),
};
ensure!(
!self.subscriptions.contains(&subscription).await?,
SystemExecutionError::NoSuchChannel(context.chain_id, *channel)
);
self.subscriptions.insert(&subscription)?;
let message = RawOutgoingMessage {
destination: Destination::Recipient(*chain_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::Subscribe {
id: context.chain_id,
subscription,
},
};
result.messages.push(message);
}
Unsubscribe { chain_id, channel } => {
let subscription = ChannelSubscription {
chain_id: *chain_id,
name: channel.name(),
};
ensure!(
self.subscriptions.contains(&subscription).await?,
SystemExecutionError::InvalidUnsubscription(context.chain_id, *channel)
);
self.subscriptions.remove(&subscription)?;
let message = RawOutgoingMessage {
destination: Destination::Recipient(*chain_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::Unsubscribe {
id: context.chain_id,
subscription,
},
};
result.messages.push(message);
}
PublishBytecode { .. } => {
let message = RawOutgoingMessage {
destination: Destination::Recipient(context.chain_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::BytecodePublished {
operation_index: context.index,
},
};
result.messages.push(message);
}
CreateApplication {
bytecode_id,
parameters,
initialization_argument,
required_application_ids,
} => {
let id = UserApplicationId {
bytecode_id: *bytecode_id,
creation: context.next_message_id(),
};
self.registry
.register_new_application(
id,
parameters.clone(),
required_application_ids.clone(),
)
.await?;
let message = RawOutgoingMessage {
destination: Destination::Recipient(context.chain_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::ApplicationCreated,
};
result.messages.push(message);
new_application = Some((id, initialization_argument.clone()));
}
RequestApplication {
chain_id,
application_id,
} => {
let message = RawOutgoingMessage {
destination: Destination::Recipient(*chain_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::RequestApplication(*application_id),
};
result.messages.push(message);
}
}
Ok((result, new_application))
}
pub async fn execute_message(
&mut self,
context: &MessageContext,
message: &SystemMessage,
) -> Result<RawExecutionResult<SystemMessage>, SystemExecutionError> {
let mut result = RawExecutionResult::default();
use SystemMessage::*;
match message {
Credit { amount, account } if context.chain_id == account.chain_id => {
match &account.owner {
None => {
let new_balance = self.balance.get().saturating_add(*amount);
self.balance.set(new_balance);
}
Some(owner) => {
let balance = self.balances.get_mut_or_default(owner).await?;
*balance = balance.saturating_add(*amount);
}
}
}
Withdraw {
amount,
account:
Account {
owner: Some(owner),
chain_id,
},
user_data: _,
recipient,
} if chain_id == &context.chain_id
&& context.authenticated_signer.as_ref() == Some(owner) =>
{
let balance = self.balances.get_mut_or_default(owner).await?;
if balance.try_sub_assign(*amount).is_ok() {
if let Recipient::Account(account) = recipient {
let message = RawOutgoingMessage {
destination: Destination::Recipient(account.chain_id),
authenticated: false,
is_skippable: false,
message: SystemMessage::Credit {
amount: *amount,
account: *account,
},
};
result.messages.push(message);
}
} else {
tracing::info!("Withdrawal request was skipped due to lack of funds.");
}
}
SetCommittees { epoch, committees } => {
ensure!(
*epoch >= self.epoch.get().expect("chain is active"),
SystemExecutionError::CannotRewindEpoch
);
self.epoch.set(Some(*epoch));
self.committees.set(committees.clone());
}
Subscribe { id, subscription } if subscription.chain_id == context.chain_id => {
let message = RawOutgoingMessage {
destination: Destination::Recipient(*id),
authenticated: false,
is_skippable: false,
message: SystemMessage::Notify { id: *id },
};
result.messages.push(message);
result.subscribe.push((subscription.name.clone(), *id));
}
Unsubscribe { id, subscription } if subscription.chain_id == context.chain_id => {
let message = RawOutgoingMessage {
destination: Destination::Recipient(*id),
authenticated: false,
is_skippable: false,
message: SystemMessage::Notify { id: *id },
};
result.messages.push(message);
result.unsubscribe.push((subscription.name.clone(), *id));
}
BytecodePublished { operation_index } => {
let bytecode_id = BytecodeId::new(context.message_id);
let bytecode_location = BytecodeLocation {
certificate_hash: context.certificate_hash,
operation_index: *operation_index,
};
self.registry
.register_published_bytecode(bytecode_id, bytecode_location)?;
let locations = self.registry.bytecode_locations().await?;
let message = RawOutgoingMessage {
destination: Destination::Subscribers(SystemChannel::PublishedBytecodes.name()),
authenticated: false,
is_skippable: false,
message: SystemMessage::BytecodeLocations { locations },
};
result.messages.push(message);
}
BytecodeLocations { locations } => {
for (id, location) in locations {
self.registry.register_published_bytecode(*id, *location)?;
}
}
ApplicationCreated { .. } | Notify { .. } => (),
OpenChain { .. } => {
}
RegisterApplications { applications } => {
for application in applications {
self.registry
.register_application(application.clone())
.await?;
}
}
RequestApplication(application_id) => {
match self
.registry
.describe_applications_with_dependencies(
vec![*application_id],
&Default::default(),
)
.await
{
Err(SystemExecutionError::UnknownApplicationId(id)) => {
tracing::info!(
%id,
"Application request was skipped: application not known."
);
}
Err(err) => return Err(err),
Ok(applications) => {
let message = RawOutgoingMessage {
destination: Destination::Recipient(context.message_id.chain_id),
authenticated: false,
is_skippable: true,
message: SystemMessage::RegisterApplications { applications },
};
result.messages.push(message);
}
}
}
_ => {
tracing::error!(
"Skipping unexpected received message: {message:?} with context: {context:?}"
);
}
}
Ok(result)
}
#[allow(clippy::too_many_arguments)]
pub fn open_chain(
&mut self,
message_id: MessageId,
ownership: ChainOwnership,
epoch: Epoch,
committees: BTreeMap<Epoch, Committee>,
admin_id: ChainId,
timestamp: Timestamp,
) {
assert!(self.description.get().is_none());
assert!(!self.ownership.get().is_active());
assert!(self.committees.get().is_empty());
let description = ChainDescription::Child(message_id);
self.description.set(Some(description));
self.epoch.set(Some(epoch));
self.committees.set(committees);
self.admin_id.set(Some(admin_id));
self.subscriptions
.insert(&ChannelSubscription {
chain_id: admin_id,
name: SystemChannel::Admin.name(),
})
.expect("serialization failed");
self.ownership.set(ownership);
self.timestamp.set(timestamp);
}
pub async fn query_application(
&mut self,
context: &QueryContext,
_query: &SystemQuery,
) -> Result<SystemResponse, SystemExecutionError> {
let response = SystemResponse {
chain_id: context.chain_id,
balance: *self.balance.get(),
};
Ok(response)
}
}