mod state;
use std::{
collections::{hash_map, BTreeMap, BTreeSet, HashMap},
convert::Infallible,
iter,
sync::Arc,
};
use custom_debug_derive::Debug;
use futures::{
future::{self, Either, FusedFuture, Future},
stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
};
#[cfg(with_metrics)]
use linera_base::prometheus_util::MeasureLatency as _;
use linera_base::{
abi::Abi,
crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
data_types::{
Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
ChainDescription, Epoch, MessagePolicy, Round, Timestamp,
},
ensure,
identifiers::{
Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
ModuleId, StreamId,
},
ownership::{ChainOwnership, TimeoutConfig},
time::{Duration, Instant},
};
#[cfg(not(target_arch = "wasm32"))]
use linera_base::{data_types::Bytecode, vm::VmRuntime};
use linera_chain::{
data_types::{
BlockProposal, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight, IncomingBundle,
ProposedBlock, Transaction,
},
manager::LockingBlock,
types::{
Block, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, Timeout,
TimeoutCertificate, ValidatedBlock,
},
ChainError, ChainExecutionContext,
};
use linera_execution::{
committee::Committee,
system::{
AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
REMOVED_EPOCH_STREAM_NAME,
},
ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
};
use linera_storage::{Clock as _, Storage as _};
use linera_views::ViewError;
use serde::Serialize;
pub(crate) use state::State;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
#[cfg(not(target_arch = "wasm32"))]
use super::create_bytecode_blobs;
use super::{
received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
ListeningMode, PendingProposal, TimingType,
};
use crate::{
data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
environment::Environment,
local_node::{LocalNodeClient, LocalNodeError},
node::{
CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
ValidatorNodeProvider as _,
},
remote_node::RemoteNode,
updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
worker::{Notification, Reason, WorkerError},
};
#[derive(Debug, Clone)]
pub struct Options {
pub max_pending_message_bundles: usize,
pub max_block_limit_errors: u32,
pub max_new_events_per_block: usize,
pub staging_bundles_time_budget: Option<Duration>,
pub message_policy: MessagePolicy,
pub cross_chain_message_delivery: CrossChainMessageDelivery,
pub quorum_grace_period: f64,
pub blob_download_timeout: Duration,
pub certificate_batch_download_timeout: Duration,
pub certificate_download_batch_size: u64,
pub certificate_upload_batch_size: u64,
pub sender_certificate_download_batch_size: usize,
pub max_joined_tasks: usize,
pub allow_fast_blocks: bool,
pub notification_circuit_breaker_initial_probe_interval: Duration,
pub notification_circuit_breaker_max_probe_interval: Duration,
pub max_event_stream_queries: usize,
}
struct CircuitBreakerState {
next_probe_at: Instant,
probe_interval: Duration,
}
#[cfg(with_testing)]
impl Options {
pub fn test_default() -> Self {
use super::{
DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
DEFAULT_MAX_EVENT_STREAM_QUERIES, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
};
use crate::DEFAULT_QUORUM_GRACE_PERIOD;
Options {
max_pending_message_bundles: 10,
max_block_limit_errors: 3,
max_new_events_per_block: 10,
staging_bundles_time_budget: None,
message_policy: MessagePolicy::new_accept_all(),
cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
blob_download_timeout: Duration::from_secs(1),
certificate_batch_download_timeout: Duration::from_secs(1),
certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
certificate_upload_batch_size: DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
max_joined_tasks: 100,
allow_fast_blocks: false,
notification_circuit_breaker_initial_probe_interval: Duration::from_secs(300),
notification_circuit_breaker_max_probe_interval: Duration::from_secs(3600),
max_event_stream_queries: DEFAULT_MAX_EVENT_STREAM_QUERIES,
}
}
}
impl Options {
pub fn bundle_execution_policy(&self) -> BundleExecutionPolicy {
BundleExecutionPolicy {
on_failure: BundleFailurePolicy::AutoRetry {
max_failures: self.max_block_limit_errors,
},
time_budget: self.staging_bundles_time_budget,
}
}
}
#[derive(Debug)]
pub struct ChainClient<Env: Environment> {
#[debug(skip)]
pub(crate) client: Arc<Client<Env>>,
chain_id: ChainId,
#[debug(skip)]
options: Options,
preferred_owner: Option<AccountOwner>,
initial_next_block_height: BlockHeight,
initial_block_hash: Option<CryptoHash>,
timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
}
impl<Env: Environment> Clone for ChainClient<Env> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
chain_id: self.chain_id,
options: self.options.clone(),
preferred_owner: self.preferred_owner,
initial_next_block_height: self.initial_next_block_height,
initial_block_hash: self.initial_block_hash,
timing_sender: self.timing_sender.clone(),
}
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("Local node operation failed: {0}")]
LocalNodeError(#[from] LocalNodeError),
#[error("Remote node operation failed: {0}")]
RemoteNodeError(#[from] NodeError),
#[error(transparent)]
ArithmeticError(#[from] ArithmeticError),
#[error("Missing certificates: {0:?}")]
ReadCertificatesError(Vec<CryptoHash>),
#[error("Missing confirmed block: {0:?}")]
MissingConfirmedBlock(CryptoHash),
#[error("JSON (de)serialization error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Chain operation failed: {0}")]
ChainError(#[from] ChainError),
#[error(transparent)]
CommunicationError(#[from] CommunicationError<NodeError>),
#[error("Internal error within chain client: {0}")]
InternalError(&'static str),
#[error(
"Cannot accept a certificate from an unknown committee in the future. \
Please synchronize the local view of the admin chain"
)]
CommitteeSynchronizationError,
#[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
WalletSynchronizationError,
#[error("The state of the client is incompatible with the proposed block: {0}")]
BlockProposalError(&'static str),
#[error(
"Cannot accept a certificate from a committee that was retired. \
Try a newer certificate from the same origin"
)]
CommitteeDeprecationError,
#[error("Protocol error within chain client: {0}")]
ProtocolError(&'static str),
#[error("Signer doesn't have key to sign for chain {0}")]
CannotFindKeyForChain(ChainId),
#[error("client is not configured to propose on chain {0}")]
NoAccountKeyConfigured(ChainId),
#[error("The chain client isn't owner on chain {0}")]
NotAnOwner(ChainId),
#[error(transparent)]
ViewError(#[from] ViewError),
#[error(
"Failed to download certificates and update local node to the next height \
{target_next_block_height} of chain {chain_id}"
)]
CannotDownloadCertificates {
chain_id: ChainId,
target_next_block_height: BlockHeight,
},
#[error(transparent)]
BcsError(#[from] bcs::Error),
#[error(
"Unexpected quorum: validators voted for block hash {hash} in {round}, \
expected block hash {expected_hash} in {expected_round}"
)]
UnexpectedQuorum {
hash: CryptoHash,
round: Round,
expected_hash: CryptoHash,
expected_round: Round,
},
#[error("signer error: {0:?}")]
Signer(#[source] Box<dyn signer::Error>),
#[error("Cannot revoke the current epoch {0}")]
CannotRevokeCurrentEpoch(Epoch),
#[error("Epoch is already revoked")]
EpochAlreadyRevoked,
#[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
CannotDownloadMissingSenderBlock {
chain_id: ChainId,
height: BlockHeight,
},
#[error(
"A different block was already committed at this height. \
The committed certificate hash is {0}"
)]
Conflict(CryptoHash),
}
impl From<Infallible> for Error {
fn from(infallible: Infallible) -> Self {
match infallible {}
}
}
impl Error {
pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
Self::Signer(Box::new(err))
}
}
impl<Env: Environment> ChainClient<Env> {
#[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
pub(crate) fn new(
client: Arc<Client<Env>>,
chain_id: ChainId,
options: Options,
initial_block_hash: Option<CryptoHash>,
initial_next_block_height: BlockHeight,
preferred_owner: Option<AccountOwner>,
timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
) -> Self {
ChainClient {
client,
chain_id,
options,
preferred_owner,
initial_block_hash,
initial_next_block_height,
timing_sender,
}
}
#[instrument(level = "trace", skip(self))]
pub fn is_follow_only(&self) -> bool {
self.client
.chain_mode(self.chain_id)
.is_none_or(|mode| mode.is_follow_only())
}
#[instrument(level = "trace", skip(self))]
fn proposal_mutex(&self) -> Arc<tokio::sync::Mutex<Option<PendingProposal>>> {
self.client
.chains
.pin()
.get(&self.chain_id)
.expect("Chain client constructed for invalid chain")
.proposal_mutex()
}
#[instrument(level = "trace", skip(self))]
pub async fn pending_proposal(&self) -> Option<PendingProposal> {
self.proposal_mutex().lock().await.clone()
}
#[instrument(level = "trace", skip(self))]
pub fn signer(&self) -> &impl Signer {
self.client.signer()
}
pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, Error> {
self.signer()
.contains_key(owner)
.await
.map_err(Error::signer_failure)
}
#[instrument(level = "trace", skip(self))]
pub fn options_mut(&mut self) -> &mut Options {
&mut self.options
}
#[instrument(level = "trace", skip(self))]
pub fn options(&self) -> &Options {
&self.options
}
#[instrument(level = "trace", skip(self))]
pub fn chain_id(&self) -> ChainId {
self.chain_id
}
pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
self.timing_sender.clone()
}
#[instrument(level = "trace", skip(self))]
pub fn admin_chain_id(&self) -> ChainId {
self.client.admin_chain_id
}
#[instrument(level = "trace", skip(self))]
pub fn preferred_owner(&self) -> Option<AccountOwner> {
self.preferred_owner
}
#[instrument(level = "trace", skip(self))]
pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
self.preferred_owner = Some(preferred_owner);
}
#[instrument(level = "trace", skip(self))]
pub fn unset_preferred_owner(&mut self) {
self.preferred_owner = None;
}
#[instrument(level = "trace")]
pub async fn chain_state_view(
&self,
) -> Result<crate::worker::ChainStateViewReadGuard<Env::Storage>, LocalNodeError> {
self.client.local_node.chain_state_view(self.chain_id).await
}
#[instrument(level = "trace", skip(self))]
pub async fn event_stream_publishers(
&self,
) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
let subscriptions = self
.client
.local_node
.get_event_subscriptions(self.chain_id)
.await?;
let mut publishers = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
for ((chain_id, stream_name), _) in subscriptions {
publishers.entry(chain_id).or_default().insert(stream_name);
}
if self.chain_id != self.client.admin_chain_id {
publishers.entry(self.client.admin_chain_id).or_default();
}
Ok(publishers)
}
#[instrument(level = "trace")]
pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
self.subscribe_to(self.chain_id)
}
#[instrument(level = "trace")]
pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
Ok(Box::pin(UnboundedReceiverStream::new(
self.client.notifier.subscribe(vec![chain_id]),
)))
}
#[instrument(level = "trace")]
pub fn storage_client(&self) -> &Env::Storage {
self.client.storage_client()
}
#[instrument(level = "trace")]
pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
let query = ChainInfoQuery::new(self.chain_id);
let response = self
.client
.local_node
.handle_chain_info_query(query)
.await?;
Ok(response.info)
}
#[instrument(level = "trace")]
pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
let response = self
.client
.local_node
.handle_chain_info_query(query)
.await?;
Ok(response.info)
}
pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
self.client.get_chain_description(self.chain_id).await
}
#[instrument(level = "trace")]
async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
if self.options.message_policy.is_ignore() {
return Ok(Vec::new());
}
let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
let info = self
.client
.local_node
.handle_chain_info_query(query)
.await?
.info;
if self.preferred_owner.is_some_and(|owner| {
info.manager
.ownership
.is_super_owner_no_regular_owners(&owner)
}) {
ensure!(
info.next_block_height >= self.initial_next_block_height,
Error::WalletSynchronizationError
);
}
Ok(info
.requested_pending_message_bundles
.into_iter()
.filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
.take(self.options.max_pending_message_bundles)
.collect())
}
#[instrument(level = "trace")]
async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
let subscription_map = self
.client
.local_node
.get_event_subscriptions(self.chain_id)
.await?;
let futures = subscription_map
.into_iter()
.filter(|((chain_id, _), _)| {
self.options
.message_policy
.restrict_chain_ids_to
.as_ref()
.is_none_or(|chain_set| chain_set.contains(chain_id))
})
.filter(|((_, stream_id), _)| {
self.options
.message_policy
.process_events_from_application_ids
.as_ref()
.is_none_or(|app_set| app_set.contains(&stream_id.application_id))
})
.map(|((chain_id, stream_id), subscriptions)| {
let client = self.client.clone();
let previous_index = subscriptions.next_index;
async move {
let next_index = client
.local_node
.get_stream_event_count(chain_id, stream_id.clone())
.await?;
if let Some(next_index) =
next_index.filter(|next_index| *next_index > previous_index)
{
Ok(Some((chain_id, stream_id, previous_index, next_index)))
} else {
Ok::<_, Error>(None)
}
}
});
let all_updates = futures::stream::iter(futures)
.buffer_unordered(self.options.max_joined_tasks)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten()
.collect::<Vec<_>>();
let max_events = self.options.max_new_events_per_block;
let mut total_events: usize = 0;
let mut updates = Vec::new();
for (chain_id, stream_id, previous_index, next_index) in all_updates {
let new_events = (next_index - previous_index) as usize;
if total_events + new_events <= max_events {
total_events += new_events;
updates.push((chain_id, stream_id, next_index));
} else {
let remaining = max_events.saturating_sub(total_events);
if remaining > 0 {
updates.push((chain_id, stream_id, previous_index + remaining as u32));
}
break;
}
}
if updates.is_empty() {
return Ok(None);
}
Ok(Some(SystemOperation::UpdateStreams(updates).into()))
}
#[instrument(level = "trace")]
async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
self.client.chain_info_with_committees(self.chain_id).await
}
#[instrument(level = "trace")]
async fn epoch_and_committees(
&self,
) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
let info = self.chain_info_with_committees().await?;
let committees = info
.requested_committees
.ok_or(LocalNodeError::InvalidChainInfoResponse)?;
Ok((info.epoch, committees))
}
#[instrument(level = "trace")]
pub async fn local_committee(&self) -> Result<Arc<Committee>, Error> {
let info = match self.chain_info().await {
Ok(info) => info,
Err(LocalNodeError::BlobsNotFound(_)) => {
self.synchronize_chain_state(self.chain_id).await?;
self.chain_info().await?
}
Err(LocalNodeError::EventsNotFound(event_ids))
if event_ids
.iter()
.all(|event_id| event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)) =>
{
self.synchronize_chain_state(self.client.admin_chain_id)
.await?;
self.chain_info().await?
}
Err(err) => return Err(err.into()),
};
let committee = self
.client
.storage_client()
.get_or_load_committee(info.epoch)
.await?
.ok_or_else(|| LocalNodeError::InactiveChain(self.chain_id))?;
Ok(committee)
}
#[instrument(level = "trace")]
pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
self.client.admin_committee().await
}
#[instrument(level = "trace")]
pub async fn identity(&self) -> Result<AccountOwner, Error> {
let Some(preferred_owner) = self.preferred_owner else {
return Err(Error::NoAccountKeyConfigured(self.chain_id));
};
let manager = self.chain_info().await?.manager;
ensure!(
manager.ownership.is_active(),
LocalNodeError::InactiveChain(self.chain_id)
);
let is_owner = manager
.ownership
.can_propose_in_multi_leader_round(&preferred_owner);
if !is_owner {
let accepted_owners = manager
.ownership
.all_owners()
.chain(&manager.leader)
.collect::<Vec<_>>();
warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
"The preferred owner is not configured as an owner of this chain",
);
return Err(Error::NotAnOwner(self.chain_id));
}
let has_signer = self
.signer()
.contains_key(&preferred_owner)
.await
.map_err(Error::signer_failure)?;
if !has_signer {
warn!(%self.chain_id, ?preferred_owner,
"Chain is one of the owners but its Signer instance doesn't contain the key",
);
return Err(Error::CannotFindKeyForChain(self.chain_id));
}
Ok(preferred_owner)
}
#[instrument(level = "trace")]
pub async fn prepare_for_owner(&self, owner: AccountOwner) -> Result<Box<ChainInfo>, Error> {
ensure!(
self.client.has_key_for(&owner).await?,
Error::CannotFindKeyForChain(self.chain_id)
);
self.client
.get_chain_description_blob(self.chain_id)
.await?;
let info = self.chain_info().await?;
ensure!(
info.manager
.ownership
.can_propose_in_multi_leader_round(&owner),
Error::NotAnOwner(self.chain_id)
);
Ok(info)
}
#[instrument(level = "trace")]
pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
#[cfg(with_metrics)]
let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
let mut info = self.synchronize_to_known_height().await?;
if self.preferred_owner.is_none_or(|owner| {
!info
.manager
.ownership
.is_super_owner_no_regular_owners(&owner)
}) {
info = self.client.synchronize_chain_state(self.chain_id).await?;
}
if info.epoch > self.client.admin_committees().await?.0 {
self.client
.synchronize_chain_state(self.client.admin_chain_id)
.await?;
}
Ok(info)
}
async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
let info = self
.client
.download_certificates(self.chain_id, self.initial_next_block_height)
.await?;
if info.next_block_height == self.initial_next_block_height {
ensure!(
self.initial_block_hash == info.block_hash,
Error::InternalError("Invalid chain of blocks in local node")
);
}
Ok(info)
}
#[instrument(level = "trace", skip(old_committee, latest_certificate))]
pub async fn update_validators(
&self,
old_committee: Option<&Committee>,
latest_certificate: Option<ConfirmedBlockCertificate>,
) -> Result<(), Error> {
let update_validators_start = linera_base::time::Instant::now();
if let Some(old_committee) = old_committee {
let old_committee_start = linera_base::time::Instant::now();
self.communicate_chain_updates(old_committee, latest_certificate.clone())
.await?;
tracing::debug!(
old_committee_ms = old_committee_start.elapsed().as_millis(),
"communicated chain updates to old committee"
);
};
if let Ok(new_committee) = self.local_committee().await {
if Some(&*new_committee) != old_committee {
let new_committee_start = linera_base::time::Instant::now();
self.communicate_chain_updates(&new_committee, latest_certificate)
.await?;
tracing::debug!(
new_committee_ms = new_committee_start.elapsed().as_millis(),
"communicated chain updates to new committee"
);
}
}
self.send_timing(update_validators_start, TimingType::UpdateValidators);
Ok(())
}
#[instrument(level = "trace", skip(committee, latest_certificate))]
pub async fn communicate_chain_updates(
&self,
committee: &Committee,
latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
) -> Result<(), Error> {
let delivery = self.options.cross_chain_message_delivery;
let height = self.chain_info().await?.next_block_height;
self.client
.communicate_chain_updates(
committee,
self.chain_id,
height,
delivery,
latest_certificate,
)
.await
}
async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
let subscriptions = self
.client
.local_node
.get_event_subscriptions(self.chain_id)
.await?;
let mut streams_by_chain = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
for ((chain_id, stream_id), _) in &subscriptions {
if *chain_id != self.chain_id {
streams_by_chain
.entry(*chain_id)
.or_default()
.insert(stream_id.clone());
}
}
let admin_chain_id = self.client.admin_chain_id;
if admin_chain_id != self.chain_id {
self.client.synchronize_chain_state(admin_chain_id).await?;
}
let (_, committee) = self.admin_committee().await?;
let nodes = self.client.make_nodes(&committee)?;
let tasks = streams_by_chain
.into_iter()
.filter(|(chain_id, _)| *chain_id != admin_chain_id)
.map(|(chain_id, stream_ids)| {
self.sync_publisher_chain_events(chain_id, stream_ids, &nodes, &committee)
})
.collect::<Vec<_>>();
stream::iter(tasks)
.buffer_unordered(self.options.max_joined_tasks)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
Ok(())
}
async fn sync_publisher_chain_events(
&self,
publisher_chain_id: ChainId,
stream_ids: BTreeSet<StreamId>,
nodes: &[RemoteNode<Env::ValidatorNode>],
committee: &Committee,
) -> Result<(), Error> {
let stream_ids_ref = &stream_ids;
communicate_with_quorum(
nodes,
committee,
|_: &()| (),
|remote_node| async move {
self.client
.sync_events_from_node(publisher_chain_id, stream_ids_ref, &remote_node)
.await
},
self.options.quorum_grace_period,
)
.await?;
Ok(())
}
#[instrument(level = "debug", skip(self), fields(chain_id = %self.chain_id))]
pub async fn find_received_certificates(&self) -> Result<(), Error> {
debug!("starting find_received_certificates");
#[cfg(with_metrics)]
let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
let chain_id = self.chain_id;
let (_, committee) = self.admin_committee().await?;
let nodes = self.client.make_nodes(&committee)?;
let trackers = self
.client
.local_node
.get_received_certificate_trackers(chain_id)
.await?;
trace!("find_received_certificates: read trackers");
let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
let result = communicate_with_quorum(
&nodes,
&committee,
|_| (),
|remote_node| {
let client = &self.client;
let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
let received_log_batches = Arc::clone(&received_log_batches);
Box::pin(async move {
let batch = client
.get_received_log_from_validator(chain_id, &remote_node, tracker)
.await?;
let mut batches = received_log_batches.lock().unwrap();
batches.push((remote_node.public_key, batch));
Ok(())
})
},
self.options.quorum_grace_period,
)
.await;
if let Err(error) = result {
error!(
%error,
"Failed to synchronize received_logs from at least a quorum of validators",
);
}
let received_logs: Vec<_> = {
let mut received_log_batches = received_log_batches.lock().unwrap();
std::mem::take(received_log_batches.as_mut())
};
debug!(
received_logs_len = %received_logs.len(),
received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
"collected received logs"
);
let (received_logs, mut validator_trackers) = {
(
ReceivedLogs::from_received_result(received_logs.clone()),
ValidatorTrackers::new(received_logs, &trackers),
)
};
debug!(
num_chains = %received_logs.num_chains(),
num_certs = %received_logs.num_certs(),
"find_received_certificates: total number of chains and certificates to sync",
);
let max_blocks_per_chain =
self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
for received_log in received_logs.into_batches(
self.options.sender_certificate_download_batch_size,
max_blocks_per_chain,
) {
validator_trackers = self
.receive_sender_certificates(received_log, validator_trackers, &nodes)
.await?;
self.update_received_certificate_trackers(&validator_trackers)
.await;
}
info!("find_received_certificates finished");
Ok(())
}
async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
let updated_trackers = trackers.to_map();
trace!(?updated_trackers, "updated tracker values");
if let Err(error) = self
.client
.local_node
.update_received_certificate_trackers(self.chain_id, updated_trackers)
.await
{
error!(
chain_id = %self.chain_id,
%error,
"Failed to update the certificate trackers",
);
}
}
async fn receive_sender_certificates(
&self,
mut received_logs: ReceivedLogs,
mut validator_trackers: ValidatorTrackers,
nodes: &[RemoteNode<Env::ValidatorNode>],
) -> Result<ValidatorTrackers, Error> {
debug!(
num_chains = %received_logs.num_chains(),
num_certs = %received_logs.num_certs(),
"receive_sender_certificates: number of chains and certificates to sync",
);
let local_next_heights = self
.client
.local_node
.next_outbox_heights(received_logs.chains(), self.chain_id)
.await?;
validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
debug!(
remaining_total_certificates = %received_logs.num_certs(),
"receive_sender_certificates: computed remote_heights"
);
let mut other_sender_chains = Vec::new();
let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
let received_logs = &received_logs;
let other_sender_chains = &mut other_sender_chains;
move |(sender_chain_id, remote_heights)| {
if remote_heights.is_empty() {
other_sender_chains.push(sender_chain_id);
return None;
};
let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
let sender = sender.clone();
let client = self.client.clone();
let nodes = nodes.to_vec();
Some(async move {
client
.download_and_process_sender_chain(
sender_chain_id,
&nodes,
received_logs,
remote_heights,
sender,
)
.await
})
}
});
future::join(
stream::iter(cert_futures)
.buffer_unordered(self.options.max_joined_tasks)
.collect::<()>(),
async {
while let Some(chain_and_height) = receiver.recv().await {
validator_trackers.downloaded_cert(chain_and_height);
}
},
)
.await;
debug!(
num_other_chains = %other_sender_chains.len(),
"receive_sender_certificates: processing certificates finished"
);
self.retry_pending_cross_chain_requests_from_sender_chains(nodes, other_sender_chains)
.await;
debug!("receive_sender_certificates: finished processing other_sender_chains");
Ok(validator_trackers)
}
async fn retry_pending_cross_chain_requests_from_sender_chains(
&self,
nodes: &[RemoteNode<Env::ValidatorNode>],
other_sender_chains: Vec<ChainId>,
) {
let stream = other_sender_chains
.into_iter()
.map(|chain_id| async move {
if let Err(error) = match self
.client
.retry_pending_cross_chain_requests(chain_id)
.await
{
Ok(()) => Ok(()),
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
if let Err(error) = self
.client
.update_local_node_with_blobs_from(blob_ids.clone(), nodes)
.await
{
error!(
?blob_ids,
%error,
"Error while attempting to download blobs during retrying outgoing \
messages"
);
}
self.client
.retry_pending_cross_chain_requests(chain_id)
.await
}
err => err,
} {
error!(
%chain_id,
%error,
"Failed to retry outgoing messages from chain"
);
}
})
.collect::<FuturesUnordered<_>>();
stream.for_each(future::ready).await;
}
#[instrument(level = "trace")]
pub async fn transfer(
&self,
owner: AccountOwner,
amount: Amount,
recipient: Account,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
Box::pin(self.execute_operation(SystemOperation::Transfer {
owner,
recipient,
amount,
}))
.await
}
#[instrument(level = "trace")]
pub async fn read_data_blob(
&self,
hash: CryptoHash,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
let blob_id = BlobId {
hash,
blob_type: BlobType::Data,
};
Box::pin(self.execute_operation(SystemOperation::VerifyBlob { blob_id })).await
}
#[instrument(level = "trace")]
pub async fn claim(
&self,
owner: AccountOwner,
target_id: ChainId,
recipient: Account,
amount: Amount,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
Box::pin(self.execute_operation(SystemOperation::Claim {
owner,
target_id,
recipient,
amount,
}))
.await
}
#[instrument(level = "trace")]
pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
let chain_id = self.chain_id;
let committee = self.local_committee().await?;
let info = self.chain_info().await?;
let committee = &committee;
let height = info.next_block_height;
let round = info.manager.current_round;
let action = CommunicateAction::RequestTimeout {
height,
round,
chain_id,
};
let value = Timeout::new(chain_id, height, info.epoch);
let certificate = Box::new(
self.client
.communicate_chain_action(committee, action, value)
.await?,
);
self.client.handle_certificate(*certificate.clone()).await?;
self.client
.communicate_chain_updates(
committee,
chain_id,
height,
CrossChainMessageDelivery::NonBlocking,
None,
)
.await?;
Ok(*certificate)
}
#[instrument(level = "trace", skip_all)]
pub async fn synchronize_chain_state(
&self,
chain_id: ChainId,
) -> Result<Box<ChainInfo>, Error> {
self.client.synchronize_chain_state(chain_id).await
}
#[instrument(level = "trace", skip_all)]
pub async fn synchronize_chain_state_from_committee(
&self,
committee: Arc<Committee>,
) -> Result<Box<ChainInfo>, Error> {
Box::pin(
self.client
.synchronize_chain_state_from_committee(self.chain_id, committee),
)
.await
}
#[instrument(level = "trace", skip(operations, blobs))]
pub async fn execute_operations(
&self,
operations: Vec<Operation>,
blobs: Vec<Blob>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
let timing_start = linera_base::time::Instant::now();
tracing::debug!("execute_operations started");
let result = loop {
let execute_block_start = linera_base::time::Instant::now();
tracing::debug!("calling execute_block");
match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
Ok(ClientOutcome::Committed(certificate)) => {
tracing::debug!(
execute_block_ms = execute_block_start.elapsed().as_millis(),
"execute_block succeeded"
);
self.send_timing(execute_block_start, TimingType::ExecuteBlock);
break Ok(ClientOutcome::Committed(certificate));
}
Ok(ClientOutcome::WaitForTimeout(timeout)) => {
break Ok(ClientOutcome::WaitForTimeout(timeout));
}
Ok(ClientOutcome::Conflict(certificate)) => {
info!(
height = %certificate.block().header.height,
"Another block was committed."
);
break Ok(ClientOutcome::Conflict(certificate));
}
Err(Error::CommunicationError(CommunicationError::Trusted(
NodeError::UnexpectedBlockHeight {
expected_block_height,
found_block_height,
},
))) if expected_block_height > found_block_height => {
tracing::info!(
chain_id = %self.chain_id,
"Local state is outdated; synchronizing chain"
);
self.synchronize_chain_state(self.chain_id).await?;
}
Err(err) => return Err(err),
};
};
self.send_timing(timing_start, TimingType::ExecuteOperations);
tracing::debug!(
total_execute_operations_ms = timing_start.elapsed().as_millis(),
"execute_operations returning"
);
result
}
pub async fn execute_operation(
&self,
operation: impl Into<Operation>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
self.execute_operations(vec![operation.into()], vec![])
.await
}
#[instrument(level = "trace", skip(operations, blobs))]
async fn execute_block(
&self,
operations: Vec<Operation>,
blobs: Vec<Blob>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
#[cfg(with_metrics)]
let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
let mutex = self.proposal_mutex();
let lock_start = linera_base::time::Instant::now();
let mut proposal_guard = mutex.lock_owned().await;
tracing::debug!(
chain_id = %self.chain_id,
lock_wait_ms = lock_start.elapsed().as_millis(),
"acquired proposal_mutex in execute_block"
);
match self
.process_pending_block_without_prepare(&mut proposal_guard)
.await?
{
ClientOutcome::Committed(Some(certificate)) => {
return Ok(ClientOutcome::Conflict(Box::new(certificate)))
}
ClientOutcome::WaitForTimeout(timeout) => {
return Ok(ClientOutcome::WaitForTimeout(timeout))
}
ClientOutcome::Conflict(certificate) => {
return Ok(ClientOutcome::Conflict(certificate))
}
ClientOutcome::Committed(None) => {}
}
let transactions = self.prepend_epochs_messages_and_events(operations).await?;
if transactions.is_empty() {
return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
)));
}
let block = self
.new_pending_block(transactions, blobs, &mut proposal_guard)
.await?;
match self
.process_pending_block_without_prepare(&mut proposal_guard)
.await?
{
ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
Ok(ClientOutcome::Committed(certificate))
}
ClientOutcome::Committed(Some(certificate)) => {
Ok(ClientOutcome::Conflict(Box::new(certificate)))
}
ClientOutcome::Committed(None) => {
Err(Error::BlockProposalError("Unexpected block proposal error"))
}
ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
}
}
#[instrument(level = "trace", skip(operations))]
async fn prepend_epochs_messages_and_events(
&self,
operations: Vec<Operation>,
) -> Result<Vec<Transaction>, Error> {
let incoming_bundles = self.pending_message_bundles().await?;
let stream_updates = self.collect_stream_updates().await?;
Ok(self
.collect_epoch_changes()
.await?
.into_iter()
.map(Transaction::ExecuteOperation)
.chain(
incoming_bundles
.into_iter()
.map(Transaction::ReceiveMessages),
)
.chain(
stream_updates
.into_iter()
.map(Transaction::ExecuteOperation),
)
.chain(operations.into_iter().map(Transaction::ExecuteOperation))
.collect::<Vec<_>>())
}
#[instrument(level = "trace", skip(transactions, blobs, proposal_guard))]
async fn new_pending_block(
&self,
transactions: Vec<Transaction>,
blobs: Vec<Blob>,
proposal_guard: &mut Option<PendingProposal>,
) -> Result<Block, Error> {
let identity = self.identity().await?;
ensure!(
proposal_guard.is_none(),
Error::BlockProposalError(
"Client state already has a pending block; \
use the `linera retry-pending-block` command to commit that first"
)
);
let info = self.chain_info_with_manager_values().await?;
let timestamp = self.next_timestamp(&transactions, info.timestamp);
let proposed_block = ProposedBlock {
epoch: info.epoch,
chain_id: self.chain_id,
transactions,
previous_block_hash: info.block_hash,
height: info.next_block_height,
authenticated_signer: Some(identity),
timestamp,
};
let round = self.round_for_oracle(&info, &identity).await?;
let (block, _) = Box::pin(self.client.stage_block_execution(
proposed_block,
round,
blobs.clone(),
self.options.bundle_execution_policy(),
))
.await?;
let (proposed_block, _) = block.clone().into_proposal();
*proposal_guard = Some(PendingProposal {
block: proposed_block,
blobs,
});
Ok(block)
}
#[instrument(level = "trace", skip(transactions))]
fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
let local_time = self.storage_client().clock().current_time();
transactions
.iter()
.filter_map(Transaction::incoming_bundle)
.map(|msg| msg.bundle.timestamp)
.max()
.map_or(local_time, |timestamp| timestamp.max(local_time))
.max(block_time)
}
#[instrument(level = "trace", skip(query))]
pub async fn query_application(
&self,
query: Query,
block_hash: Option<CryptoHash>,
) -> Result<(QueryOutcome, BlockHeight), Error> {
loop {
let result = self
.client
.local_node
.query_application(self.chain_id, query.clone(), block_hash)
.await;
if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
let validators = self.client.validator_nodes().await?;
self.client
.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
.await?;
continue; }
return Ok(result?);
}
}
#[instrument(level = "trace", skip(query))]
pub async fn query_system_application(
&self,
query: SystemQuery,
) -> Result<QueryOutcome<SystemResponse>, Error> {
let (
QueryOutcome {
response,
operations,
},
_,
) = self.query_application(Query::System(query), None).await?;
match response {
QueryResponse::System(response) => Ok(QueryOutcome {
response,
operations,
}),
_ => Err(Error::InternalError("Unexpected response for system query")),
}
}
#[instrument(level = "trace", skip(application_id, query))]
#[cfg(with_testing)]
pub async fn query_user_application<A: Abi>(
&self,
application_id: ApplicationId<A>,
query: &A::Query,
) -> Result<QueryOutcome<A::QueryResponse>, Error> {
let query = Query::user(application_id, query)?;
let (
QueryOutcome {
response,
operations,
},
_,
) = self.query_application(query, None).await?;
match response {
QueryResponse::User(response_bytes) => {
let response = serde_json::from_slice(&response_bytes)?;
Ok(QueryOutcome {
response,
operations,
})
}
_ => Err(Error::InternalError("Unexpected response for user query")),
}
}
#[instrument(level = "trace")]
pub async fn query_balance(&self) -> Result<Amount, Error> {
let (balance, _) = Box::pin(self.query_balances_with_owner(AccountOwner::CHAIN)).await?;
Ok(balance)
}
#[instrument(level = "trace", skip(owner))]
pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
if owner.is_chain() {
Box::pin(self.query_balance()).await
} else {
Ok(Box::pin(self.query_balances_with_owner(owner))
.await?
.1
.unwrap_or(Amount::ZERO))
}
}
#[instrument(level = "trace", skip(owner))]
pub(crate) async fn query_balances_with_owner(
&self,
owner: AccountOwner,
) -> Result<(Amount, Option<Amount>), Error> {
let incoming_bundles = self.pending_message_bundles().await?;
if incoming_bundles.is_empty() {
let chain_balance = self.local_balance().await?;
let owner_balance = self.local_owner_balance(owner).await?;
return Ok((chain_balance, Some(owner_balance)));
}
let info = self.chain_info().await?;
let transactions = incoming_bundles
.into_iter()
.map(Transaction::ReceiveMessages)
.collect::<Vec<_>>();
let timestamp = self.next_timestamp(&transactions, info.timestamp);
let block = ProposedBlock {
epoch: info.epoch,
chain_id: self.chain_id,
transactions,
previous_block_hash: info.block_hash,
height: info.next_block_height,
authenticated_signer: if owner == AccountOwner::CHAIN {
None
} else {
Some(owner)
},
timestamp,
};
match Box::pin(self.client.stage_block_execution(
block,
None,
Vec::new(),
self.options.bundle_execution_policy(),
))
.await
{
Ok((_, response)) => Ok((
response.info.chain_balance,
response.info.requested_owner_balance,
)),
Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
error,
)))) if matches!(
&*error,
ChainError::ExecutionError(
execution_error,
ChainExecutionContext::Block
) if matches!(
**execution_error,
ExecutionError::FeesExceedFunding { .. }
)
) =>
{
Ok((Amount::ZERO, Some(Amount::ZERO)))
}
Err(error) => Err(error),
}
}
#[instrument(level = "trace")]
pub async fn local_balance(&self) -> Result<Amount, Error> {
let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
Ok(balance)
}
#[instrument(level = "trace", skip(owner))]
pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
if owner.is_chain() {
self.local_balance().await
} else {
Ok(self
.local_balances_with_owner(owner)
.await?
.1
.unwrap_or(Amount::ZERO))
}
}
#[instrument(level = "trace", skip(owner))]
pub(crate) async fn local_balances_with_owner(
&self,
owner: AccountOwner,
) -> Result<(Amount, Option<Amount>), Error> {
ensure!(
self.chain_info().await?.next_block_height >= self.initial_next_block_height,
Error::WalletSynchronizationError
);
let mut query = ChainInfoQuery::new(self.chain_id);
query.request_owner_balance = owner;
let response = self
.client
.local_node
.handle_chain_info_query(query)
.await?;
Ok((
response.info.chain_balance,
response.info.requested_owner_balance,
))
}
#[instrument(level = "trace")]
pub async fn transfer_to_account(
&self,
from: AccountOwner,
amount: Amount,
account: Account,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
self.transfer(from, amount, account).await
}
#[cfg(with_testing)]
#[instrument(level = "trace")]
pub async fn burn(
&self,
owner: AccountOwner,
amount: Amount,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
let recipient = Account::burn_address(self.chain_id);
self.transfer(owner, amount, recipient).await
}
#[instrument(level = "trace")]
pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
let validators = self.client.validator_nodes().await?;
self.client
.fetch_chain_info(self.chain_id, &validators)
.await
}
#[instrument(level = "trace")]
pub async fn synchronize_up_to(
&self,
next_height: Option<BlockHeight>,
until_block_time: Option<Timestamp>,
) -> Result<Box<ChainInfo>, Error> {
let (_, committee) = self.client.admin_committee().await?;
let validators = self.client.make_nodes(&committee)?;
Box::pin(self.client.fetch_chain_info(self.chain_id, &validators)).await?;
communicate_with_quorum(
&validators,
&committee,
|_: &()| (),
|remote_node| async move {
self.client
.download_certificates_from(
&remote_node,
self.chain_id,
next_height.unwrap_or(BlockHeight::MAX),
until_block_time,
)
.await?;
Ok(())
},
self.client.options.quorum_grace_period,
)
.await?;
self.client
.local_node
.chain_info(self.chain_id)
.await
.map_err(Into::into)
}
pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
if self.preferred_owner.is_none() {
return self.client.synchronize_chain_state(self.chain_id).await;
}
let info = self.prepare_chain().await?;
self.synchronize_publisher_chains().await?;
self.find_received_certificates().await?;
Ok(info)
}
#[instrument(level = "trace")]
pub async fn process_pending_block(
&self,
) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
self.prepare_chain().await?;
let mutex = self.proposal_mutex();
let mut proposal_guard = mutex.lock_owned().await;
self.process_pending_block_without_prepare(&mut proposal_guard)
.await
}
#[instrument(level = "debug", skip(self, proposal_guard), fields(chain_id = %self.chain_id))]
async fn process_pending_block_without_prepare(
&self,
proposal_guard: &mut Option<PendingProposal>,
) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
let process_start = linera_base::time::Instant::now();
tracing::debug!("process_pending_block_without_prepare started");
let info = self.request_leader_timeout_if_needed().await?;
if let Some(pending) = &*proposal_guard {
if pending.block.height < info.next_block_height {
tracing::debug!(
"Clearing pending proposal: a block was committed at height {}",
pending.block.height
);
*proposal_guard = None;
}
}
if info.manager.has_locking_block_in_current_round()
&& !info.manager.current_round.is_fast()
{
return Box::pin(self.finalize_locking_block(info)).await;
}
let owner = self.identity().await?;
let local_node = &self.client.local_node;
let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
match &**locking {
LockingBlock::Regular(certificate) => {
let blob_ids = certificate.block().required_blob_ids();
let blobs = local_node
.get_locking_blobs(&blob_ids, self.chain_id)
.await?
.ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
debug!("Retrying locking block from round {}", certificate.round);
(certificate.block().clone(), blobs)
}
LockingBlock::Fast(proposal) => {
let proposed_block = proposal.content.block.clone();
let blob_ids = proposed_block.published_blob_ids();
let blobs = local_node
.get_locking_blobs(&blob_ids, self.chain_id)
.await?
.ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
let block = self
.client
.stage_block_execution(
proposed_block,
None,
blobs.clone(),
BundleExecutionPolicy::committed(),
)
.await?
.0;
debug!("Retrying locking block from fast round.");
(block, blobs)
}
}
} else if let Some(pending) = proposal_guard.as_ref() {
let proposed_block = pending.block.clone();
let blobs = pending.blobs.clone();
let round = self.round_for_oracle(&info, &owner).await?;
let (block, _) = self
.client
.stage_block_execution(
proposed_block,
round,
blobs.clone(),
BundleExecutionPolicy::committed(),
)
.await?;
debug!("Proposing the local pending block.");
(block, blobs)
} else {
return Ok(ClientOutcome::Committed(None)); };
let has_oracle_responses = block.has_oracle_responses();
let (proposed_block, outcome) = block.into_proposal();
let round = match self
.round_for_new_proposal(&info, &owner, has_oracle_responses)
.await?
{
Either::Left(round) => round,
Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
};
debug!("Proposing block for round {}", round);
let already_handled_locally = info
.manager
.already_handled_proposal(round, &proposed_block);
let proposal = if let Some(locking) = info.manager.requested_locking {
Box::new(match *locking {
LockingBlock::Regular(cert) => {
BlockProposal::new_retry_regular(owner, round, cert, self.signer())
.await
.map_err(Error::signer_failure)?
}
LockingBlock::Fast(proposal) => {
BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
.await
.map_err(Error::signer_failure)?
}
})
} else {
Box::new(
BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
.await
.map_err(Error::signer_failure)?,
)
};
if !already_handled_locally {
if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
match err {
LocalNodeError::BlobsNotFound(_) => {
local_node
.handle_pending_blobs(self.chain_id, blobs)
.await?;
local_node.handle_block_proposal(*proposal.clone()).await?;
}
err => return Err(err.into()),
}
}
}
let committee = self.local_committee().await?;
let block = Block::new(proposed_block, outcome);
let submit_block_proposal_start = linera_base::time::Instant::now();
let certificate = if round.is_fast() {
let hashed_value = ConfirmedBlock::new(block);
Box::pin(
self.client
.submit_block_proposal(&committee, proposal, hashed_value),
)
.await?
} else {
let hashed_value = ValidatedBlock::new(block);
let certificate = Box::pin(self.client.submit_block_proposal(
&committee,
proposal,
hashed_value.clone(),
))
.await?;
Box::pin(self.client.finalize_block(&committee, certificate)).await?
};
self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
debug!(round = %certificate.round, "Sending confirmed block to validators");
let update_start = linera_base::time::Instant::now();
Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
tracing::debug!(
update_validators_ms = update_start.elapsed().as_millis(),
total_process_ms = process_start.elapsed().as_millis(),
"process_pending_block_without_prepare completing"
);
*proposal_guard = None;
Ok(ClientOutcome::Committed(Some(certificate)))
}
fn send_timing(&self, start: Instant, timing_type: TimingType) {
let Some(sender) = &self.timing_sender else {
return;
};
if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
tracing::warn!(%err, "Failed to send timing info");
}
}
async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
let mut info = self.chain_info_with_manager_values().await?;
if let Some(round_timeout) = info.manager.round_timeout {
if round_timeout <= self.storage_client().clock().current_time() {
if let Err(e) = self.request_leader_timeout().await {
debug!("Failed to obtain a timeout certificate: {}", e);
} else {
info = self.chain_info_with_manager_values().await?;
}
}
}
Ok(info)
}
async fn finalize_locking_block(
&self,
info: Box<ChainInfo>,
) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
let locking = info
.manager
.requested_locking
.expect("Should have a locking block");
let LockingBlock::Regular(certificate) = *locking else {
panic!("Should have a locking validated block");
};
debug!(
round = %certificate.round,
"Finalizing locking block"
);
let committee = self.local_committee().await?;
let certificate =
Box::pin(self.client.finalize_block(&committee, certificate.clone())).await?;
Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
Ok(ClientOutcome::Committed(Some(certificate)))
}
async fn round_for_oracle(
&self,
info: &ChainInfo,
identity: &AccountOwner,
) -> Result<Option<u32>, Error> {
match self.round_for_new_proposal(info, identity, true).await {
Ok(Either::Left(round)) => Ok(round.multi_leader()),
Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
Err(err) => Err(err),
}
}
async fn round_for_new_proposal(
&self,
info: &ChainInfo,
identity: &AccountOwner,
has_oracle_responses: bool,
) -> Result<Either<Round, RoundTimeout>, Error> {
let manager = &info.manager;
let seed = self
.client
.local_node
.get_manager_seed(self.chain_id)
.await?;
let skip_fast = manager.current_round.is_fast()
&& (has_oracle_responses || !self.options.allow_fast_blocks);
let conflict = manager
.requested_signed_proposal
.as_ref()
.into_iter()
.chain(&manager.requested_proposed)
.any(|proposal| proposal.content.round == manager.current_round)
|| skip_fast;
let round = if !conflict {
manager.current_round
} else if let Some(round) = manager
.ownership
.next_round(manager.current_round)
.filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
{
round
} else if let Some(timeout) = info.round_timeout() {
return Ok(Either::Right(timeout));
} else {
return Err(Error::BlockProposalError(
"Conflicting proposal in the current round",
));
};
let current_committee = self
.local_committee()
.await?
.validators
.values()
.map(|v| (AccountOwner::from(v.account_public_key), v.votes))
.collect();
if manager.should_propose(identity, round, seed, ¤t_committee) {
return Ok(Either::Left(round));
}
if let Some(timeout) = info.round_timeout() {
return Ok(Either::Right(timeout));
}
Err(Error::BlockProposalError(
"Not a leader in the current round",
))
}
#[cfg(with_testing)]
#[instrument(level = "trace")]
pub async fn clear_pending_proposal(&self) {
*self.proposal_mutex().lock().await = None;
}
#[instrument(level = "trace")]
pub async fn rotate_key_pair(
&self,
public_key: AccountPublicKey,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
Box::pin(self.transfer_ownership(public_key.into())).await
}
#[instrument(level = "trace")]
pub async fn transfer_ownership(
&self,
new_owner: AccountOwner,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
super_owners: vec![new_owner],
owners: Vec::new(),
multi_leader_rounds: 2,
open_multi_leader_rounds: false,
timeout_config: TimeoutConfig::default(),
}))
.await
}
#[instrument(level = "trace")]
pub async fn share_ownership(
&self,
new_owner: AccountOwner,
new_weight: u64,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
let ownership = self.prepare_chain().await?.manager.ownership;
ensure!(
ownership.is_active(),
ChainError::InactiveChain(self.chain_id)
);
let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
owners.push((new_owner, new_weight));
let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
super_owners: Vec::new(),
owners,
multi_leader_rounds: ownership.multi_leader_rounds,
open_multi_leader_rounds: ownership.open_multi_leader_rounds,
timeout_config: ownership.timeout_config,
})];
match self.execute_block(operations, vec![]).await? {
ClientOutcome::Committed(certificate) => Ok(ClientOutcome::Committed(certificate)),
ClientOutcome::Conflict(certificate) => {
info!(
height = %certificate.block().header.height,
"Another block was committed."
);
Ok(ClientOutcome::Conflict(certificate))
}
ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
}
}
#[instrument(level = "trace")]
pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, Error> {
Ok(self
.client
.local_node
.chain_state_view(self.chain_id)
.await?
.execution_state
.system
.ownership
.get()
.await?
.clone())
}
#[instrument(level = "trace")]
pub async fn change_ownership(
&self,
ownership: ChainOwnership,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
super_owners: ownership.super_owners.into_iter().collect(),
owners: ownership.owners.into_iter().collect(),
multi_leader_rounds: ownership.multi_leader_rounds,
open_multi_leader_rounds: ownership.open_multi_leader_rounds,
timeout_config: ownership.timeout_config.clone(),
}))
.await
}
#[instrument(level = "trace")]
pub async fn query_application_permissions(&self) -> Result<ApplicationPermissions, Error> {
Ok(self
.client
.local_node
.chain_state_view(self.chain_id)
.await?
.execution_state
.system
.application_permissions
.get()
.await?
.clone())
}
#[instrument(level = "trace", skip(application_permissions))]
pub async fn change_application_permissions(
&self,
application_permissions: ApplicationPermissions,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
Box::pin(
self.execute_operation(SystemOperation::ChangeApplicationPermissions(
application_permissions,
)),
)
.await
}
#[instrument(level = "trace", skip(self))]
pub async fn open_chain(
&self,
ownership: ChainOwnership,
application_permissions: ApplicationPermissions,
balance: Amount,
) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
let config = OpenChainConfig {
ownership: ownership.clone(),
balance,
application_permissions: application_permissions.clone(),
};
let operation = Operation::system(SystemOperation::OpenChain(config));
let certificate = match self.execute_block(vec![operation], vec![]).await? {
ClientOutcome::Committed(certificate) => certificate,
ClientOutcome::Conflict(certificate) => {
return Ok(ClientOutcome::Conflict(certificate));
}
ClientOutcome::WaitForTimeout(timeout) => {
return Ok(ClientOutcome::WaitForTimeout(timeout));
}
};
let chain_blob = certificate
.block()
.body
.blobs
.last()
.and_then(|blobs| blobs.last())
.ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
for owner in ownership.all_owners() {
if self.client.has_key_for(owner).await? {
self.client
.extend_chain_mode(description.id(), ListeningMode::FullChain);
break;
}
}
self.client
.retry_pending_cross_chain_requests(self.chain_id)
.await?;
Ok(ClientOutcome::Committed((description, certificate)))
}
#[instrument(level = "trace")]
pub async fn close_chain(
&self,
) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
match Box::pin(self.execute_operation(SystemOperation::CloseChain)).await {
Ok(outcome) => Ok(outcome.map(Some)),
Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
chain_error,
)))) if matches!(*chain_error, ChainError::ClosedChain) => {
Ok(ClientOutcome::Committed(None)) }
Err(error) => Err(error),
}
}
#[cfg(not(target_arch = "wasm32"))]
#[instrument(level = "trace", skip(contract, service))]
pub async fn publish_module(
&self,
contract: Bytecode,
service: Bytecode,
vm_runtime: VmRuntime,
) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
Box::pin(self.publish_module_blobs(blobs, module_id)).await
}
#[cfg(not(target_arch = "wasm32"))]
#[instrument(level = "trace", skip(blobs, module_id))]
pub async fn publish_module_blobs(
&self,
blobs: Vec<Blob>,
module_id: ModuleId,
) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
self.execute_operations(
vec![Operation::system(SystemOperation::PublishModule {
module_id,
})],
blobs,
)
.await?
.try_map(|certificate| Ok((module_id, certificate)))
}
#[instrument(level = "trace", skip(bytes))]
pub async fn publish_data_blobs(
&self,
bytes: Vec<Vec<u8>>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
let blobs = bytes.into_iter().map(Blob::new_data);
let publish_blob_operations = blobs
.clone()
.map(|blob| {
Operation::system(SystemOperation::PublishDataBlob {
blob_hash: blob.id().hash,
})
})
.collect();
self.execute_operations(publish_blob_operations, blobs.collect())
.await
}
#[instrument(level = "trace", skip(bytes))]
pub async fn publish_data_blob(
&self,
bytes: Vec<u8>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
Box::pin(self.publish_data_blobs(vec![bytes])).await
}
#[instrument(
level = "trace",
skip(self, parameters, instantiation_argument, required_application_ids)
)]
pub async fn create_application<
A: Abi,
Parameters: Serialize,
InstantiationArgument: Serialize,
>(
&self,
module_id: ModuleId<A, Parameters, InstantiationArgument>,
parameters: &Parameters,
instantiation_argument: &InstantiationArgument,
required_application_ids: Vec<ApplicationId>,
) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
let parameters = serde_json::to_vec(parameters)?;
Ok(Box::pin(self.create_application_untyped(
module_id.forget_abi(),
parameters,
instantiation_argument,
required_application_ids,
))
.await?
.map(|(app_id, cert)| (app_id.with_abi(), cert)))
}
#[instrument(
level = "trace",
skip(
self,
module_id,
parameters,
instantiation_argument,
required_application_ids
)
)]
pub async fn create_application_untyped(
&self,
module_id: ModuleId,
parameters: Vec<u8>,
instantiation_argument: Vec<u8>,
required_application_ids: Vec<ApplicationId>,
) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
Box::pin(self.execute_operation(SystemOperation::CreateApplication {
module_id,
parameters,
instantiation_argument,
required_application_ids,
}))
.await?
.try_map(|certificate| {
let mut creation: Vec<_> = certificate
.block()
.created_blob_ids()
.into_iter()
.filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
.collect();
if creation.len() > 1 {
return Err(Error::InternalError(
"Unexpected number of application descriptions published",
));
}
let blob_id = creation.pop().ok_or(Error::InternalError(
"ApplicationDescription blob not found.",
))?;
let id = ApplicationId::new(blob_id.hash);
Ok((id, certificate))
})
}
#[instrument(level = "trace", skip(committee))]
pub async fn stage_new_committee(
&self,
committee: Committee,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
let blob_hash = blob.id().hash;
match self
.execute_operations(
vec![Operation::system(SystemOperation::Admin(
AdminOperation::PublishCommitteeBlob { blob_hash },
))],
vec![blob],
)
.await?
{
ClientOutcome::Committed(_) => {}
outcome @ ClientOutcome::WaitForTimeout(_) | outcome @ ClientOutcome::Conflict(_) => {
return Ok(outcome)
}
}
let epoch = Box::pin(self.chain_info()).await?.epoch.try_add_one()?;
Box::pin(
self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
epoch,
blob_hash,
})),
)
.await
}
#[instrument(level = "trace")]
pub async fn process_inbox(
&self,
) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
self.prepare_chain().await?;
self.process_inbox_without_prepare().await
}
#[instrument(level = "trace")]
pub async fn process_inbox_without_prepare(
&self,
) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
#[cfg(with_metrics)]
let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
let mut certificates = Vec::new();
loop {
match self.execute_block(vec![], vec![]).await {
Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
Ok(ClientOutcome::WaitForTimeout(timeout)) => {
return Ok((certificates, Some(timeout)));
}
Err(Error::LocalNodeError(LocalNodeError::WorkerError(
WorkerError::ChainError(chain_error),
))) if matches!(*chain_error, ChainError::EmptyBlock) => {
return Ok((certificates, None));
}
Err(error) => return Err(error),
};
}
}
async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
let (mut min_epoch, mut next_epoch) = {
let (epoch, committees) = self.epoch_and_committees().await?;
let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
(min_epoch, epoch.try_add_one()?)
};
let mut epoch_change_ops = Vec::new();
while self
.has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
.await?
{
epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
next_epoch,
)));
next_epoch.try_add_assign_one()?;
}
while self
.has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
.await?
{
epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
min_epoch,
)));
min_epoch.try_add_assign_one()?;
}
Ok(epoch_change_ops)
}
async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
let event_id = EventId {
chain_id: self.client.admin_chain_id,
stream_id: StreamId::system(stream_name),
index,
};
Ok(self
.client
.storage_client()
.read_event(event_id)
.await?
.is_some())
}
pub async fn events_from_index(
&self,
stream_id: StreamId,
start_index: u32,
) -> Result<Vec<IndexAndEvent>, Error> {
Ok(self
.client
.storage_client()
.read_events_from_index(&self.chain_id, &stream_id, start_index)
.await?)
}
#[instrument(level = "trace")]
pub async fn revoke_epochs(
&self,
revoked_epoch: Epoch,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
self.prepare_chain().await?;
let (current_epoch, committees) = self.epoch_and_committees().await?;
ensure!(
revoked_epoch < current_epoch,
Error::CannotRevokeCurrentEpoch(current_epoch)
);
ensure!(
committees.contains_key(&revoked_epoch),
Error::EpochAlreadyRevoked
);
let operations = committees
.keys()
.filter_map(|epoch| {
if *epoch <= revoked_epoch {
Some(Operation::system(SystemOperation::Admin(
AdminOperation::RemoveCommittee { epoch: *epoch },
)))
} else {
None
}
})
.collect();
self.execute_operations(operations, vec![]).await
}
#[instrument(level = "trace")]
pub async fn transfer_to_account_unsafe_unconfirmed(
&self,
owner: AccountOwner,
amount: Amount,
recipient: Account,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
Box::pin(self.execute_operation(SystemOperation::Transfer {
owner,
recipient,
amount,
}))
.await
}
#[instrument(level = "trace", skip(hash))]
pub async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, Error> {
let block = self
.client
.storage_client()
.read_confirmed_block(hash)
.await?;
block
.map(Arc::unwrap_or_clone)
.ok_or(Error::MissingConfirmedBlock(hash))
}
#[instrument(level = "trace", skip(hash))]
pub async fn read_certificate(
&self,
hash: CryptoHash,
) -> Result<ConfirmedBlockCertificate, Error> {
let certificate = self.client.storage_client().read_certificate(hash).await?;
certificate
.map(Arc::unwrap_or_clone)
.ok_or(Error::ReadCertificatesError(vec![hash]))
}
#[instrument(level = "trace")]
pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
self.client
.retry_pending_cross_chain_requests(self.chain_id)
.await?;
Ok(())
}
#[instrument(level = "trace", skip(local_node))]
async fn local_chain_info(
&self,
chain_id: ChainId,
local_node: &mut LocalNodeClient<Env::Storage>,
) -> Result<Option<Box<ChainInfo>>, Error> {
match local_node.chain_info(chain_id).await {
Ok(info) => Ok(Some(info)),
Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
Err(err) => Err(err.into()),
}
}
#[instrument(level = "trace", skip(chain_id, local_node))]
async fn local_next_block_height(
&self,
chain_id: ChainId,
local_node: &mut LocalNodeClient<Env::Storage>,
) -> Result<BlockHeight, Error> {
Ok(self
.local_chain_info(chain_id, local_node)
.await?
.map_or(BlockHeight::ZERO, |info| info.next_block_height))
}
#[instrument(level = "trace")]
async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
Ok(self
.client
.local_node
.get_inbox_next_height(self.chain_id, origin)
.await?)
}
#[instrument(level = "trace", skip(remote_node, local_node, notification))]
async fn process_notification(
&self,
remote_node: RemoteNode<Env::ValidatorNode>,
mut local_node: LocalNodeClient<Env::Storage>,
notification: Notification,
) -> Result<(), Error> {
let listening_mode = self.client.chain_mode(notification.chain_id);
let is_relevant = listening_mode
.as_ref()
.is_some_and(|mode| mode.is_relevant(¬ification.reason));
if !is_relevant {
tracing::trace!(
chain_id = %notification.chain_id,
reason = ?notification.reason,
?listening_mode,
"Ignoring notification due to listening mode"
);
return Ok(());
}
match notification.reason {
Reason::NewIncomingBundle { origin, height } => {
if self.local_next_height_to_receive(origin).await? > height {
debug!(
chain_id = %self.chain_id,
"Accepting redundant notification for new message"
);
return Ok(());
}
self.client
.download_sender_block_with_sending_ancestors(
self.chain_id,
origin,
height,
&remote_node,
)
.await?;
if self.local_next_height_to_receive(origin).await? <= height {
info!(
chain_id = %self.chain_id,
"NewIncomingBundle: Fail to synchronize new message after notification"
);
}
}
Reason::NewBlock {
height,
hash,
event_streams,
..
} => {
let chain_id = notification.chain_id;
let local_height = self
.local_next_block_height(chain_id, &mut local_node)
.await?;
if local_height > height {
debug!(
chain_id = %self.chain_id,
"Accepting redundant notification for new block"
);
return Ok(());
}
if let Some(ListeningMode::EventsOnly(subscribed)) =
self.client.chain_mode(chain_id)
{
if !event_streams.is_empty() {
self.client
.download_event_bearing_blocks(
chain_id,
BTreeSet::from([(height, hash)]),
local_height,
&subscribed,
&remote_node,
)
.await?;
}
} else {
self.client
.synchronize_chain_state_from(&remote_node, chain_id)
.await?;
if self
.local_next_block_height(chain_id, &mut local_node)
.await?
<= height
{
error!("NewBlock: Fail to synchronize new block after notification");
}
}
}
Reason::NewEvents { height, hash, .. } => {
let chain_id = notification.chain_id;
let local_height = self
.local_next_block_height(chain_id, &mut local_node)
.await?;
if local_height > height {
debug!(
chain_id = %self.chain_id,
"Accepting redundant notification for new events"
);
return Ok(());
}
let subscribed = match self.client.chain_mode(chain_id) {
Some(ListeningMode::EventsOnly(streams)) => streams,
_ => return Ok(()),
};
self.client
.download_event_bearing_blocks(
chain_id,
BTreeSet::from([(height, hash)]),
local_height,
&subscribed,
&remote_node,
)
.await?;
}
Reason::NewRound { height, round } => {
let chain_id = notification.chain_id;
if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
if (info.next_block_height, info.manager.current_round) >= (height, round) {
debug!(
chain_id = %self.chain_id,
"Accepting redundant notification for new round"
);
return Ok(());
}
}
self.client
.synchronize_chain_state_from(&remote_node, chain_id)
.await?;
let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
error!(
chain_id = %self.chain_id,
"NewRound: Fail to read local chain info for {chain_id}"
);
return Ok(());
};
if (info.next_block_height, info.manager.current_round) < (height, round) {
info!(
chain_id = %self.chain_id,
"NewRound: Fail to synchronize new block after notification"
);
}
}
Reason::BlockExecuted { .. } => {
}
}
Ok(())
}
pub fn is_tracked(&self) -> bool {
self.client.is_tracked(self.chain_id)
}
pub fn listening_mode(&self) -> Option<ListeningMode> {
self.client.chain_mode(self.chain_id)
}
#[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
pub async fn listen(
&self,
) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
use future::FutureExt as _;
async fn await_while_polling<F: FusedFuture>(
future: F,
background_work: impl FusedStream<Item = ()>,
) -> F::Output {
tokio::pin!(future);
tokio::pin!(background_work);
loop {
futures::select! {
_ = background_work.next() => (),
result = future => return result,
}
}
}
let mut senders = HashMap::new();
let mut circuit_breakers: HashMap<ValidatorPublicKey, CircuitBreakerState> = HashMap::new();
let notifications = self.subscribe()?;
let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
let mut process_notifications = FuturesUnordered::new();
match self
.update_notification_streams(&mut senders, &mut circuit_breakers)
.await
{
Ok(handler) => process_notifications.push(handler),
Err(error) => error!("Failed to update committee: {error}"),
};
let this = self.clone();
let update_streams = async move {
let mut abortable_notifications = abortable_notifications.fuse();
while let Some(notification) =
await_while_polling(abortable_notifications.next(), &mut process_notifications)
.await
{
if let Reason::NewBlock { .. } = notification.reason {
let is_events_only = this
.listening_mode()
.is_some_and(|m| matches!(m, ListeningMode::EventsOnly(_)));
if !is_events_only {
match Box::pin(await_while_polling(
this.update_notification_streams(&mut senders, &mut circuit_breakers)
.fuse(),
&mut process_notifications,
))
.await
{
Ok(handler) => process_notifications.push(handler),
Err(error) => error!("Failed to update committee: {error}"),
}
}
}
}
for abort in senders.into_values() {
abort.abort();
}
let () = process_notifications.collect().await;
}
.in_current_span();
Ok((update_streams, AbortOnDrop(abort), notifications))
}
#[instrument(level = "trace", skip(senders, circuit_breakers))]
async fn update_notification_streams(
&self,
senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
circuit_breakers: &mut HashMap<ValidatorPublicKey, CircuitBreakerState>,
) -> Result<impl Future<Output = ()>, Error> {
let initial_probe_interval = self
.options
.notification_circuit_breaker_initial_probe_interval;
let max_probe_interval = self.options.notification_circuit_breaker_max_probe_interval;
let events_only = self
.listening_mode()
.is_none_or(|m| matches!(m, ListeningMode::EventsOnly(_)));
let (nodes, local_node) = {
let committee = if events_only {
let (_, committee) = self.admin_committee().await?;
committee
} else {
self.local_committee().await?
};
let nodes: HashMap<_, _> = self
.client
.validator_node_provider()
.make_nodes(&committee)?
.collect();
(nodes, self.client.local_node.clone())
};
for (validator, abort) in senders.iter() {
if abort.is_aborted() && nodes.contains_key(validator) {
if let Some(state) = circuit_breakers.get_mut(validator) {
state.probe_interval = (state.probe_interval * 2).min(max_probe_interval);
state.next_probe_at = Instant::now() + state.probe_interval;
warn!(
%validator,
chain_id = %self.chain_id,
next_probe_in = ?state.probe_interval,
"Validator still unhealthy after probe; increasing probe interval"
);
} else {
circuit_breakers.insert(
*validator,
CircuitBreakerState {
next_probe_at: Instant::now() + initial_probe_interval,
probe_interval: initial_probe_interval,
},
);
error!(
%validator,
chain_id = %self.chain_id,
next_probe_in = ?initial_probe_interval,
"Validator notification stream ended; entering circuit breaker"
);
}
} else if !abort.is_aborted() && circuit_breakers.contains_key(validator) {
info!(
%validator,
chain_id = %self.chain_id,
"Validator recovered from circuit breaker"
);
circuit_breakers.remove(validator);
}
}
senders.retain(|validator, abort| {
if !nodes.contains_key(validator) {
abort.abort();
}
!abort.is_aborted()
});
circuit_breakers.retain(|validator, _| nodes.contains_key(validator));
let validator_tasks = FuturesUnordered::new();
for (public_key, node) in nodes {
let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
continue;
};
if let Some(state) = circuit_breakers.get(&public_key) {
if Instant::now() < state.next_probe_at {
continue;
}
debug!(
validator = %public_key,
chain_id = %self.chain_id,
"Probing unhealthy validator"
);
}
let address = node.address();
let this = self.clone();
let stream = stream::once({
let node = node.clone();
async move {
let stream = node.subscribe(vec![this.chain_id]).await?;
if !events_only {
let remote_node = RemoteNode { public_key, node };
this.client
.synchronize_chain_state_from(&remote_node, this.chain_id)
.await?;
} else {
let remote_node = RemoteNode { public_key, node };
if let Some(ListeningMode::EventsOnly(subscribed)) = this.listening_mode() {
if let Err(error) = this
.client
.sync_events_from_node(this.chain_id, &subscribed, &remote_node)
.await
{
debug!(
chain_id = %this.chain_id,
%error,
"Failed initial sparse sync for EventsOnly chain"
);
}
}
}
Ok::<_, Error>(stream)
}
})
.filter_map(move |result| {
let address = address.clone();
async move {
if let Err(error) = &result {
info!(?error, address, "could not connect to validator");
} else {
debug!(address, "connected to validator");
}
result.ok()
}
})
.flatten();
let (stream, abort) = stream::abortable(stream);
let abort_on_exit = abort.clone();
let mut stream = Box::pin(stream);
let this = self.clone();
let local_node = local_node.clone();
let remote_node = RemoteNode { public_key, node };
validator_tasks.push(async move {
while let Some(notification) = stream.next().await {
if let Err(error) = this
.process_notification(
remote_node.clone(),
local_node.clone(),
notification.clone(),
)
.await
{
tracing::info!(
chain_id = %this.chain_id,
address = remote_node.address(),
?notification,
%error,
"failed to process notification",
);
}
}
warn!(
chain_id = %this.chain_id,
address = remote_node.address(),
"Validator notification stream ended"
);
abort_on_exit.abort();
});
entry.insert(abort);
}
Ok(validator_tasks.collect())
}
#[instrument(level = "trace", skip(remote_node))]
pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
let validator_next_block_height = match remote_node
.handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
.await
{
Ok(info) => info.info.next_block_height,
Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
Err(err) => return Err(err.into()),
};
let local_next_block_height = self.chain_info().await?.next_block_height;
if validator_next_block_height >= local_next_block_height {
debug!("Validator is up-to-date with local state");
return Ok(());
}
let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
.map(BlockHeight)
.collect();
let certificates = self
.client
.storage_client()
.read_certificates_by_heights(self.chain_id, &heights)
.await?
.into_iter()
.flatten()
.map(Arc::unwrap_or_clone)
.collect::<Vec<_>>();
for certificate in certificates {
match remote_node
.handle_confirmed_certificate(
certificate.clone(),
CrossChainMessageDelivery::NonBlocking,
)
.await
{
Ok(_) => (),
Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
let missing_blobs: Vec<_> = self
.client
.storage_client()
.read_blobs(&missing_blob_ids)
.await?
.into_iter()
.flatten()
.map(Arc::unwrap_or_clone)
.collect();
remote_node.upload_blobs(missing_blobs).await?;
remote_node
.handle_confirmed_certificate(
certificate,
CrossChainMessageDelivery::NonBlocking,
)
.await?;
}
Err(err) => return Err(err.into()),
}
}
Ok(())
}
}
#[cfg(with_testing)]
impl<Env: Environment> ChainClient<Env> {
pub async fn process_notification_from(
&self,
notification: Notification,
validator: (ValidatorPublicKey, &str),
) {
let mut node_list = self
.client
.validator_node_provider()
.make_nodes_from_list(vec![validator])
.unwrap();
let (public_key, node) = node_list.next().unwrap();
let remote_node = RemoteNode { node, public_key };
let local_node = self.client.local_node.clone();
self.process_notification(remote_node, local_node, notification)
.await
.unwrap();
}
}