Skip to main content

linera_core/client/
mod.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7    convert::Infallible,
8    iter,
9    sync::{Arc, RwLock},
10};
11
12use chain_client_state::ChainClientState;
13use custom_debug_derive::Debug;
14use futures::{
15    future::{self, Either, FusedFuture, Future, FutureExt},
16    select,
17    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
18};
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{
22    abi::Abi,
23    crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
24    data_types::{
25        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
26        ChainDescription, Epoch, MessagePolicy, Round, TimeDelta, Timestamp,
27    },
28    ensure,
29    identifiers::{
30        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
31        ModuleId, StreamId,
32    },
33    ownership::{ChainOwnership, TimeoutConfig},
34    time::{Duration, Instant},
35};
36#[cfg(not(target_arch = "wasm32"))]
37use linera_base::{data_types::Bytecode, vm::VmRuntime};
38use linera_chain::{
39    data_types::{
40        BlockProposal, BundleExecutionPolicy, ChainAndHeight, IncomingBundle, LiteVote,
41        ProposedBlock, Transaction,
42    },
43    manager::LockingBlock,
44    types::{
45        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
46        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
47    },
48    ChainError, ChainExecutionContext, ChainStateView,
49};
50use linera_execution::{
51    committee::Committee,
52    system::{
53        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
54        REMOVED_EPOCH_STREAM_NAME,
55    },
56    ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
57};
58use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
59use linera_views::ViewError;
60use rand::prelude::SliceRandom as _;
61use received_log::ReceivedLogs;
62use serde::{Deserialize, Serialize};
63use thiserror::Error;
64use tokio::sync::{mpsc, OwnedRwLockReadGuard};
65use tokio_stream::wrappers::UnboundedReceiverStream;
66use tokio_util::sync::CancellationToken;
67use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
68use validator_trackers::ValidatorTrackers;
69
70use crate::{
71    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout},
72    environment::{wallet::Wallet as _, Environment},
73    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
74    node::{
75        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
76        ValidatorNodeProvider as _,
77    },
78    notifier::{ChannelNotifier, Notifier as _},
79    remote_node::RemoteNode,
80    updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
81    worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
82    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
83};
84
85mod chain_client_state;
86#[cfg(test)]
87#[path = "../unit_tests/client_tests.rs"]
88mod client_tests;
89pub mod requests_scheduler;
90
91pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
92mod received_log;
93mod validator_trackers;
94
95#[cfg(with_metrics)]
96mod metrics {
97    use std::sync::LazyLock;
98
99    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
100    use prometheus::HistogramVec;
101
102    pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
103        LazyLock::new(|| {
104            register_histogram_vec(
105                "process_inbox_latency",
106                "process_inbox latency",
107                &[],
108                exponential_bucket_latencies(500.0),
109            )
110        });
111
112    pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
113        register_histogram_vec(
114            "prepare_chain_latency",
115            "prepare_chain latency",
116            &[],
117            exponential_bucket_latencies(500.0),
118        )
119    });
120
121    pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
122        register_histogram_vec(
123            "synchronize_chain_state_latency",
124            "synchronize_chain_state latency",
125            &[],
126            exponential_bucket_latencies(500.0),
127        )
128    });
129
130    pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
131        register_histogram_vec(
132            "execute_block_latency",
133            "execute_block latency",
134            &[],
135            exponential_bucket_latencies(500.0),
136        )
137    });
138
139    pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
140        register_histogram_vec(
141            "find_received_certificates_latency",
142            "find_received_certificates latency",
143            &[],
144            exponential_bucket_latencies(500.0),
145        )
146    });
147}
148
149/// Defines what type of notifications we should process for a chain:
150/// - do we fully participate in consensus and download sender chains?
151/// - or do we only follow the chain's blocks without participating?
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub enum ListeningMode {
154    /// Listen to everything: all blocks for the chain and all blocks from sender chains,
155    /// and participate in rounds.
156    FullChain,
157    /// Listen to all blocks for the chain, but don't download sender chain blocks or participate
158    /// in rounds. Use this when interested in the chain's state but not intending to propose
159    /// blocks (e.g., because we're not a chain owner).
160    FollowChain,
161}
162
163impl PartialOrd for ListeningMode {
164    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
165        use std::cmp::Ordering;
166        match (self, other) {
167            (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
168            (ListeningMode::FullChain, _) => Some(Ordering::Greater),
169            (_, ListeningMode::FullChain) => Some(Ordering::Less),
170            (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
171        }
172    }
173}
174
175impl ListeningMode {
176    /// Returns whether a notification with this reason should be processed under this listening
177    /// mode.
178    pub fn is_relevant(&self, reason: &Reason) -> bool {
179        match (reason, self) {
180            // FullChain processes everything.
181            (_, ListeningMode::FullChain) => true,
182            // FollowChain processes new blocks on the chain itself (including events embedded in
183            // NewBlock).
184            (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
185            (_, ListeningMode::FollowChain) => false,
186        }
187    }
188
189    pub fn extend(&mut self, other: Option<ListeningMode>) {
190        match (self, other) {
191            (_, None) => (),
192            (ListeningMode::FullChain, _) => (),
193            (mode, Some(ListeningMode::FullChain)) => {
194                *mode = ListeningMode::FullChain;
195            }
196            (ListeningMode::FollowChain, _) => (),
197        }
198    }
199
200    /// Returns whether this mode implies follow-only behavior (i.e., not participating in
201    /// consensus rounds).
202    pub fn is_follow_only(&self) -> bool {
203        !matches!(self, ListeningMode::FullChain)
204    }
205
206    /// Returns whether this is a full chain mode (synchronizing sender chains and updating
207    /// inboxes).
208    pub fn is_full(&self) -> bool {
209        matches!(self, ListeningMode::FullChain)
210    }
211}
212
213/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
214pub struct Client<Env: Environment> {
215    environment: Env,
216    /// Local node to manage the execution state and the local storage of the chains that we are
217    /// tracking.
218    pub local_node: LocalNodeClient<Env::Storage>,
219    /// Manages the requests sent to validator nodes.
220    requests_scheduler: RequestsScheduler<Env>,
221    /// The admin chain ID.
222    admin_chain_id: ChainId,
223    /// Chains that should be tracked by the client, along with their listening mode.
224    /// The presence of a chain in this map means it is tracked by the local node.
225    chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
226    /// References to clients waiting for chain notifications.
227    notifier: Arc<ChannelNotifier<Notification>>,
228    /// Chain state for the managed chains.
229    chains: papaya::HashMap<ChainId, ChainClientState>,
230    /// Configuration options.
231    options: ChainClientOptions,
232}
233
234impl<Env: Environment> Client<Env> {
235    /// Creates a new `Client` with a new cache and notifiers.
236    #[expect(clippy::too_many_arguments)]
237    #[instrument(level = "trace", skip_all)]
238    pub fn new(
239        environment: Env,
240        admin_chain_id: ChainId,
241        long_lived_services: bool,
242        chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
243        name: impl Into<String>,
244        chain_worker_ttl: Duration,
245        sender_chain_worker_ttl: Duration,
246        options: ChainClientOptions,
247        requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
248    ) -> Self {
249        let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect()));
250        let state = WorkerState::new_for_client(
251            name.into(),
252            environment.storage().clone(),
253            chain_modes.clone(),
254        )
255        .with_long_lived_services(long_lived_services)
256        .with_allow_inactive_chains(true)
257        .with_allow_messages_from_deprecated_epochs(true)
258        .with_chain_worker_ttl(chain_worker_ttl)
259        .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
260        let local_node = LocalNodeClient::new(state);
261        let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
262
263        Self {
264            environment,
265            local_node,
266            requests_scheduler,
267            chains: papaya::HashMap::new(),
268            admin_chain_id,
269            chain_modes,
270            notifier: Arc::new(ChannelNotifier::default()),
271            options,
272        }
273    }
274
275    /// Returns the chain ID of the admin chain.
276    pub fn admin_chain_id(&self) -> ChainId {
277        self.admin_chain_id
278    }
279
280    /// Returns the storage client used by this client's local node.
281    pub fn storage_client(&self) -> &Env::Storage {
282        self.environment.storage()
283    }
284
285    pub fn validator_node_provider(&self) -> &Env::Network {
286        self.environment.network()
287    }
288
289    /// Returns a reference to the client's [`Signer`][crate::environment::Signer].
290    #[instrument(level = "trace", skip(self))]
291    pub fn signer(&self) -> &Env::Signer {
292        self.environment.signer()
293    }
294
295    /// Returns whether the signer has a key for the given owner.
296    pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, ChainClientError> {
297        self.signer()
298            .contains_key(owner)
299            .await
300            .map_err(ChainClientError::signer_failure)
301    }
302
303    /// Returns a reference to the client's [`Wallet`][crate::environment::Wallet].
304    pub fn wallet(&self) -> &Env::Wallet {
305        self.environment.wallet()
306    }
307
308    /// Returns whether the given chain is in follow-only mode (no owner key in the wallet).
309    ///
310    /// If the chain is not in the wallet, returns `true` since we don't have an owner key
311    /// for it.
312    async fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
313        match self.wallet().get(chain_id).await {
314            Ok(Some(chain)) => chain.owner.is_none(),
315            // Chain not in wallet or error: treat as follow-only.
316            Ok(None) | Err(_) => true,
317        }
318    }
319
320    /// Extends the listening mode for a chain, combining with the existing mode if present.
321    /// Returns the resulting mode.
322    #[instrument(level = "trace", skip(self))]
323    pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
324        let mut chain_modes = self
325            .chain_modes
326            .write()
327            .expect("Panics should not happen while holding a lock to `chain_modes`");
328        let entry = chain_modes.entry(chain_id).or_insert(mode.clone());
329        entry.extend(Some(mode));
330        entry.clone()
331    }
332
333    /// Returns the listening mode for a chain, if it is tracked.
334    pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
335        self.chain_modes
336            .read()
337            .expect("Panics should not happen while holding a lock to `chain_modes`")
338            .get(&chain_id)
339            .cloned()
340    }
341
342    /// Returns whether a chain is fully tracked by the local node.
343    pub fn is_tracked(&self, chain_id: ChainId) -> bool {
344        self.chain_modes
345            .read()
346            .expect("Panics should not happen while holding a lock to `chain_modes`")
347            .get(&chain_id)
348            .is_some_and(ListeningMode::is_full)
349    }
350
351    /// Creates a new `ChainClient`.
352    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
353    pub fn create_chain_client(
354        self: &Arc<Self>,
355        chain_id: ChainId,
356        block_hash: Option<CryptoHash>,
357        next_block_height: BlockHeight,
358        pending_proposal: Option<PendingProposal>,
359        preferred_owner: Option<AccountOwner>,
360        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
361    ) -> ChainClient<Env> {
362        // If the entry already exists we assume that the entry is more up to date than
363        // the arguments: If they were read from the wallet file, they might be stale.
364        self.chains
365            .pin()
366            .get_or_insert_with(chain_id, || ChainClientState::new(pending_proposal.clone()));
367
368        ChainClient {
369            client: self.clone(),
370            chain_id,
371            options: self.options.clone(),
372            preferred_owner,
373            initial_block_hash: block_hash,
374            initial_next_block_height: next_block_height,
375            timing_sender,
376        }
377    }
378
379    /// Fetches the chain description blob if needed, and returns the chain info.
380    async fn fetch_chain_info(
381        &self,
382        chain_id: ChainId,
383        validators: &[RemoteNode<Env::ValidatorNode>],
384    ) -> Result<Box<ChainInfo>, ChainClientError> {
385        match self.local_node.chain_info(chain_id).await {
386            Ok(info) => Ok(info),
387            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
388                // Make sure the admin chain is up to date.
389                Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
390                // If the chain is missing then the error is a WorkerError
391                // and so a BlobsNotFound
392                self.update_local_node_with_blobs_from(blob_ids, validators)
393                    .await?;
394                Ok(self.local_node.chain_info(chain_id).await?)
395            }
396            Err(err) => Err(err.into()),
397        }
398    }
399
400    /// Downloads and processes all certificates up to (excluding) the specified height.
401    #[instrument(level = "trace", skip(self))]
402    async fn download_certificates(
403        &self,
404        chain_id: ChainId,
405        target_next_block_height: BlockHeight,
406    ) -> Result<Box<ChainInfo>, ChainClientError> {
407        let mut validators = self.validator_nodes().await?;
408        // Sequentially try each validator in random order.
409        validators.shuffle(&mut rand::thread_rng());
410        let mut info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
411        for remote_node in validators {
412            if target_next_block_height <= info.next_block_height {
413                return Ok(info);
414            }
415            match self
416                .download_certificates_from(&remote_node, chain_id, target_next_block_height)
417                .await
418            {
419                Err(error) => info!(
420                    remote_node = remote_node.address(),
421                    %error,
422                    "failed to download certificates from validator",
423                ),
424                Ok(Some(new_info)) => info = new_info,
425                Ok(None) => {}
426            }
427        }
428        ensure!(
429            target_next_block_height <= info.next_block_height,
430            ChainClientError::CannotDownloadCertificates {
431                chain_id,
432                target_next_block_height,
433            }
434        );
435        Ok(info)
436    }
437
438    /// Downloads and processes all certificates up to (excluding) the specified height from the
439    /// given validator.
440    #[instrument(level = "trace", skip_all)]
441    async fn download_certificates_from(
442        &self,
443        remote_node: &RemoteNode<Env::ValidatorNode>,
444        chain_id: ChainId,
445        stop: BlockHeight,
446    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
447        let mut last_info = None;
448        // First load any blocks from local storage, if available.
449        let chain_info = self.local_node.chain_info(chain_id).await?;
450        let mut next_height = chain_info.next_block_height;
451        let hashes = self
452            .local_node
453            .get_preprocessed_block_hashes(chain_id, next_height, stop)
454            .await?;
455        let certificates = self.storage_client().read_certificates(&hashes).await?;
456        let certificates = match ResultReadCertificates::new(certificates, hashes) {
457            ResultReadCertificates::Certificates(certificates) => certificates,
458            ResultReadCertificates::InvalidHashes(hashes) => {
459                return Err(ChainClientError::ReadCertificatesError(hashes))
460            }
461        };
462        for certificate in certificates {
463            last_info = Some(self.handle_certificate(certificate).await?.info);
464        }
465        // Now download the rest in batches from the remote node.
466        while next_height < stop {
467            // TODO(#2045): Analyze network errors instead of using a fixed batch size.
468            let limit = u64::from(stop)
469                .checked_sub(u64::from(next_height))
470                .ok_or(ArithmeticError::Overflow)?
471                .min(self.options.certificate_download_batch_size);
472
473            let certificates = self
474                .requests_scheduler
475                .download_certificates(remote_node, chain_id, next_height, limit)
476                .await?;
477            let Some(info) = self.process_certificates(remote_node, certificates).await? else {
478                break;
479            };
480            assert!(info.next_block_height > next_height);
481            next_height = info.next_block_height;
482            last_info = Some(info);
483        }
484        Ok(last_info)
485    }
486
487    async fn download_blobs(
488        &self,
489        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
490        blob_ids: &[BlobId],
491    ) -> Result<(), ChainClientError> {
492        let blobs = &self
493            .requests_scheduler
494            .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
495            .await?
496            .ok_or_else(|| {
497                ChainClientError::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
498            })?;
499        self.local_node.store_blobs(blobs).await.map_err(Into::into)
500    }
501
502    /// Tries to process all the certificates, requesting any missing blobs from the given node.
503    /// Returns the chain info of the last successfully processed certificate.
504    #[instrument(level = "trace", skip_all)]
505    async fn process_certificates(
506        &self,
507        remote_node: &RemoteNode<Env::ValidatorNode>,
508        certificates: Vec<ConfirmedBlockCertificate>,
509    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
510        let mut info = None;
511        let required_blob_ids: Vec<_> = certificates
512            .iter()
513            .flat_map(|certificate| certificate.value().required_blob_ids())
514            .collect();
515
516        match self
517            .local_node
518            .read_blob_states_from_storage(&required_blob_ids)
519            .await
520        {
521            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
522                self.download_blobs(&[remote_node.clone()], &blob_ids)
523                    .await?;
524            }
525            x => {
526                x?;
527            }
528        }
529
530        for certificate in certificates {
531            info = Some(
532                match self.handle_certificate(certificate.clone()).await {
533                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
534                        self.download_blobs(&[remote_node.clone()], &blob_ids)
535                            .await?;
536                        self.handle_certificate(certificate).await?
537                    }
538                    x => x?,
539                }
540                .info,
541            );
542        }
543
544        // Done with all certificates.
545        Ok(info)
546    }
547
548    async fn handle_certificate<T: ProcessableCertificate>(
549        &self,
550        certificate: GenericCertificate<T>,
551    ) -> Result<ChainInfoResponse, LocalNodeError> {
552        self.local_node
553            .handle_certificate(certificate, &self.notifier)
554            .await
555    }
556
557    async fn chain_info_with_committees(
558        &self,
559        chain_id: ChainId,
560    ) -> Result<Box<ChainInfo>, LocalNodeError> {
561        let query = ChainInfoQuery::new(chain_id).with_committees();
562        let info = self.local_node.handle_chain_info_query(query).await?.info;
563        Ok(info)
564    }
565
566    /// Obtains all the committees trusted by any of the given chains. Also returns the highest
567    /// of their epochs.
568    #[instrument(level = "trace", skip_all)]
569    async fn admin_committees(
570        &self,
571    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
572        let info = self.chain_info_with_committees(self.admin_chain_id).await?;
573        Ok((info.epoch, info.into_committees()?))
574    }
575
576    /// Obtains the committee for the latest epoch on the admin chain.
577    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
578        let info = self.chain_info_with_committees(self.admin_chain_id).await?;
579        Ok((info.epoch, info.into_current_committee()?))
580    }
581
582    /// Obtains the validators for the latest epoch.
583    async fn validator_nodes(
584        &self,
585    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, ChainClientError> {
586        let (_, committee) = self.admin_committee().await?;
587        Ok(self.make_nodes(&committee)?)
588    }
589
590    /// Creates a [`RemoteNode`] for each validator in the committee.
591    fn make_nodes(
592        &self,
593        committee: &Committee,
594    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
595        Ok(self
596            .validator_node_provider()
597            .make_nodes(committee)?
598            .map(|(public_key, node)| RemoteNode { public_key, node })
599            .collect())
600    }
601
602    /// Ensures that the client has the `ChainDescription` blob corresponding to this
603    /// client's `ChainId`, and returns the chain description blob.
604    pub async fn get_chain_description_blob(
605        &self,
606        chain_id: ChainId,
607    ) -> Result<Blob, ChainClientError> {
608        let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
609        let blob = self
610            .local_node
611            .storage_client()
612            .read_blob(chain_desc_id)
613            .await?;
614        if let Some(blob) = blob {
615            // We have the blob - return it.
616            return Ok(blob);
617        }
618        // Recover history from the current validators, according to the admin chain.
619        Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
620        let nodes = self.validator_nodes().await?;
621        Ok(self
622            .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
623            .await?
624            .pop()
625            .unwrap()) // Returns exactly as many blobs as passed-in IDs.
626    }
627
628    /// Ensures that the client has the `ChainDescription` blob corresponding to this
629    /// client's `ChainId`, and returns the chain description.
630    pub async fn get_chain_description(
631        &self,
632        chain_id: ChainId,
633    ) -> Result<ChainDescription, ChainClientError> {
634        let blob = self.get_chain_description_blob(chain_id).await?;
635        Ok(bcs::from_bytes(blob.bytes())?)
636    }
637
638    /// Updates the latest block and next block height and round information from the chain info.
639    #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
640    fn update_from_info(&self, info: &ChainInfo) {
641        self.chains.pin().update(info.chain_id, |state| {
642            let mut state = state.clone_for_update_unchecked();
643            state.update_from_info(info);
644            state
645        });
646    }
647
648    /// Handles the certificate in the local node and the resulting notifications.
649    #[instrument(level = "trace", skip_all)]
650    async fn process_certificate<T: ProcessableCertificate>(
651        &self,
652        certificate: Box<GenericCertificate<T>>,
653    ) -> Result<(), LocalNodeError> {
654        let info = self.handle_certificate(*certificate).await?.info;
655        self.update_from_info(&info);
656        Ok(())
657    }
658
659    /// Submits a validated block for finalization and returns the confirmed block certificate.
660    #[instrument(level = "trace", skip_all)]
661    async fn finalize_block(
662        self: &Arc<Self>,
663        committee: &Committee,
664        certificate: ValidatedBlockCertificate,
665    ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
666        debug!(round = %certificate.round, "Submitting block for confirmation");
667        let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
668        let finalize_action = CommunicateAction::FinalizeBlock {
669            certificate: Box::new(certificate),
670            delivery: self.options.cross_chain_message_delivery,
671        };
672        let certificate = self
673            .communicate_chain_action(committee, finalize_action, hashed_value)
674            .await?;
675        self.receive_certificate_with_checked_signatures(certificate.clone())
676            .await?;
677        Ok(certificate)
678    }
679
680    /// Submits a block proposal to the validators.
681    #[instrument(level = "trace", skip_all)]
682    async fn submit_block_proposal<T: ProcessableCertificate>(
683        self: &Arc<Self>,
684        committee: &Committee,
685        proposal: Box<BlockProposal>,
686        value: T,
687    ) -> Result<GenericCertificate<T>, ChainClientError> {
688        debug!(
689            round = %proposal.content.round,
690            "Submitting block proposal to validators"
691        );
692
693        // Check if the block timestamp is in the future and log INFO.
694        let block_timestamp = proposal.content.block.timestamp;
695        let local_time = self.local_node.storage_client().clock().current_time();
696        if block_timestamp > local_time {
697            info!(
698                chain_id = %proposal.content.block.chain_id,
699                %block_timestamp,
700                %local_time,
701                "Block timestamp is in the future; waiting until it can be proposed",
702            );
703        }
704
705        // Create channel for clock skew reports from validators.
706        let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
707        let submit_action = CommunicateAction::SubmitBlock {
708            proposal,
709            blob_ids: value.required_blob_ids().into_iter().collect(),
710            clock_skew_sender,
711        };
712
713        // Spawn a task to monitor clock skew reports and warn if threshold is reached.
714        let validity_threshold = committee.validity_threshold();
715        let committee_clone = committee.clone();
716        let clock_skew_check_handle = linera_base::task::spawn(async move {
717            let mut skew_weight = 0u64;
718            let mut min_skew = TimeDelta::MAX;
719            let mut max_skew = TimeDelta::ZERO;
720            while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
721                if clock_skew.as_micros() > 0 {
722                    skew_weight += committee_clone.weight(&public_key);
723                    min_skew = min_skew.min(clock_skew);
724                    max_skew = max_skew.max(clock_skew);
725                    if skew_weight >= validity_threshold {
726                        warn!(
727                            skew_weight,
728                            validity_threshold,
729                            min_skew_ms = min_skew.as_micros() / 1000,
730                            max_skew_ms = max_skew.as_micros() / 1000,
731                            "A validity threshold of validators reported clock skew; \
732                             consider checking your system clock",
733                        );
734                        return;
735                    }
736                }
737            }
738        });
739
740        let certificate = self
741            .communicate_chain_action(committee, submit_action, value)
742            .await?;
743
744        clock_skew_check_handle.await;
745
746        self.process_certificate(Box::new(certificate.clone()))
747            .await?;
748        Ok(certificate)
749    }
750
751    /// Broadcasts certified blocks to validators.
752    #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
753    async fn communicate_chain_updates(
754        self: &Arc<Self>,
755        committee: &Committee,
756        chain_id: ChainId,
757        height: BlockHeight,
758        delivery: CrossChainMessageDelivery,
759        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
760    ) -> Result<(), ChainClientError> {
761        let nodes = self.make_nodes(committee)?;
762        communicate_with_quorum(
763            &nodes,
764            committee,
765            |_: &()| (),
766            |remote_node| {
767                let mut updater = ValidatorUpdater {
768                    remote_node,
769                    client: self.clone(),
770                    admin_chain_id: self.admin_chain_id,
771                };
772                let certificate = latest_certificate.clone();
773                Box::pin(async move {
774                    updater
775                        .send_chain_information(chain_id, height, delivery, certificate)
776                        .await
777                })
778            },
779            self.options.quorum_grace_period,
780        )
781        .await?;
782        Ok(())
783    }
784
785    /// Broadcasts certified blocks and optionally a block proposal, certificate or
786    /// leader timeout request.
787    ///
788    /// In that case, it verifies that the validator votes are for the provided value,
789    /// and returns a certificate.
790    #[instrument(level = "trace", skip_all)]
791    async fn communicate_chain_action<T: CertificateValue>(
792        self: &Arc<Self>,
793        committee: &Committee,
794        action: CommunicateAction,
795        value: T,
796    ) -> Result<GenericCertificate<T>, ChainClientError> {
797        let nodes = self.make_nodes(committee)?;
798        let ((votes_hash, votes_round), votes) = communicate_with_quorum(
799            &nodes,
800            committee,
801            |vote: &LiteVote| (vote.value.value_hash, vote.round),
802            |remote_node| {
803                let mut updater = ValidatorUpdater {
804                    remote_node,
805                    client: self.clone(),
806                    admin_chain_id: self.admin_chain_id,
807                };
808                let action = action.clone();
809                Box::pin(async move { updater.send_chain_update(action).await })
810            },
811            self.options.quorum_grace_period,
812        )
813        .await?;
814        ensure!(
815            (votes_hash, votes_round) == (value.hash(), action.round()),
816            ChainClientError::UnexpectedQuorum {
817                hash: votes_hash,
818                round: votes_round,
819                expected_hash: value.hash(),
820                expected_round: action.round(),
821            }
822        );
823        // Certificate is valid because
824        // * `communicate_with_quorum` ensured a sufficient "weight" of
825        // (non-error) answers were returned by validators.
826        // * each answer is a vote signed by the expected validator.
827        let certificate = LiteCertificate::try_from_votes(votes)
828            .ok_or_else(|| {
829                ChainClientError::InternalError("Vote values or rounds don't match; this is a bug")
830            })?
831            .with_value(value)
832            .ok_or_else(|| {
833                ChainClientError::ProtocolError("A quorum voted for an unexpected value")
834            })?;
835        Ok(certificate)
836    }
837
838    /// Processes the confirmed block certificate in the local node without checking signatures.
839    /// Also downloads and processes all ancestors that are still missing.
840    #[instrument(level = "trace", skip_all)]
841    async fn receive_certificate_with_checked_signatures(
842        &self,
843        certificate: ConfirmedBlockCertificate,
844    ) -> Result<(), ChainClientError> {
845        let certificate = Box::new(certificate);
846        let block = certificate.block();
847        // Recover history from the network.
848        self.download_certificates(block.header.chain_id, block.header.height)
849            .await?;
850        // Process the received operations. Download required hashed certificate values if
851        // necessary.
852        if let Err(err) = self.process_certificate(certificate.clone()).await {
853            match &err {
854                LocalNodeError::BlobsNotFound(blob_ids) => {
855                    self.download_blobs(&self.validator_nodes().await?, blob_ids)
856                        .await
857                        .map_err(|_| err)?;
858                    self.process_certificate(certificate).await?;
859                }
860                _ => {
861                    // The certificate is not as expected. Give up.
862                    warn!("Failed to process network hashed certificate value");
863                    return Err(err.into());
864                }
865            }
866        }
867
868        Ok(())
869    }
870
871    /// Processes the confirmed block in the local node, possibly without executing it.
872    #[instrument(level = "trace", skip_all)]
873    #[allow(dead_code)] // Otherwise CI fails when built for docker.
874    async fn receive_sender_certificate(
875        &self,
876        certificate: ConfirmedBlockCertificate,
877        mode: ReceiveCertificateMode,
878        nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
879    ) -> Result<(), ChainClientError> {
880        // Verify the certificate before doing any expensive networking.
881        let (max_epoch, committees) = self.admin_committees().await?;
882        if let ReceiveCertificateMode::NeedsCheck = mode {
883            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
884        }
885        // Recover history from the network.
886        let nodes = if let Some(nodes) = nodes {
887            nodes
888        } else {
889            self.validator_nodes().await?
890        };
891        if let Err(err) = self.handle_certificate(certificate.clone()).await {
892            match &err {
893                LocalNodeError::BlobsNotFound(blob_ids) => {
894                    self.download_blobs(&nodes, blob_ids).await?;
895                    self.handle_certificate(certificate.clone()).await?;
896                }
897                _ => {
898                    // The certificate is not as expected. Give up.
899                    warn!("Failed to process network hashed certificate value");
900                    return Err(err.into());
901                }
902            }
903        }
904
905        Ok(())
906    }
907
908    /// Downloads and processes certificates for sender chain blocks.
909    #[instrument(level = "trace", skip_all)]
910    async fn download_and_process_sender_chain(
911        &self,
912        sender_chain_id: ChainId,
913        nodes: &[RemoteNode<Env::ValidatorNode>],
914        received_log: &ReceivedLogs,
915        mut remote_heights: Vec<BlockHeight>,
916        sender: mpsc::UnboundedSender<ChainAndHeight>,
917    ) {
918        let (max_epoch, committees) = match self.admin_committees().await {
919            Ok(result) => result,
920            Err(error) => {
921                error!(%error, %sender_chain_id, "could not read admin committees");
922                return;
923            }
924        };
925        let committees_ref = &committees;
926        let mut nodes = nodes.to_vec();
927        while !remote_heights.is_empty() {
928            let remote_heights_ref = &remote_heights;
929            nodes.shuffle(&mut rand::thread_rng());
930            let certificates = match communicate_concurrently(
931                &nodes,
932                async move |remote_node| {
933                    let mut remote_heights = remote_heights_ref.clone();
934                    // No need trying to download certificates the validator didn't have in their
935                    // log - we'll retry downloading the remaining ones next time we loop.
936                    remote_heights.retain(|height| {
937                        received_log.validator_has_block(
938                            &remote_node.public_key,
939                            sender_chain_id,
940                            *height,
941                        )
942                    });
943                    if remote_heights.is_empty() {
944                        // It makes no sense to return `Ok(_)` if we aren't going to try downloading
945                        // anything from the validator - let the function try the other validators
946                        return Err(());
947                    }
948                    let certificates = self
949                        .requests_scheduler
950                        .download_certificates_by_heights(
951                            &remote_node,
952                            sender_chain_id,
953                            remote_heights,
954                        )
955                        .await
956                        .map_err(|_| ())?;
957                    let mut certificates_with_check_results = vec![];
958                    for cert in certificates {
959                        if let Ok(check_result) =
960                            Self::check_certificate(max_epoch, committees_ref, &cert)
961                        {
962                            certificates_with_check_results
963                                .push((cert, check_result.into_result().is_ok()));
964                        } else {
965                            // Invalid signature - the validator is faulty
966                            return Err(());
967                        }
968                    }
969                    Ok(certificates_with_check_results)
970                },
971                |errors| {
972                    errors
973                        .into_iter()
974                        .map(|(validator, _error)| validator)
975                        .collect::<BTreeSet<_>>()
976                },
977                self.options.certificate_batch_download_timeout,
978            )
979            .await
980            {
981                Ok(certificates_with_check_results) => certificates_with_check_results,
982                Err(faulty_validators) => {
983                    // filter out faulty validators and retry if any are left
984                    nodes.retain(|node| !faulty_validators.contains(&node.public_key));
985                    if nodes.is_empty() {
986                        info!(
987                            chain_id = %sender_chain_id,
988                            "could not download certificates for chain - no more correct validators left"
989                        );
990                        return;
991                    }
992                    continue;
993                }
994            };
995
996            trace!(
997                chain_id = %sender_chain_id,
998                num_certificates = %certificates.len(),
999                "received certificates",
1000            );
1001
1002            let mut to_remove_from_queue = BTreeSet::new();
1003
1004            for (certificate, check_result) in certificates {
1005                let hash = certificate.hash();
1006                let chain_id = certificate.block().header.chain_id;
1007                let height = certificate.block().header.height;
1008                if !check_result {
1009                    // The certificate was correctly signed, but we were missing a committee to
1010                    // validate it properly - do not receive it, but also do not attempt to
1011                    // re-download it.
1012                    to_remove_from_queue.insert(height);
1013                    continue;
1014                }
1015                // We checked the certificates right after downloading them.
1016                let mode = ReceiveCertificateMode::AlreadyChecked;
1017                if let Err(error) = self
1018                    .receive_sender_certificate(certificate, mode, None)
1019                    .await
1020                {
1021                    warn!(%error, %hash, "Received invalid certificate");
1022                } else {
1023                    to_remove_from_queue.insert(height);
1024                    if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1025                        error!(
1026                            %chain_id,
1027                            %height,
1028                            %error,
1029                            "failed to send chain and height over the channel",
1030                        );
1031                    }
1032                }
1033            }
1034
1035            remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1036        }
1037        trace!(
1038            chain_id = %sender_chain_id,
1039            "find_received_certificates: finished processing chain",
1040        );
1041    }
1042
1043    /// Downloads the log of received messages for a chain from a validator.
1044    #[instrument(level = "trace", skip(self))]
1045    async fn get_received_log_from_validator(
1046        &self,
1047        chain_id: ChainId,
1048        remote_node: &RemoteNode<Env::ValidatorNode>,
1049        tracker: u64,
1050    ) -> Result<Vec<ChainAndHeight>, ChainClientError> {
1051        let mut offset = tracker;
1052
1053        // Retrieve the list of newly received certificates from this validator.
1054        let mut remote_log = Vec::new();
1055        loop {
1056            trace!("get_received_log_from_validator: looping");
1057            let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1058            let info = remote_node.handle_chain_info_query(query).await?;
1059            let received_entries = info.requested_received_log.len();
1060            offset += received_entries as u64;
1061            remote_log.extend(info.requested_received_log);
1062            trace!(
1063                remote_node = remote_node.address(),
1064                %received_entries,
1065                "get_received_log_from_validator: received log batch",
1066            );
1067            if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1068                break;
1069            }
1070        }
1071
1072        trace!(
1073            remote_node = remote_node.address(),
1074            num_entries = remote_log.len(),
1075            "get_received_log_from_validator: returning downloaded log",
1076        );
1077
1078        Ok(remote_log)
1079    }
1080
1081    /// Downloads a specific sender block and recursively downloads any earlier blocks
1082    /// that also sent a message to our chain, based on `previous_message_blocks`.
1083    ///
1084    /// This ensures that we have all the sender blocks needed to preprocess the target block
1085    /// and put the messages to our chain into the outbox.
1086    async fn download_sender_block_with_sending_ancestors(
1087        &self,
1088        receiver_chain_id: ChainId,
1089        sender_chain_id: ChainId,
1090        height: BlockHeight,
1091        remote_node: &RemoteNode<Env::ValidatorNode>,
1092    ) -> Result<(), ChainClientError> {
1093        let next_outbox_height = self
1094            .local_node
1095            .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1096            .await?
1097            .get(&sender_chain_id)
1098            .copied()
1099            .unwrap_or(BlockHeight::ZERO);
1100        let (max_epoch, committees) = self.admin_committees().await?;
1101
1102        // Recursively collect all certificates we need, following
1103        // the chain of previous_message_blocks back to next_outbox_height.
1104        let mut certificates = BTreeMap::new();
1105        let mut current_height = height;
1106
1107        // Stop if we've reached the height we've already processed.
1108        while current_height >= next_outbox_height {
1109            // Download the certificate for this height.
1110            let downloaded = self
1111                .requests_scheduler
1112                .download_certificates_by_heights(
1113                    remote_node,
1114                    sender_chain_id,
1115                    vec![current_height],
1116                )
1117                .await?;
1118            let Some(certificate) = downloaded.into_iter().next() else {
1119                return Err(ChainClientError::CannotDownloadMissingSenderBlock {
1120                    chain_id: sender_chain_id,
1121                    height: current_height,
1122                });
1123            };
1124
1125            // Validate the certificate.
1126            Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1127                .into_result()?;
1128
1129            // Check if there's a previous message block to our chain.
1130            let block = certificate.block();
1131            let next_height = block
1132                .body
1133                .previous_message_blocks
1134                .get(&receiver_chain_id)
1135                .map(|(_prev_hash, prev_height)| *prev_height);
1136
1137            // Store this certificate.
1138            certificates.insert(current_height, certificate);
1139
1140            if let Some(prev_height) = next_height {
1141                // Continue with the previous block.
1142                current_height = prev_height;
1143            } else {
1144                // No more dependencies.
1145                break;
1146            }
1147        }
1148
1149        if certificates.is_empty() {
1150            self.local_node
1151                .retry_pending_cross_chain_requests(sender_chain_id)
1152                .await?;
1153        }
1154
1155        // Process certificates in ascending block height order (BTreeMap keeps them sorted).
1156        for certificate in certificates.into_values() {
1157            self.receive_sender_certificate(
1158                certificate,
1159                ReceiveCertificateMode::AlreadyChecked,
1160                Some(vec![remote_node.clone()]),
1161            )
1162            .await?;
1163        }
1164
1165        Ok(())
1166    }
1167
1168    #[instrument(
1169        level = "trace", skip_all,
1170        fields(certificate_hash = ?incoming_certificate.hash()),
1171    )]
1172    fn check_certificate(
1173        highest_known_epoch: Epoch,
1174        committees: &BTreeMap<Epoch, Committee>,
1175        incoming_certificate: &ConfirmedBlockCertificate,
1176    ) -> Result<CheckCertificateResult, NodeError> {
1177        let block = incoming_certificate.block();
1178        // Check that certificates are valid w.r.t one of our trusted committees.
1179        if block.header.epoch > highest_known_epoch {
1180            return Ok(CheckCertificateResult::FutureEpoch);
1181        }
1182        if let Some(known_committee) = committees.get(&block.header.epoch) {
1183            // This epoch is recognized by our chain. Let's verify the
1184            // certificate.
1185            incoming_certificate.check(known_committee)?;
1186            Ok(CheckCertificateResult::New)
1187        } else {
1188            // We don't accept a certificate from a committee that was retired.
1189            Ok(CheckCertificateResult::OldEpoch)
1190        }
1191    }
1192
1193    /// Downloads and processes any certificates we are missing for the given chain.
1194    ///
1195    /// If we are an owner of the chain, also synchronizes the consensus state.
1196    #[instrument(level = "trace", skip_all)]
1197    pub(crate) async fn synchronize_chain_state(
1198        &self,
1199        chain_id: ChainId,
1200    ) -> Result<Box<ChainInfo>, ChainClientError> {
1201        let (_, committee) = self.admin_committee().await?;
1202        Box::pin(self.synchronize_chain_state_from_committee(chain_id, committee)).await
1203    }
1204
1205    /// Downloads certificates for the given chain from the given committee.
1206    ///
1207    /// If the chain is not in follow-only mode, also fetches and processes manager values
1208    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1209    #[instrument(level = "trace", skip_all)]
1210    pub async fn synchronize_chain_state_from_committee(
1211        &self,
1212        chain_id: ChainId,
1213        committee: Committee,
1214    ) -> Result<Box<ChainInfo>, ChainClientError> {
1215        #[cfg(with_metrics)]
1216        let _latency = if !self.is_chain_follow_only(chain_id).await {
1217            Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1218        } else {
1219            None
1220        };
1221
1222        let validators = self.make_nodes(&committee)?;
1223        Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1224        communicate_with_quorum(
1225            &validators,
1226            &committee,
1227            |_: &()| (),
1228            |remote_node| async move {
1229                self.synchronize_chain_state_from(&remote_node, chain_id)
1230                    .await
1231            },
1232            self.options.quorum_grace_period,
1233        )
1234        .await?;
1235
1236        self.local_node
1237            .chain_info(chain_id)
1238            .await
1239            .map_err(Into::into)
1240    }
1241
1242    /// Downloads any certificates from the specified validator that we are missing for the given
1243    /// chain.
1244    ///
1245    /// If the chain is owned, also fetches and processes manager values
1246    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1247    #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1248    pub(crate) async fn synchronize_chain_state_from(
1249        &self,
1250        remote_node: &RemoteNode<Env::ValidatorNode>,
1251        chain_id: ChainId,
1252    ) -> Result<(), ChainClientError> {
1253        let with_manager_values = !self.is_chain_follow_only(chain_id).await;
1254        let query = if with_manager_values {
1255            ChainInfoQuery::new(chain_id).with_manager_values()
1256        } else {
1257            ChainInfoQuery::new(chain_id)
1258        };
1259        let remote_info = remote_node.handle_chain_info_query(query).await?;
1260        let local_info = self
1261            .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1262            .await?;
1263
1264        if !with_manager_values {
1265            return Ok(());
1266        }
1267
1268        // If we are at the same height as the remote node, we also update our chain manager.
1269        let local_height = match local_info {
1270            Some(info) => info.next_block_height,
1271            None => {
1272                self.local_node
1273                    .chain_info(chain_id)
1274                    .await?
1275                    .next_block_height
1276            }
1277        };
1278        if local_height != remote_info.next_block_height {
1279            debug!(
1280                remote_node = remote_node.address(),
1281                remote_height = %remote_info.next_block_height,
1282                local_height = %local_height,
1283                "synced from validator, but remote height and local height are different",
1284            );
1285            return Ok(());
1286        };
1287
1288        if let Some(timeout) = remote_info.manager.timeout {
1289            self.handle_certificate(*timeout).await?;
1290        }
1291        let mut proposals = Vec::new();
1292        if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1293            proposals.push(*proposal);
1294        }
1295        if let Some(proposal) = remote_info.manager.requested_proposed {
1296            proposals.push(*proposal);
1297        }
1298        if let Some(locking) = remote_info.manager.requested_locking {
1299            match *locking {
1300                LockingBlock::Fast(proposal) => {
1301                    proposals.push(proposal);
1302                }
1303                LockingBlock::Regular(cert) => {
1304                    let hash = cert.hash();
1305                    if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1306                    {
1307                        debug!(
1308                            remote_node = remote_node.address(),
1309                            %hash,
1310                            height = %local_height,
1311                            %error,
1312                            "skipping locked block from validator",
1313                        );
1314                    }
1315                }
1316            }
1317        }
1318        'proposal_loop: for proposal in proposals {
1319            let owner: AccountOwner = proposal.owner();
1320            if let Err(mut err) =
1321                Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1322            {
1323                if let LocalNodeError::BlobsNotFound(_) = &err {
1324                    let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1325                    if !required_blob_ids.is_empty() {
1326                        let mut blobs = Vec::new();
1327                        for blob_id in required_blob_ids {
1328                            let blob_content = match self
1329                                .requests_scheduler
1330                                .download_pending_blob(remote_node, chain_id, blob_id)
1331                                .await
1332                            {
1333                                Ok(content) => content,
1334                                Err(error) => {
1335                                    info!(
1336                                        remote_node = remote_node.address(),
1337                                        height = %local_height,
1338                                        proposer = %owner,
1339                                        %blob_id,
1340                                        %error,
1341                                        "skipping proposal from validator; failed to download blob",
1342                                    );
1343                                    continue 'proposal_loop;
1344                                }
1345                            };
1346                            blobs.push(Blob::new(blob_content));
1347                        }
1348                        self.local_node
1349                            .handle_pending_blobs(chain_id, blobs)
1350                            .await?;
1351                        // We found the missing blobs: retry.
1352                        if let Err(new_err) =
1353                            Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1354                        {
1355                            err = new_err;
1356                        } else {
1357                            continue;
1358                        }
1359                    }
1360                    if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1361                        self.update_local_node_with_blobs_from(
1362                            blob_ids.clone(),
1363                            &[remote_node.clone()],
1364                        )
1365                        .await?;
1366                        // We found the missing blobs: retry.
1367                        if let Err(new_err) =
1368                            Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1369                        {
1370                            err = new_err;
1371                        } else {
1372                            continue;
1373                        }
1374                    }
1375                }
1376                while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1377                    if let ChainError::MissingCrossChainUpdate {
1378                        chain_id,
1379                        origin,
1380                        height,
1381                    } = &**chain_err
1382                    {
1383                        self.download_sender_block_with_sending_ancestors(
1384                            *chain_id,
1385                            *origin,
1386                            *height,
1387                            remote_node,
1388                        )
1389                        .await?;
1390                        // Retry
1391                        if let Err(new_err) =
1392                            Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1393                        {
1394                            err = new_err;
1395                        } else {
1396                            continue 'proposal_loop;
1397                        }
1398                    } else {
1399                        break;
1400                    }
1401                }
1402
1403                debug!(
1404                    remote_node = remote_node.address(),
1405                    proposer = %owner,
1406                    height = %local_height,
1407                    error = %err,
1408                    "skipping proposal from validator",
1409                );
1410            }
1411        }
1412        Ok(())
1413    }
1414
1415    async fn try_process_locking_block_from(
1416        &self,
1417        remote_node: &RemoteNode<Env::ValidatorNode>,
1418        certificate: GenericCertificate<ValidatedBlock>,
1419    ) -> Result<(), ChainClientError> {
1420        let chain_id = certificate.inner().chain_id();
1421        let certificate = Box::new(certificate);
1422        match self.process_certificate(certificate.clone()).await {
1423            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1424                let mut blobs = Vec::new();
1425                for blob_id in blob_ids {
1426                    let blob_content = self
1427                        .requests_scheduler
1428                        .download_pending_blob(remote_node, chain_id, blob_id)
1429                        .await?;
1430                    blobs.push(Blob::new(blob_content));
1431                }
1432                self.local_node
1433                    .handle_pending_blobs(chain_id, blobs)
1434                    .await?;
1435                self.process_certificate(certificate).await?;
1436                Ok(())
1437            }
1438            Err(err) => Err(err.into()),
1439            Ok(()) => Ok(()),
1440        }
1441    }
1442
1443    /// Downloads and processes from the specified validators a confirmed block certificates that
1444    /// use the given blobs. If this succeeds, the blob will be in our storage.
1445    async fn update_local_node_with_blobs_from(
1446        &self,
1447        blob_ids: Vec<BlobId>,
1448        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1449    ) -> Result<Vec<Blob>, ChainClientError> {
1450        let timeout = self.options.blob_download_timeout;
1451        // Deduplicate IDs.
1452        let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1453        stream::iter(blob_ids.into_iter().map(|blob_id| {
1454            communicate_concurrently(
1455                remote_nodes,
1456                async move |remote_node| {
1457                    let certificate = self
1458                        .requests_scheduler
1459                        .download_certificate_for_blob(&remote_node, blob_id)
1460                        .await?;
1461                    self.receive_sender_certificate(
1462                        certificate,
1463                        ReceiveCertificateMode::NeedsCheck,
1464                        Some(vec![remote_node.clone()]),
1465                    )
1466                    .await?;
1467                    let blob = self
1468                        .local_node
1469                        .storage_client()
1470                        .read_blob(blob_id)
1471                        .await?
1472                        .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1473                    Result::<_, ChainClientError>::Ok(blob)
1474                },
1475                move |_| ChainClientError::from(NodeError::BlobsNotFound(vec![blob_id])),
1476                timeout,
1477            )
1478        }))
1479        .buffer_unordered(self.options.max_joined_tasks)
1480        .collect::<Vec<_>>()
1481        .await
1482        .into_iter()
1483        .collect()
1484    }
1485
1486    /// Attempts to execute the block locally. If any incoming message execution fails, that
1487    /// message is rejected and execution is retried, until the block accepts only messages
1488    /// that succeed.
1489    ///
1490    /// Attempts to execute the block locally with a specified policy for handling bundle failures.
1491    /// If any attempt to read a blob fails, the blob is downloaded and execution is retried.
1492    ///
1493    /// Returns the modified block (bundles may be rejected/removed based on the policy)
1494    /// and the execution result.
1495    #[instrument(level = "trace", skip(self, block))]
1496    async fn stage_block_execution_with_policy(
1497        &self,
1498        block: ProposedBlock,
1499        round: Option<u32>,
1500        published_blobs: Vec<Blob>,
1501        policy: BundleExecutionPolicy,
1502    ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1503        loop {
1504            let result = self
1505                .local_node
1506                .stage_block_execution_with_policy(
1507                    block.clone(),
1508                    round,
1509                    published_blobs.clone(),
1510                    policy,
1511                )
1512                .await;
1513            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1514                let validators = self.validator_nodes().await?;
1515                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1516                    .await?;
1517                continue; // We found the missing blob: retry.
1518            }
1519            if let Ok((_, executed_block, _, _)) = &result {
1520                let hash = CryptoHash::new(executed_block);
1521                let notification = Notification {
1522                    chain_id: executed_block.header.chain_id,
1523                    reason: Reason::BlockExecuted {
1524                        height: executed_block.header.height,
1525                        hash,
1526                    },
1527                };
1528                self.notifier.notify(&[notification]);
1529            }
1530            let (_modified_block, executed_block, response, _resource_tracker) = result?;
1531            return Ok((executed_block, response));
1532        }
1533    }
1534
1535    /// Attempts to execute the block locally. If any attempt to read a blob fails, the blob is
1536    /// downloaded and execution is retried.
1537    #[instrument(level = "trace", skip(self, block))]
1538    async fn stage_block_execution(
1539        &self,
1540        block: ProposedBlock,
1541        round: Option<u32>,
1542        published_blobs: Vec<Blob>,
1543    ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1544        loop {
1545            let result = self
1546                .local_node
1547                .stage_block_execution(block.clone(), round, published_blobs.clone())
1548                .await;
1549            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1550                let validators = self.validator_nodes().await?;
1551                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1552                    .await?;
1553                continue; // We found the missing blob: retry.
1554            }
1555            if let Ok((block, _, _)) = &result {
1556                let hash = CryptoHash::new(block);
1557                let notification = Notification {
1558                    chain_id: block.header.chain_id,
1559                    reason: Reason::BlockExecuted {
1560                        height: block.header.height,
1561                        hash,
1562                    },
1563                };
1564                self.notifier.notify(&[notification]);
1565            }
1566            let (block, response, _resource_tracker) = result?;
1567            return Ok((block, response));
1568        }
1569    }
1570}
1571
1572#[derive(Debug, Clone, Copy)]
1573pub enum TimingType {
1574    ExecuteOperations,
1575    ExecuteBlock,
1576    SubmitBlockProposal,
1577    UpdateValidators,
1578}
1579
1580#[derive(Debug, Clone)]
1581pub struct ChainClientOptions {
1582    /// Maximum number of pending message bundles processed at a time in a block.
1583    pub max_pending_message_bundles: usize,
1584    /// Maximum number of message bundles to discard from a block proposal due to block limit
1585    /// errors before discarding all remaining bundles.
1586    ///
1587    /// Discarded bundles can be retried in the next block.
1588    pub max_block_limit_errors: u32,
1589    /// Maximum number of new stream events processed at a time in a block.
1590    pub max_new_events_per_block: usize,
1591    /// The policy for automatically handling incoming messages.
1592    pub message_policy: MessagePolicy,
1593    /// Whether to block on cross-chain message delivery.
1594    pub cross_chain_message_delivery: CrossChainMessageDelivery,
1595    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
1596    /// as a fraction of time taken to reach quorum.
1597    pub quorum_grace_period: f64,
1598    /// The delay when downloading a blob, after which we try a second validator.
1599    pub blob_download_timeout: Duration,
1600    /// The delay when downloading a batch of certificates, after which we try a second validator.
1601    pub certificate_batch_download_timeout: Duration,
1602    /// Maximum number of certificates that we download at a time from one validator when
1603    /// synchronizing one of our chains.
1604    pub certificate_download_batch_size: u64,
1605    /// Maximum number of sender certificates we try to download and receive in one go
1606    /// when syncing sender chains.
1607    pub sender_certificate_download_batch_size: usize,
1608    /// Maximum number of tasks that can be joined concurrently using buffer_unordered.
1609    pub max_joined_tasks: usize,
1610    /// Whether to allow creating blocks in the fast round. Fast blocks have lower latency but
1611    /// must be used carefully so that there are never any conflicting fast block proposals.
1612    pub allow_fast_blocks: bool,
1613}
1614
1615pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
1616pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
1617
1618#[cfg(with_testing)]
1619impl ChainClientOptions {
1620    pub fn test_default() -> Self {
1621        use crate::DEFAULT_QUORUM_GRACE_PERIOD;
1622
1623        ChainClientOptions {
1624            max_pending_message_bundles: 10,
1625            max_block_limit_errors: 3,
1626            max_new_events_per_block: 10,
1627            message_policy: MessagePolicy::new_accept_all(),
1628            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
1629            quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
1630            blob_download_timeout: Duration::from_secs(1),
1631            certificate_batch_download_timeout: Duration::from_secs(1),
1632            certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
1633            sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
1634            max_joined_tasks: 100,
1635            allow_fast_blocks: false,
1636        }
1637    }
1638}
1639
1640/// Client to operate a chain by interacting with validators and the given local storage
1641/// implementation.
1642/// * The chain being operated is called the "local chain" or just the "chain".
1643/// * As a rule, operations are considered successful (and communication may stop) when
1644///   they succeeded in gathering a quorum of responses.
1645#[derive(Debug)]
1646pub struct ChainClient<Env: Environment> {
1647    /// The Linera [`Client`] that manages operations for this chain client.
1648    #[debug(skip)]
1649    client: Arc<Client<Env>>,
1650    /// The off-chain chain ID.
1651    chain_id: ChainId,
1652    /// The client options.
1653    #[debug(skip)]
1654    options: ChainClientOptions,
1655    /// The preferred owner of the chain used to sign proposals.
1656    /// `None` if we cannot propose on this chain.
1657    preferred_owner: Option<AccountOwner>,
1658    /// The next block height as read from the wallet.
1659    initial_next_block_height: BlockHeight,
1660    /// The last block hash as read from the wallet.
1661    initial_block_hash: Option<CryptoHash>,
1662    /// Optional timing sender for benchmarking.
1663    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
1664}
1665
1666impl<Env: Environment> Clone for ChainClient<Env> {
1667    fn clone(&self) -> Self {
1668        Self {
1669            client: self.client.clone(),
1670            chain_id: self.chain_id,
1671            options: self.options.clone(),
1672            preferred_owner: self.preferred_owner,
1673            initial_next_block_height: self.initial_next_block_height,
1674            initial_block_hash: self.initial_block_hash,
1675            timing_sender: self.timing_sender.clone(),
1676        }
1677    }
1678}
1679
1680/// Error type for [`ChainClient`].
1681#[derive(Debug, Error)]
1682pub enum ChainClientError {
1683    #[error("Local node operation failed: {0}")]
1684    LocalNodeError(#[from] LocalNodeError),
1685
1686    #[error("Remote node operation failed: {0}")]
1687    RemoteNodeError(#[from] NodeError),
1688
1689    #[error(transparent)]
1690    ArithmeticError(#[from] ArithmeticError),
1691
1692    #[error("Missing certificates: {0:?}")]
1693    ReadCertificatesError(Vec<CryptoHash>),
1694
1695    #[error("Missing confirmed block: {0:?}")]
1696    MissingConfirmedBlock(CryptoHash),
1697
1698    #[error("JSON (de)serialization error: {0}")]
1699    JsonError(#[from] serde_json::Error),
1700
1701    #[error("Chain operation failed: {0}")]
1702    ChainError(#[from] ChainError),
1703
1704    #[error(transparent)]
1705    CommunicationError(#[from] CommunicationError<NodeError>),
1706
1707    #[error("Internal error within chain client: {0}")]
1708    InternalError(&'static str),
1709
1710    #[error(
1711        "Cannot accept a certificate from an unknown committee in the future. \
1712         Please synchronize the local view of the admin chain"
1713    )]
1714    CommitteeSynchronizationError,
1715
1716    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
1717    WalletSynchronizationError,
1718
1719    #[error("The state of the client is incompatible with the proposed block: {0}")]
1720    BlockProposalError(&'static str),
1721
1722    #[error(
1723        "Cannot accept a certificate from a committee that was retired. \
1724         Try a newer certificate from the same origin"
1725    )]
1726    CommitteeDeprecationError,
1727
1728    #[error("Protocol error within chain client: {0}")]
1729    ProtocolError(&'static str),
1730
1731    #[error("Signer doesn't have key to sign for chain {0}")]
1732    CannotFindKeyForChain(ChainId),
1733
1734    #[error("client is not configured to propose on chain {0}")]
1735    NoAccountKeyConfigured(ChainId),
1736
1737    #[error("The chain client isn't owner on chain {0}")]
1738    NotAnOwner(ChainId),
1739
1740    #[error(transparent)]
1741    ViewError(#[from] ViewError),
1742
1743    #[error(
1744        "Failed to download certificates and update local node to the next height \
1745         {target_next_block_height} of chain {chain_id}"
1746    )]
1747    CannotDownloadCertificates {
1748        chain_id: ChainId,
1749        target_next_block_height: BlockHeight,
1750    },
1751
1752    #[error(transparent)]
1753    BcsError(#[from] bcs::Error),
1754
1755    #[error(
1756        "Unexpected quorum: validators voted for block hash {hash} in {round}, \
1757         expected block hash {expected_hash} in {expected_round}"
1758    )]
1759    UnexpectedQuorum {
1760        hash: CryptoHash,
1761        round: Round,
1762        expected_hash: CryptoHash,
1763        expected_round: Round,
1764    },
1765
1766    #[error("signer error: {0:?}")]
1767    Signer(#[source] Box<dyn signer::Error>),
1768
1769    #[error("Cannot revoke the current epoch {0}")]
1770    CannotRevokeCurrentEpoch(Epoch),
1771
1772    #[error("Epoch is already revoked")]
1773    EpochAlreadyRevoked,
1774
1775    #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
1776    CannotDownloadMissingSenderBlock {
1777        chain_id: ChainId,
1778        height: BlockHeight,
1779    },
1780
1781    #[error(
1782        "A different block was already committed at this height. \
1783         The committed certificate hash is {0}"
1784    )]
1785    Conflict(CryptoHash),
1786}
1787
1788impl From<Infallible> for ChainClientError {
1789    fn from(infallible: Infallible) -> Self {
1790        match infallible {}
1791    }
1792}
1793
1794impl ChainClientError {
1795    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
1796        Self::Signer(Box::new(err))
1797    }
1798}
1799
1800impl<Env: Environment> ChainClient<Env> {
1801    /// Gets the client mutex from the chain's state.
1802    #[instrument(level = "trace", skip(self))]
1803    fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
1804        self.client
1805            .chains
1806            .pin()
1807            .get(&self.chain_id)
1808            .expect("Chain client constructed for invalid chain")
1809            .client_mutex()
1810    }
1811
1812    /// Gets the next pending block.
1813    #[instrument(level = "trace", skip(self))]
1814    pub fn pending_proposal(&self) -> Option<PendingProposal> {
1815        self.client
1816            .chains
1817            .pin()
1818            .get(&self.chain_id)
1819            .expect("Chain client constructed for invalid chain")
1820            .pending_proposal()
1821            .clone()
1822    }
1823
1824    /// Updates the chain's state using a closure.
1825    #[instrument(level = "trace", skip(self, f))]
1826    fn update_state<F>(&self, f: F)
1827    where
1828        F: Fn(&mut ChainClientState),
1829    {
1830        let chains = self.client.chains.pin();
1831        chains
1832            .update(self.chain_id, |state| {
1833                let mut state = state.clone_for_update_unchecked();
1834                f(&mut state);
1835                state
1836            })
1837            .expect("Chain client constructed for invalid chain");
1838    }
1839
1840    /// Gets a reference to the client's signer instance.
1841    #[instrument(level = "trace", skip(self))]
1842    pub fn signer(&self) -> &impl Signer {
1843        self.client.signer()
1844    }
1845
1846    /// Gets a mutable reference to the per-`ChainClient` options.
1847    #[instrument(level = "trace", skip(self))]
1848    pub fn options_mut(&mut self) -> &mut ChainClientOptions {
1849        &mut self.options
1850    }
1851
1852    /// Gets a reference to the per-`ChainClient` options.
1853    #[instrument(level = "trace", skip(self))]
1854    pub fn options(&self) -> &ChainClientOptions {
1855        &self.options
1856    }
1857
1858    /// Gets the ID of the associated chain.
1859    #[instrument(level = "trace", skip(self))]
1860    pub fn chain_id(&self) -> ChainId {
1861        self.chain_id
1862    }
1863
1864    /// Gets a clone of the timing sender for benchmarking.
1865    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
1866        self.timing_sender.clone()
1867    }
1868
1869    /// Gets the ID of the admin chain.
1870    #[instrument(level = "trace", skip(self))]
1871    pub fn admin_chain_id(&self) -> ChainId {
1872        self.client.admin_chain_id
1873    }
1874
1875    /// Gets the currently preferred owner for signing the blocks.
1876    #[instrument(level = "trace", skip(self))]
1877    pub fn preferred_owner(&self) -> Option<AccountOwner> {
1878        self.preferred_owner
1879    }
1880
1881    /// Sets the new, preferred owner for signing the blocks.
1882    #[instrument(level = "trace", skip(self))]
1883    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
1884        self.preferred_owner = Some(preferred_owner);
1885    }
1886
1887    /// Unsets the preferred owner for signing the blocks.
1888    #[instrument(level = "trace", skip(self))]
1889    pub fn unset_preferred_owner(&mut self) {
1890        self.preferred_owner = None;
1891    }
1892
1893    /// Obtains a `ChainStateView` for this client's chain.
1894    #[instrument(level = "trace")]
1895    pub async fn chain_state_view(
1896        &self,
1897    ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
1898        self.client.local_node.chain_state_view(self.chain_id).await
1899    }
1900
1901    /// Returns chain IDs that this chain subscribes to.
1902    #[instrument(level = "trace", skip(self))]
1903    pub async fn event_stream_publishers(&self) -> Result<BTreeSet<ChainId>, LocalNodeError> {
1904        let subscriptions = self
1905            .client
1906            .local_node
1907            .get_event_subscriptions(self.chain_id)
1908            .await?;
1909        let mut publishers = subscriptions
1910            .into_iter()
1911            .map(|((chain_id, _), _)| chain_id)
1912            .collect::<BTreeSet<_>>();
1913        if self.chain_id != self.client.admin_chain_id {
1914            publishers.insert(self.client.admin_chain_id);
1915        }
1916        Ok(publishers)
1917    }
1918
1919    /// Subscribes to notifications from this client's chain.
1920    #[instrument(level = "trace")]
1921    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
1922        self.subscribe_to(self.chain_id)
1923    }
1924
1925    /// Subscribes to notifications from the specified chain.
1926    #[instrument(level = "trace")]
1927    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
1928        Ok(Box::pin(UnboundedReceiverStream::new(
1929            self.client.notifier.subscribe(vec![chain_id]),
1930        )))
1931    }
1932
1933    /// Returns the storage client used by this client's local node.
1934    #[instrument(level = "trace")]
1935    pub fn storage_client(&self) -> &Env::Storage {
1936        self.client.storage_client()
1937    }
1938
1939    /// Obtains the basic `ChainInfo` data for the local chain.
1940    #[instrument(level = "trace")]
1941    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1942        let query = ChainInfoQuery::new(self.chain_id);
1943        let response = self
1944            .client
1945            .local_node
1946            .handle_chain_info_query(query)
1947            .await?;
1948        self.client.update_from_info(&response.info);
1949        Ok(response.info)
1950    }
1951
1952    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
1953    #[instrument(level = "trace")]
1954    pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1955        let query = ChainInfoQuery::new(self.chain_id)
1956            .with_manager_values()
1957            .with_committees();
1958        let response = self
1959            .client
1960            .local_node
1961            .handle_chain_info_query(query)
1962            .await?;
1963        self.client.update_from_info(&response.info);
1964        Ok(response.info)
1965    }
1966
1967    /// Returns the chain's description. Fetches it from the validators if necessary.
1968    pub async fn get_chain_description(&self) -> Result<ChainDescription, ChainClientError> {
1969        self.client.get_chain_description(self.chain_id).await
1970    }
1971
1972    /// Prepares the chain for the specified owner.
1973    ///
1974    /// Ensures we have the chain description blob, gets chain info, and validates
1975    /// that the owner can propose on this chain (either by being an owner or via
1976    /// `open_multi_leader_rounds`).
1977    pub async fn prepare_for_owner(
1978        &self,
1979        owner: AccountOwner,
1980    ) -> Result<Box<ChainInfo>, ChainClientError> {
1981        ensure!(
1982            self.client.has_key_for(&owner).await?,
1983            ChainClientError::CannotFindKeyForChain(self.chain_id)
1984        );
1985        // Ensure we have the chain description blob.
1986        self.client
1987            .get_chain_description_blob(self.chain_id)
1988            .await?;
1989
1990        // Get chain info.
1991        let info = self.chain_info().await?;
1992
1993        // Validate that the owner can propose on this chain.
1994        ensure!(
1995            info.manager
1996                .ownership
1997                .can_propose_in_multi_leader_round(&owner),
1998            ChainClientError::NotAnOwner(self.chain_id)
1999        );
2000
2001        Ok(info)
2002    }
2003
2004    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
2005    /// local chain.
2006    #[instrument(level = "trace")]
2007    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
2008        if self.options.message_policy.is_ignore() {
2009            // Ignore all messages.
2010            return Ok(Vec::new());
2011        }
2012
2013        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
2014        let info = self
2015            .client
2016            .local_node
2017            .handle_chain_info_query(query)
2018            .await?
2019            .info;
2020        if self.preferred_owner.is_some_and(|owner| {
2021            info.manager
2022                .ownership
2023                .is_super_owner_no_regular_owners(&owner)
2024        }) {
2025            // There are only super owners; they are expected to sync manually.
2026            ensure!(
2027                info.next_block_height >= self.initial_next_block_height,
2028                ChainClientError::WalletSynchronizationError
2029            );
2030        }
2031
2032        Ok(info
2033            .requested_pending_message_bundles
2034            .into_iter()
2035            .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
2036            .take(self.options.max_pending_message_bundles)
2037            .collect())
2038    }
2039
2040    /// Returns an `UpdateStreams` operation that updates this client's chain about new events
2041    /// in any of the streams its applications are subscribing to. Returns `None` if there are no
2042    /// new events.
2043    #[instrument(level = "trace")]
2044    async fn collect_stream_updates(&self) -> Result<Option<Operation>, ChainClientError> {
2045        // Load all our subscriptions.
2046        let subscription_map = self
2047            .client
2048            .local_node
2049            .get_event_subscriptions(self.chain_id)
2050            .await?;
2051        // Collect the indices of all new events.
2052        let futures = subscription_map
2053            .into_iter()
2054            .filter(|((chain_id, _), _)| {
2055                self.options
2056                    .message_policy
2057                    .restrict_chain_ids_to
2058                    .as_ref()
2059                    .is_none_or(|chain_set| chain_set.contains(chain_id))
2060            })
2061            .map(|((chain_id, stream_id), subscriptions)| {
2062                let client = self.client.clone();
2063                let previous_index = subscriptions.next_index;
2064                async move {
2065                    let next_index = client
2066                        .local_node
2067                        .get_stream_event_count(chain_id, stream_id.clone())
2068                        .await?;
2069                    if let Some(next_index) =
2070                        next_index.filter(|next_index| *next_index > previous_index)
2071                    {
2072                        Ok(Some((chain_id, stream_id, previous_index, next_index)))
2073                    } else {
2074                        Ok::<_, ChainClientError>(None)
2075                    }
2076                }
2077            });
2078        let all_updates = futures::stream::iter(futures)
2079            .buffer_unordered(self.options.max_joined_tasks)
2080            .try_collect::<Vec<_>>()
2081            .await?
2082            .into_iter()
2083            .flatten()
2084            .collect::<Vec<_>>();
2085        // Apply the max_new_events_per_block limit.
2086        let max_events = self.options.max_new_events_per_block;
2087        let mut total_events: usize = 0;
2088        let mut updates = Vec::new();
2089        for (chain_id, stream_id, previous_index, next_index) in all_updates {
2090            let new_events = (next_index - previous_index) as usize;
2091            if total_events + new_events <= max_events {
2092                total_events += new_events;
2093                updates.push((chain_id, stream_id, next_index));
2094            } else {
2095                let remaining = max_events.saturating_sub(total_events);
2096                if remaining > 0 {
2097                    updates.push((chain_id, stream_id, previous_index + remaining as u32));
2098                }
2099                break;
2100            }
2101        }
2102        if updates.is_empty() {
2103            return Ok(None);
2104        }
2105        Ok(Some(SystemOperation::UpdateStreams(updates).into()))
2106    }
2107
2108    #[instrument(level = "trace")]
2109    async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
2110        self.client.chain_info_with_committees(self.chain_id).await
2111    }
2112
2113    /// Obtains the current epoch of the local chain as well as its set of trusted committees.
2114    #[instrument(level = "trace")]
2115    async fn epoch_and_committees(
2116        &self,
2117    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
2118        let info = self.chain_info_with_committees().await?;
2119        let epoch = info.epoch;
2120        let committees = info.into_committees()?;
2121        Ok((epoch, committees))
2122    }
2123
2124    /// Obtains the committee for the current epoch of the local chain.
2125    #[instrument(level = "trace")]
2126    pub async fn local_committee(&self) -> Result<Committee, ChainClientError> {
2127        let info = match self.chain_info_with_committees().await {
2128            Ok(info) => info,
2129            Err(LocalNodeError::BlobsNotFound(_)) => {
2130                self.synchronize_chain_state(self.chain_id).await?;
2131                self.chain_info_with_committees().await?
2132            }
2133            Err(err) => return Err(err.into()),
2134        };
2135        Ok(info.into_current_committee()?)
2136    }
2137
2138    /// Obtains the committee for the latest epoch on the admin chain.
2139    #[instrument(level = "trace")]
2140    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
2141        self.client.admin_committee().await
2142    }
2143
2144    /// Obtains the identity of the current owner of the chain.
2145    ///
2146    /// Returns an error if we don't have the private key for the identity.
2147    #[instrument(level = "trace")]
2148    pub async fn identity(&self) -> Result<AccountOwner, ChainClientError> {
2149        let Some(preferred_owner) = self.preferred_owner else {
2150            return Err(ChainClientError::NoAccountKeyConfigured(self.chain_id));
2151        };
2152        let manager = self.chain_info().await?.manager;
2153        ensure!(
2154            manager.ownership.is_active(),
2155            LocalNodeError::InactiveChain(self.chain_id)
2156        );
2157
2158        // Check if the preferred owner can propose on this chain: either they are an owner,
2159        // the current leader, or open_multi_leader_rounds is enabled.
2160        let is_owner = manager
2161            .ownership
2162            .can_propose_in_multi_leader_round(&preferred_owner);
2163
2164        if !is_owner {
2165            let accepted_owners = manager
2166                .ownership
2167                .all_owners()
2168                .chain(&manager.leader)
2169                .collect::<Vec<_>>();
2170            warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
2171                "The preferred owner is not configured as an owner of this chain",
2172            );
2173            return Err(ChainClientError::NotAnOwner(self.chain_id));
2174        }
2175
2176        let has_signer = self
2177            .signer()
2178            .contains_key(&preferred_owner)
2179            .await
2180            .map_err(ChainClientError::signer_failure)?;
2181
2182        if !has_signer {
2183            warn!(%self.chain_id, ?preferred_owner,
2184                "Chain is one of the owners but its Signer instance doesn't contain the key",
2185            );
2186            return Err(ChainClientError::CannotFindKeyForChain(self.chain_id));
2187        }
2188
2189        Ok(preferred_owner)
2190    }
2191
2192    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
2193    /// its current height.
2194    #[instrument(level = "trace")]
2195    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2196        #[cfg(with_metrics)]
2197        let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();
2198
2199        let mut info = self.synchronize_to_known_height().await?;
2200
2201        if self.preferred_owner.is_none_or(|owner| {
2202            !info
2203                .manager
2204                .ownership
2205                .is_super_owner_no_regular_owners(&owner)
2206        }) {
2207            // If we are not a super owner or there are regular owners, we could be missing recent
2208            // certificates created by other clients. Further synchronize blocks from the network.
2209            // This is a best-effort that depends on network conditions.
2210            info = self.client.synchronize_chain_state(self.chain_id).await?;
2211        }
2212
2213        if info.epoch > self.client.admin_committees().await?.0 {
2214            self.client
2215                .synchronize_chain_state(self.client.admin_chain_id)
2216                .await?;
2217        }
2218
2219        self.client.update_from_info(&info);
2220        Ok(info)
2221    }
2222
2223    // Verifies that our local storage contains enough history compared to the
2224    // known block height. Otherwise, downloads the missing history from the
2225    // network.
2226    // The known height only differs if the wallet is ahead of storage.
2227    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2228        let info = self
2229            .client
2230            .download_certificates(self.chain_id, self.initial_next_block_height)
2231            .await?;
2232        if info.next_block_height == self.initial_next_block_height {
2233            // Check that our local node has the expected block hash.
2234            ensure!(
2235                self.initial_block_hash == info.block_hash,
2236                ChainClientError::InternalError("Invalid chain of blocks in local node")
2237            );
2238        }
2239        Ok(info)
2240    }
2241
2242    /// Attempts to update all validators about the local chain.
2243    #[instrument(level = "trace", skip(old_committee, latest_certificate))]
2244    pub async fn update_validators(
2245        &self,
2246        old_committee: Option<&Committee>,
2247        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
2248    ) -> Result<(), ChainClientError> {
2249        let update_validators_start = linera_base::time::Instant::now();
2250        // Communicate the new certificate now.
2251        if let Some(old_committee) = old_committee {
2252            let old_committee_start = linera_base::time::Instant::now();
2253            self.communicate_chain_updates(old_committee, latest_certificate.clone())
2254                .await?;
2255            tracing::debug!(
2256                old_committee_ms = old_committee_start.elapsed().as_millis(),
2257                "communicated chain updates to old committee"
2258            );
2259        };
2260        if let Ok(new_committee) = self.local_committee().await {
2261            if Some(&new_committee) != old_committee {
2262                // If the configuration just changed, communicate to the new committee as well.
2263                // (This is actually more important that updating the previous committee.)
2264                let new_committee_start = linera_base::time::Instant::now();
2265                self.communicate_chain_updates(&new_committee, latest_certificate)
2266                    .await?;
2267                tracing::debug!(
2268                    new_committee_ms = new_committee_start.elapsed().as_millis(),
2269                    "communicated chain updates to new committee"
2270                );
2271            }
2272        }
2273        self.send_timing(update_validators_start, TimingType::UpdateValidators);
2274        Ok(())
2275    }
2276
2277    /// Broadcasts certified blocks to validators.
2278    #[instrument(level = "trace", skip(committee, latest_certificate))]
2279    pub async fn communicate_chain_updates(
2280        &self,
2281        committee: &Committee,
2282        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
2283    ) -> Result<(), ChainClientError> {
2284        let delivery = self.options.cross_chain_message_delivery;
2285        let height = self.chain_info().await?.next_block_height;
2286        self.client
2287            .communicate_chain_updates(
2288                committee,
2289                self.chain_id,
2290                height,
2291                delivery,
2292                latest_certificate,
2293            )
2294            .await
2295    }
2296
2297    /// Synchronizes all chains that any application on this chain subscribes to.
2298    /// We always consider the admin chain a relevant publishing chain, for new epochs.
2299    async fn synchronize_publisher_chains(&self) -> Result<(), ChainClientError> {
2300        let subscriptions = self
2301            .client
2302            .local_node
2303            .get_event_subscriptions(self.chain_id)
2304            .await?;
2305        let chain_ids = subscriptions
2306            .iter()
2307            .map(|((chain_id, _), _)| *chain_id)
2308            .chain(iter::once(self.client.admin_chain_id))
2309            .filter(|chain_id| *chain_id != self.chain_id)
2310            .collect::<BTreeSet<_>>();
2311        stream::iter(
2312            chain_ids
2313                .into_iter()
2314                .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
2315        )
2316        .buffer_unordered(self.options.max_joined_tasks)
2317        .collect::<Vec<_>>()
2318        .await
2319        .into_iter()
2320        .collect::<Result<Vec<_>, _>>()?;
2321        Ok(())
2322    }
2323
2324    /// Attempts to download new received certificates.
2325    ///
2326    /// This is a best effort: it will only find certificates that have been confirmed
2327    /// amongst sufficiently many validators of the current committee of the target
2328    /// chain.
2329    ///
2330    /// However, this should be the case whenever a sender's chain is still in use and
2331    /// is regularly upgraded to new committees.
2332    #[instrument(level = "trace")]
2333    pub async fn find_received_certificates(
2334        &self,
2335        cancellation_token: Option<CancellationToken>,
2336    ) -> Result<(), ChainClientError> {
2337        debug!(chain_id = %self.chain_id, "starting find_received_certificates");
2338        #[cfg(with_metrics)]
2339        let _latency = metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
2340        // Use network information from the local chain.
2341        let chain_id = self.chain_id;
2342        let (_, committee) = self.admin_committee().await?;
2343        let nodes = self.client.make_nodes(&committee)?;
2344
2345        let trackers = self
2346            .client
2347            .local_node
2348            .get_received_certificate_trackers(chain_id)
2349            .await?;
2350
2351        trace!("find_received_certificates: read trackers");
2352
2353        let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
2354        // Proceed to downloading received logs.
2355        let result = communicate_with_quorum(
2356            &nodes,
2357            &committee,
2358            |_| (),
2359            |remote_node| {
2360                let client = &self.client;
2361                let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
2362                let received_log_batches = Arc::clone(&received_log_batches);
2363                Box::pin(async move {
2364                    let batch = client
2365                        .get_received_log_from_validator(chain_id, &remote_node, tracker)
2366                        .await?;
2367                    let mut batches = received_log_batches.lock().unwrap();
2368                    batches.push((remote_node.public_key, batch));
2369                    Ok(())
2370                })
2371            },
2372            self.options.quorum_grace_period,
2373        )
2374        .await;
2375
2376        if let Err(error) = result {
2377            error!(
2378                %error,
2379                "Failed to synchronize received_logs from at least a quorum of validators",
2380            );
2381        }
2382
2383        let received_logs: Vec<_> = {
2384            let mut received_log_batches = received_log_batches.lock().unwrap();
2385            std::mem::take(received_log_batches.as_mut())
2386        };
2387
2388        debug!(
2389            received_logs_len = %received_logs.len(),
2390            received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
2391            "collected received logs"
2392        );
2393
2394        let (received_logs, mut validator_trackers) = {
2395            (
2396                ReceivedLogs::from_received_result(received_logs.clone()),
2397                ValidatorTrackers::new(received_logs, &trackers),
2398            )
2399        };
2400
2401        debug!(
2402            num_chains = %received_logs.num_chains(),
2403            num_certs = %received_logs.num_certs(),
2404            "find_received_certificates: total number of chains and certificates to sync",
2405        );
2406
2407        let max_blocks_per_chain =
2408            self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
2409        for received_log in received_logs.into_batches(
2410            self.options.sender_certificate_download_batch_size,
2411            max_blocks_per_chain,
2412        ) {
2413            validator_trackers = self
2414                .receive_sender_certificates(
2415                    received_log,
2416                    validator_trackers,
2417                    &nodes,
2418                    cancellation_token.clone(),
2419                )
2420                .await?;
2421
2422            self.update_received_certificate_trackers(&validator_trackers)
2423                .await;
2424        }
2425
2426        info!("find_received_certificates finished");
2427
2428        Ok(())
2429    }
2430
2431    async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
2432        let updated_trackers = trackers.to_map();
2433        trace!(?updated_trackers, "updated tracker values");
2434
2435        // Update the trackers.
2436        if let Err(error) = self
2437            .client
2438            .local_node
2439            .update_received_certificate_trackers(self.chain_id, updated_trackers)
2440            .await
2441        {
2442            error!(
2443                chain_id = %self.chain_id,
2444                %error,
2445                "Failed to update the certificate trackers for chain",
2446            );
2447        }
2448    }
2449
2450    /// Downloads and processes or preprocesses the certificates for blocks sending messages to
2451    /// this chain that we are still missing.
2452    async fn receive_sender_certificates(
2453        &self,
2454        mut received_logs: ReceivedLogs,
2455        mut validator_trackers: ValidatorTrackers,
2456        nodes: &[RemoteNode<Env::ValidatorNode>],
2457        cancellation_token: Option<CancellationToken>,
2458    ) -> Result<ValidatorTrackers, ChainClientError> {
2459        debug!(
2460            num_chains = %received_logs.num_chains(),
2461            num_certs = %received_logs.num_certs(),
2462            "receive_sender_certificates: number of chains and certificates to sync",
2463        );
2464
2465        // Obtain the next block height we need in the local node, for each chain.
2466        let local_next_heights = self
2467            .client
2468            .local_node
2469            .next_outbox_heights(received_logs.chains(), self.chain_id)
2470            .await?;
2471
2472        validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
2473
2474        debug!(
2475            remaining_total_certificates = %received_logs.num_certs(),
2476            "receive_sender_certificates: computed remote_heights"
2477        );
2478
2479        let mut other_sender_chains = Vec::new();
2480        let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
2481
2482        let cert_futures = received_logs.heights_per_chain().into_iter().filter_map(
2483            |(sender_chain_id, remote_heights)| {
2484                if remote_heights.is_empty() {
2485                    // Our highest, locally executed block is higher than any block height
2486                    // from the current batch. Skip this batch, but remember to wait for
2487                    // the messages to be delivered to the inboxes.
2488                    other_sender_chains.push(sender_chain_id);
2489                    return None;
2490                };
2491                let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
2492                let sender = sender.clone();
2493                let client = self.client.clone();
2494                let mut nodes = nodes.to_vec();
2495                nodes.shuffle(&mut rand::thread_rng());
2496                let received_logs_ref = &received_logs;
2497                Some(async move {
2498                    client
2499                        .download_and_process_sender_chain(
2500                            sender_chain_id,
2501                            &nodes,
2502                            received_logs_ref,
2503                            remote_heights,
2504                            sender,
2505                        )
2506                        .await
2507                })
2508            },
2509        );
2510
2511        let update_trackers = linera_base::task::spawn(async move {
2512            while let Some(chain_and_height) = receiver.recv().await {
2513                validator_trackers.downloaded_cert(chain_and_height);
2514            }
2515            validator_trackers
2516        });
2517
2518        let mut cancellation_future = Box::pin(
2519            async move {
2520                if let Some(token) = cancellation_token {
2521                    token.cancelled().await
2522                } else {
2523                    future::pending().await
2524                }
2525            }
2526            .fuse(),
2527        );
2528
2529        select! {
2530            _ = stream::iter(cert_futures)
2531            .buffer_unordered(self.options.max_joined_tasks)
2532            .for_each(future::ready)
2533            => (),
2534            _ = cancellation_future => ()
2535        };
2536
2537        drop(sender);
2538
2539        let validator_trackers = update_trackers.await;
2540
2541        debug!(
2542            num_other_chains = %other_sender_chains.len(),
2543            "receive_sender_certificates: processing certificates finished"
2544        );
2545
2546        // Certificates for these chains were omitted from `certificates` because they were
2547        // already processed locally. If they were processed in a concurrent task, it is not
2548        // guaranteed that their cross-chain messages were already handled.
2549        self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
2550            .await;
2551
2552        debug!("receive_sender_certificates: finished processing other_sender_chains");
2553
2554        Ok(validator_trackers)
2555    }
2556
2557    /// Retries cross chain requests on the chains which may have been processed on
2558    /// another task without the messages being correctly handled.
2559    async fn retry_pending_cross_chain_requests(
2560        &self,
2561        nodes: &[RemoteNode<Env::ValidatorNode>],
2562        other_sender_chains: Vec<ChainId>,
2563    ) {
2564        let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
2565            let local_node = self.client.local_node.clone();
2566            async move {
2567                if let Err(error) = match local_node
2568                    .retry_pending_cross_chain_requests(chain_id)
2569                    .await
2570                {
2571                    Ok(()) => Ok(()),
2572                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
2573                        if let Err(error) = self
2574                            .client
2575                            .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
2576                            .await
2577                        {
2578                            error!(
2579                                ?blob_ids,
2580                                %error,
2581                                "Error while attempting to download blobs during retrying outgoing \
2582                                messages"
2583                            );
2584                        }
2585                        local_node
2586                            .retry_pending_cross_chain_requests(chain_id)
2587                            .await
2588                    }
2589                    err => err,
2590                } {
2591                    error!(
2592                        %chain_id,
2593                        %error,
2594                        "Failed to retry outgoing messages from chain"
2595                    );
2596                }
2597            }
2598        }));
2599        stream.for_each(future::ready).await;
2600    }
2601
2602    /// Sends money.
2603    #[instrument(level = "trace")]
2604    pub async fn transfer(
2605        &self,
2606        owner: AccountOwner,
2607        amount: Amount,
2608        recipient: Account,
2609    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2610        // TODO(#467): check the balance of `owner` before signing any block proposal.
2611        Box::pin(self.execute_operation(SystemOperation::Transfer {
2612            owner,
2613            recipient,
2614            amount,
2615        }))
2616        .await
2617    }
2618
2619    /// Verify if a data blob is readable from storage.
2620    // TODO(#2490): Consider removing or renaming this.
2621    #[instrument(level = "trace")]
2622    pub async fn read_data_blob(
2623        &self,
2624        hash: CryptoHash,
2625    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2626        let blob_id = BlobId {
2627            hash,
2628            blob_type: BlobType::Data,
2629        };
2630        Box::pin(self.execute_operation(SystemOperation::VerifyBlob { blob_id })).await
2631    }
2632
2633    /// Claims money in a remote chain.
2634    #[instrument(level = "trace")]
2635    pub async fn claim(
2636        &self,
2637        owner: AccountOwner,
2638        target_id: ChainId,
2639        recipient: Account,
2640        amount: Amount,
2641    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2642        Box::pin(self.execute_operation(SystemOperation::Claim {
2643            owner,
2644            target_id,
2645            recipient,
2646            amount,
2647        }))
2648        .await
2649    }
2650
2651    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
2652    /// certificate and sends it to all validators, to make them enter the next round.
2653    #[instrument(level = "trace")]
2654    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, ChainClientError> {
2655        let chain_id = self.chain_id;
2656        let info = self.chain_info_with_committees().await?;
2657        let committee = info.current_committee()?;
2658        let height = info.next_block_height;
2659        let round = info.manager.current_round;
2660        let action = CommunicateAction::RequestTimeout {
2661            height,
2662            round,
2663            chain_id,
2664        };
2665        let value = Timeout::new(chain_id, height, info.epoch);
2666        let certificate = Box::new(
2667            self.client
2668                .communicate_chain_action(committee, action, value)
2669                .await?,
2670        );
2671        self.client.process_certificate(certificate.clone()).await?;
2672        // The block height didn't increase, but this will communicate the timeout as well.
2673        self.client
2674            .communicate_chain_updates(
2675                committee,
2676                chain_id,
2677                height,
2678                CrossChainMessageDelivery::NonBlocking,
2679                None,
2680            )
2681            .await?;
2682        Ok(*certificate)
2683    }
2684
2685    /// Downloads and processes any certificates we are missing for the given chain.
2686    #[instrument(level = "trace", skip_all)]
2687    pub async fn synchronize_chain_state(
2688        &self,
2689        chain_id: ChainId,
2690    ) -> Result<Box<ChainInfo>, ChainClientError> {
2691        self.client.synchronize_chain_state(chain_id).await
2692    }
2693
2694    /// Downloads and processes any certificates we are missing for this chain, from the given
2695    /// committee.
2696    #[instrument(level = "trace", skip_all)]
2697    pub async fn synchronize_chain_state_from_committee(
2698        &self,
2699        committee: Committee,
2700    ) -> Result<Box<ChainInfo>, ChainClientError> {
2701        Box::pin(
2702            self.client
2703                .synchronize_chain_state_from_committee(self.chain_id, committee),
2704        )
2705        .await
2706    }
2707
2708    /// Executes a list of operations.
2709    #[instrument(level = "trace", skip(operations, blobs))]
2710    pub async fn execute_operations(
2711        &self,
2712        operations: Vec<Operation>,
2713        blobs: Vec<Blob>,
2714    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2715        let timing_start = linera_base::time::Instant::now();
2716        tracing::debug!("execute_operations started");
2717
2718        let result = loop {
2719            let execute_block_start = linera_base::time::Instant::now();
2720            // TODO(#2066): Remove boxing once the call-stack is shallower
2721            tracing::debug!("calling execute_block");
2722            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
2723                Ok(ClientOutcome::Committed(certificate)) => {
2724                    tracing::debug!(
2725                        execute_block_ms = execute_block_start.elapsed().as_millis(),
2726                        "execute_block succeeded"
2727                    );
2728                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
2729                    break Ok(ClientOutcome::Committed(certificate));
2730                }
2731                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2732                    break Ok(ClientOutcome::WaitForTimeout(timeout));
2733                }
2734                Ok(ClientOutcome::Conflict(certificate)) => {
2735                    info!(
2736                        height = %certificate.block().header.height,
2737                        "Another block was committed."
2738                    );
2739                    break Ok(ClientOutcome::Conflict(certificate));
2740                }
2741                Err(ChainClientError::CommunicationError(CommunicationError::Trusted(
2742                    NodeError::UnexpectedBlockHeight {
2743                        expected_block_height,
2744                        found_block_height,
2745                    },
2746                ))) if expected_block_height > found_block_height => {
2747                    tracing::info!(
2748                        "Local state is outdated; synchronizing chain {:.8}",
2749                        self.chain_id
2750                    );
2751                    self.synchronize_chain_state(self.chain_id).await?;
2752                }
2753                Err(err) => return Err(err),
2754            };
2755        };
2756
2757        self.send_timing(timing_start, TimingType::ExecuteOperations);
2758        tracing::debug!(
2759            total_execute_operations_ms = timing_start.elapsed().as_millis(),
2760            "execute_operations returning"
2761        );
2762
2763        result
2764    }
2765
2766    /// Executes an operation.
2767    pub async fn execute_operation(
2768        &self,
2769        operation: impl Into<Operation>,
2770    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2771        self.execute_operations(vec![operation.into()], vec![])
2772            .await
2773    }
2774
2775    /// Executes a new block.
2776    ///
2777    /// This must be preceded by a call to `prepare_chain()`.
2778    #[instrument(level = "trace", skip(operations, blobs))]
2779    async fn execute_block(
2780        &self,
2781        operations: Vec<Operation>,
2782        blobs: Vec<Blob>,
2783    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2784        #[cfg(with_metrics)]
2785        let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
2786
2787        let mutex = self.client_mutex();
2788        let lock_start = linera_base::time::Instant::now();
2789        let _guard = mutex.lock_owned().await;
2790        tracing::debug!(
2791            lock_wait_ms = lock_start.elapsed().as_millis(),
2792            "acquired client_mutex in execute_block"
2793        );
2794        // TODO(#5092): We shouldn't need to call this explicitly.
2795        match self.process_pending_block_without_prepare().await? {
2796            ClientOutcome::Committed(Some(certificate)) => {
2797                return Ok(ClientOutcome::Conflict(Box::new(certificate)))
2798            }
2799            ClientOutcome::WaitForTimeout(timeout) => {
2800                return Ok(ClientOutcome::WaitForTimeout(timeout))
2801            }
2802            ClientOutcome::Conflict(certificate) => {
2803                return Ok(ClientOutcome::Conflict(certificate))
2804            }
2805            ClientOutcome::Committed(None) => {}
2806        }
2807
2808        // Collect pending messages and epoch changes after acquiring the lock to avoid
2809        // race conditions where messages valid for one block height are proposed at a
2810        // different height.
2811        let transactions = self.prepend_epochs_messages_and_events(operations).await?;
2812
2813        if transactions.is_empty() {
2814            return Err(ChainClientError::LocalNodeError(
2815                LocalNodeError::WorkerError(WorkerError::ChainError(Box::new(
2816                    ChainError::EmptyBlock,
2817                ))),
2818            ));
2819        }
2820
2821        let block = self.new_pending_block(transactions, blobs).await?;
2822
2823        match self.process_pending_block_without_prepare().await? {
2824            ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
2825                Ok(ClientOutcome::Committed(certificate))
2826            }
2827            ClientOutcome::Committed(Some(certificate)) => {
2828                Ok(ClientOutcome::Conflict(Box::new(certificate)))
2829            }
2830            // Should be unreachable: We did set a pending block.
2831            ClientOutcome::Committed(None) => Err(ChainClientError::BlockProposalError(
2832                "Unexpected block proposal error",
2833            )),
2834            ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
2835            ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
2836        }
2837    }
2838
2839    /// Creates a vector of transactions which, in addition to the provided operations,
2840    /// also contains epoch changes, receiving message bundles and event stream updates
2841    /// (if there are any to be processed).
2842    /// This should be called when executing a block, in order to make sure that any pending
2843    /// messages or events are included in it.
2844    #[instrument(level = "trace", skip(operations))]
2845    async fn prepend_epochs_messages_and_events(
2846        &self,
2847        operations: Vec<Operation>,
2848    ) -> Result<Vec<Transaction>, ChainClientError> {
2849        let incoming_bundles = self.pending_message_bundles().await?;
2850        let stream_updates = self.collect_stream_updates().await?;
2851        Ok(self
2852            .collect_epoch_changes()
2853            .await?
2854            .into_iter()
2855            .map(Transaction::ExecuteOperation)
2856            .chain(
2857                incoming_bundles
2858                    .into_iter()
2859                    .map(Transaction::ReceiveMessages),
2860            )
2861            .chain(
2862                stream_updates
2863                    .into_iter()
2864                    .map(Transaction::ExecuteOperation),
2865            )
2866            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
2867            .collect::<Vec<_>>())
2868    }
2869
2870    /// Creates a new pending block and handles the proposal in the local node.
2871    /// Next time `process_pending_block_without_prepare` is called, this block will be proposed
2872    /// to the validators.
2873    #[instrument(level = "trace", skip(transactions, blobs))]
2874    async fn new_pending_block(
2875        &self,
2876        transactions: Vec<Transaction>,
2877        blobs: Vec<Blob>,
2878    ) -> Result<Block, ChainClientError> {
2879        let identity = self.identity().await?;
2880
2881        ensure!(
2882            self.pending_proposal().is_none(),
2883            ChainClientError::BlockProposalError(
2884                "Client state already has a pending block; \
2885                use the `linera retry-pending-block` command to commit that first"
2886            )
2887        );
2888        let info = self.chain_info_with_committees().await?;
2889        let timestamp = self.next_timestamp(&transactions, info.timestamp);
2890        let proposed_block = ProposedBlock {
2891            epoch: info.epoch,
2892            chain_id: self.chain_id,
2893            transactions,
2894            previous_block_hash: info.block_hash,
2895            height: info.next_block_height,
2896            authenticated_signer: Some(identity),
2897            timestamp,
2898        };
2899
2900        let round = self.round_for_oracle(&info, &identity).await?;
2901        // Make sure every incoming message succeeds and otherwise remove them.
2902        // Also, compute the final certified hash while we're at it.
2903        let (block, _) = Box::pin(self.client.stage_block_execution_with_policy(
2904            proposed_block,
2905            round,
2906            blobs.clone(),
2907            BundleExecutionPolicy::AutoRetry {
2908                max_failures: self.options.max_block_limit_errors,
2909            },
2910        ))
2911        .await?;
2912        let (proposed_block, _) = block.clone().into_proposal();
2913        self.update_state(|state| {
2914            state.set_pending_proposal(proposed_block.clone(), blobs.clone())
2915        });
2916        Ok(block)
2917    }
2918
2919    /// Returns a suitable timestamp for the next block.
2920    ///
2921    /// This will usually be the current time according to the local clock, but may be slightly
2922    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
2923    #[instrument(level = "trace", skip(transactions))]
2924    fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
2925        let local_time = self.storage_client().clock().current_time();
2926        transactions
2927            .iter()
2928            .filter_map(Transaction::incoming_bundle)
2929            .map(|msg| msg.bundle.timestamp)
2930            .max()
2931            .map_or(local_time, |timestamp| timestamp.max(local_time))
2932            .max(block_time)
2933    }
2934
2935    /// Queries an application.
2936    #[instrument(level = "trace", skip(query))]
2937    pub async fn query_application(
2938        &self,
2939        query: Query,
2940        block_hash: Option<CryptoHash>,
2941    ) -> Result<QueryOutcome, ChainClientError> {
2942        loop {
2943            let result = self
2944                .client
2945                .local_node
2946                .query_application(self.chain_id, query.clone(), block_hash)
2947                .await;
2948            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2949                let validators = self.client.validator_nodes().await?;
2950                self.client
2951                    .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
2952                    .await?;
2953                continue; // We found the missing blob: retry.
2954            }
2955            return Ok(result?);
2956        }
2957    }
2958
2959    /// Queries a system application.
2960    #[instrument(level = "trace", skip(query))]
2961    pub async fn query_system_application(
2962        &self,
2963        query: SystemQuery,
2964    ) -> Result<QueryOutcome<SystemResponse>, ChainClientError> {
2965        let QueryOutcome {
2966            response,
2967            operations,
2968        } = self.query_application(Query::System(query), None).await?;
2969        match response {
2970            QueryResponse::System(response) => Ok(QueryOutcome {
2971                response,
2972                operations,
2973            }),
2974            _ => Err(ChainClientError::InternalError(
2975                "Unexpected response for system query",
2976            )),
2977        }
2978    }
2979
2980    /// Queries a user application.
2981    #[instrument(level = "trace", skip(application_id, query))]
2982    #[cfg(with_testing)]
2983    pub async fn query_user_application<A: Abi>(
2984        &self,
2985        application_id: ApplicationId<A>,
2986        query: &A::Query,
2987    ) -> Result<QueryOutcome<A::QueryResponse>, ChainClientError> {
2988        let query = Query::user(application_id, query)?;
2989        let QueryOutcome {
2990            response,
2991            operations,
2992        } = self.query_application(query, None).await?;
2993        match response {
2994            QueryResponse::User(response_bytes) => {
2995                let response = serde_json::from_slice(&response_bytes)?;
2996                Ok(QueryOutcome {
2997                    response,
2998                    operations,
2999                })
3000            }
3001            _ => Err(ChainClientError::InternalError(
3002                "Unexpected response for user query",
3003            )),
3004        }
3005    }
3006
3007    /// Obtains the local balance of the chain account after staging the execution of
3008    /// incoming messages in a new block.
3009    ///
3010    /// Does not attempt to synchronize with validators. The result will reflect up to
3011    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
3012    /// block.
3013    #[instrument(level = "trace")]
3014    pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
3015        let (balance, _) = Box::pin(self.query_balances_with_owner(AccountOwner::CHAIN)).await?;
3016        Ok(balance)
3017    }
3018
3019    /// Obtains the local balance of an account after staging the execution of incoming messages in
3020    /// a new block.
3021    ///
3022    /// Does not attempt to synchronize with validators. The result will reflect up to
3023    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
3024    /// block.
3025    #[instrument(level = "trace", skip(owner))]
3026    pub async fn query_owner_balance(
3027        &self,
3028        owner: AccountOwner,
3029    ) -> Result<Amount, ChainClientError> {
3030        if owner.is_chain() {
3031            Box::pin(self.query_balance()).await
3032        } else {
3033            Ok(Box::pin(self.query_balances_with_owner(owner))
3034                .await?
3035                .1
3036                .unwrap_or(Amount::ZERO))
3037        }
3038    }
3039
3040    /// Obtains the local balance of an account and optionally another user after staging the
3041    /// execution of incoming messages in a new block.
3042    ///
3043    /// Does not attempt to synchronize with validators. The result will reflect up to
3044    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
3045    /// block.
3046    #[instrument(level = "trace", skip(owner))]
3047    async fn query_balances_with_owner(
3048        &self,
3049        owner: AccountOwner,
3050    ) -> Result<(Amount, Option<Amount>), ChainClientError> {
3051        let incoming_bundles = self.pending_message_bundles().await?;
3052        // Since we disallow empty blocks, and there is no incoming messages,
3053        // that could change it, we query for the balance immediately.
3054        if incoming_bundles.is_empty() {
3055            let chain_balance = self.local_balance().await?;
3056            let owner_balance = self.local_owner_balance(owner).await?;
3057            return Ok((chain_balance, Some(owner_balance)));
3058        }
3059        let info = self.chain_info().await?;
3060        let transactions = incoming_bundles
3061            .into_iter()
3062            .map(Transaction::ReceiveMessages)
3063            .collect::<Vec<_>>();
3064        let timestamp = self.next_timestamp(&transactions, info.timestamp);
3065        let block = ProposedBlock {
3066            epoch: info.epoch,
3067            chain_id: self.chain_id,
3068            transactions,
3069            previous_block_hash: info.block_hash,
3070            height: info.next_block_height,
3071            authenticated_signer: if owner == AccountOwner::CHAIN {
3072                None
3073            } else {
3074                Some(owner)
3075            },
3076            timestamp,
3077        };
3078        match Box::pin(self.client.stage_block_execution_with_policy(
3079            block,
3080            None,
3081            Vec::new(),
3082            BundleExecutionPolicy::AutoRetry {
3083                max_failures: self.options.max_block_limit_errors,
3084            },
3085        ))
3086        .await
3087        {
3088            Ok((_, response)) => Ok((
3089                response.info.chain_balance,
3090                response.info.requested_owner_balance,
3091            )),
3092            Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3093                WorkerError::ChainError(error),
3094            ))) if matches!(
3095                &*error,
3096                ChainError::ExecutionError(
3097                    execution_error,
3098                    ChainExecutionContext::Block
3099                ) if matches!(
3100                    **execution_error,
3101                    ExecutionError::FeesExceedFunding { .. }
3102                )
3103            ) =>
3104            {
3105                // We can't even pay for the execution of one empty block. Let's return zero.
3106                Ok((Amount::ZERO, Some(Amount::ZERO)))
3107            }
3108            Err(error) => Err(error),
3109        }
3110    }
3111
3112    /// Reads the local balance of the chain account.
3113    ///
3114    /// Does not process the inbox or attempt to synchronize with validators.
3115    #[instrument(level = "trace")]
3116    pub async fn local_balance(&self) -> Result<Amount, ChainClientError> {
3117        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
3118        Ok(balance)
3119    }
3120
3121    /// Reads the local balance of a user account.
3122    ///
3123    /// Does not process the inbox or attempt to synchronize with validators.
3124    #[instrument(level = "trace", skip(owner))]
3125    pub async fn local_owner_balance(
3126        &self,
3127        owner: AccountOwner,
3128    ) -> Result<Amount, ChainClientError> {
3129        if owner.is_chain() {
3130            self.local_balance().await
3131        } else {
3132            Ok(self
3133                .local_balances_with_owner(owner)
3134                .await?
3135                .1
3136                .unwrap_or(Amount::ZERO))
3137        }
3138    }
3139
3140    /// Reads the local balance of the chain account and optionally another user.
3141    ///
3142    /// Does not process the inbox or attempt to synchronize with validators.
3143    #[instrument(level = "trace", skip(owner))]
3144    async fn local_balances_with_owner(
3145        &self,
3146        owner: AccountOwner,
3147    ) -> Result<(Amount, Option<Amount>), ChainClientError> {
3148        ensure!(
3149            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
3150            ChainClientError::WalletSynchronizationError
3151        );
3152        let mut query = ChainInfoQuery::new(self.chain_id);
3153        query.request_owner_balance = owner;
3154        let response = self
3155            .client
3156            .local_node
3157            .handle_chain_info_query(query)
3158            .await?;
3159        Ok((
3160            response.info.chain_balance,
3161            response.info.requested_owner_balance,
3162        ))
3163    }
3164
3165    /// Sends tokens to a chain.
3166    #[instrument(level = "trace")]
3167    pub async fn transfer_to_account(
3168        &self,
3169        from: AccountOwner,
3170        amount: Amount,
3171        account: Account,
3172    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3173        self.transfer(from, amount, account).await
3174    }
3175
3176    /// Burns tokens (transfer to a special address).
3177    #[cfg(with_testing)]
3178    #[instrument(level = "trace")]
3179    pub async fn burn(
3180        &self,
3181        owner: AccountOwner,
3182        amount: Amount,
3183    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3184        let recipient = Account::burn_address(self.chain_id);
3185        self.transfer(owner, amount, recipient).await
3186    }
3187
3188    #[instrument(level = "trace")]
3189    pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3190        let validators = self.client.validator_nodes().await?;
3191        self.client
3192            .fetch_chain_info(self.chain_id, &validators)
3193            .await
3194    }
3195
3196    /// Attempts to synchronize chains that have sent us messages and populate our local
3197    /// inbox.
3198    ///
3199    /// To create a block that actually executes the messages in the inbox,
3200    /// `process_inbox` must be called separately.
3201    ///
3202    /// If the chain is in follow-only mode, this only downloads blocks for this chain without
3203    /// fetching manager values or sender/publisher chains.
3204    #[instrument(level = "trace")]
3205    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3206        if self.preferred_owner.is_none() {
3207            return self.client.synchronize_chain_state(self.chain_id).await;
3208        }
3209        let info = self.prepare_chain().await?;
3210        self.synchronize_publisher_chains().await?;
3211        self.find_received_certificates(None).await?;
3212        Ok(info)
3213    }
3214
3215    /// Processes the last pending block
3216    #[instrument(level = "trace")]
3217    pub async fn process_pending_block(
3218        &self,
3219    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3220        self.prepare_chain().await?;
3221        self.process_pending_block_without_prepare().await
3222    }
3223
3224    /// Processes the last pending block. Assumes that the local chain is up to date.
3225    #[instrument(level = "trace")]
3226    async fn process_pending_block_without_prepare(
3227        &self,
3228    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3229        let process_start = linera_base::time::Instant::now();
3230        tracing::debug!("process_pending_block_without_prepare started");
3231        let info = self.request_leader_timeout_if_needed().await?;
3232
3233        // If there is a validated block in the current round, finalize it.
3234        if info.manager.has_locking_block_in_current_round()
3235            && !info.manager.current_round.is_fast()
3236        {
3237            return Box::pin(self.finalize_locking_block(info)).await;
3238        }
3239        let owner = self.identity().await?;
3240
3241        let local_node = &self.client.local_node;
3242        // Otherwise we have to re-propose the highest validated block, if there is one.
3243        let pending_proposal = self.pending_proposal();
3244        let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
3245            match &**locking {
3246                LockingBlock::Regular(certificate) => {
3247                    let blob_ids = certificate.block().required_blob_ids();
3248                    let blobs = local_node
3249                        .get_locking_blobs(&blob_ids, self.chain_id)
3250                        .await?
3251                        .ok_or_else(|| {
3252                            ChainClientError::InternalError("Missing local locking blobs")
3253                        })?;
3254                    debug!("Retrying locking block from round {}", certificate.round);
3255                    (certificate.block().clone(), blobs)
3256                }
3257                LockingBlock::Fast(proposal) => {
3258                    let proposed_block = proposal.content.block.clone();
3259                    let blob_ids = proposed_block.published_blob_ids();
3260                    let blobs = local_node
3261                        .get_locking_blobs(&blob_ids, self.chain_id)
3262                        .await?
3263                        .ok_or_else(|| {
3264                            ChainClientError::InternalError("Missing local locking blobs")
3265                        })?;
3266                    let block = self
3267                        .client
3268                        .stage_block_execution(proposed_block, None, blobs.clone())
3269                        .await?
3270                        .0;
3271                    debug!("Retrying locking block from fast round.");
3272                    (block, blobs)
3273                }
3274            }
3275        } else if let Some(pending_proposal) = pending_proposal {
3276            // Otherwise we are free to propose our own pending block.
3277            let proposed_block = pending_proposal.block;
3278            let round = self.round_for_oracle(&info, &owner).await?;
3279            let (block, _) = self
3280                .client
3281                .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
3282                .await?;
3283            debug!("Proposing the local pending block.");
3284            (block, pending_proposal.blobs)
3285        } else {
3286            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
3287        };
3288
3289        let has_oracle_responses = block.has_oracle_responses();
3290        let (proposed_block, outcome) = block.into_proposal();
3291        let round = match self
3292            .round_for_new_proposal(&info, &owner, has_oracle_responses)
3293            .await?
3294        {
3295            Either::Left(round) => round,
3296            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
3297        };
3298        debug!("Proposing block for round {}", round);
3299
3300        let already_handled_locally = info
3301            .manager
3302            .already_handled_proposal(round, &proposed_block);
3303        // Create the final block proposal.
3304        let proposal = if let Some(locking) = info.manager.requested_locking {
3305            Box::new(match *locking {
3306                LockingBlock::Regular(cert) => {
3307                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
3308                        .await
3309                        .map_err(ChainClientError::signer_failure)?
3310                }
3311                LockingBlock::Fast(proposal) => {
3312                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
3313                        .await
3314                        .map_err(ChainClientError::signer_failure)?
3315                }
3316            })
3317        } else {
3318            Box::new(
3319                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
3320                    .await
3321                    .map_err(ChainClientError::signer_failure)?,
3322            )
3323        };
3324        if !already_handled_locally {
3325            // Check the final block proposal. This will be cheaper after #1401.
3326            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
3327                match err {
3328                    LocalNodeError::BlobsNotFound(_) => {
3329                        local_node
3330                            .handle_pending_blobs(self.chain_id, blobs)
3331                            .await?;
3332                        local_node.handle_block_proposal(*proposal.clone()).await?;
3333                    }
3334                    err => return Err(err.into()),
3335                }
3336            }
3337        }
3338        let committee = self.local_committee().await?;
3339        let block = Block::new(proposed_block, outcome);
3340        // Send the query to validators.
3341        let submit_block_proposal_start = linera_base::time::Instant::now();
3342        let certificate = if round.is_fast() {
3343            let hashed_value = ConfirmedBlock::new(block);
3344            Box::pin(
3345                self.client
3346                    .submit_block_proposal(&committee, proposal, hashed_value),
3347            )
3348            .await?
3349        } else {
3350            let hashed_value = ValidatedBlock::new(block);
3351            let certificate = Box::pin(self.client.submit_block_proposal(
3352                &committee,
3353                proposal,
3354                hashed_value.clone(),
3355            ))
3356            .await?;
3357            Box::pin(self.client.finalize_block(&committee, certificate)).await?
3358        };
3359        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
3360        debug!(round = %certificate.round, "Sending confirmed block to validators");
3361        let update_start = linera_base::time::Instant::now();
3362        Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
3363        tracing::debug!(
3364            update_validators_ms = update_start.elapsed().as_millis(),
3365            total_process_ms = process_start.elapsed().as_millis(),
3366            "process_pending_block_without_prepare completing"
3367        );
3368        Ok(ClientOutcome::Committed(Some(certificate)))
3369    }
3370
3371    fn send_timing(&self, start: Instant, timing_type: TimingType) {
3372        let Some(sender) = &self.timing_sender else {
3373            return;
3374        };
3375        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
3376            tracing::warn!(%err, "Failed to send timing info");
3377        }
3378    }
3379
3380    /// Requests a leader timeout certificate if the current round has timed out. Returns the
3381    /// chain info for the (possibly new) current round.
3382    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3383        let mut info = self.chain_info_with_manager_values().await?;
3384        // If the current round has timed out, we request a timeout certificate and retry in
3385        // the next round.
3386        if let Some(round_timeout) = info.manager.round_timeout {
3387            if round_timeout <= self.storage_client().clock().current_time() {
3388                if let Err(e) = self.request_leader_timeout().await {
3389                    debug!("Failed to obtain a timeout certificate: {}", e);
3390                } else {
3391                    info = self.chain_info_with_manager_values().await?;
3392                }
3393            }
3394        }
3395        Ok(info)
3396    }
3397
3398    /// Finalizes the locking block.
3399    ///
3400    /// Panics if there is no locking block; fails if the locking block is not in the current round.
3401    async fn finalize_locking_block(
3402        &self,
3403        info: Box<ChainInfo>,
3404    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3405        let locking = info
3406            .manager
3407            .requested_locking
3408            .expect("Should have a locking block");
3409        let LockingBlock::Regular(certificate) = *locking else {
3410            panic!("Should have a locking validated block");
3411        };
3412        debug!(
3413            round = %certificate.round,
3414            "Finalizing locking block"
3415        );
3416        let committee = self.local_committee().await?;
3417        let certificate =
3418            Box::pin(self.client.finalize_block(&committee, certificate.clone())).await?;
3419        Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
3420        Ok(ClientOutcome::Committed(Some(certificate)))
3421    }
3422
3423    /// Returns the number for the round number oracle to use when staging a block proposal.
3424    async fn round_for_oracle(
3425        &self,
3426        info: &ChainInfo,
3427        identity: &AccountOwner,
3428    ) -> Result<Option<u32>, ChainClientError> {
3429        // Pretend we do use oracles: If we don't, the round number is never read anyway.
3430        match self.round_for_new_proposal(info, identity, true).await {
3431            // If it is a multi-leader round, use its number for the oracle.
3432            Ok(Either::Left(round)) => Ok(round.multi_leader()),
3433            // If there is no suitable round with oracles, use None: If it works without oracles,
3434            // the block won't read the value. If it returns a timeout, it will be a single-leader
3435            // round, in which the oracle returns None.
3436            Err(ChainClientError::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
3437            Err(err) => Err(err),
3438        }
3439    }
3440
3441    /// Returns a round in which we can propose a new block or the given one, if possible.
3442    async fn round_for_new_proposal(
3443        &self,
3444        info: &ChainInfo,
3445        identity: &AccountOwner,
3446        has_oracle_responses: bool,
3447    ) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
3448        let manager = &info.manager;
3449        let seed = self
3450            .client
3451            .local_node
3452            .get_manager_seed(self.chain_id)
3453            .await?;
3454        // If there is a conflicting proposal in the current round, we can only propose if the
3455        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
3456        // Similarly, we cannot propose a block that uses oracles in the fast round, and also
3457        // skip the fast round if fast blocks are not allowed.
3458        let skip_fast = manager.current_round.is_fast()
3459            && (has_oracle_responses || !self.options.allow_fast_blocks);
3460        let conflict = manager
3461            .requested_signed_proposal
3462            .as_ref()
3463            .into_iter()
3464            .chain(&manager.requested_proposed)
3465            .any(|proposal| proposal.content.round == manager.current_round)
3466            || skip_fast;
3467        let round = if !conflict {
3468            manager.current_round
3469        } else if let Some(round) = manager
3470            .ownership
3471            .next_round(manager.current_round)
3472            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
3473        {
3474            round
3475        } else if let Some(timeout) = info.round_timeout() {
3476            return Ok(Either::Right(timeout));
3477        } else {
3478            return Err(ChainClientError::BlockProposalError(
3479                "Conflicting proposal in the current round",
3480            ));
3481        };
3482        let current_committee = info
3483            .current_committee()?
3484            .validators
3485            .values()
3486            .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
3487            .collect();
3488        if manager.should_propose(identity, round, seed, &current_committee) {
3489            return Ok(Either::Left(round));
3490        }
3491        if let Some(timeout) = info.round_timeout() {
3492            return Ok(Either::Right(timeout));
3493        }
3494        Err(ChainClientError::BlockProposalError(
3495            "Not a leader in the current round",
3496        ))
3497    }
3498
3499    /// Clears the information on any operation that previously failed.
3500    #[cfg(with_testing)]
3501    #[instrument(level = "trace")]
3502    pub fn clear_pending_proposal(&self) {
3503        self.update_state(|state| state.clear_pending_proposal());
3504    }
3505
3506    /// Rotates the key of the chain.
3507    ///
3508    /// Replaces current owners of the chain with the new key pair.
3509    #[instrument(level = "trace")]
3510    pub async fn rotate_key_pair(
3511        &self,
3512        public_key: AccountPublicKey,
3513    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3514        Box::pin(self.transfer_ownership(public_key.into())).await
3515    }
3516
3517    /// Transfers ownership of the chain to a single super owner.
3518    #[instrument(level = "trace")]
3519    pub async fn transfer_ownership(
3520        &self,
3521        new_owner: AccountOwner,
3522    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3523        Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
3524            super_owners: vec![new_owner],
3525            owners: Vec::new(),
3526            multi_leader_rounds: 2,
3527            open_multi_leader_rounds: false,
3528            timeout_config: TimeoutConfig::default(),
3529        }))
3530        .await
3531    }
3532
3533    /// Adds another owner to the chain, and turns existing super owners into regular owners.
3534    #[instrument(level = "trace")]
3535    pub async fn share_ownership(
3536        &self,
3537        new_owner: AccountOwner,
3538        new_weight: u64,
3539    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3540        let ownership = self.prepare_chain().await?.manager.ownership;
3541        ensure!(
3542            ownership.is_active(),
3543            ChainError::InactiveChain(self.chain_id)
3544        );
3545        let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
3546        owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
3547        owners.push((new_owner, new_weight));
3548        let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
3549            super_owners: Vec::new(),
3550            owners,
3551            multi_leader_rounds: ownership.multi_leader_rounds,
3552            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3553            timeout_config: ownership.timeout_config,
3554        })];
3555        match self.execute_block(operations, vec![]).await? {
3556            ClientOutcome::Committed(certificate) => Ok(ClientOutcome::Committed(certificate)),
3557            ClientOutcome::Conflict(certificate) => {
3558                info!(
3559                    height = %certificate.block().header.height,
3560                    "Another block was committed."
3561                );
3562                Ok(ClientOutcome::Conflict(certificate))
3563            }
3564            ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
3565        }
3566    }
3567
3568    /// Returns the current ownership settings on this chain.
3569    #[instrument(level = "trace")]
3570    pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, ChainClientError> {
3571        Ok(self
3572            .client
3573            .local_node
3574            .chain_state_view(self.chain_id)
3575            .await?
3576            .execution_state
3577            .system
3578            .ownership
3579            .get()
3580            .clone())
3581    }
3582
3583    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
3584    /// `remove_owners` is `true`.
3585    #[instrument(level = "trace")]
3586    pub async fn change_ownership(
3587        &self,
3588        ownership: ChainOwnership,
3589    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3590        Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
3591            super_owners: ownership.super_owners.into_iter().collect(),
3592            owners: ownership.owners.into_iter().collect(),
3593            multi_leader_rounds: ownership.multi_leader_rounds,
3594            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3595            timeout_config: ownership.timeout_config.clone(),
3596        }))
3597        .await
3598    }
3599
3600    /// Returns the current application permissions on this chain.
3601    #[instrument(level = "trace")]
3602    pub async fn query_application_permissions(
3603        &self,
3604    ) -> Result<ApplicationPermissions, ChainClientError> {
3605        Ok(self
3606            .client
3607            .local_node
3608            .chain_state_view(self.chain_id)
3609            .await?
3610            .execution_state
3611            .system
3612            .application_permissions
3613            .get()
3614            .clone())
3615    }
3616
3617    /// Changes the application permissions configuration on this chain.
3618    #[instrument(level = "trace", skip(application_permissions))]
3619    pub async fn change_application_permissions(
3620        &self,
3621        application_permissions: ApplicationPermissions,
3622    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3623        Box::pin(
3624            self.execute_operation(SystemOperation::ChangeApplicationPermissions(
3625                application_permissions,
3626            )),
3627        )
3628        .await
3629    }
3630
3631    /// Opens a new chain with a derived UID.
3632    #[instrument(level = "trace", skip(self))]
3633    pub async fn open_chain(
3634        &self,
3635        ownership: ChainOwnership,
3636        application_permissions: ApplicationPermissions,
3637        balance: Amount,
3638    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, ChainClientError>
3639    {
3640        let config = OpenChainConfig {
3641            ownership: ownership.clone(),
3642            balance,
3643            application_permissions: application_permissions.clone(),
3644        };
3645        let operation = Operation::system(SystemOperation::OpenChain(config));
3646        let certificate = match self.execute_block(vec![operation], vec![]).await? {
3647            ClientOutcome::Committed(certificate) => certificate,
3648            ClientOutcome::Conflict(certificate) => {
3649                return Ok(ClientOutcome::Conflict(certificate));
3650            }
3651            ClientOutcome::WaitForTimeout(timeout) => {
3652                return Ok(ClientOutcome::WaitForTimeout(timeout));
3653            }
3654        };
3655        // The only operation, i.e. the last transaction, created the new chain.
3656        let chain_blob = certificate
3657            .block()
3658            .body
3659            .blobs
3660            .last()
3661            .and_then(|blobs| blobs.last())
3662            .ok_or_else(|| ChainClientError::InternalError("Failed to create a new chain"))?;
3663        let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
3664        // If we have a key for any owner, add it to the list of tracked chains.
3665        for owner in ownership.all_owners() {
3666            if self.client.has_key_for(owner).await? {
3667                self.client
3668                    .extend_chain_mode(description.id(), ListeningMode::FullChain);
3669                break;
3670            }
3671        }
3672        self.client
3673            .local_node
3674            .retry_pending_cross_chain_requests(self.chain_id)
3675            .await?;
3676        Ok(ClientOutcome::Committed((description, certificate)))
3677    }
3678
3679    /// Closes the chain (and loses everything in it!!).
3680    /// Returns `None` if the chain was already closed.
3681    #[instrument(level = "trace")]
3682    pub async fn close_chain(
3683        &self,
3684    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3685        match Box::pin(self.execute_operation(SystemOperation::CloseChain)).await {
3686            Ok(outcome) => Ok(outcome.map(Some)),
3687            Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3688                WorkerError::ChainError(chain_error),
3689            ))) if matches!(*chain_error, ChainError::ClosedChain) => {
3690                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
3691            }
3692            Err(error) => Err(error),
3693        }
3694    }
3695
3696    /// Publishes some module.
3697    #[cfg(not(target_arch = "wasm32"))]
3698    #[instrument(level = "trace", skip(contract, service))]
3699    pub async fn publish_module(
3700        &self,
3701        contract: Bytecode,
3702        service: Bytecode,
3703        vm_runtime: VmRuntime,
3704    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3705        let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
3706        Box::pin(self.publish_module_blobs(blobs, module_id)).await
3707    }
3708
3709    /// Publishes some module.
3710    #[cfg(not(target_arch = "wasm32"))]
3711    #[instrument(level = "trace", skip(blobs, module_id))]
3712    pub async fn publish_module_blobs(
3713        &self,
3714        blobs: Vec<Blob>,
3715        module_id: ModuleId,
3716    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3717        self.execute_operations(
3718            vec![Operation::system(SystemOperation::PublishModule {
3719                module_id,
3720            })],
3721            blobs,
3722        )
3723        .await?
3724        .try_map(|certificate| Ok((module_id, certificate)))
3725    }
3726
3727    /// Publishes some data blobs.
3728    #[instrument(level = "trace", skip(bytes))]
3729    pub async fn publish_data_blobs(
3730        &self,
3731        bytes: Vec<Vec<u8>>,
3732    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3733        let blobs = bytes.into_iter().map(Blob::new_data);
3734        let publish_blob_operations = blobs
3735            .clone()
3736            .map(|blob| {
3737                Operation::system(SystemOperation::PublishDataBlob {
3738                    blob_hash: blob.id().hash,
3739                })
3740            })
3741            .collect();
3742        self.execute_operations(publish_blob_operations, blobs.collect())
3743            .await
3744    }
3745
3746    /// Publishes some data blob.
3747    #[instrument(level = "trace", skip(bytes))]
3748    pub async fn publish_data_blob(
3749        &self,
3750        bytes: Vec<u8>,
3751    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3752        Box::pin(self.publish_data_blobs(vec![bytes])).await
3753    }
3754
3755    /// Creates an application by instantiating some bytecode.
3756    #[instrument(
3757        level = "trace",
3758        skip(self, parameters, instantiation_argument, required_application_ids)
3759    )]
3760    pub async fn create_application<
3761        A: Abi,
3762        Parameters: Serialize,
3763        InstantiationArgument: Serialize,
3764    >(
3765        &self,
3766        module_id: ModuleId<A, Parameters, InstantiationArgument>,
3767        parameters: &Parameters,
3768        instantiation_argument: &InstantiationArgument,
3769        required_application_ids: Vec<ApplicationId>,
3770    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, ChainClientError>
3771    {
3772        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
3773        let parameters = serde_json::to_vec(parameters)?;
3774        Ok(Box::pin(self.create_application_untyped(
3775            module_id.forget_abi(),
3776            parameters,
3777            instantiation_argument,
3778            required_application_ids,
3779        ))
3780        .await?
3781        .map(|(app_id, cert)| (app_id.with_abi(), cert)))
3782    }
3783
3784    /// Creates an application by instantiating some bytecode.
3785    #[instrument(
3786        level = "trace",
3787        skip(
3788            self,
3789            module_id,
3790            parameters,
3791            instantiation_argument,
3792            required_application_ids
3793        )
3794    )]
3795    pub async fn create_application_untyped(
3796        &self,
3797        module_id: ModuleId,
3798        parameters: Vec<u8>,
3799        instantiation_argument: Vec<u8>,
3800        required_application_ids: Vec<ApplicationId>,
3801    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, ChainClientError> {
3802        Box::pin(self.execute_operation(SystemOperation::CreateApplication {
3803            module_id,
3804            parameters,
3805            instantiation_argument,
3806            required_application_ids,
3807        }))
3808        .await?
3809        .try_map(|certificate| {
3810            // The first message of the only operation created the application.
3811            let mut creation: Vec<_> = certificate
3812                .block()
3813                .created_blob_ids()
3814                .into_iter()
3815                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
3816                .collect();
3817            if creation.len() > 1 {
3818                return Err(ChainClientError::InternalError(
3819                    "Unexpected number of application descriptions published",
3820                ));
3821            }
3822            let blob_id = creation.pop().ok_or(ChainClientError::InternalError(
3823                "ApplicationDescription blob not found.",
3824            ))?;
3825            let id = ApplicationId::new(blob_id.hash);
3826            Ok((id, certificate))
3827        })
3828    }
3829
3830    /// Creates a new committee and starts using it (admin chains only).
3831    #[instrument(level = "trace", skip(committee))]
3832    pub async fn stage_new_committee(
3833        &self,
3834        committee: Committee,
3835    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3836        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
3837        let blob_hash = blob.id().hash;
3838        match self
3839            .execute_operations(
3840                vec![Operation::system(SystemOperation::Admin(
3841                    AdminOperation::PublishCommitteeBlob { blob_hash },
3842                ))],
3843                vec![blob],
3844            )
3845            .await?
3846        {
3847            ClientOutcome::Committed(_) => {}
3848            outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
3849            outcome @ ClientOutcome::Conflict(_) => return Ok(outcome),
3850        }
3851        let epoch = Box::pin(self.chain_info()).await?.epoch.try_add_one()?;
3852        Box::pin(
3853            self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
3854                epoch,
3855                blob_hash,
3856            })),
3857        )
3858        .await
3859    }
3860
3861    /// Synchronizes the chain with the validators and creates blocks without any operations to
3862    /// process all incoming messages. This may require several blocks.
3863    ///
3864    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
3865    /// is returned, too.
3866    #[instrument(level = "trace")]
3867    pub async fn process_inbox(
3868        &self,
3869    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3870        self.prepare_chain().await?;
3871        self.process_inbox_without_prepare().await
3872    }
3873
3874    /// Creates blocks without any operations to process all incoming messages. This may require
3875    /// several blocks.
3876    ///
3877    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
3878    /// is returned, too.
3879    #[instrument(level = "trace")]
3880    pub async fn process_inbox_without_prepare(
3881        &self,
3882    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3883        #[cfg(with_metrics)]
3884        let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
3885
3886        let mut certificates = Vec::new();
3887        loop {
3888            // We provide no operations - this means that the only operations executed
3889            // will be epoch changes, receiving messages and processing event stream
3890            // updates, if any are pending.
3891            match self.execute_block(vec![], vec![]).await {
3892                Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
3893                Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
3894                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
3895                    return Ok((certificates, Some(timeout)));
3896                }
3897                // Nothing in the inbox and no stream updates to be processed.
3898                Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3899                    WorkerError::ChainError(chain_error),
3900                ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
3901                    return Ok((certificates, None));
3902                }
3903                Err(error) => return Err(error),
3904            };
3905        }
3906    }
3907
3908    /// Returns operations to process all pending epoch changes: first the new epochs, in order,
3909    /// then the removed epochs, in order.
3910    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, ChainClientError> {
3911        let (mut min_epoch, mut next_epoch) = {
3912            let (epoch, committees) = self.epoch_and_committees().await?;
3913            let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
3914            (min_epoch, epoch.try_add_one()?)
3915        };
3916        let mut epoch_change_ops = Vec::new();
3917        while self
3918            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
3919            .await?
3920        {
3921            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
3922                next_epoch,
3923            )));
3924            next_epoch.try_add_assign_one()?;
3925        }
3926        while self
3927            .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
3928            .await?
3929        {
3930            epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
3931                min_epoch,
3932            )));
3933            min_epoch.try_add_assign_one()?;
3934        }
3935        Ok(epoch_change_ops)
3936    }
3937
3938    /// Returns whether the system event on the admin chain with the given stream name and key
3939    /// exists in storage.
3940    async fn has_admin_event(
3941        &self,
3942        stream_name: &[u8],
3943        index: u32,
3944    ) -> Result<bool, ChainClientError> {
3945        let event_id = EventId {
3946            chain_id: self.client.admin_chain_id,
3947            stream_id: StreamId::system(stream_name),
3948            index,
3949        };
3950        Ok(self
3951            .client
3952            .storage_client()
3953            .read_event(event_id)
3954            .await?
3955            .is_some())
3956    }
3957
3958    /// Returns the indices and events from the storage
3959    pub async fn events_from_index(
3960        &self,
3961        stream_id: StreamId,
3962        start_index: u32,
3963    ) -> Result<Vec<IndexAndEvent>, ChainClientError> {
3964        Ok(self
3965            .client
3966            .storage_client()
3967            .read_events_from_index(&self.chain_id, &stream_id, start_index)
3968            .await?)
3969    }
3970
3971    /// Deprecates all the configurations of voting rights up to the given one (admin chains
3972    /// only). Currently, each individual chain is still entitled to wait before accepting
3973    /// this command. However, it is expected that deprecated validators stop functioning
3974    /// shortly after such command is issued.
3975    #[instrument(level = "trace")]
3976    pub async fn revoke_epochs(
3977        &self,
3978        revoked_epoch: Epoch,
3979    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3980        self.prepare_chain().await?;
3981        let (current_epoch, committees) = self.epoch_and_committees().await?;
3982        ensure!(
3983            revoked_epoch < current_epoch,
3984            ChainClientError::CannotRevokeCurrentEpoch(current_epoch)
3985        );
3986        ensure!(
3987            committees.contains_key(&revoked_epoch),
3988            ChainClientError::EpochAlreadyRevoked
3989        );
3990        let operations = committees
3991            .keys()
3992            .filter_map(|epoch| {
3993                if *epoch <= revoked_epoch {
3994                    Some(Operation::system(SystemOperation::Admin(
3995                        AdminOperation::RemoveCommittee { epoch: *epoch },
3996                    )))
3997                } else {
3998                    None
3999                }
4000            })
4001            .collect();
4002        self.execute_operations(operations, vec![]).await
4003    }
4004
4005    /// Sends money to a chain.
4006    /// Do not check balance. (This may block the client)
4007    /// Do not confirm the transaction.
4008    #[instrument(level = "trace")]
4009    pub async fn transfer_to_account_unsafe_unconfirmed(
4010        &self,
4011        owner: AccountOwner,
4012        amount: Amount,
4013        recipient: Account,
4014    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4015        Box::pin(self.execute_operation(SystemOperation::Transfer {
4016            owner,
4017            recipient,
4018            amount,
4019        }))
4020        .await
4021    }
4022
4023    #[instrument(level = "trace", skip(hash))]
4024    pub async fn read_confirmed_block(
4025        &self,
4026        hash: CryptoHash,
4027    ) -> Result<ConfirmedBlock, ChainClientError> {
4028        let block = self
4029            .client
4030            .storage_client()
4031            .read_confirmed_block(hash)
4032            .await?;
4033        block.ok_or(ChainClientError::MissingConfirmedBlock(hash))
4034    }
4035
4036    #[instrument(level = "trace", skip(hash))]
4037    pub async fn read_certificate(
4038        &self,
4039        hash: CryptoHash,
4040    ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
4041        let certificate = self.client.storage_client().read_certificate(hash).await?;
4042        certificate.ok_or(ChainClientError::ReadCertificatesError(vec![hash]))
4043    }
4044
4045    /// Handles any cross-chain requests for any pending outgoing messages.
4046    #[instrument(level = "trace")]
4047    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), ChainClientError> {
4048        self.client
4049            .local_node
4050            .retry_pending_cross_chain_requests(self.chain_id)
4051            .await?;
4052        Ok(())
4053    }
4054
4055    #[instrument(level = "trace", skip(local_node))]
4056    async fn local_chain_info(
4057        &self,
4058        chain_id: ChainId,
4059        local_node: &mut LocalNodeClient<Env::Storage>,
4060    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
4061        match local_node.chain_info(chain_id).await {
4062            Ok(info) => {
4063                // Useful in case `chain_id` is the same as a local chain.
4064                self.client.update_from_info(&info);
4065                Ok(Some(info))
4066            }
4067            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
4068            Err(err) => Err(err.into()),
4069        }
4070    }
4071
4072    #[instrument(level = "trace", skip(chain_id, local_node))]
4073    async fn local_next_block_height(
4074        &self,
4075        chain_id: ChainId,
4076        local_node: &mut LocalNodeClient<Env::Storage>,
4077    ) -> Result<BlockHeight, ChainClientError> {
4078        Ok(self
4079            .local_chain_info(chain_id, local_node)
4080            .await?
4081            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
4082    }
4083
4084    /// Returns the next height we expect to receive from the given sender chain, according to the
4085    /// local inbox.
4086    #[instrument(level = "trace")]
4087    async fn local_next_height_to_receive(
4088        &self,
4089        origin: ChainId,
4090    ) -> Result<BlockHeight, ChainClientError> {
4091        Ok(self
4092            .client
4093            .local_node
4094            .get_inbox_next_height(self.chain_id, origin)
4095            .await?)
4096    }
4097
4098    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
4099    async fn process_notification(
4100        &self,
4101        remote_node: RemoteNode<Env::ValidatorNode>,
4102        mut local_node: LocalNodeClient<Env::Storage>,
4103        notification: Notification,
4104    ) -> Result<(), ChainClientError> {
4105        let dominated = self
4106            .listening_mode()
4107            .is_none_or(|mode| !mode.is_relevant(&notification.reason));
4108        if dominated {
4109            debug!(
4110                chain_id = %self.chain_id,
4111                reason = ?notification.reason,
4112                listening_mode = ?self.listening_mode(),
4113                "Ignoring notification due to listening mode"
4114            );
4115            return Ok(());
4116        }
4117        match notification.reason {
4118            Reason::NewIncomingBundle { origin, height } => {
4119                if self.local_next_height_to_receive(origin).await? > height {
4120                    debug!(
4121                        chain_id = %self.chain_id,
4122                        "Accepting redundant notification for new message"
4123                    );
4124                    return Ok(());
4125                }
4126                self.client
4127                    .download_sender_block_with_sending_ancestors(
4128                        self.chain_id,
4129                        origin,
4130                        height,
4131                        &remote_node,
4132                    )
4133                    .await?;
4134                if self.local_next_height_to_receive(origin).await? <= height {
4135                    info!(
4136                        chain_id = %self.chain_id,
4137                        "NewIncomingBundle: Fail to synchronize new message after notification"
4138                    );
4139                }
4140            }
4141            Reason::NewBlock { height, .. } => {
4142                let chain_id = notification.chain_id;
4143                if self
4144                    .local_next_block_height(chain_id, &mut local_node)
4145                    .await?
4146                    > height
4147                {
4148                    debug!(
4149                        chain_id = %self.chain_id,
4150                        "Accepting redundant notification for new block"
4151                    );
4152                    return Ok(());
4153                }
4154                self.client
4155                    .synchronize_chain_state_from(&remote_node, chain_id)
4156                    .await?;
4157                if self
4158                    .local_next_block_height(chain_id, &mut local_node)
4159                    .await?
4160                    <= height
4161                {
4162                    error!("NewBlock: Fail to synchronize new block after notification");
4163                }
4164            }
4165            Reason::NewRound { height, round } => {
4166                let chain_id = notification.chain_id;
4167                if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
4168                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
4169                        debug!(
4170                            chain_id = %self.chain_id,
4171                            "Accepting redundant notification for new round"
4172                        );
4173                        return Ok(());
4174                    }
4175                }
4176                self.client
4177                    .synchronize_chain_state_from(&remote_node, chain_id)
4178                    .await?;
4179                let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
4180                    error!(
4181                        chain_id = %self.chain_id,
4182                        "NewRound: Fail to read local chain info for {chain_id}"
4183                    );
4184                    return Ok(());
4185                };
4186                if (info.next_block_height, info.manager.current_round) < (height, round) {
4187                    info!(
4188                        chain_id = %self.chain_id,
4189                        "NewRound: Fail to synchronize new block after notification"
4190                    );
4191                }
4192            }
4193            Reason::BlockExecuted { .. } => {
4194                // Ignored.
4195            }
4196        }
4197        Ok(())
4198    }
4199
4200    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
4201    pub fn is_tracked(&self) -> bool {
4202        self.client.is_tracked(self.chain_id)
4203    }
4204
4205    /// Returns the listening mode for this chain, if it is tracked.
4206    pub fn listening_mode(&self) -> Option<ListeningMode> {
4207        self.client.chain_mode(self.chain_id)
4208    }
4209
4210    /// Spawns a task that listens to notifications about the current chain from all validators,
4211    /// and synchronizes the local state accordingly.
4212    ///
4213    /// The listening mode must be set in `Client::chain_modes` before calling this method.
4214    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
4215    pub async fn listen(
4216        &self,
4217    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), ChainClientError> {
4218        use future::FutureExt as _;
4219
4220        async fn await_while_polling<F: FusedFuture>(
4221            future: F,
4222            background_work: impl FusedStream<Item = ()>,
4223        ) -> F::Output {
4224            tokio::pin!(future);
4225            tokio::pin!(background_work);
4226            loop {
4227                futures::select! {
4228                    _ = background_work.next() => (),
4229                    result = future => return result,
4230                }
4231            }
4232        }
4233
4234        let mut senders = HashMap::new(); // Senders to cancel notification streams.
4235        let notifications = self.subscribe()?;
4236        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
4237
4238        // Beware: if this future ceases to make progress, notification processing will
4239        // deadlock, because of the issue described in
4240        // https://github.com/linera-io/linera-protocol/pull/1173.
4241
4242        // TODO(#2013): replace this lock with an asynchronous communication channel
4243
4244        let mut process_notifications = FuturesUnordered::new();
4245
4246        match self.update_notification_streams(&mut senders).await {
4247            Ok(handler) => process_notifications.push(handler),
4248            Err(error) => error!("Failed to update committee: {error}"),
4249        };
4250
4251        let this = self.clone();
4252        let update_streams = async move {
4253            let mut abortable_notifications = abortable_notifications.fuse();
4254
4255            while let Some(notification) =
4256                await_while_polling(abortable_notifications.next(), &mut process_notifications)
4257                    .await
4258            {
4259                if let Reason::NewBlock { .. } = notification.reason {
4260                    match Box::pin(await_while_polling(
4261                        this.update_notification_streams(&mut senders).fuse(),
4262                        &mut process_notifications,
4263                    ))
4264                    .await
4265                    {
4266                        Ok(handler) => process_notifications.push(handler),
4267                        Err(error) => error!("Failed to update committee: {error}"),
4268                    }
4269                }
4270            }
4271
4272            for abort in senders.into_values() {
4273                abort.abort();
4274            }
4275
4276            let () = process_notifications.collect().await;
4277        }
4278        .in_current_span();
4279
4280        Ok((update_streams, AbortOnDrop(abort), notifications))
4281    }
4282
4283    #[instrument(level = "trace", skip(senders))]
4284    async fn update_notification_streams(
4285        &self,
4286        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
4287    ) -> Result<impl Future<Output = ()>, ChainClientError> {
4288        let (nodes, local_node) = {
4289            let committee = self.local_committee().await?;
4290            let nodes: HashMap<_, _> = self
4291                .client
4292                .validator_node_provider()
4293                .make_nodes(&committee)?
4294                .collect();
4295            (nodes, self.client.local_node.clone())
4296        };
4297        // Drop removed validators.
4298        senders.retain(|validator, abort| {
4299            if !nodes.contains_key(validator) {
4300                abort.abort();
4301            }
4302            !abort.is_aborted()
4303        });
4304        // Add tasks for new validators.
4305        let validator_tasks = FuturesUnordered::new();
4306        for (public_key, node) in nodes {
4307            let address = node.address();
4308            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
4309                continue;
4310            };
4311            let this = self.clone();
4312            let stream = stream::once({
4313                let node = node.clone();
4314                async move {
4315                    let stream = node.subscribe(vec![this.chain_id]).await?;
4316                    // Only now the notification stream is established. We may have missed
4317                    // notifications since the last time we synchronized.
4318                    let remote_node = RemoteNode { public_key, node };
4319                    this.client
4320                        .synchronize_chain_state_from(&remote_node, this.chain_id)
4321                        .await?;
4322                    Ok::<_, ChainClientError>(stream)
4323                }
4324            })
4325            .filter_map(move |result| {
4326                let address = address.clone();
4327                async move {
4328                    if let Err(error) = &result {
4329                        info!(?error, address, "could not connect to validator");
4330                    } else {
4331                        debug!(address, "connected to validator");
4332                    }
4333                    result.ok()
4334                }
4335            })
4336            .flatten();
4337            let (stream, abort) = stream::abortable(stream);
4338            let mut stream = Box::pin(stream);
4339            let this = self.clone();
4340            let local_node = local_node.clone();
4341            let remote_node = RemoteNode { public_key, node };
4342            validator_tasks.push(async move {
4343                while let Some(notification) = stream.next().await {
4344                    if let Err(error) = this
4345                        .process_notification(
4346                            remote_node.clone(),
4347                            local_node.clone(),
4348                            notification.clone(),
4349                        )
4350                        .await
4351                    {
4352                        tracing::info!(
4353                            chain_id = %this.chain_id,
4354                            address = remote_node.address(),
4355                            ?notification,
4356                            %error,
4357                            "failed to process notification",
4358                        );
4359                    }
4360                }
4361            });
4362            entry.insert(abort);
4363        }
4364        Ok(validator_tasks.collect())
4365    }
4366
4367    /// Attempts to update a validator with the local information.
4368    #[instrument(level = "trace", skip(remote_node))]
4369    pub async fn sync_validator(
4370        &self,
4371        remote_node: Env::ValidatorNode,
4372    ) -> Result<(), ChainClientError> {
4373        let validator_next_block_height = match remote_node
4374            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
4375            .await
4376        {
4377            Ok(info) => info.info.next_block_height,
4378            Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
4379            Err(err) => return Err(err.into()),
4380        };
4381        let local_next_block_height = self.chain_info().await?.next_block_height;
4382
4383        if validator_next_block_height >= local_next_block_height {
4384            debug!("Validator is up-to-date with local state");
4385            return Ok(());
4386        }
4387
4388        let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
4389            .map(BlockHeight)
4390            .collect();
4391
4392        let certificates = self
4393            .client
4394            .storage_client()
4395            .read_certificates_by_heights(self.chain_id, &heights)
4396            .await?
4397            .into_iter()
4398            .flatten()
4399            .collect::<Vec<_>>();
4400
4401        for certificate in certificates {
4402            match remote_node
4403                .handle_confirmed_certificate(
4404                    certificate.clone(),
4405                    CrossChainMessageDelivery::NonBlocking,
4406                )
4407                .await
4408            {
4409                Ok(_) => (),
4410                Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
4411                    // Upload the missing blobs we have and retry.
4412                    let missing_blobs: Vec<_> = self
4413                        .client
4414                        .storage_client()
4415                        .read_blobs(&missing_blob_ids)
4416                        .await?
4417                        .into_iter()
4418                        .flatten()
4419                        .collect();
4420                    remote_node.upload_blobs(missing_blobs).await?;
4421                    remote_node
4422                        .handle_confirmed_certificate(
4423                            certificate,
4424                            CrossChainMessageDelivery::NonBlocking,
4425                        )
4426                        .await?;
4427                }
4428                Err(err) => return Err(err.into()),
4429            }
4430        }
4431
4432        Ok(())
4433    }
4434}
4435
4436/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on
4437/// each subsequent node. Returns error `err` if all of the nodes fail.
4438async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
4439    nodes: &[RemoteNode<A>],
4440    f: F,
4441    err: G,
4442    timeout: Duration,
4443) -> Result<V, E2>
4444where
4445    F: Clone + FnOnce(RemoteNode<A>) -> R,
4446    RemoteNode<A>: Clone,
4447    G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
4448    R: Future<Output = Result<V, E1>> + 'a,
4449{
4450    let mut stream = nodes
4451        .iter()
4452        .zip(0..)
4453        .map(|(remote_node, i)| {
4454            let fun = f.clone();
4455            let node = remote_node.clone();
4456            async move {
4457                linera_base::time::timer::sleep(timeout * i * i).await;
4458                fun(node).await.map_err(|err| (remote_node.public_key, err))
4459            }
4460        })
4461        .collect::<FuturesUnordered<_>>();
4462    let mut errors = vec![];
4463    while let Some(maybe_result) = stream.next().await {
4464        match maybe_result {
4465            Ok(result) => return Ok(result),
4466            Err(error) => errors.push(error),
4467        };
4468    }
4469    Err(err(errors))
4470}
4471
4472#[cfg(with_testing)]
4473impl<Env: Environment> ChainClient<Env> {
4474    pub async fn process_notification_from(
4475        &self,
4476        notification: Notification,
4477        validator: (ValidatorPublicKey, &str),
4478    ) {
4479        let mut node_list = self
4480            .client
4481            .validator_node_provider()
4482            .make_nodes_from_list(vec![validator])
4483            .unwrap();
4484        let (public_key, node) = node_list.next().unwrap();
4485        let remote_node = RemoteNode { node, public_key };
4486        let local_node = self.client.local_node.clone();
4487        self.process_notification(remote_node, local_node, notification)
4488            .await
4489            .unwrap();
4490    }
4491}
4492
4493/// Wrapper for `AbortHandle` that aborts when its dropped.
4494#[must_use]
4495pub struct AbortOnDrop(pub AbortHandle);
4496
4497impl Drop for AbortOnDrop {
4498    #[instrument(level = "trace", skip(self))]
4499    fn drop(&mut self) {
4500        self.0.abort();
4501    }
4502}
4503
4504/// A pending proposed block, together with its published blobs.
4505#[derive(Clone, Serialize, Deserialize)]
4506pub struct PendingProposal {
4507    pub block: ProposedBlock,
4508    pub blobs: Vec<Blob>,
4509}
4510
4511enum ReceiveCertificateMode {
4512    NeedsCheck,
4513    AlreadyChecked,
4514}
4515
4516enum CheckCertificateResult {
4517    OldEpoch,
4518    New,
4519    FutureEpoch,
4520}
4521
4522impl CheckCertificateResult {
4523    fn into_result(self) -> Result<(), ChainClientError> {
4524        match self {
4525            Self::OldEpoch => Err(ChainClientError::CommitteeDeprecationError),
4526            Self::New => Ok(()),
4527            Self::FutureEpoch => Err(ChainClientError::CommitteeSynchronizationError),
4528        }
4529    }
4530}
4531
4532/// Creates a compressed Contract, Service and bytecode.
4533#[cfg(not(target_arch = "wasm32"))]
4534pub async fn create_bytecode_blobs(
4535    contract: Bytecode,
4536    service: Bytecode,
4537    vm_runtime: VmRuntime,
4538) -> (Vec<Blob>, ModuleId) {
4539    match vm_runtime {
4540        VmRuntime::Wasm => {
4541            let (compressed_contract, compressed_service) =
4542                tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
4543                    .await
4544                    .expect("Compression should not panic");
4545            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
4546            let service_blob = Blob::new_service_bytecode(compressed_service);
4547            let module_id =
4548                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
4549            (vec![contract_blob, service_blob], module_id)
4550        }
4551        VmRuntime::Evm => {
4552            let compressed_contract = contract.compress();
4553            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
4554            let module_id = ModuleId::new(
4555                evm_contract_blob.id().hash,
4556                evm_contract_blob.id().hash,
4557                vm_runtime,
4558            );
4559            (vec![evm_contract_blob], module_id)
4560        }
4561    }
4562}