use crate::{
data_types::{
Block, ChainAndHeight, ChannelFullName, Event, IncomingMessage, Medium, Origin,
OutgoingMessage, Target,
},
inbox::{InboxError, InboxStateView},
outbox::OutboxStateView,
ChainError, ChainManager,
};
use async_graphql::SimpleObject;
use futures::stream::{self, StreamExt, TryStreamExt};
use linera_base::{
crypto::CryptoHash,
data_types::{Amount, ArithmeticError, BlockHeight, Timestamp},
ensure,
identifiers::{ChainId, Destination, MessageId},
};
use linera_execution::{
system::{Account, SystemMessage},
ApplicationId, ExecutionResult, ExecutionRuntimeContext, ExecutionStateView, Message,
MessageContext, OperationContext, Query, QueryContext, RawExecutionResult, Response,
UserApplicationDescription, UserApplicationId,
};
use linera_views::{
common::Context,
log_view::LogView,
reentrant_collection_view::ReentrantCollectionView,
register_view::RegisterView,
set_view::SetView,
views::{CryptoHashView, GraphQLView, RootView, View, ViewError},
};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashSet};
#[derive(Debug, RootView, GraphQLView)]
pub struct ChainStateView<C> {
pub execution_state: ExecutionStateView<C>,
pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
pub tip_state: RegisterView<C, ChainTipState>,
pub manager: RegisterView<C, ChainManager>,
pub confirmed_log: LogView<C, CryptoHash>,
pub received_log: LogView<C, ChainAndHeight>,
pub inboxes: ReentrantCollectionView<C, Origin, InboxStateView<C>>,
pub outboxes: ReentrantCollectionView<C, Target, OutboxStateView<C>>,
pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
pub channels: ReentrantCollectionView<C, ChannelFullName, ChannelStateView<C>>,
}
#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, SimpleObject)]
pub struct ChainTipState {
pub block_hash: Option<CryptoHash>,
pub next_block_height: BlockHeight,
}
impl ChainTipState {
pub fn verify_block_chaining(&self, new_block: &Block) -> Result<(), ChainError> {
ensure!(
new_block.height == self.next_block_height,
ChainError::UnexpectedBlockHeight {
expected_block_height: self.next_block_height,
found_block_height: new_block.height
}
);
ensure!(
new_block.previous_block_hash == self.block_hash,
ChainError::UnexpectedPreviousBlockHash
);
Ok(())
}
pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
ensure!(
self.next_block_height >= height,
ChainError::MissingEarlierBlocks {
current_block_height: self.next_block_height,
}
);
Ok(self.next_block_height > height)
}
}
#[derive(Debug, View, GraphQLView)]
pub struct ChannelStateView<C> {
pub subscribers: SetView<C, ChainId>,
pub block_height: RegisterView<C, Option<BlockHeight>>,
}
impl<C> ChainStateView<C>
where
C: Context + Clone + Send + Sync + 'static,
ViewError: From<C::Error>,
C::Extra: ExecutionRuntimeContext,
{
pub fn chain_id(&self) -> ChainId {
self.context().extra().chain_id()
}
pub async fn query_application(&mut self, query: &Query) -> Result<Response, ChainError> {
let context = QueryContext {
chain_id: self.chain_id(),
};
let response = self
.execution_state
.query_application(&context, query)
.await?;
Ok(response)
}
pub async fn describe_application(
&mut self,
application_id: UserApplicationId,
) -> Result<UserApplicationDescription, ChainError> {
self.execution_state
.system
.registry
.describe_application(application_id)
.await
.map_err(|err| ChainError::ExecutionError(err.into()))
}
pub async fn mark_messages_as_received(
&mut self,
target: Target,
height: BlockHeight,
) -> Result<bool, ChainError> {
let mut outbox = self.outboxes.try_load_entry_mut(&target).await?;
let updates = outbox.mark_messages_as_received(height).await?;
if updates.is_empty() {
return Ok(false);
}
for update in updates {
let counter = self
.outbox_counters
.get_mut()
.get_mut(&update)
.expect("message counter should be present");
*counter = counter
.checked_sub(1)
.expect("message counter should not underflow");
if *counter == 0 {
self.outbox_counters.get_mut().remove(&update);
}
}
if outbox.queue.count() == 0 {
self.outboxes.remove_entry(&target)?;
}
Ok(true)
}
pub fn all_messages_delivered_up_to(&mut self, height: BlockHeight) -> bool {
tracing::debug!(
"Messages left in {:?}'s outbox: {:?}",
self.chain_id(),
self.outbox_counters.get()
);
if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
key > &height
} else {
true
}
}
pub fn is_active(&self) -> bool {
self.execution_state.system.is_active()
}
pub fn ensure_is_active(&self) -> Result<(), ChainError> {
if self.is_active() {
Ok(())
} else {
Err(ChainError::InactiveChain(self.chain_id()))
}
}
pub async fn validate_incoming_messages(&mut self) -> Result<(), ChainError> {
let chain_id = self.chain_id();
let origins = self.inboxes.indices().await?;
let inboxes = self.inboxes.try_load_entries(&origins).await?;
let stream = origins.into_iter().zip(inboxes);
let stream = stream::iter(stream)
.map(|(origin, inbox)| async move {
if let Some(event) = inbox.removed_events.front().await? {
return Err(ChainError::MissingCrossChainUpdate {
chain_id,
origin: origin.into(),
height: event.height,
});
}
Ok::<(), ChainError>(())
})
.buffer_unordered(C::MAX_CONNECTIONS);
stream.try_collect::<Vec<_>>().await?;
Ok(())
}
pub async fn next_block_height_to_receive(
&mut self,
origin: &Origin,
) -> Result<BlockHeight, ChainError> {
let inbox = self.inboxes.try_load_entry(origin).await?;
inbox.next_block_height_to_receive()
}
pub async fn last_anticipated_block_height(
&mut self,
origin: &Origin,
) -> Result<Option<BlockHeight>, ChainError> {
let inbox = self.inboxes.try_load_entry(origin).await?;
match inbox.removed_events.back().await? {
Some(event) => Ok(Some(event.height)),
None => Ok(None),
}
}
pub async fn receive_block(
&mut self,
origin: &Origin,
height: BlockHeight,
timestamp: Timestamp,
messages: Vec<OutgoingMessage>,
certificate_hash: CryptoHash,
) -> Result<(), ChainError> {
let chain_id = self.chain_id();
ensure!(
height >= self.next_block_height_to_receive(origin).await?,
ChainError::InternalError("Trying to receive blocks in the wrong order".to_string())
);
tracing::trace!(
"Processing new messages to {:?} from {:?} at height {}",
chain_id,
origin,
height
);
let mut events = Vec::new();
for (index, outgoing_message) in messages.into_iter().enumerate() {
let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
let OutgoingMessage {
destination,
authenticated_signer,
message,
} = outgoing_message;
match destination {
Destination::Recipient(id) => {
if origin.medium != Medium::Direct || id != chain_id {
continue;
}
}
Destination::Subscribers(name) => {
let expected_medium = Medium::Channel(ChannelFullName {
application_id: message.application_id(),
name,
});
if origin.medium != expected_medium {
continue;
}
}
}
if let Message::System(_) = message {
let message_id = MessageId {
chain_id: origin.sender,
height,
index,
};
self.execute_immediate_message(message_id, &message, timestamp)
.await?;
}
events.push(Event {
certificate_hash,
height,
index,
authenticated_signer,
timestamp,
message,
});
}
ensure!(
!events.is_empty(),
ChainError::InternalError(format!(
"The block received by {:?} from {:?} at height {:?} was entirely ignored. \
This should not happen",
chain_id, origin, height
))
);
let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
for event in events {
inbox.add_event(event).await.map_err(|error| match error {
InboxError::ViewError(error) => ChainError::ViewError(error),
error => ChainError::InternalError(format!(
"while processing messages in certified block: {error}"
)),
})?;
}
self.received_log.push(ChainAndHeight {
chain_id: origin.sender,
height,
});
Ok(())
}
async fn execute_immediate_message(
&mut self,
message_id: MessageId,
message: &Message,
timestamp: Timestamp,
) -> Result<(), ChainError> {
if let Message::System(SystemMessage::OpenChain {
public_key,
epoch,
committees,
admin_id,
}) = message
{
self.execution_state.system.open_chain(
message_id,
*public_key,
*epoch,
committees.clone(),
*admin_id,
timestamp,
);
let hash = self.execution_state.crypto_hash().await?;
self.execution_state_hash.set(Some(hash));
self.manager
.get_mut()
.reset(self.execution_state.system.ownership.get());
}
Ok(())
}
pub async fn remove_events_from_inboxes(&mut self, block: &Block) -> Result<(), ChainError> {
let chain_id = self.chain_id();
let mut events_by_origin: BTreeMap<_, Vec<&Event>> = Default::default();
for IncomingMessage { event, origin, .. } in &block.incoming_messages {
ensure!(
event.timestamp <= block.timestamp,
ChainError::IncorrectEventTimestamp {
chain_id,
message_timestamp: event.timestamp,
block_timestamp: block.timestamp,
}
);
let events = events_by_origin.entry(origin).or_default();
events.push(event);
}
let origins = events_by_origin.keys().copied();
let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
for ((origin, events), mut inbox) in events_by_origin.into_iter().zip(inboxes) {
tracing::trace!("Updating inbox {:?} in chain {:?}", origin, chain_id);
for event in events {
inbox
.remove_event(event)
.await
.map_err(|error| ChainError::from((chain_id, origin.clone(), error)))?;
}
}
Ok(())
}
pub async fn execute_block(
&mut self,
block: &Block,
) -> Result<(Vec<OutgoingMessage>, CryptoHash), ChainError> {
assert_eq!(block.chain_id, self.chain_id());
let chain_id = self.chain_id();
ensure!(
*self.execution_state.system.timestamp.get() <= block.timestamp,
ChainError::InvalidBlockTimestamp
);
self.execution_state.system.timestamp.set(block.timestamp);
let Some((_, committee)) = self.execution_state.system.current_committee() else {
return Err(ChainError::InactiveChain(chain_id));
};
let pricing = committee.pricing().clone();
let credit: Amount = block
.incoming_messages
.iter()
.filter_map(|msg| match &msg.event.message {
Message::System(SystemMessage::Credit { account, amount })
if *account == Account::chain(chain_id) =>
{
Some(amount)
}
_ => None,
})
.sum();
let balance = self.execution_state.system.balance.get_mut();
balance.try_add_assign(credit)?;
Self::sub_assign_fees(balance, pricing.certificate_price())?;
Self::sub_assign_fees(balance, pricing.storage_price(&block.incoming_messages)?)?;
Self::sub_assign_fees(balance, pricing.storage_price(&block.operations)?)?;
let mut messages = Vec::new();
let available_fuel = pricing.remaining_fuel(*balance);
let mut remaining_fuel = available_fuel;
for message in &block.incoming_messages {
let context = MessageContext {
chain_id,
height: block.height,
certificate_hash: message.event.certificate_hash,
message_id: MessageId {
chain_id: message.origin.sender,
height: message.event.height,
index: message.event.index,
},
authenticated_signer: message.event.authenticated_signer,
};
let results = self
.execution_state
.execute_message(&context, &message.event.message, &mut remaining_fuel)
.await?;
self.process_execution_results(&mut messages, context.height, results)
.await?;
}
for (index, operation) in block.operations.iter().enumerate() {
let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
let next_message_index =
u32::try_from(messages.len()).map_err(|_| ArithmeticError::Overflow)?;
let context = OperationContext {
chain_id,
height: block.height,
index,
authenticated_signer: block.authenticated_signer,
next_message_index,
};
let results = self
.execution_state
.execute_operation(&context, operation, &mut remaining_fuel)
.await?;
self.process_execution_results(&mut messages, context.height, results)
.await?;
}
let used_fuel = available_fuel.saturating_sub(remaining_fuel);
let balance = self.execution_state.system.balance.get_mut();
Self::sub_assign_fees(balance, credit)?;
Self::sub_assign_fees(balance, pricing.fuel_price(used_fuel)?)?;
Self::sub_assign_fees(balance, pricing.messages_price(&messages)?)?;
let state_hash = self.execution_state.crypto_hash().await?;
self.execution_state_hash.set(Some(state_hash));
self.manager
.get_mut()
.reset(self.execution_state.system.ownership.get());
Ok((messages, state_hash))
}
async fn process_execution_results(
&mut self,
messages: &mut Vec<OutgoingMessage>,
height: BlockHeight,
results: Vec<ExecutionResult>,
) -> Result<(), ChainError> {
for result in results {
match result {
ExecutionResult::System(result) => {
self.process_raw_execution_result(
ApplicationId::System,
Message::System,
messages,
height,
result,
)
.await?;
}
ExecutionResult::User(application_id, result) => {
self.process_raw_execution_result(
ApplicationId::User(application_id),
|bytes| Message::User {
application_id,
bytes,
},
messages,
height,
result,
)
.await?;
}
}
}
Ok(())
}
async fn process_raw_execution_result<E, F>(
&mut self,
application_id: ApplicationId,
lift: F,
messages: &mut Vec<OutgoingMessage>,
height: BlockHeight,
raw_result: RawExecutionResult<E>,
) -> Result<(), ChainError>
where
F: Fn(E) -> Message,
{
let mut recipients = HashSet::new();
let mut channel_broadcasts = HashSet::new();
for (destination, authenticated, message) in raw_result.messages {
match &destination {
Destination::Recipient(id) => {
recipients.insert(*id);
}
Destination::Subscribers(name) => {
channel_broadcasts.insert(name.clone());
}
}
let authenticated_signer = if authenticated {
raw_result.authenticated_signer
} else {
None
};
messages.push(OutgoingMessage {
destination,
authenticated_signer,
message: lift(message),
});
}
let outbox_counters = self.outbox_counters.get_mut();
let targets = recipients
.into_iter()
.map(Target::chain)
.collect::<Vec<_>>();
let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
for mut outbox in outboxes {
if outbox.schedule_message(height)? {
*outbox_counters.entry(height).or_default() += 1;
}
}
let full_names = raw_result
.unsubscribe
.clone()
.into_iter()
.map(|(name, _id)| ChannelFullName {
application_id,
name,
})
.collect::<Vec<_>>();
let channels = self.channels.try_load_entries_mut(&full_names).await?;
for ((_name, id), mut channel) in raw_result.unsubscribe.into_iter().zip(channels) {
channel.subscribers.remove(&id)?;
}
let full_names = channel_broadcasts
.into_iter()
.map(|name| ChannelFullName {
application_id,
name,
})
.collect::<Vec<_>>();
let channels = self.channels.try_load_entries_mut(&full_names).await?;
let stream = full_names.into_iter().zip(channels);
let stream = stream::iter(stream)
.map(|(full_name, mut channel)| async move {
let recipients = channel.subscribers.indices().await?;
channel.block_height.set(Some(height));
let targets = recipients
.into_iter()
.map(|recipient| Target::channel(recipient, full_name.clone()))
.collect::<Vec<_>>();
Ok::<_, ChainError>(targets)
})
.buffer_unordered(C::MAX_CONNECTIONS);
let infos = stream.try_collect::<Vec<_>>().await?;
let targets = infos.into_iter().flatten().collect::<Vec<_>>();
let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
for mut outbox in outboxes {
if outbox.schedule_message(height)? {
*outbox_counters.entry(height).or_default() += 1;
}
}
let full_names = raw_result
.subscribe
.clone()
.into_iter()
.map(|(name, _id)| ChannelFullName {
application_id,
name,
})
.collect::<Vec<_>>();
let channels = self.channels.try_load_entries_mut(&full_names).await?;
let stream = raw_result.subscribe.into_iter().zip(channels);
let stream = stream::iter(stream)
.map(|((name, id), mut channel)| async move {
let mut result = None;
let full_name = ChannelFullName {
application_id,
name,
};
if !channel.subscribers.contains(&id).await? {
if let Some(latest_height) = channel.block_height.get() {
let target = Target::channel(id, full_name.clone());
result = Some((target, *latest_height));
}
channel.subscribers.insert(&id)?;
}
Ok::<_, ChainError>(result)
})
.buffer_unordered(C::MAX_CONNECTIONS);
let infos = stream.try_collect::<Vec<_>>().await?;
let (targets, heights): (Vec<_>, Vec<_>) = infos.into_iter().flatten().unzip();
let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
for (height, mut outbox) in heights.into_iter().zip(outboxes) {
if outbox.schedule_message(height)? {
*outbox_counters.entry(height).or_default() += 1;
}
}
Ok(())
}
fn sub_assign_fees(balance: &mut Amount, fees: Amount) -> Result<(), ChainError> {
balance
.try_sub_assign(fees)
.map_err(|_| ChainError::InsufficientBalance)
}
}