use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet, HashMap},
sync::{self, Arc},
};
use futures::future::Either;
#[cfg(with_metrics)]
use linera_base::prometheus_util::MeasureLatency as _;
use linera_base::{
crypto::{CryptoHash, ValidatorPublicKey},
data_types::{
ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp,
},
ensure,
hashed::Hashed,
identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, StreamId},
};
use linera_cache::{UniqueValueCache, ValueCache};
use linera_chain::{
data_types::{
BlockProposal, BundleExecutionPolicy, IncomingBundle, MessageAction, MessageBundle,
OriginalProposal, ProposalContent, ProposedBlock,
},
manager::{self, ManagerSafetySnapshot},
types::{
Block, ConfirmedBlock, ConfirmedBlockCertificate, TimeoutCertificate,
ValidatedBlockCertificate,
},
ChainError, ChainExecutionContext, ChainStateView, ExecutionResultExt as _,
};
use linera_execution::{
system::EventSubscriptions, ExecutionRuntimeContext as _, ExecutionStateView, Query,
QueryContext, QueryOutcome, ResourceTracker, ServiceRuntimeEndpoint,
};
use linera_storage::{Clock as _, Storage};
use linera_views::{
context::{Context, InactiveContext},
views::{ReplaceContext as _, RootView as _, View as _},
};
use tokio::sync::oneshot;
use tracing::{debug, instrument, trace, warn};
use crate::{
chain_worker::{handle::AtomicTimestamp, ChainWorkerConfig, DeliveryNotifier},
client::ListeningMode,
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
worker::{NetworkActions, Notification, Reason, WorkerError},
};
pub(crate) type EventSubscriptionsResult = Vec<((ChainId, StreamId), EventSubscriptions)>;
#[cfg(with_metrics)]
mod metrics {
use std::sync::LazyLock;
use linera_base::prometheus_util::{
exponential_bucket_interval, exponential_bucket_latencies, register_histogram,
register_histogram_vec,
};
use prometheus::{Histogram, HistogramVec};
pub static CREATE_NETWORK_ACTIONS_LATENCY: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram(
"create_network_actions_latency",
"Time (ms) to create network actions",
exponential_bucket_latencies(10_000.0),
)
});
pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"num_inboxes",
"Number of inboxes",
&[],
exponential_bucket_interval(1.0, 10_000.0),
)
});
}
pub(crate) struct ChainWorkerState<StorageClient>
where
StorageClient: Storage,
{
config: ChainWorkerConfig,
storage: StorageClient,
chain: ChainStateView<StorageClient::Context>,
service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
service_runtime_task: Option<web_thread_pool::Task<()>>,
last_access: Arc<AtomicTimestamp>,
block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
execution_state_cache:
Option<Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>>,
chain_modes: Option<Arc<sync::RwLock<BTreeMap<ChainId, ListeningMode>>>>,
delivery_notifier: DeliveryNotifier,
knows_chain_is_active: bool,
poisoned: bool,
}
pub(crate) enum CrossChainUpdateResult {
Updated(BlockHeight),
NothingToDo,
GapDetected {
origin: ChainId,
retransmit_from: BlockHeight,
},
}
pub enum BlockOutcome {
Processed,
Preprocessed,
Skipped,
}
impl<StorageClient> ChainWorkerState<StorageClient>
where
StorageClient: Storage + Clone + 'static,
{
#[instrument(skip_all, fields(
chain_id = %chain_id
))]
#[expect(clippy::too_many_arguments)]
pub(crate) async fn load(
config: ChainWorkerConfig,
storage: StorageClient,
block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
execution_state_cache: Option<
Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
>,
chain_modes: Option<Arc<sync::RwLock<BTreeMap<ChainId, ListeningMode>>>>,
delivery_notifier: DeliveryNotifier,
chain_id: ChainId,
service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
service_runtime_task: Option<web_thread_pool::Task<()>>,
) -> Result<Self, WorkerError> {
let chain = storage.load_chain(chain_id).await?;
Ok(ChainWorkerState {
config,
storage,
chain,
service_runtime_endpoint,
service_runtime_task,
last_access: Arc::new(AtomicTimestamp::now()),
block_values,
execution_state_cache,
chain_modes,
delivery_notifier,
knows_chain_is_active: false,
poisoned: false,
})
}
fn chain_id(&self) -> ChainId {
self.chain.chain_id()
}
pub(crate) fn chain(&self) -> &ChainStateView<StorageClient::Context> {
&self.chain
}
pub(crate) fn knows_chain_is_active(&self) -> bool {
self.knows_chain_is_active
}
pub(crate) fn rollback(&mut self) {
self.chain.rollback();
}
pub(crate) fn check_not_poisoned(&self) -> Result<(), WorkerError> {
ensure!(!self.poisoned, WorkerError::PoisonedWorker);
Ok(())
}
pub(crate) fn touch(&self) {
self.last_access.store_now();
}
pub(crate) fn last_access_arc(&self) -> Arc<AtomicTimestamp> {
Arc::clone(&self.last_access)
}
pub(crate) fn clear_service_runtime(&mut self) -> Option<web_thread_pool::Task<()>> {
self.service_runtime_endpoint.take();
self.service_runtime_task.take()
}
#[instrument(skip_all, fields(chain_id = %self.chain_id()))]
pub(crate) async fn cross_chain_network_actions(&self) -> Result<NetworkActions, WorkerError> {
self.create_network_actions(None).await
}
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn handle_chain_info_query(
&mut self,
query: ChainInfoQuery,
) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
let create_network_actions = query.create_network_actions;
if let Some((height, round)) = query.request_leader_timeout {
self.vote_for_leader_timeout(height, round).await?;
}
if query.request_fallback {
self.vote_for_fallback().await?;
}
let response = self.prepare_chain_info_response(query).await?;
let actions = if create_network_actions {
self.create_network_actions(None).await?
} else {
NetworkActions::default()
};
Ok((response, actions))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
blob_id = %blob_id
))]
pub(crate) async fn download_pending_blob(&self, blob_id: BlobId) -> Result<Blob, WorkerError> {
if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? {
return Ok(blob);
}
let blob = self.storage.read_blob(blob_id).await?;
blob.map(Arc::unwrap_or_clone)
.ok_or(WorkerError::BlobsNotFound(vec![blob_id]))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id()
))]
async fn get_required_blobs(
&self,
required_blob_ids: impl IntoIterator<Item = BlobId>,
created_blobs: BTreeMap<BlobId, Blob>,
) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
let maybe_blobs = self
.maybe_get_required_blobs(required_blob_ids, Some(created_blobs))
.await?;
let not_found_blob_ids = missing_blob_ids(&maybe_blobs);
ensure!(
not_found_blob_ids.is_empty(),
WorkerError::BlobsNotFound(not_found_blob_ids)
);
Ok(maybe_blobs
.into_iter()
.filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
.collect())
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id()
))]
async fn maybe_get_required_blobs(
&self,
blob_ids: impl IntoIterator<Item = BlobId>,
created_blobs: Option<BTreeMap<BlobId, Blob>>,
) -> Result<BTreeMap<BlobId, Option<Blob>>, WorkerError> {
let maybe_blobs = blob_ids.into_iter().collect::<BTreeSet<_>>();
let mut maybe_blobs = maybe_blobs
.into_iter()
.map(|x| (x, None))
.collect::<Vec<(BlobId, Option<Blob>)>>();
if let Some(mut blob_map) = created_blobs {
for (blob_id, value) in &mut maybe_blobs {
if let Some(blob) = blob_map.remove(blob_id) {
*value = Some(blob);
}
}
}
let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
let second_block_blobs = self.chain.manager.pending_blobs(&missing_blob_ids).await?;
for (index, blob) in missing_indices.into_iter().zip(second_block_blobs) {
maybe_blobs[index].1 = blob;
}
let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
let third_block_blobs = self
.chain
.pending_validated_blobs
.multi_get(&missing_blob_ids)
.await?;
for (index, blob) in missing_indices.into_iter().zip(third_block_blobs) {
maybe_blobs[index].1 = blob;
}
let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
if !missing_indices.is_empty() {
let all_entries_pending_blobs = self
.chain
.pending_proposed_blobs
.try_load_all_entries()
.await?;
for (index, blob_id) in missing_indices.into_iter().zip(missing_blob_ids) {
for (_, pending_blobs) in &all_entries_pending_blobs {
if let Some(blob) = pending_blobs.get(&blob_id).await? {
maybe_blobs[index].1 = Some(blob);
break;
}
}
}
}
let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
let fourth_block_blobs = self.storage.read_blobs(&missing_blob_ids).await?;
for (index, blob) in missing_indices.into_iter().zip(fourth_block_blobs) {
maybe_blobs[index].1 = blob.map(Arc::unwrap_or_clone);
}
Ok(maybe_blobs.into_iter().collect())
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id()
))]
async fn create_cross_chain_actions_for_recipient(
&self,
recipient: ChainId,
) -> Result<NetworkActions, WorkerError> {
let outbox = self.chain.outboxes.try_load_entry(&recipient).await?;
let Some(outbox) = outbox else {
return Ok(NetworkActions::default());
};
let heights = outbox.queue.elements().await?;
if heights.is_empty() {
return Ok(NetworkActions::default());
}
let heights_by_recipient = BTreeMap::from([(recipient, heights)]);
let cross_chain_requests = self
.create_cross_chain_requests(heights_by_recipient)
.await?;
Ok(NetworkActions {
cross_chain_requests,
notifications: Vec::new(),
})
}
async fn create_network_actions(
&self,
old_round: Option<Round>,
) -> Result<NetworkActions, WorkerError> {
#[cfg(with_metrics)]
let _latency = metrics::CREATE_NETWORK_ACTIONS_LATENCY.measure_latency();
let mut heights_by_recipient = BTreeMap::<_, Vec<_>>::new();
let mut targets = self.chain.nonempty_outbox_chain_ids();
if let Some(chain_modes) = self.chain_modes.as_ref() {
let chain_modes = chain_modes
.read()
.expect("Panics should not happen while holding a lock to `chain_modes`");
targets.retain(|target| chain_modes.get(target).is_some_and(ListeningMode::is_full));
}
let outboxes = self.chain.load_outboxes(&targets).await?;
for (target, outbox) in targets.into_iter().zip(outboxes) {
let heights = outbox.queue.elements().await?;
heights_by_recipient.insert(target, heights);
}
let cross_chain_requests = self
.create_cross_chain_requests(heights_by_recipient)
.await?;
let mut notifications = Vec::new();
if let Some(old_round) = old_round {
let round = self.chain.manager.current_round();
if round > old_round {
let height = self.chain.tip_state.get().next_block_height;
notifications.push(Notification {
chain_id: self.chain_id(),
reason: Reason::NewRound { height, round },
});
}
}
Ok(NetworkActions {
cross_chain_requests,
notifications,
})
}
async fn read_confirmed_blocks(
&self,
hashes: Vec<CryptoHash>,
) -> Result<Vec<Option<Arc<ConfirmedBlock>>>, WorkerError> {
let mut blocks = Vec::with_capacity(hashes.len());
let mut uncached_indices = Vec::new();
let mut uncached_hashes = Vec::new();
for (i, hash) in hashes.iter().enumerate() {
if let Some(hashed_block) = self.block_values.get(hash) {
blocks.push(Some(Arc::new(ConfirmedBlock::from_hashed(
Arc::unwrap_or_clone(hashed_block),
))));
} else {
blocks.push(None);
uncached_indices.push(i);
uncached_hashes.push(*hash);
}
}
if !uncached_hashes.is_empty() {
let from_storage = self.storage.read_confirmed_blocks(uncached_hashes).await?;
for (i, maybe_block) in uncached_indices.into_iter().zip(from_storage) {
if let Some(block) = &maybe_block {
self.block_values
.insert_hashed(Cow::Borrowed(block.inner()));
}
blocks[i] = maybe_block;
}
}
Ok(blocks)
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
num_recipients = %heights_by_recipient.len()
))]
async fn create_cross_chain_requests(
&self,
heights_by_recipient: BTreeMap<ChainId, Vec<BlockHeight>>,
) -> Result<Vec<CrossChainRequest>, WorkerError> {
let heights = BTreeSet::from_iter(heights_by_recipient.values().flatten().copied());
let hashes = self.chain.block_hashes(heights.iter().copied()).await?;
let blocks = self.read_confirmed_blocks(hashes.clone()).await?;
let mut height_to_blocks = HashMap::new();
for (block, hash) in blocks.into_iter().zip(hashes) {
let block = block.ok_or_else(|| WorkerError::ReadCertificatesError(vec![hash]))?;
let hashed_block = Arc::unwrap_or_clone(block).into_inner();
height_to_blocks.insert(hashed_block.inner().header.height, hashed_block);
}
let sender = self.chain.chain_id();
let mut cross_chain_requests = Vec::new();
for (recipient, heights) in heights_by_recipient {
let previous_height = heights.first().and_then(|first_height| {
let block = height_to_blocks.get(first_height)?;
let (_, prev_height) =
block.inner().body.previous_message_blocks.get(&recipient)?;
Some(*prev_height)
});
let mut bundles = Vec::new();
let mut bundles_size = 0;
for height in heights {
let Some(hashed_block) = height_to_blocks.get(&height) else {
tracing::warn!(
%height,
%recipient,
"spurious entry in outbox; skipping this and higher sender blocks"
);
break;
};
let new_bundles = hashed_block
.inner()
.message_bundles_for(recipient, hashed_block.hash())
.collect::<Vec<_>>();
let new_size = new_bundles
.iter()
.map(|(_epoch, bundle)| bundle.estimated_size())
.sum::<usize>();
if bundles_size + new_size > self.config.cross_chain_message_chunk_limit {
if bundles.is_empty() {
warn!(
"Single block at height {height} produces an UpdateRecipient \
of ~{new_size} bytes, exceeding the chunk limit of {}",
self.config.cross_chain_message_chunk_limit
);
} else {
debug!(
"Stopping cross-chain batch for {recipient} at height {height}: \
adding ~{new_size} bytes would exceed chunk limit of {} \
(current batch ~{bundles_size} bytes)",
self.config.cross_chain_message_chunk_limit
);
break;
}
}
bundles.extend(new_bundles);
bundles_size += new_size;
}
if !bundles.is_empty() {
cross_chain_requests.push(CrossChainRequest::UpdateRecipient {
sender,
recipient,
bundles,
previous_height,
});
}
}
Ok(cross_chain_requests)
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
height = %height
))]
async fn all_messages_to_tracked_chains_delivered_up_to(
&self,
height: BlockHeight,
) -> Result<bool, WorkerError> {
if self.chain.all_messages_delivered_up_to(height) {
return Ok(true);
}
let Some(chain_modes) = self.chain_modes.as_ref() else {
return Ok(false);
};
let mut targets = self.chain.nonempty_outbox_chain_ids();
{
let chain_modes = chain_modes.read().unwrap();
targets.retain(|target| chain_modes.get(target).is_some_and(ListeningMode::is_full));
}
let outboxes = self.chain.load_outboxes(&targets).await?;
for outbox in outboxes {
let front = outbox.queue.front();
if front.is_some_and(|key| *key <= height) {
return Ok(false);
}
}
Ok(true)
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
height = %certificate.inner().height()
))]
pub(crate) async fn process_timeout(
&mut self,
certificate: TimeoutCertificate,
) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
self.initialize_and_save_if_needed().await?;
let (chain_epoch, committee) = self.chain.current_committee().await?;
certificate.check(&committee)?;
if self
.chain
.tip_state
.get()
.already_validated_block(certificate.inner().height())?
{
return Ok((self.chain_info_response().await?, NetworkActions::default()));
}
ensure!(
certificate.inner().epoch() == chain_epoch,
WorkerError::InvalidEpoch {
chain_id: certificate.inner().chain_id(),
chain_epoch,
epoch: certificate.inner().epoch()
}
);
let old_round = self.chain.manager.current_round();
self.chain
.manager
.handle_timeout_certificate(certificate, self.storage.clock().current_time());
self.save().await?;
let actions = self.create_network_actions(Some(old_round)).await?;
Ok((self.chain_info_response().await?, actions))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
block_height = %proposal.content.block.height
))]
async fn load_proposal_blobs(
&mut self,
proposal: &BlockProposal,
) -> Result<Vec<Blob>, WorkerError> {
let owner = proposal.owner();
let BlockProposal {
content:
ProposalContent {
block,
round,
outcome: _,
},
original_proposal,
signature: _,
} = proposal;
let mut maybe_blobs = self
.maybe_get_required_blobs(proposal.required_blob_ids(), None)
.await?;
let missing_blob_ids = missing_blob_ids(&maybe_blobs);
if !missing_blob_ids.is_empty() {
let chain = &mut self.chain;
if chain.ownership().await?.open_multi_leader_rounds {
chain.pending_proposed_blobs.clear();
}
let validated = matches!(original_proposal, Some(OriginalProposal::Regular { .. }));
chain
.pending_proposed_blobs
.try_load_entry_mut(&owner)
.await?
.update(*round, validated, maybe_blobs)?;
self.save().await?;
return Err(WorkerError::BlobsNotFound(missing_blob_ids));
}
let published_blobs = block
.published_blob_ids()
.iter()
.filter_map(|blob_id| maybe_blobs.remove(blob_id).flatten())
.collect::<Vec<_>>();
Ok(published_blobs)
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
block_height = %certificate.block().header.height
))]
pub(crate) async fn process_validated_block(
&mut self,
certificate: ValidatedBlockCertificate,
) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
let block = certificate.block();
let header = &block.header;
let height = header.height;
self.initialize_and_save_if_needed().await?;
let tip_state = self.chain.tip_state.get();
ensure!(
header.height == tip_state.next_block_height,
ChainError::UnexpectedBlockHeight {
expected_block_height: tip_state.next_block_height,
found_block_height: header.height,
}
);
let (epoch, committee) = self.chain.current_committee().await?;
check_block_epoch(epoch, header.chain_id, header.epoch)?;
certificate.check(&committee)?;
let already_committed_block = self.chain.tip_state.get().already_validated_block(height)?;
let should_skip_validated_block = || {
self.chain
.manager
.check_validated_block(&certificate)
.map(|outcome| outcome == manager::Outcome::Skip)
};
if already_committed_block || should_skip_validated_block()? {
return Ok((
self.chain_info_response().await?,
NetworkActions::default(),
BlockOutcome::Skipped,
));
}
self.block_values
.insert_hashed(Cow::Borrowed(certificate.inner().inner()));
let required_blob_ids = block.required_blob_ids();
let maybe_blobs = self
.maybe_get_required_blobs(required_blob_ids, Some(block.created_blobs()))
.await?;
let missing_blob_ids = missing_blob_ids(&maybe_blobs);
if !missing_blob_ids.is_empty() {
self.chain
.pending_validated_blobs
.update(certificate.round, true, maybe_blobs)?;
self.save().await?;
return Err(WorkerError::BlobsNotFound(missing_blob_ids));
}
let blobs = maybe_blobs
.into_iter()
.filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
.collect();
let old_round = self.chain.manager.current_round();
self.chain.manager.create_final_vote(
certificate,
self.config.key_pair(),
self.storage.clock().current_time(),
blobs,
)?;
self.save().await?;
let actions = self.create_network_actions(Some(old_round)).await?;
Ok((
self.chain_info_response().await?,
actions,
BlockOutcome::Processed,
))
}
async fn initialize_next_expected_events(&mut self) -> Result<(), WorkerError> {
if self.chain.next_expected_events.count().await? > 0 {
return Ok(()); }
for (stream_id, index) in self
.chain
.execution_state
.stream_event_counts
.index_values()
.await?
{
self.chain.next_expected_events.insert(&stream_id, index)?;
}
let chain_id = self.chain_id();
let index_values = self.chain.preprocessed_blocks.index_values().await?;
let hashes = index_values.iter().map(|(_, hash)| *hash).collect();
let blocks = self.read_confirmed_blocks(hashes).await?;
for ((height, _), maybe_block) in index_values.into_iter().zip(blocks) {
let block =
maybe_block.ok_or_else(|| WorkerError::LocalBlockNotFound { height, chain_id })?;
self.chain.preprocess_block(&block).await?;
}
Ok(())
}
#[instrument(skip_all, fields(
chain_id = %certificate.block().header.chain_id,
height = %certificate.block().header.height,
block_hash = %certificate.hash(),
))]
pub(crate) async fn process_confirmed_block(
&mut self,
certificate: ConfirmedBlockCertificate,
notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
let block = certificate.block();
let block_hash = certificate.hash();
let height = block.header.height;
let chain_id = block.header.chain_id;
let tip = self.chain.tip_state.get().clone();
if tip.next_block_height > height {
let actions = self.create_network_actions(None).await?;
self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
.await;
return Ok((
self.chain_info_response().await?,
actions,
BlockOutcome::Skipped,
));
}
let epoch = block.header.epoch;
let committee = self
.chain
.execution_state
.context()
.extra()
.get_committees(epoch..=epoch)
.await
.with_execution_context(ChainExecutionContext::Block)?
.remove(&epoch)
.ok_or_else(|| {
ChainError::InternalError(format!(
"missing committee for epoch {epoch}; this is a bug"
))
})?;
certificate.check(&committee)?;
let required_blob_ids = block.required_blob_ids();
let blobs_result = self
.get_required_blobs(required_blob_ids.iter().copied(), block.created_blobs())
.await
.map(|blobs| blobs.into_values().collect::<Vec<_>>());
if let Ok(blobs) = &blobs_result {
self.storage
.write_blobs_and_certificate(blobs, &certificate)
.await?;
let events = block
.body
.events
.iter()
.flatten()
.map(|event| (event.id(chain_id), event.value.clone()));
self.storage.write_events(events).await?;
}
let blob_state = certificate.value().to_blob_state(blobs_result.is_ok());
let blob_ids = required_blob_ids.into_iter().collect::<Vec<_>>();
self.storage
.maybe_write_blob_states(&blob_ids, blob_state)
.await?;
let mut blobs = blobs_result?
.into_iter()
.map(|blob| (blob.id(), blob))
.collect::<BTreeMap<_, _>>();
if tip.next_block_height < height {
if block.body.events.iter().any(|events| !events.is_empty()) {
self.initialize_next_expected_events().await?;
}
let event_streams = self.chain.preprocess_block(certificate.value()).await?;
self.save().await?;
let mut actions = self.create_network_actions(None).await?;
if !event_streams.is_empty() {
actions.notifications.push(Notification {
chain_id,
reason: Reason::NewEvents {
height,
hash: block_hash,
event_streams,
},
});
}
trace!("Preprocessed confirmed block {height}");
self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
.await;
return Ok((
self.chain_info_response().await?,
actions,
BlockOutcome::Preprocessed,
));
}
ensure!(
tip.block_hash == block.header.previous_block_hash,
WorkerError::InvalidBlockChaining
);
self.initialize_and_save_if_needed().await?;
let (epoch, _) = self.chain.current_committee().await?;
check_block_epoch(epoch, chain_id, block.header.epoch)?;
let published_blobs = block
.published_blob_ids()
.iter()
.filter_map(|blob_id| blobs.remove(blob_id))
.collect::<Vec<_>>();
if block.body.events.iter().any(|events| !events.is_empty()) {
self.initialize_next_expected_events().await?;
}
let local_time = self.storage.clock().current_time();
if block.header.timestamp.duration_since(local_time) > self.config.block_time_grace_period {
warn!(
block_timestamp = %block.header.timestamp,
%local_time,
"Confirmed block has a timestamp in the future beyond the block time grace period"
);
}
let chain = &mut self.chain;
chain
.remove_bundles_from_inboxes(
block.header.timestamp,
false,
block.body.incoming_bundles(),
)
.await?;
let confirmed_block = if let Some(mut execution_state) = self
.execution_state_cache
.as_ref()
.and_then(|cache| cache.remove(&block_hash))
{
chain.execution_state = execution_state
.with_context(|ctx| {
chain
.execution_state
.context()
.clone_with_base_key(ctx.base_key().bytes.clone())
})
.await;
certificate.into_value()
} else {
let oracle_responses = Some(block.body.oracle_responses.clone());
let (proposed_block, outcome) = block.clone().into_proposal();
let (proposed_block, verified, _resource_tracker) = chain
.execute_block(
proposed_block,
local_time,
None,
&published_blobs,
oracle_responses,
BundleExecutionPolicy::committed(),
)
.await?;
if outcome != verified {
return Err(ChainError::CorruptedChainState(format!(
"computed block outcome differs from the certificate.\n\
Computed: {verified:#?}\n\
Submitted: {outcome:#?}"
))
.into());
}
ConfirmedBlock::new(Block::new(proposed_block, verified))
};
let event_streams = chain
.apply_confirmed_block(&confirmed_block, local_time)
.await?;
let mut actions = self.create_network_actions(None).await?;
trace!("Processed confirmed block {height}");
let hash = confirmed_block.inner().hash();
actions.notifications.push(Notification {
chain_id,
reason: Reason::NewBlock {
height,
hash,
event_streams: event_streams.clone(),
},
});
if !event_streams.is_empty() {
actions.notifications.push(Notification {
chain_id,
reason: Reason::NewEvents {
height,
hash,
event_streams,
},
});
}
self.save().await?;
self.block_values
.insert_hashed(Cow::Owned(confirmed_block.into_inner()));
self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
.await;
Ok((
self.chain_info_response().await?,
actions,
BlockOutcome::Processed,
))
}
#[instrument(level = "trace", skip(self, notify_when_messages_are_delivered))]
async fn register_delivery_notifier(
&mut self,
height: BlockHeight,
actions: &NetworkActions,
notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
) {
if let Some(notifier) = notify_when_messages_are_delivered {
if actions
.cross_chain_requests
.iter()
.any(|request| request.has_messages_lower_or_equal_than(height))
{
self.delivery_notifier.register(height, notifier);
} else {
if let Err(()) = notifier.send(()) {
warn!("Failed to notify message delivery to caller");
}
}
}
}
#[instrument(level = "debug", skip(self, bundles), fields(chain_id = %self.chain_id()))]
pub(crate) async fn process_cross_chain_update(
&mut self,
origin: ChainId,
bundles: Vec<(Epoch, MessageBundle)>,
previous_height: Option<BlockHeight>,
) -> Result<CrossChainUpdateResult, WorkerError> {
let mut inbox = self.chain.inboxes.try_load_entry_mut(&origin).await?;
let next_height_to_receive = inbox.next_block_height_to_receive()?;
let last_anticipated_block_height = match inbox.removed_bundles.back().await? {
Some(bundle) => Some(bundle.height),
None => None,
};
if let Some(prev) = previous_height {
if prev >= next_height_to_receive {
let chain_id = self.chain_id();
if self.config.allow_revert_confirm && self.config.recovery_allowed_for(&chain_id) {
warn!(
%chain_id,
"Inbox gap detected from {origin}: \
sender declares previous height {prev} but we only have up to \
{next_height_to_receive}; requesting resend",
);
return Ok(CrossChainUpdateResult::GapDetected {
origin,
retransmit_from: next_height_to_receive,
});
}
return Err(ChainError::InboxGapDetected {
chain_id,
origin,
expected_height: prev,
actual_height: bundles.first().map(|(_, b)| b.height).unwrap_or_default(),
}
.into());
}
}
let helper = CrossChainUpdateHelper::new(&self.config, &self.chain);
let recipient = self.chain_id();
let bundles = helper
.select_message_bundles(
&origin,
recipient,
next_height_to_receive,
last_anticipated_block_height,
bundles,
&self.storage,
)
.await?;
let Some(last_updated_height) = bundles.last().map(|bundle| bundle.height) else {
return Ok(CrossChainUpdateResult::NothingToDo);
};
let local_time = self.storage.clock().current_time();
let mut previous_height = None;
for bundle in bundles {
let add_to_received_log = previous_height != Some(bundle.height);
previous_height = Some(bundle.height);
self.chain
.receive_message_bundle_with_inbox(
&mut inbox,
&origin,
bundle,
local_time,
add_to_received_log,
)
.await?;
}
inbox.observe_size_metric();
drop(inbox);
if !self.config.allow_inactive_chains && !self.chain.is_active().await? {
warn!(
chain_id = %self.chain_id(),
"Refusing to deliver messages from {origin} \
at height {last_updated_height} because the recipient is still inactive",
);
return Ok(CrossChainUpdateResult::NothingToDo);
}
self.save().await?;
Ok(CrossChainUpdateResult::Updated(last_updated_height))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
recipient = %recipient,
latest_height = %latest_height
))]
pub(crate) async fn confirm_updated_recipient(
&mut self,
recipient: ChainId,
latest_height: BlockHeight,
) -> Result<NetworkActions, WorkerError> {
let fully_delivered = self
.chain
.mark_messages_as_received(&recipient, latest_height)
.await?
&& self
.all_messages_to_tracked_chains_delivered_up_to(latest_height)
.await?;
let actions = self
.create_cross_chain_actions_for_recipient(recipient)
.await?;
self.save().await?;
if fully_delivered {
self.delivery_notifier.notify(latest_height);
}
Ok(actions)
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
%recipient,
%retransmit_from,
))]
pub(crate) async fn handle_revert_confirm(
&mut self,
recipient: ChainId,
retransmit_from: BlockHeight,
) -> Result<NetworkActions, WorkerError> {
let Some(latest_height) = self.chain.previous_message_blocks.get(&recipient).await? else {
warn!("RevertConfirm: no record of sending to {recipient}");
return Ok(NetworkActions::default());
};
let mut heights_to_re_add = Vec::new();
let mut current_height = latest_height;
while current_height >= retransmit_from {
heights_to_re_add.push(current_height);
let hash = match &*self.chain.block_hashes([current_height]).await? {
[hash] => *hash,
_ => {
return Err(WorkerError::ConfirmedBlockHashNotFound {
height: current_height,
chain_id: self.chain_id(),
})
}
};
let block = self
.read_confirmed_blocks(vec![hash])
.await?
.pop()
.flatten()
.ok_or_else(|| WorkerError::LocalBlockNotFound {
height: current_height,
chain_id: self.chain_id(),
})?;
match block.block().body.previous_message_blocks.get(&recipient) {
Some((_, prev_height)) if *prev_height >= retransmit_from => {
current_height = *prev_height;
}
_ => break,
}
}
let new_heights = self
.chain
.outboxes
.try_load_entry_mut(&recipient)
.await?
.revert(&heights_to_re_add)
.await?;
if new_heights.is_empty() {
debug!("RevertConfirm: all heights already in outbox for {recipient}");
return Ok(NetworkActions::default());
}
let new_heights_len = new_heights.len();
for h in new_heights {
*self.chain.outbox_counters.get_mut().entry(h).or_default() += 1;
}
self.chain.nonempty_outboxes.get_mut().insert(recipient);
let actions = self
.create_cross_chain_actions_for_recipient(recipient)
.await?;
self.save().await?;
warn!(
"RevertConfirm: re-added {new_heights_len} heights to outbox for {recipient}, \
starting from height {retransmit_from}"
);
Ok(actions)
}
pub(crate) async fn maybe_reset_corrupted_chain_state(
&mut self,
) -> Result<Option<Vec<CrossChainRequest>>, WorkerError> {
let Some(min_duration) = self.config.reset_on_corrupted_chain_state else {
return Ok(None);
};
let chain_id = self.chain_id();
if !self.config.recovery_allowed_for(&chain_id) {
return Ok(None);
}
let local_time = self.storage.clock().current_time();
let block_zero_time = *self.chain.block_zero_executed_at.get();
let elapsed = local_time.duration_since(block_zero_time);
if elapsed < min_duration {
warn!(
%chain_id, ?elapsed, ?min_duration,
"Not resetting corrupted chain state; not enough time elapsed \
since last block 0 execution"
);
return Ok(None);
}
warn!(%chain_id, "Corrupted chain state detected; resetting and re-executing");
Ok(Some(self.reset_and_reexecute_chain().await?))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
))]
pub(crate) async fn reset_and_reexecute_chain(
&mut self,
) -> Result<Vec<CrossChainRequest>, WorkerError> {
let chain_id = self.chain_id();
let tip_height = self.chain.tip_state.get().next_block_height;
let sender_ids = self.chain.inboxes.indices().await?;
let hashes = self.chain.confirmed_log.read(..).await?;
let preprocessed = self.chain.preprocessed_blocks.index_values().await?;
let manager_snapshot = ManagerSafetySnapshot::capture(&self.chain.manager).await?;
self.chain.clear();
self.knows_chain_is_active = false;
self.save().await?;
warn!(
%chain_id,
"Cleared chain state up to height {tip_height}; \
re-executing all blocks"
);
for (index, hash) in hashes.into_iter().enumerate() {
let height = BlockHeight(index as u64);
let cert = self
.storage
.read_certificate(hash)
.await?
.map(Arc::unwrap_or_clone)
.ok_or_else(|| WorkerError::LocalBlockNotFound { height, chain_id })?;
Box::pin(self.process_confirmed_block(cert, None)).await?;
}
for (height, hash) in preprocessed {
let cert = self
.storage
.read_certificate(hash)
.await?
.map(Arc::unwrap_or_clone)
.ok_or_else(|| WorkerError::LocalBlockNotFound { height, chain_id })?;
Box::pin(self.process_confirmed_block(cert, None)).await?;
}
let new_tip_height = self.chain.tip_state.get().next_block_height;
if new_tip_height == tip_height {
manager_snapshot.restore(&mut self.chain.manager)?;
self.save().await?;
} else {
warn!(
%tip_height, %new_tip_height,
"Dropping manager snapshot: pre-reset tip differs from post-reset tip"
);
}
let revert_requests = sender_ids
.into_iter()
.map(|sender| CrossChainRequest::RevertConfirm {
sender,
recipient: chain_id,
retransmit_from: BlockHeight::ZERO,
})
.collect::<Vec<_>>();
warn!(
tip_height = %self.chain.tip_state.get().next_block_height,
num_revert_confirms = revert_requests.len(),
"Chain reset and re-executed; sending RevertConfirm to senders"
);
Ok(revert_requests)
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
num_trackers = %new_trackers.len()
))]
pub(crate) async fn update_received_certificate_trackers(
&mut self,
new_trackers: BTreeMap<ValidatorPublicKey, u64>,
) -> Result<(), WorkerError> {
self.chain
.update_received_certificate_trackers(new_trackers);
self.save().await?;
Ok(())
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
start = %start,
end = %end
))]
pub(crate) async fn get_preprocessed_block_hashes(
&self,
start: BlockHeight,
end: BlockHeight,
) -> Result<Vec<CryptoHash>, WorkerError> {
let mut hashes = Vec::new();
let mut height = start;
while height < end {
match self.chain.preprocessed_blocks.get(&height).await? {
Some(hash) => hashes.push(hash),
None => break,
}
height = height.try_add_one()?;
}
Ok(hashes)
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
origin = %origin
))]
pub(crate) async fn get_inbox_next_height(
&self,
origin: ChainId,
) -> Result<BlockHeight, WorkerError> {
Ok(match self.chain.inboxes.try_load_entry(&origin).await? {
Some(inbox) => inbox.next_block_height_to_receive()?,
None => BlockHeight::ZERO,
})
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
num_blob_ids = %blob_ids.len()
))]
pub(crate) async fn get_locking_blobs(
&self,
blob_ids: Vec<BlobId>,
) -> Result<Option<Vec<Blob>>, WorkerError> {
let results = self
.chain
.manager
.locking_blobs
.multi_get(&blob_ids)
.await?;
Ok(results.into_iter().collect())
}
pub(crate) async fn get_block_hashes(
&self,
heights: Vec<BlockHeight>,
) -> Result<Vec<CryptoHash>, WorkerError> {
Ok(self.chain.block_hashes(heights).await?)
}
pub(crate) async fn get_proposed_blobs(
&self,
blob_ids: Vec<BlobId>,
) -> Result<Vec<Blob>, WorkerError> {
let results = self
.chain
.manager
.proposed_blobs
.multi_get(&blob_ids)
.await?;
let mut blobs = Vec::with_capacity(blob_ids.len());
let mut missing = Vec::new();
for (blob_id, maybe_blob) in blob_ids.into_iter().zip(results) {
match maybe_blob {
Some(blob) => blobs.push(blob),
None => missing.push(blob_id),
}
}
if !missing.is_empty() {
return Err(WorkerError::BlobsNotFound(missing));
}
Ok(blobs)
}
pub(crate) async fn get_previous_event_blocks(
&self,
stream_ids: Vec<StreamId>,
) -> Result<BTreeMap<StreamId, (BlockHeight, CryptoHash)>, WorkerError> {
let heights = self
.chain
.previous_event_blocks
.multi_get(&stream_ids)
.await?;
let mut result = BTreeMap::new();
let mut indices = Vec::new();
let mut streams_with_heights = Vec::new();
for (stream_id, height) in stream_ids.into_iter().zip(heights) {
if let Some(height) = height {
let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
indices.push(index);
streams_with_heights.push((stream_id, height));
}
}
let hashes = self.chain.confirmed_log.multi_get(indices).await?;
for (hash, (stream_id, height)) in hashes.into_iter().zip(streams_with_heights) {
if let Some(hash) = hash {
result.insert(stream_id, (height, hash));
}
}
Ok(result)
}
pub(crate) async fn get_next_expected_events(
&self,
stream_ids: Vec<StreamId>,
) -> Result<BTreeMap<StreamId, u32>, WorkerError> {
let values = self
.chain
.next_expected_events
.multi_get(&stream_ids)
.await?;
Ok(stream_ids
.into_iter()
.zip(values)
.filter_map(|(id, val)| Some((id, val?)))
.collect())
}
pub(crate) async fn get_event_subscriptions(
&self,
) -> Result<EventSubscriptionsResult, WorkerError> {
Ok(self
.chain
.execution_state
.system
.event_subscriptions
.index_values()
.await?)
}
pub(crate) async fn get_stream_event_count(
&self,
stream_id: StreamId,
) -> Result<Option<u32>, WorkerError> {
let next_expected = self.chain.next_expected_events.get(&stream_id).await?;
if next_expected.is_some() {
return Ok(next_expected);
}
Ok(self
.chain
.execution_state
.stream_event_counts
.get(&stream_id)
.await?)
}
pub(crate) async fn get_received_certificate_trackers(
&self,
) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
Ok(self.chain.received_certificate_trackers.get().clone())
}
pub(crate) async fn get_tip_state_and_outbox_info(
&self,
receiver_id: ChainId,
) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
let next_block_height = self.chain.tip_state.get().next_block_height;
let next_height_to_schedule = self
.chain
.outboxes
.try_load_entry(&receiver_id)
.await?
.map(|outbox| *outbox.next_height_to_schedule.get());
Ok((next_block_height, next_height_to_schedule))
}
pub(crate) async fn get_next_height_to_preprocess(&self) -> Result<BlockHeight, WorkerError> {
Ok(self.chain.next_height_to_preprocess().await?)
}
pub(crate) async fn get_manager_seed(&self) -> Result<u64, WorkerError> {
Ok(*self.chain.manager.seed.get())
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
height = %height,
round = %round
))]
async fn vote_for_leader_timeout(
&mut self,
height: BlockHeight,
round: Round,
) -> Result<(), WorkerError> {
let chain = &mut self.chain;
ensure!(
height == chain.tip_state.get().next_block_height,
WorkerError::UnexpectedBlockHeight {
expected_block_height: chain.tip_state.get().next_block_height,
found_block_height: height
}
);
let epoch = chain.execution_state.system.epoch.get();
let chain_id = chain.chain_id();
let key_pair = self.config.key_pair();
let local_time = self.storage.clock().current_time();
if chain
.manager
.create_timeout_vote(chain_id, height, round, *epoch, key_pair, local_time)?
{
self.save().await?;
}
Ok(())
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id()
))]
async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> {
Err(WorkerError::NoFallbackMode)
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
blob_id = %blob.id()
))]
pub(crate) async fn handle_pending_blob(
&mut self,
blob: Blob,
) -> Result<ChainInfoResponse, WorkerError> {
let mut was_expected = self
.chain
.pending_validated_blobs
.maybe_insert(&blob)
.await?;
for (_, mut pending_blobs) in self
.chain
.pending_proposed_blobs
.try_load_all_entries_mut()
.await?
{
if !pending_blobs.validated.get() {
let (_, committee) = self.chain.current_committee().await?;
let policy = committee.policy();
policy
.check_blob_size(blob.content())
.with_execution_context(ChainExecutionContext::Block)?;
ensure!(
u64::try_from(pending_blobs.pending_blobs.count().await?)
.is_ok_and(|count| count < policy.maximum_published_blobs),
WorkerError::TooManyPublishedBlobs(policy.maximum_published_blobs)
);
}
was_expected = was_expected || pending_blobs.maybe_insert(&blob).await?;
}
ensure!(was_expected, WorkerError::UnexpectedBlob);
self.save().await?;
self.chain_info_response().await
}
#[cfg(with_testing)]
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
height = %height
))]
pub(crate) async fn read_certificate(
&self,
height: BlockHeight,
) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
let certificate_hash = match self.chain.confirmed_log.get(height.try_into()?).await? {
Some(hash) => hash,
None => return Ok(None),
};
let certificate = self
.storage
.read_certificate(certificate_hash)
.await?
.map(Arc::unwrap_or_clone)
.ok_or_else(|| WorkerError::ReadCertificatesError(vec![certificate_hash]))?;
Ok(Some(certificate))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
query_application_id = %query.application_id()
))]
pub(crate) async fn query_application(
&mut self,
query: Query,
block_hash: Option<CryptoHash>,
) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
self.initialize_and_save_if_needed().await?;
let next_block_height = self.chain.tip_state.get().next_block_height;
let local_time = self.storage.clock().current_time();
if let Some(requested_block) = block_hash {
if let Some(mut state) = self
.execution_state_cache
.as_ref()
.and_then(|cache| cache.remove(&requested_block))
{
let next_block_height = next_block_height
.try_add_one()
.expect("block height to not overflow");
let context = QueryContext {
chain_id: self.chain_id(),
next_block_height,
local_time,
};
let outcome = state
.with_context(|ctx| {
self.chain
.execution_state
.context()
.clone_with_base_key(ctx.base_key().bytes.clone())
})
.await
.query_application(context, query, self.service_runtime_endpoint.as_mut())
.await
.with_execution_context(ChainExecutionContext::Query)?;
if let Some(cache) = &self.execution_state_cache {
cache.insert(&requested_block, state);
}
Ok((outcome, next_block_height))
} else {
tracing::debug!(requested_block = %requested_block, "requested block hash not found in cache, querying committed state");
let outcome = self
.chain
.query_application(local_time, query, self.service_runtime_endpoint.as_mut())
.await?;
Ok((outcome, next_block_height))
}
} else {
let outcome = self
.chain
.query_application(local_time, query, self.service_runtime_endpoint.as_mut())
.await?;
Ok((outcome, next_block_height))
}
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
application_id = %application_id
))]
pub(crate) async fn describe_application_readonly(
&self,
application_id: ApplicationId,
) -> Result<ApplicationDescription, WorkerError> {
let blob_id = application_id.description_blob_id();
let blob = self
.storage
.read_blob(blob_id)
.await?
.ok_or(WorkerError::BlobsNotFound(vec![blob_id]))?;
Ok(bcs::from_bytes(blob.bytes())?)
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
block_height = %block.height
))]
pub(crate) async fn stage_block_execution(
&mut self,
block: ProposedBlock,
round: Option<u32>,
published_blobs: &[Blob],
policy: BundleExecutionPolicy,
) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> {
self.initialize_and_save_if_needed().await?;
let local_time = self.storage.clock().current_time();
let signer = block.authenticated_signer;
let (_, committee) = self.chain.current_committee().await?;
block.check_proposal_size(committee.policy().maximum_block_proposal_size)?;
self.chain
.remove_bundles_from_inboxes(block.timestamp, true, block.incoming_bundles())
.await?;
let (executed_block, resource_tracker) =
Box::pin(self.execute_block(block, local_time, round, published_blobs, policy)).await?;
let info = ChainInfo::from_chain_view(&self.chain).await?;
let mut response = ChainInfoResponse::new(info, None);
if let Some(signer) = signer {
response.info.requested_owner_balance = self
.chain
.execution_state
.system
.balances
.get(&signer)
.await?;
}
let (proposed_block, _) = executed_block.clone().into_proposal();
Ok((proposed_block, executed_block, response, resource_tracker))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
block_height = %proposal.content.block.height
))]
pub(crate) async fn handle_block_proposal(
&mut self,
proposal: BlockProposal,
) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
self.initialize_and_save_if_needed().await?;
proposal
.check_invariants()
.map_err(|msg| WorkerError::InvalidBlockProposal(msg.to_string()))?;
proposal.check_signature()?;
let owner = proposal.owner();
let BlockProposal {
content,
original_proposal,
signature: _,
} = &proposal;
let block = &content.block;
let chain = &self.chain;
chain.tip_state.get().verify_block_chaining(block)?;
let (epoch, committee) = chain.current_committee().await?;
check_block_epoch(epoch, block.chain_id, block.epoch)?;
let policy = committee.policy().clone();
block.check_proposal_size(policy.maximum_block_proposal_size)?;
ensure!(
chain.manager.verify_owner(&owner, proposal.content.round)?,
WorkerError::InvalidOwner
);
let old_round = self.chain.manager.current_round();
match original_proposal {
None => {
if let Some(signer) = block.authenticated_signer {
ensure!(signer == owner, WorkerError::InvalidSigner(owner));
}
}
Some(OriginalProposal::Regular { certificate }) => {
certificate.check(&committee)?;
}
Some(OriginalProposal::Fast(signature)) => {
let original_proposal = BlockProposal {
content: ProposalContent {
block: content.block.clone(),
round: Round::Fast,
outcome: None,
},
signature: *signature,
original_proposal: None,
};
let super_owner = original_proposal.owner();
ensure!(
chain
.manager
.ownership
.get()
.super_owners
.contains(&super_owner),
WorkerError::InvalidOwner
);
if let Some(signer) = block.authenticated_signer {
ensure!(signer == super_owner, WorkerError::InvalidSigner(signer));
}
original_proposal.check_signature()?;
}
}
if chain.manager.check_proposed_block(&proposal)? == manager::Outcome::Skip {
return Ok((self.chain_info_response().await?, NetworkActions::default()));
}
let local_time = self.storage.clock().current_time();
if self
.chain
.manager
.update_signed_proposal(&proposal, local_time)
{
self.save().await?;
}
let published_blobs = self.load_proposal_blobs(&proposal).await?;
let ProposalContent {
block,
round,
outcome,
} = content;
if self.config.key_pair().is_some()
&& block.timestamp.duration_since(local_time) > self.config.block_time_grace_period
{
return Err(WorkerError::InvalidTimestamp {
local_time,
block_timestamp: block.timestamp,
block_time_grace_period: self.config.block_time_grace_period,
});
}
self.chain
.remove_bundles_from_inboxes(block.timestamp, true, block.incoming_bundles())
.await?;
let block = if let Some(outcome) = outcome {
outcome.clone().with(proposal.content.block.clone())
} else {
let (executed_block, _resource_tracker) = Box::pin(self.execute_block(
block.clone(),
local_time,
round.multi_leader(),
&published_blobs,
BundleExecutionPolicy::committed(),
))
.await?;
executed_block
};
ensure!(
!round.is_fast() || !block.has_oracle_responses(),
WorkerError::FastBlockUsingOracles
);
let chain = &mut self.chain;
chain
.tip_state
.get_mut()
.update_counters(&block.body.transactions, &block.body.messages)?;
chain.rollback();
let blobs = self
.get_required_blobs(proposal.expected_blob_ids(), block.created_blobs())
.await?;
let key_pair = self.config.key_pair();
let manager = &mut self.chain.manager;
match manager.create_vote(proposal, block, key_pair, local_time, blobs)? {
Some(Either::Left(vote)) => {
self.block_values
.insert_hashed(Cow::Borrowed(vote.value.inner()));
}
Some(Either::Right(vote)) => {
self.block_values
.insert_hashed(Cow::Borrowed(vote.value.inner()));
}
None => (),
}
self.save().await?;
let actions = self.create_network_actions(Some(old_round)).await?;
Ok((self.chain_info_response().await?, actions))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id()
))]
async fn prepare_chain_info_response(
&mut self,
query: ChainInfoQuery,
) -> Result<ChainInfoResponse, WorkerError> {
self.initialize_and_save_if_needed().await?;
let mut info = ChainInfo::from_chain_view(&self.chain).await?;
if query.request_committees {
info.requested_committees = Some(
self.chain
.execution_state
.system
.committees
.get()
.await?
.clone(),
);
}
if query.request_owner_balance == AccountOwner::CHAIN {
info.requested_owner_balance = Some(*self.chain.execution_state.system.balance.get());
} else {
info.requested_owner_balance = self
.chain
.execution_state
.system
.balances
.get(&query.request_owner_balance)
.await?;
}
if let Some(next_block_height) = query.test_next_block_height {
ensure!(
self.chain.tip_state.get().next_block_height == next_block_height,
WorkerError::UnexpectedBlockHeight {
expected_block_height: self.chain.tip_state.get().next_block_height,
found_block_height: next_block_height,
}
);
}
if query.request_pending_message_bundles {
let origins = if let Some(nonempty_origins) = self.chain.nonempty_inboxes.get().clone()
{
nonempty_origins.into_iter().collect::<Vec<_>>()
} else {
let pairs = self.chain.inboxes.try_load_all_entries().await?;
let nonempty_origins = pairs
.into_iter()
.filter(|(_, inbox)| inbox.added_bundles.count() > 0)
.map(|(origin, _)| origin)
.collect::<BTreeSet<ChainId>>();
let origins = nonempty_origins.iter().copied().collect::<Vec<_>>();
*self.chain.nonempty_inboxes.get_mut() = Some(nonempty_origins);
self.save().await?;
origins
};
let mut bundles = Vec::new();
let inboxes = self.chain.inboxes.try_load_entries(&origins).await?;
let origins_and_inboxes = origins
.into_iter()
.zip(inboxes)
.filter_map(|(origin, inbox)| Some((origin, inbox?)))
.collect::<Vec<_>>();
#[cfg(with_metrics)]
metrics::NUM_INBOXES
.with_label_values(&[])
.observe(origins_and_inboxes.len() as f64);
let action = if *self.chain.execution_state.system.closed.get() {
MessageAction::Reject
} else {
MessageAction::Accept
};
for (origin, inbox) in origins_and_inboxes {
for bundle in inbox.added_bundles.elements().await? {
bundles.push(IncomingBundle {
origin,
bundle,
action,
});
}
}
let ignored_origins = &self.config.ignored_bundle_origins;
if !ignored_origins.is_empty() {
bundles.retain(|b| !ignored_origins.contains(&b.origin));
}
let priority_origins = &self.config.priority_bundle_origins;
bundles.sort_by(|a, b| {
let a_priority = priority_origins.contains(&a.origin);
let b_priority = priority_origins.contains(&b.origin);
b_priority
.cmp(&a_priority)
.then(a.bundle.timestamp.cmp(&b.bundle.timestamp))
});
info.requested_pending_message_bundles = bundles;
}
let hashes = self
.chain
.block_hashes(query.request_sent_certificate_hashes_by_heights)
.await?;
info.requested_sent_certificate_hashes = hashes;
if let Some(start) = query.request_received_log_excluding_first_n {
let start = usize::try_from(start).map_err(|_| ArithmeticError::Overflow)?;
let max_received_log_entries = self.config.chain_info_max_received_log_entries;
let end = start
.saturating_add(max_received_log_entries)
.min(self.chain.received_log.count());
info.requested_received_log = self.chain.received_log.read(start..end).await?;
}
if query.request_manager_values {
info.manager.add_values(&self.chain.manager);
}
Ok(ChainInfoResponse::new(info, self.config.key_pair()))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
block_height = %block.height
))]
async fn execute_block(
&mut self,
block: ProposedBlock,
local_time: Timestamp,
round: Option<u32>,
published_blobs: &[Blob],
policy: BundleExecutionPolicy,
) -> Result<(Block, ResourceTracker), WorkerError> {
let (proposed_block, outcome, resource_tracker) = Box::pin(self.chain.execute_block(
block,
local_time,
round,
published_blobs,
None,
policy,
))
.await?;
let executed_block = Block::new(proposed_block, outcome);
let block_hash = CryptoHash::new(&executed_block);
if let Some(cache) = &self.execution_state_cache {
cache.insert(
&block_hash,
Box::pin(
self.chain
.execution_state
.with_context(|ctx| InactiveContext(ctx.base_key().clone())),
)
.await,
);
}
Ok((executed_block, resource_tracker))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id()
))]
pub(crate) async fn initialize_and_save_if_needed(&mut self) -> Result<(), WorkerError> {
if !self.knows_chain_is_active {
let local_time = self.storage.clock().current_time();
self.chain.initialize_if_needed(local_time).await?;
self.save().await?;
self.knows_chain_is_active = true;
}
Ok(())
}
pub(crate) async fn chain_info_response(&self) -> Result<ChainInfoResponse, WorkerError> {
let info = ChainInfo::from_chain_view(&self.chain).await?;
Ok(ChainInfoResponse::new(info, self.config.key_pair()))
}
#[instrument(skip_all, fields(
chain_id = %self.chain_id()
))]
async fn save(&mut self) -> Result<(), WorkerError> {
if let Err(error) = self.chain.save().await {
tracing::error!(
?error,
chain_id = %self.chain_id(),
"Chain save failed; marking worker as poisoned"
);
self.poisoned = true;
return Err(WorkerError::PoisonedWorker);
}
self.chain.execution_state.system.committees.evict();
Ok(())
}
}
fn missing_indices_blob_ids(maybe_blobs: &[(BlobId, Option<Blob>)]) -> (Vec<usize>, Vec<BlobId>) {
let mut missing_indices = Vec::new();
let mut missing_blob_ids = Vec::new();
for (index, (blob_id, blob)) in maybe_blobs.iter().enumerate() {
if blob.is_none() {
missing_indices.push(index);
missing_blob_ids.push(*blob_id);
}
}
(missing_indices, missing_blob_ids)
}
fn missing_blob_ids<'a>(
maybe_blobs: impl IntoIterator<Item = (&'a BlobId, &'a Option<Blob>)>,
) -> Vec<BlobId> {
maybe_blobs
.into_iter()
.filter(|(_, maybe_blob)| maybe_blob.is_none())
.map(|(blob_id, _)| *blob_id)
.collect()
}
fn check_block_epoch(
chain_epoch: Epoch,
block_chain: ChainId,
block_epoch: Epoch,
) -> Result<(), WorkerError> {
ensure!(
block_epoch == chain_epoch,
WorkerError::InvalidEpoch {
chain_id: block_chain,
epoch: block_epoch,
chain_epoch
}
);
Ok(())
}
pub(crate) struct CrossChainUpdateHelper {
pub(crate) allow_messages_from_deprecated_epochs: bool,
pub(crate) current_epoch: Epoch,
}
impl CrossChainUpdateHelper {
fn new<C>(config: &ChainWorkerConfig, chain: &ChainStateView<C>) -> Self
where
C: Context + Clone + 'static,
{
CrossChainUpdateHelper {
allow_messages_from_deprecated_epochs: config.allow_messages_from_deprecated_epochs,
current_epoch: *chain.execution_state.system.epoch.get(),
}
}
pub(crate) async fn select_message_bundles<S: Storage>(
&self,
origin: &ChainId,
recipient: ChainId,
next_height_to_receive: BlockHeight,
last_anticipated_block_height: Option<BlockHeight>,
mut bundles: Vec<(Epoch, MessageBundle)>,
storage: &S,
) -> Result<Vec<MessageBundle>, WorkerError> {
let mut latest_height = None;
let mut skipped_len = 0;
let mut trusted_len = 0;
for (i, (epoch, bundle)) in bundles.iter().enumerate() {
ensure!(
latest_height <= Some(bundle.height),
WorkerError::InvalidCrossChainRequest
);
latest_height = Some(bundle.height);
if bundle.height < next_height_to_receive {
skipped_len = i + 1;
}
let epoch_is_known = self.allow_messages_from_deprecated_epochs
|| Some(bundle.height) <= last_anticipated_block_height
|| *epoch >= self.current_epoch
|| storage.get_or_load_committee(*epoch).await?.is_some();
if epoch_is_known {
trusted_len = i + 1;
}
}
if skipped_len > 0 {
let (_, sample_bundle) = &bundles[skipped_len - 1];
debug!(
"Ignoring repeated messages to {recipient:.8} from {origin:} at height {}",
sample_bundle.height,
);
}
if skipped_len < bundles.len() && trusted_len < bundles.len() {
let (sample_epoch, sample_bundle) = &bundles[trusted_len];
warn!(
"Refusing messages to {recipient:.8} from {origin:} at height {} \
because the epoch {} is not known locally",
sample_bundle.height, sample_epoch,
);
}
let bundles = if skipped_len < trusted_len {
bundles
.drain(skipped_len..trusted_len)
.map(|(_, bundle)| bundle)
.collect()
} else {
vec![]
};
Ok(bundles)
}
}