use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet, HashSet},
slice,
sync::{Arc, RwLock},
};
use custom_debug_derive::Debug;
use futures::{
future::Future,
stream::{self, AbortHandle, FuturesUnordered, StreamExt},
};
#[cfg(with_metrics)]
use linera_base::prometheus_util::MeasureLatency as _;
use linera_base::{
crypto::{CryptoHash, Signer as _, ValidatorPublicKey},
data_types::{
ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch, TimeDelta, Timestamp,
},
ensure,
identifiers::{AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
time::Duration,
};
#[cfg(not(target_arch = "wasm32"))]
use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
use linera_chain::{
data_types::{BlockProposal, BundleExecutionPolicy, ChainAndHeight, LiteVote, ProposedBlock},
manager::LockingBlock,
types::{
Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
},
ChainError,
};
use linera_execution::committee::Committee;
use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
use rand::prelude::SliceRandom as _;
use received_log::ReceivedLogs;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::{debug, error, info, instrument, trace, warn};
use crate::{
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
environment::{wallet::Wallet as _, Environment},
local_node::{LocalNodeClient, LocalNodeError},
node::{CrossChainMessageDelivery, NodeError, ValidatorNode, ValidatorNodeProvider as _},
notifier::{ChannelNotifier, Notifier as _},
remote_node::RemoteNode,
updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
ChainWorkerConfig, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
};
pub mod chain_client;
pub use chain_client::ChainClient;
pub use crate::data_types::ClientOutcome;
#[cfg(test)]
#[path = "../unit_tests/client_tests.rs"]
mod client_tests;
pub mod requests_scheduler;
pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
mod received_log;
mod validator_trackers;
#[cfg(with_metrics)]
mod metrics {
use std::sync::LazyLock;
use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
use prometheus::HistogramVec;
pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
LazyLock::new(|| {
register_histogram_vec(
"process_inbox_latency",
"process_inbox latency",
&[],
exponential_bucket_latencies(10_000.0),
)
});
pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"prepare_chain_latency",
"prepare_chain latency",
&[],
exponential_bucket_latencies(10_000.0),
)
});
pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"synchronize_chain_state_latency",
"synchronize_chain_state latency",
&[],
exponential_bucket_latencies(10_000.0),
)
});
pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"execute_block_latency",
"execute_block latency",
&[],
exponential_bucket_latencies(10_000.0),
)
});
pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"find_received_certificates_latency",
"find_received_certificates latency",
&[],
exponential_bucket_latencies(10_000.0),
)
});
}
pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
pub static DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE: u64 = 500;
pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
pub static DEFAULT_MAX_EVENT_STREAM_QUERIES: usize = 1000;
#[derive(Debug, Clone, Copy)]
pub enum TimingType {
ExecuteOperations,
ExecuteBlock,
SubmitBlockProposal,
UpdateValidators,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ListeningMode {
FullChain,
FollowChain,
EventsOnly(BTreeSet<StreamId>),
}
impl PartialOrd for ListeningMode {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
(ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
(ListeningMode::FullChain, _) => Some(Ordering::Greater),
(_, ListeningMode::FullChain) => Some(Ordering::Less),
(ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
(ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
(ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
(ListeningMode::EventsOnly(a), ListeningMode::EventsOnly(b)) => {
if a == b {
Some(Ordering::Equal)
} else if a.is_superset(b) {
Some(Ordering::Greater)
} else if b.is_superset(a) {
Some(Ordering::Less)
} else {
None
}
}
}
}
}
impl ListeningMode {
pub fn is_relevant(&self, reason: &Reason) -> bool {
match (reason, self) {
(Reason::NewEvents { .. }, ListeningMode::FollowChain | ListeningMode::FullChain) => {
false
}
(_, ListeningMode::FullChain) => true,
(Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
(_, ListeningMode::FollowChain) => false,
(Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant))
| (Reason::NewBlock { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
relevant.intersection(event_streams).next().is_some()
}
(_, ListeningMode::EventsOnly(_)) => false,
}
}
pub fn extend(&mut self, other: Option<ListeningMode>) {
match (self, other) {
(_, None) => (),
(ListeningMode::FullChain, _) => (),
(mode, Some(ListeningMode::FullChain)) => {
*mode = ListeningMode::FullChain;
}
(ListeningMode::FollowChain, _) => (),
(mode, Some(ListeningMode::FollowChain)) => {
*mode = ListeningMode::FollowChain;
}
(
ListeningMode::EventsOnly(self_events),
Some(ListeningMode::EventsOnly(other_events)),
) => {
self_events.extend(other_events);
}
}
}
pub fn is_follow_only(&self) -> bool {
!matches!(self, ListeningMode::FullChain)
}
pub fn is_full(&self) -> bool {
matches!(self, ListeningMode::FullChain)
}
}
pub struct Client<Env: Environment> {
environment: Env,
pub local_node: LocalNodeClient<Env::Storage>,
requests_scheduler: RequestsScheduler<Env>,
admin_chain_id: ChainId,
chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
notifier: Arc<ChannelNotifier<Notification>>,
chains: papaya::HashMap<ChainId, chain_client::State>,
options: chain_client::Options,
}
impl<Env: Environment> Client<Env> {
#[instrument(level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)]
pub fn new(
environment: Env,
admin_chain_id: ChainId,
long_lived_services: bool,
chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
name: impl Into<String>,
chain_worker_ttl: Option<Duration>,
sender_chain_worker_ttl: Option<Duration>,
priority_bundle_origins: HashSet<ChainId>,
ignored_bundle_origins: HashSet<ChainId>,
options: chain_client::Options,
requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
block_cache_size: usize,
execution_state_cache_size: usize,
) -> Self {
let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect()));
let config = ChainWorkerConfig {
nickname: name.into(),
long_lived_services,
allow_inactive_chains: true,
allow_messages_from_deprecated_epochs: true,
ttl: chain_worker_ttl,
sender_chain_ttl: sender_chain_worker_ttl,
priority_bundle_origins,
block_cache_size,
execution_state_cache_size,
ignored_bundle_origins,
..ChainWorkerConfig::default()
};
let state = WorkerState::new(
environment.storage().clone(),
config,
Some(chain_modes.clone()),
);
let local_node = LocalNodeClient::new(state);
let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
Self {
environment,
local_node,
requests_scheduler,
chains: papaya::HashMap::new(),
admin_chain_id,
chain_modes,
notifier: Arc::new(ChannelNotifier::default()),
options,
}
}
pub fn admin_chain_id(&self) -> ChainId {
self.admin_chain_id
}
pub fn subscribe(
&self,
chain_ids: Vec<ChainId>,
) -> tokio::sync::mpsc::UnboundedReceiver<Notification> {
self.notifier.subscribe(chain_ids)
}
pub fn subscribe_extra(
&self,
chain_ids: Vec<ChainId>,
sender: &tokio::sync::mpsc::UnboundedSender<Notification>,
) {
self.notifier.add_sender(chain_ids, sender);
}
pub fn storage_client(&self) -> &Env::Storage {
self.environment.storage()
}
async fn try_read_local_certificate(
&self,
chain_id: ChainId,
height: BlockHeight,
hash: Option<CryptoHash>,
) -> Result<Option<Arc<ConfirmedBlockCertificate>>, chain_client::Error> {
if let Some(hash) = hash {
return Ok(self.storage_client().read_certificate(hash).await?);
}
let results = self
.storage_client()
.read_certificates_by_heights(chain_id, &[height])
.await?;
Ok(results.into_iter().next().flatten())
}
pub fn validator_node_provider(&self) -> &Env::Network {
self.environment.network()
}
pub(crate) fn options(&self) -> &chain_client::Options {
&self.options
}
pub async fn retry_pending_cross_chain_requests(
&self,
sender_chain: ChainId,
) -> Result<(), LocalNodeError> {
self.local_node
.retry_pending_cross_chain_requests(sender_chain, &self.notifier)
.await
}
#[instrument(level = "trace", skip(self))]
pub fn signer(&self) -> &Env::Signer {
self.environment.signer()
}
pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, chain_client::Error> {
self.signer()
.contains_key(owner)
.await
.map_err(chain_client::Error::signer_failure)
}
pub fn wallet(&self) -> &Env::Wallet {
self.environment.wallet()
}
async fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
match self.wallet().get(chain_id).await {
Ok(Some(chain)) => chain.owner.is_none(),
Ok(None) | Err(_) => true,
}
}
#[instrument(level = "trace", skip(self))]
pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
let mut chain_modes = self
.chain_modes
.write()
.expect("Panics should not happen while holding a lock to `chain_modes`");
let entry = chain_modes.entry(chain_id).or_insert_with(|| mode.clone());
entry.extend(Some(mode));
entry.clone()
}
pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
self.chain_modes
.read()
.expect("Panics should not happen while holding a lock to `chain_modes`")
.get(&chain_id)
.cloned()
}
pub fn is_tracked(&self, chain_id: ChainId) -> bool {
self.chain_modes
.read()
.expect("Panics should not happen while holding a lock to `chain_modes`")
.get(&chain_id)
.is_some_and(ListeningMode::is_full)
}
#[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
pub fn create_chain_client(
self: &Arc<Self>,
chain_id: ChainId,
block_hash: Option<CryptoHash>,
next_block_height: BlockHeight,
pending_proposal: Option<PendingProposal>,
preferred_owner: Option<AccountOwner>,
timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
) -> ChainClient<Env> {
self.chains.pin().get_or_insert_with(chain_id, || {
chain_client::State::new(pending_proposal.clone())
});
ChainClient::new(
self.clone(),
chain_id,
self.options.clone(),
block_hash,
next_block_height,
preferred_owner,
timing_sender,
)
}
async fn fetch_chain_info(
&self,
chain_id: ChainId,
validators: &[RemoteNode<Env::ValidatorNode>],
) -> Result<Box<ChainInfo>, chain_client::Error> {
match self.local_node.chain_info(chain_id).await {
Ok(info) => Ok(info),
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
self.update_local_node_with_blobs_from(blob_ids, validators)
.await?;
Ok(self.local_node.chain_info(chain_id).await?)
}
Err(err) => Err(err.into()),
}
}
#[instrument(level = "trace", skip(self))]
async fn download_certificates(
&self,
chain_id: ChainId,
target_next_block_height: BlockHeight,
) -> Result<Box<ChainInfo>, chain_client::Error> {
let validators = self.validator_nodes().await?;
let mut info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
if target_next_block_height <= info.next_block_height {
return Ok(info);
}
info = self
.load_local_certificates(chain_id, target_next_block_height, None)
.await?;
let mut next_height = info.next_block_height;
while next_height < target_next_block_height {
let limit = u64::from(target_next_block_height)
.checked_sub(u64::from(next_height))
.ok_or(ArithmeticError::Overflow)?
.min(self.options.certificate_download_batch_size);
let certificates = self
.requests_scheduler
.download_certificates_from_validators(
&validators,
chain_id,
next_height,
limit,
self.options.certificate_batch_download_timeout,
)
.await?;
let Some(new_info) = self
.process_certificates(&validators, certificates, None)
.await?
else {
break;
};
assert!(new_info.next_block_height > next_height);
next_height = new_info.next_block_height;
info = new_info;
}
ensure!(
target_next_block_height <= info.next_block_height,
chain_client::Error::CannotDownloadCertificates {
chain_id,
target_next_block_height,
}
);
Ok(info)
}
async fn load_local_certificates(
&self,
chain_id: ChainId,
end: BlockHeight,
until_block_time: Option<Timestamp>,
) -> Result<Box<ChainInfo>, chain_client::Error> {
let mut last_info = self.local_node.chain_info(chain_id).await?;
let next_height = last_info.next_block_height;
let hashes = self
.local_node
.get_preprocessed_block_hashes(chain_id, next_height, end)
.await?;
let certificates = self.storage_client().read_certificates(&hashes).await?;
let certificates = match ResultReadCertificates::new(certificates, hashes) {
ResultReadCertificates::Certificates(certificates) => certificates,
ResultReadCertificates::InvalidHashes(hashes) => {
return Err(chain_client::Error::ReadCertificatesError(hashes))
}
};
for certificate in certificates {
if let Some(until) = until_block_time {
if certificate.value().block().header.timestamp >= until {
break;
}
}
last_info = self.handle_certificate(certificate).await?.info;
}
Ok(last_info)
}
#[instrument(level = "trace", skip_all)]
async fn download_certificates_from(
&self,
remote_node: &RemoteNode<Env::ValidatorNode>,
chain_id: ChainId,
stop: BlockHeight,
until_block_time: Option<Timestamp>,
) -> Result<Box<ChainInfo>, chain_client::Error> {
let mut last_info = self
.load_local_certificates(chain_id, stop, until_block_time)
.await?;
let mut next_height = last_info.next_block_height;
while next_height < stop {
let limit = u64::from(stop)
.checked_sub(u64::from(next_height))
.ok_or(ArithmeticError::Overflow)?
.min(self.options.certificate_download_batch_size);
let certificates = self
.requests_scheduler
.download_certificates(remote_node, chain_id, next_height, limit)
.await?;
let Some(info) = self
.process_certificates(slice::from_ref(remote_node), certificates, until_block_time)
.await?
else {
break;
};
assert!(info.next_block_height > next_height);
next_height = info.next_block_height;
last_info = info;
}
Ok(last_info)
}
async fn download_blobs(
&self,
remote_nodes: &[RemoteNode<Env::ValidatorNode>],
blob_ids: &[BlobId],
) -> Result<(), chain_client::Error> {
let blobs = &self
.requests_scheduler
.download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
.await?
.ok_or_else(|| {
chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
})?;
self.local_node.store_blobs(blobs).await.map_err(Into::into)
}
#[instrument(level = "trace", skip_all)]
async fn download_certificates_for_events(
&self,
event_ids: &[EventId],
) -> Result<(), chain_client::Error> {
let validators = self.validator_nodes().await?;
let timeout = self.options.certificate_batch_download_timeout;
let mut required_by_chain = BTreeMap::<_, BTreeMap<StreamId, u32>>::new();
for event_id in event_ids {
required_by_chain
.entry(event_id.chain_id)
.or_default()
.entry(event_id.stream_id.clone())
.and_modify(|max| *max = (*max).max(event_id.index))
.or_insert(event_id.index);
}
for (chain_id, required_streams) in required_by_chain {
let stream_ids = required_streams.keys().cloned().collect::<BTreeSet<_>>();
let stream_ids_ref = &stream_ids;
let required_ref = &required_streams;
let result = communicate_concurrently(
&validators,
move |remote_node| {
Box::pin(async move {
self.sync_events_from_node(chain_id, stream_ids_ref, &remote_node)
.await?;
let next_expected = self
.local_node
.next_expected_events(
chain_id,
stream_ids_ref.iter().cloned().collect(),
)
.await?;
if required_ref.iter().all(|(stream_id, &max_index)| {
next_expected
.get(stream_id)
.is_some_and(|index| *index > max_index)
}) {
Ok::<(), chain_client::Error>(())
} else {
Err(chain_client::Error::InternalError("missing events"))
}
})
},
|_| chain_client::Error::InternalError("missing events"),
timeout,
)
.await;
if result.is_err() {
let next_expected = self
.local_node
.next_expected_events(chain_id, stream_ids.into_iter().collect())
.await?;
let missing = event_ids
.iter()
.filter(|id| {
id.chain_id == chain_id
&& next_expected
.get(&id.stream_id)
.is_none_or(|index| *index <= id.index)
})
.cloned()
.collect();
return Err(NodeError::EventsNotFound(missing).into());
}
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn process_certificates(
&self,
remote_nodes: &[RemoteNode<Env::ValidatorNode>],
certificates: Vec<ConfirmedBlockCertificate>,
until_block_time: Option<Timestamp>,
) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
let mut info = None;
let required_blob_ids: Vec<_> = certificates
.iter()
.flat_map(|certificate| certificate.value().required_blob_ids())
.collect();
match self
.local_node
.read_blob_states_from_storage(&required_blob_ids)
.await
{
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
self.download_blobs(remote_nodes, &blob_ids).await?;
}
x => {
x?;
}
}
for certificate in certificates {
if let Some(until) = until_block_time {
if certificate.value().block().header.timestamp >= until {
break;
}
}
info = Some(
match self.handle_certificate(certificate.clone()).await {
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
self.download_blobs(remote_nodes, &blob_ids).await?;
self.handle_certificate(certificate).await?
}
x => x?,
}
.info,
);
}
Ok(info)
}
async fn handle_certificate<T: ProcessableCertificate>(
&self,
certificate: GenericCertificate<T>,
) -> Result<ChainInfoResponse, LocalNodeError> {
let chain_id = certificate.inner().chain_id();
let response = self
.local_node
.handle_certificate(certificate, &self.notifier)
.await?;
if self.is_tracked(chain_id) {
self.update_publisher_chain_modes(chain_id).await?;
}
Ok(response)
}
async fn update_publisher_chain_modes(&self, chain_id: ChainId) -> Result<(), LocalNodeError> {
let subscriptions = self.local_node.get_event_subscriptions(chain_id).await?;
let mut publishers = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
for ((publisher_id, stream_name), _) in subscriptions {
publishers
.entry(publisher_id)
.or_default()
.insert(stream_name);
}
if chain_id != self.admin_chain_id {
publishers.entry(self.admin_chain_id).or_default();
}
for (publisher_id, streams) in publishers {
if publisher_id != chain_id {
self.extend_chain_mode(publisher_id, ListeningMode::EventsOnly(streams));
}
}
Ok(())
}
async fn chain_info_with_committees(
&self,
chain_id: ChainId,
) -> Result<Box<ChainInfo>, LocalNodeError> {
let query = ChainInfoQuery::new(chain_id).with_committees();
let info = self.local_node.handle_chain_info_query(query).await?.info;
Ok(info)
}
#[instrument(level = "trace", skip_all)]
async fn admin_committees(
&self,
) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
let query = ChainInfoQuery::new(self.admin_chain_id);
let info = self.local_node.handle_chain_info_query(query).await?.info;
let max_epoch = info.epoch;
let mut committees = BTreeMap::new();
for index in 0..=max_epoch.0 {
let epoch = Epoch(index);
if let Some(committee) = self.storage_client().get_or_load_committee(epoch).await? {
committees.insert(epoch, (*committee).clone());
}
}
Ok((max_epoch, committees))
}
pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
let query = ChainInfoQuery::new(self.admin_chain_id);
let info = self.local_node.handle_chain_info_query(query).await?.info;
let committee = self
.storage_client()
.get_or_load_committee(info.epoch)
.await?
.ok_or(LocalNodeError::InactiveChain(self.admin_chain_id))?;
Ok((info.epoch, committee))
}
async fn validator_nodes(
&self,
) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
let (_, committee) = self.admin_committee().await?;
Ok(self.make_nodes(&committee)?)
}
fn make_nodes(
&self,
committee: &Committee,
) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
Ok(self
.validator_node_provider()
.make_nodes(committee)?
.map(|(public_key, node)| RemoteNode { public_key, node })
.collect())
}
pub async fn get_chain_description_blob(
&self,
chain_id: ChainId,
) -> Result<Blob, chain_client::Error> {
let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
let blob = self
.local_node
.storage_client()
.read_blob(chain_desc_id)
.await?;
if let Some(blob) = blob {
return Ok(Arc::unwrap_or_clone(blob));
}
Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
let nodes = self.validator_nodes().await?;
Ok(self
.update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
.await?
.pop()
.unwrap()) }
pub async fn get_chain_description(
&self,
chain_id: ChainId,
) -> Result<ChainDescription, chain_client::Error> {
let blob = self.get_chain_description_blob(chain_id).await?;
Ok(bcs::from_bytes(blob.bytes())?)
}
#[instrument(level = "trace", skip_all)]
async fn finalize_block(
self: &Arc<Self>,
committee: &Committee,
certificate: ValidatedBlockCertificate,
) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
debug!(round = %certificate.round, "Submitting block for confirmation");
let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
let finalize_action = CommunicateAction::FinalizeBlock {
certificate: Box::new(certificate),
delivery: self.options.cross_chain_message_delivery,
};
let certificate = self
.communicate_chain_action(committee, finalize_action, hashed_value)
.await?;
self.receive_certificate_with_checked_signatures(certificate.clone())
.await?;
Ok(certificate)
}
#[instrument(level = "trace", skip_all)]
async fn submit_block_proposal<T: ProcessableCertificate>(
self: &Arc<Self>,
committee: &Committee,
proposal: Box<BlockProposal>,
value: T,
) -> Result<GenericCertificate<T>, chain_client::Error> {
debug!(
round = %proposal.content.round,
"Submitting block proposal to validators"
);
let block_timestamp = proposal.content.block.timestamp;
let local_time = self.local_node.storage_client().clock().current_time();
if block_timestamp > local_time {
info!(
chain_id = %proposal.content.block.chain_id,
%block_timestamp,
%local_time,
"Block timestamp is in the future; waiting until it can be proposed",
);
}
let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
let submit_action = CommunicateAction::SubmitBlock {
proposal,
blob_ids: value.required_blob_ids().into_iter().collect(),
clock_skew_sender,
};
let validity_threshold = committee.validity_threshold();
let committee_clone = committee.clone();
let clock_skew_check_handle = linera_base::Task::spawn(async move {
let mut skew_weight = 0u64;
let mut min_skew = TimeDelta::MAX;
let mut max_skew = TimeDelta::ZERO;
while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
if clock_skew.as_micros() > 0 {
skew_weight += committee_clone.weight(&public_key);
min_skew = min_skew.min(clock_skew);
max_skew = max_skew.max(clock_skew);
if skew_weight >= validity_threshold {
warn!(
skew_weight,
validity_threshold,
min_skew_ms = min_skew.as_micros() / 1000,
max_skew_ms = max_skew.as_micros() / 1000,
"A validity threshold of validators reported clock skew; \
consider checking your system clock",
);
return;
}
}
}
});
let certificate = self
.communicate_chain_action(committee, submit_action, value)
.await?;
clock_skew_check_handle.await;
self.handle_certificate(certificate.clone()).await?;
Ok(certificate)
}
#[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
async fn communicate_chain_updates(
self: &Arc<Self>,
committee: &Committee,
chain_id: ChainId,
height: BlockHeight,
delivery: CrossChainMessageDelivery,
latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
) -> Result<(), chain_client::Error> {
let nodes = self.make_nodes(committee)?;
communicate_with_quorum(
&nodes,
committee,
|_: &()| (),
|remote_node| {
let mut updater = ValidatorUpdater {
remote_node,
client: self.clone(),
admin_chain_id: self.admin_chain_id,
};
let certificate = latest_certificate.clone();
Box::pin(async move {
updater
.send_chain_information(chain_id, height, delivery, certificate)
.await
})
},
self.options.quorum_grace_period,
)
.await?;
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn communicate_chain_action<T: CertificateValue>(
self: &Arc<Self>,
committee: &Committee,
action: CommunicateAction,
value: T,
) -> Result<GenericCertificate<T>, chain_client::Error> {
let nodes = self.make_nodes(committee)?;
let ((votes_hash, votes_round), votes) = communicate_with_quorum(
&nodes,
committee,
|vote: &LiteVote| (vote.value.value_hash, vote.round),
|remote_node| {
let mut updater = ValidatorUpdater {
remote_node,
client: self.clone(),
admin_chain_id: self.admin_chain_id,
};
let action = action.clone();
Box::pin(async move { updater.send_chain_update(action).await })
},
self.options.quorum_grace_period,
)
.await?;
ensure!(
(votes_hash, votes_round) == (value.hash(), action.round()),
chain_client::Error::UnexpectedQuorum {
hash: votes_hash,
round: votes_round,
expected_hash: value.hash(),
expected_round: action.round(),
}
);
let certificate = LiteCertificate::try_from_votes(votes)
.ok_or_else(|| {
chain_client::Error::InternalError(
"Vote values or rounds don't match; this is a bug",
)
})?
.with_value(value)
.ok_or_else(|| {
chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
})?;
Ok(certificate)
}
#[instrument(level = "trace", skip_all)]
async fn receive_certificate_with_checked_signatures(
&self,
certificate: ConfirmedBlockCertificate,
) -> Result<(), chain_client::Error> {
let block = certificate.block();
self.download_certificates(block.header.chain_id, block.header.height)
.await?;
if let Err(err) = self.handle_certificate(certificate.clone()).await {
match &err {
LocalNodeError::BlobsNotFound(blob_ids) => {
self.download_blobs(&self.validator_nodes().await?, blob_ids)
.await
.map_err(|_| err)?;
self.handle_certificate(certificate).await?;
}
_ => {
warn!("Failed to process network hashed certificate value");
return Err(err.into());
}
}
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn receive_sender_certificate(
&self,
certificate: ConfirmedBlockCertificate,
mode: ReceiveCertificateMode,
nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
) -> Result<(), chain_client::Error> {
let (max_epoch, committees) = self.admin_committees().await?;
if let ReceiveCertificateMode::NeedsCheck = mode {
Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
}
let nodes = if let Some(nodes) = nodes {
nodes
} else {
self.validator_nodes().await?
};
if let Err(err) = self.handle_certificate(certificate.clone()).await {
match &err {
LocalNodeError::BlobsNotFound(blob_ids) => {
self.download_blobs(&nodes, blob_ids).await?;
self.handle_certificate(certificate.clone()).await?;
}
_ => {
warn!("Failed to process network hashed certificate value");
return Err(err.into());
}
}
}
Ok(())
}
#[instrument(level = "debug", skip_all, fields(chain_id = %sender_chain_id))]
async fn download_and_process_sender_chain(
&self,
sender_chain_id: ChainId,
nodes: &[RemoteNode<Env::ValidatorNode>],
received_log: &ReceivedLogs,
mut remote_heights: Vec<BlockHeight>,
sender: mpsc::UnboundedSender<ChainAndHeight>,
) {
let (max_epoch, committees) = match self.admin_committees().await {
Ok(result) => result,
Err(error) => {
error!(%error, %sender_chain_id, "could not read admin committees");
return;
}
};
let committees_ref = &committees;
let mut nodes = nodes.to_vec();
while !remote_heights.is_empty() {
if let Ok(local_certs) = self
.storage_client()
.read_certificates_by_heights(sender_chain_id, &remote_heights)
.await
{
let mut still_needed = Vec::new();
for (height, maybe_cert) in remote_heights.iter().copied().zip(local_certs) {
if let Some(certificate) = maybe_cert {
let chain_id = certificate.block().header.chain_id;
if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
error!(
%chain_id, %height, %error,
"failed to send chain and height over the channel",
);
}
} else {
still_needed.push(height);
}
}
remote_heights = still_needed;
if remote_heights.is_empty() {
break;
}
}
let remote_heights_ref = &remote_heights;
let certificates = match communicate_concurrently(
&nodes,
async move |remote_node| {
let mut remote_heights = remote_heights_ref.clone();
remote_heights.retain(|height| {
received_log.validator_has_block(
&remote_node.public_key,
sender_chain_id,
*height,
)
});
if remote_heights.is_empty() {
return Err(NodeError::MissingCertificateValue);
}
let certificates = self
.requests_scheduler
.download_certificates_by_heights(
&remote_node,
sender_chain_id,
remote_heights,
)
.await?;
let mut certificates_with_check_results = vec![];
for cert in certificates {
let check_result =
Self::check_certificate(max_epoch, committees_ref, &cert)?;
certificates_with_check_results
.push((cert, check_result.into_result().is_ok()));
}
Ok(certificates_with_check_results)
},
|errors| {
errors
.into_iter()
.map(|(validator, _error)| validator)
.collect::<BTreeSet<_>>()
},
self.options.certificate_batch_download_timeout,
)
.await
{
Ok(certificates_with_check_results) => certificates_with_check_results,
Err(faulty_validators) => {
nodes.retain(|node| !faulty_validators.contains(&node.public_key));
if nodes.is_empty() {
info!(
chain_id = %sender_chain_id,
"could not download certificates for chain - no more correct validators left"
);
return;
}
continue;
}
};
trace!(
num_certificates = %certificates.len(),
"received certificates",
);
let mut to_remove_from_queue = BTreeSet::new();
for (certificate, check_result) in certificates {
let hash = certificate.hash();
let chain_id = certificate.block().header.chain_id;
let height = certificate.block().header.height;
if !check_result {
to_remove_from_queue.insert(height);
continue;
}
let mode = ReceiveCertificateMode::AlreadyChecked;
if let Err(error) = self
.receive_sender_certificate(certificate, mode, None)
.await
{
warn!(%error, %hash, "Received invalid certificate");
} else {
to_remove_from_queue.insert(height);
if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
error!(
%chain_id,
%height,
%error,
"failed to send chain and height over the channel",
);
}
}
}
remote_heights.retain(|height| !to_remove_from_queue.contains(height));
}
trace!("find_received_certificates: finished processing chain");
}
#[instrument(level = "trace", skip(self))]
async fn get_received_log_from_validator(
&self,
chain_id: ChainId,
remote_node: &RemoteNode<Env::ValidatorNode>,
tracker: u64,
) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
let mut offset = tracker;
let mut remote_log = Vec::new();
loop {
trace!("get_received_log_from_validator: looping");
let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
let info = remote_node.handle_chain_info_query(query).await?;
let received_entries = info.requested_received_log.len();
offset += received_entries as u64;
remote_log.extend(info.requested_received_log);
trace!(
remote_node = remote_node.address(),
%received_entries,
"get_received_log_from_validator: received log batch",
);
if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
break;
}
}
trace!(
remote_node = remote_node.address(),
num_entries = remote_log.len(),
"get_received_log_from_validator: returning downloaded log",
);
Ok(remote_log)
}
async fn download_sender_block_with_sending_ancestors(
&self,
receiver_chain_id: ChainId,
sender_chain_id: ChainId,
height: BlockHeight,
remote_node: &RemoteNode<Env::ValidatorNode>,
) -> Result<(), chain_client::Error> {
let next_outbox_height = self
.local_node
.next_outbox_heights(&[sender_chain_id], receiver_chain_id)
.await?
.get(&sender_chain_id)
.copied()
.unwrap_or(BlockHeight::ZERO);
let (max_epoch, committees) = self.admin_committees().await?;
let mut certificates = BTreeMap::new();
let mut current_height = height;
let mut current_hash: Option<CryptoHash> = None;
while current_height >= next_outbox_height {
let certificate = if let Some(local) = self
.try_read_local_certificate(sender_chain_id, current_height, current_hash)
.await?
{
Arc::unwrap_or_clone(local)
} else {
let downloaded = self
.requests_scheduler
.download_certificates_by_heights(
remote_node,
sender_chain_id,
vec![current_height],
)
.await?;
let Some(certificate) = downloaded.into_iter().next() else {
return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
chain_id: sender_chain_id,
height: current_height,
});
};
certificate
};
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
.into_result()?;
let block = certificate.block();
let next = block
.body
.previous_message_blocks
.get(&receiver_chain_id)
.map(|(prev_hash, prev_height)| (*prev_hash, *prev_height));
certificates.insert(current_height, certificate);
if let Some((prev_hash, prev_height)) = next {
current_height = prev_height;
current_hash = Some(prev_hash);
} else {
break;
}
}
if certificates.is_empty() {
self.retry_pending_cross_chain_requests(sender_chain_id)
.await?;
}
for certificate in certificates.into_values() {
self.receive_sender_certificate(
certificate,
ReceiveCertificateMode::AlreadyChecked,
Some(vec![remote_node.clone()]),
)
.await?;
}
Ok(())
}
async fn download_event_bearing_blocks(
&self,
publisher_chain_id: ChainId,
initial_blocks: BTreeSet<(BlockHeight, CryptoHash)>,
local_next_block_height: BlockHeight,
subscribed_streams: &BTreeSet<StreamId>,
remote_node: &RemoteNode<Env::ValidatorNode>,
) -> Result<(), chain_client::Error> {
if initial_blocks.is_empty() {
return Ok(());
}
let (max_epoch, committees) = self.admin_committees().await?;
let mut certificates = BTreeMap::new();
let mut blocks_to_fetch = initial_blocks;
let next_expected_events = self
.local_node
.next_expected_events(
publisher_chain_id,
subscribed_streams.iter().cloned().collect(),
)
.await?;
while let Some((current_height, current_hash)) = blocks_to_fetch.pop_last() {
if current_height < local_next_block_height {
continue; }
if certificates.contains_key(¤t_height) {
continue;
}
let certificate = if let Some(certificate) =
self.storage_client().read_certificate(current_hash).await?
{
Arc::unwrap_or_clone(certificate)
} else {
let downloaded = self
.requests_scheduler
.download_certificates(remote_node, publisher_chain_id, current_height, 1)
.await?;
let Some(certificate) = downloaded.into_iter().next() else {
tracing::debug!(
validator = remote_node.address(),
%publisher_chain_id,
height = %current_height,
"failed to download event publisher block"
);
continue;
};
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
.into_result()?;
certificate
};
let block = certificate.block();
for stream_id in subscribed_streams {
if let Some((prev_hash, prev_height)) =
block.body.previous_event_blocks.get(stream_id)
{
if next_expected_events.get(stream_id).is_some_and(|index| {
block
.body
.events
.iter()
.flatten()
.find(|event| event.stream_id == *stream_id)
.is_some_and(|event| event.index == *index)
}) {
continue;
}
if !certificates.contains_key(prev_height) {
blocks_to_fetch.insert((*prev_height, *prev_hash));
}
}
}
certificates.insert(current_height, certificate);
}
for certificate in certificates.into_values() {
self.receive_sender_certificate(
certificate,
ReceiveCertificateMode::AlreadyChecked,
Some(vec![remote_node.clone()]),
)
.await?;
}
Ok(())
}
async fn sync_events_from_node(
&self,
chain_id: ChainId,
stream_ids: &BTreeSet<StreamId>,
remote_node: &RemoteNode<Env::ValidatorNode>,
) -> Result<(), chain_client::Error> {
let stream_ids_vec: Vec<_> = stream_ids.iter().cloned().collect();
let mut initial_blocks = BTreeSet::new();
for chunk in stream_ids_vec.chunks(self.options.max_event_stream_queries) {
let previous_blocks = remote_node
.node
.previous_event_blocks(chain_id, chunk.to_vec())
.await?;
initial_blocks.extend(previous_blocks.values().copied());
}
let local_height = match self.local_node.chain_info(chain_id).await {
Ok(info) => info.next_block_height,
Err(LocalNodeError::InactiveChain(_) | LocalNodeError::BlobsNotFound(_)) => {
BlockHeight::ZERO
}
Err(error) => return Err(error.into()),
};
self.download_event_bearing_blocks(
chain_id,
initial_blocks,
local_height,
stream_ids,
remote_node,
)
.await
}
#[instrument(
level = "trace", skip_all,
fields(certificate_hash = ?incoming_certificate.hash()),
)]
fn check_certificate(
highest_known_epoch: Epoch,
committees: &BTreeMap<Epoch, Committee>,
incoming_certificate: &ConfirmedBlockCertificate,
) -> Result<CheckCertificateResult, NodeError> {
let block = incoming_certificate.block();
if block.header.epoch > highest_known_epoch {
return Ok(CheckCertificateResult::FutureEpoch);
}
if let Some(known_committee) = committees.get(&block.header.epoch) {
incoming_certificate.check(known_committee)?;
Ok(CheckCertificateResult::New)
} else {
Ok(CheckCertificateResult::OldEpoch)
}
}
#[instrument(level = "trace", skip_all)]
pub(crate) async fn synchronize_chain_state(
&self,
chain_id: ChainId,
) -> Result<Box<ChainInfo>, chain_client::Error> {
let (_, committee) = self.admin_committee().await?;
Box::pin(self.synchronize_chain_state_from_committee(chain_id, committee)).await
}
#[instrument(level = "trace", skip_all)]
pub async fn synchronize_chain_state_from_committee(
&self,
chain_id: ChainId,
committee: Arc<Committee>,
) -> Result<Box<ChainInfo>, chain_client::Error> {
#[cfg(with_metrics)]
let _latency = if !self.is_chain_follow_only(chain_id).await {
Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
} else {
None
};
let validators = self.make_nodes(&committee)?;
Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
communicate_with_quorum(
&validators,
&committee,
|_: &()| (),
|remote_node| async move {
self.synchronize_chain_state_from(&remote_node, chain_id)
.await
},
self.options.quorum_grace_period,
)
.await?;
self.local_node
.chain_info(chain_id)
.await
.map_err(Into::into)
}
#[instrument(level = "trace", skip(self, remote_node, chain_id))]
pub(crate) async fn synchronize_chain_state_from(
&self,
remote_node: &RemoteNode<Env::ValidatorNode>,
chain_id: ChainId,
) -> Result<(), chain_client::Error> {
let with_manager_values = !self.is_chain_follow_only(chain_id).await;
let query = if with_manager_values {
ChainInfoQuery::new(chain_id).with_manager_values()
} else {
ChainInfoQuery::new(chain_id)
};
let remote_info = remote_node.handle_chain_info_query(query).await?;
let local_info = self
.download_certificates_from(remote_node, chain_id, remote_info.next_block_height, None)
.await?;
if !with_manager_values {
return Ok(());
}
let local_height = local_info.next_block_height;
if local_height != remote_info.next_block_height {
debug!(
remote_node = remote_node.address(),
remote_height = %remote_info.next_block_height,
local_height = %local_height,
"synced from validator, but remote height and local height are different",
);
return Ok(());
};
if let Some(timeout) = remote_info.manager.timeout {
self.handle_certificate(*timeout).await?;
}
let mut proposals = Vec::new();
if let Some(proposal) = remote_info.manager.requested_signed_proposal {
proposals.push(*proposal);
}
if let Some(proposal) = remote_info.manager.requested_proposed {
proposals.push(*proposal);
}
if let Some(locking) = remote_info.manager.requested_locking {
match *locking {
LockingBlock::Fast(proposal) => {
proposals.push(proposal);
}
LockingBlock::Regular(cert) => {
let hash = cert.hash();
if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
{
debug!(
remote_node = remote_node.address(),
%hash,
height = %local_height,
%error,
"skipping locked block from validator",
);
}
}
}
}
'proposal_loop: for proposal in proposals {
let owner: AccountOwner = proposal.owner();
if let Err(mut err) =
Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
{
if let LocalNodeError::BlobsNotFound(_) = &err {
let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
if !required_blob_ids.is_empty() {
let mut blobs = Vec::new();
for blob_id in required_blob_ids {
let blob_content = match self
.requests_scheduler
.download_pending_blob(remote_node, chain_id, blob_id)
.await
{
Ok(content) => content,
Err(error) => {
info!(
remote_node = remote_node.address(),
height = %local_height,
proposer = %owner,
%blob_id,
%error,
"skipping proposal from validator; failed to download blob",
);
continue 'proposal_loop;
}
};
blobs.push(Blob::new(blob_content));
}
self.local_node
.handle_pending_blobs(chain_id, blobs)
.await?;
if let Err(new_err) =
Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
{
err = new_err;
} else {
continue;
}
}
if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
self.update_local_node_with_blobs_from(
blob_ids.clone(),
slice::from_ref(remote_node),
)
.await?;
if let Err(new_err) =
Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
{
err = new_err;
} else {
continue;
}
}
}
while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
if let ChainError::MissingCrossChainUpdate {
chain_id,
origin,
height,
} = &**chain_err
{
self.download_sender_block_with_sending_ancestors(
*chain_id,
*origin,
*height,
remote_node,
)
.await?;
if let Err(new_err) =
Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
{
err = new_err;
} else {
continue 'proposal_loop;
}
} else {
break;
}
}
debug!(
remote_node = remote_node.address(),
proposer = %owner,
height = %local_height,
error = %err,
"skipping proposal from validator",
);
}
}
Ok(())
}
async fn try_process_locking_block_from(
&self,
remote_node: &RemoteNode<Env::ValidatorNode>,
certificate: GenericCertificate<ValidatedBlock>,
) -> Result<(), chain_client::Error> {
let chain_id = certificate.inner().chain_id();
match self.handle_certificate(certificate.clone()).await {
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
let mut blobs = Vec::new();
for blob_id in blob_ids {
let blob_content = self
.requests_scheduler
.download_pending_blob(remote_node, chain_id, blob_id)
.await?;
blobs.push(Blob::new(blob_content));
}
self.local_node
.handle_pending_blobs(chain_id, blobs)
.await?;
self.handle_certificate(certificate).await?;
Ok(())
}
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}
}
async fn update_local_node_with_blobs_from(
&self,
blob_ids: Vec<BlobId>,
remote_nodes: &[RemoteNode<Env::ValidatorNode>],
) -> Result<Vec<Blob>, chain_client::Error> {
let timeout = self.options.blob_download_timeout;
let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
stream::iter(blob_ids.into_iter().map(|blob_id| {
communicate_concurrently(
remote_nodes,
async move |remote_node| {
let certificate = self
.requests_scheduler
.download_certificate_for_blob(&remote_node, blob_id)
.await?;
self.receive_sender_certificate(
certificate,
ReceiveCertificateMode::NeedsCheck,
Some(vec![remote_node.clone()]),
)
.await?;
let blob = self
.local_node
.storage_client()
.read_blob(blob_id)
.await?
.map(Arc::unwrap_or_clone)
.ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
Result::<_, chain_client::Error>::Ok(blob)
},
move |_| chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id])),
timeout,
)
}))
.buffer_unordered(self.options.max_joined_tasks)
.collect::<Vec<_>>()
.await
.into_iter()
.collect()
}
#[instrument(level = "trace", skip(self, block))]
async fn stage_block_execution(
&self,
block: ProposedBlock,
round: Option<u32>,
published_blobs: Vec<Blob>,
policy: BundleExecutionPolicy,
) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
let mut downloaded_events = HashSet::<EventId>::new();
loop {
let result = self
.local_node
.stage_block_execution(block.clone(), round, published_blobs.clone(), policy)
.await;
if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
let validators = self.validator_nodes().await?;
self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
.await?;
continue; }
if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
let new_events = event_ids
.iter()
.filter(|id| !downloaded_events.contains(id))
.cloned()
.collect::<Vec<_>>();
if !new_events.is_empty() {
Box::pin(self.download_certificates_for_events(&new_events)).await?;
downloaded_events.extend(new_events);
continue; }
}
if let Ok((_, executed_block, _, _)) = &result {
let hash = CryptoHash::new(executed_block);
let notification = Notification {
chain_id: executed_block.header.chain_id,
reason: Reason::BlockExecuted {
height: executed_block.header.height,
hash,
},
};
self.notifier.notify(&[notification]);
}
let (_modified_block, executed_block, response, _resource_tracker) = result?;
return Ok((executed_block, response));
}
}
}
async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
nodes: &[RemoteNode<A>],
f: F,
err: G,
timeout: Duration,
) -> Result<V, E2>
where
F: Clone + FnOnce(RemoteNode<A>) -> R,
RemoteNode<A>: Clone,
G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
R: Future<Output = Result<V, E1>> + 'a,
{
let mut nodes = nodes.to_vec();
nodes.shuffle(&mut rand::thread_rng());
let mut stream = nodes
.iter()
.zip(0..)
.map(|(remote_node, i)| {
let fun = f.clone();
let node = remote_node.clone();
async move {
linera_base::time::timer::sleep(timeout * i * i).await;
fun(node).await.map_err(|err| (remote_node.public_key, err))
}
})
.collect::<FuturesUnordered<_>>();
let mut errors = vec![];
while let Some(maybe_result) = stream.next().await {
match maybe_result {
Ok(result) => return Ok(result),
Err(error) => errors.push(error),
};
}
Err(err(errors))
}
#[must_use]
pub struct AbortOnDrop(pub AbortHandle);
impl Drop for AbortOnDrop {
#[instrument(level = "trace", skip(self))]
fn drop(&mut self) {
self.0.abort();
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct PendingProposal {
pub block: ProposedBlock,
pub blobs: Vec<Blob>,
}
enum ReceiveCertificateMode {
NeedsCheck,
AlreadyChecked,
}
enum CheckCertificateResult {
OldEpoch,
New,
FutureEpoch,
}
impl CheckCertificateResult {
fn into_result(self) -> Result<(), chain_client::Error> {
match self {
Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
Self::New => Ok(()),
Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn create_bytecode_blobs(
contract: Bytecode,
service: Bytecode,
vm_runtime: VmRuntime,
) -> (Vec<Blob>, ModuleId) {
match vm_runtime {
VmRuntime::Wasm => {
let (compressed_contract, compressed_service) =
tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
.await
.expect("Compression should not panic");
let contract_blob = Blob::new_contract_bytecode(compressed_contract);
let service_blob = Blob::new_service_bytecode(compressed_service);
let module_id =
ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
(vec![contract_blob, service_blob], module_id)
}
VmRuntime::Evm => {
let compressed_contract = contract.compress();
let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
let module_id = ModuleId::new(
evm_contract_blob.id().hash,
evm_contract_blob.id().hash,
vm_runtime,
);
(vec![evm_contract_blob], module_id)
}
}
}