Skip to main content

linera_core/client/
mod.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::Ordering,
7    collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
8    convert::Infallible,
9    iter,
10    sync::{Arc, RwLock},
11};
12
13use chain_client_state::ChainClientState;
14use custom_debug_derive::Debug;
15use futures::{
16    future::{self, Either, FusedFuture, Future},
17    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
18};
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{
22    abi::Abi,
23    crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
24    data_types::{
25        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
26        ChainDescription, Epoch, MessagePolicy, Round, TimeDelta, Timestamp,
27    },
28    ensure,
29    identifiers::{
30        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
31        ModuleId, StreamId,
32    },
33    ownership::{ChainOwnership, TimeoutConfig},
34    time::{Duration, Instant},
35};
36#[cfg(not(target_arch = "wasm32"))]
37use linera_base::{data_types::Bytecode, vm::VmRuntime};
38use linera_chain::{
39    data_types::{
40        BlockProposal, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight, IncomingBundle,
41        LiteVote, ProposedBlock, Transaction,
42    },
43    manager::LockingBlock,
44    types::{
45        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
46        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
47    },
48    ChainError, ChainExecutionContext, ChainStateView,
49};
50use linera_execution::{
51    committee::Committee,
52    system::{
53        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
54        REMOVED_EPOCH_STREAM_NAME,
55    },
56    ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
57};
58use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
59use linera_views::ViewError;
60use rand::prelude::SliceRandom as _;
61use received_log::ReceivedLogs;
62use serde::{Deserialize, Serialize};
63use thiserror::Error;
64use tokio::sync::{mpsc, OwnedRwLockReadGuard};
65use tokio_stream::wrappers::UnboundedReceiverStream;
66use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
67use validator_trackers::ValidatorTrackers;
68
69use crate::{
70    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout},
71    environment::{wallet::Wallet as _, Environment},
72    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
73    node::{
74        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
75        ValidatorNodeProvider as _,
76    },
77    notifier::{ChannelNotifier, Notifier as _},
78    remote_node::RemoteNode,
79    updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
80    worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
81    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
82};
83
84mod chain_client_state;
85#[cfg(test)]
86#[path = "../unit_tests/client_tests.rs"]
87mod client_tests;
88pub mod requests_scheduler;
89
90pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
91mod received_log;
92mod validator_trackers;
93
94#[cfg(with_metrics)]
95mod metrics {
96    use std::sync::LazyLock;
97
98    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
99    use prometheus::HistogramVec;
100
101    pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
102        LazyLock::new(|| {
103            register_histogram_vec(
104                "process_inbox_latency",
105                "process_inbox latency",
106                &[],
107                exponential_bucket_latencies(500.0),
108            )
109        });
110
111    pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
112        register_histogram_vec(
113            "prepare_chain_latency",
114            "prepare_chain latency",
115            &[],
116            exponential_bucket_latencies(500.0),
117        )
118    });
119
120    pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
121        register_histogram_vec(
122            "synchronize_chain_state_latency",
123            "synchronize_chain_state latency",
124            &[],
125            exponential_bucket_latencies(500.0),
126        )
127    });
128
129    pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
130        register_histogram_vec(
131            "execute_block_latency",
132            "execute_block latency",
133            &[],
134            exponential_bucket_latencies(500.0),
135        )
136    });
137
138    pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
139        register_histogram_vec(
140            "find_received_certificates_latency",
141            "find_received_certificates latency",
142            &[],
143            exponential_bucket_latencies(500.0),
144        )
145    });
146}
147
148/// Defines what type of notifications we should process for a chain:
149/// - do we fully participate in consensus and download sender chains?
150/// - or do we only follow the chain's blocks without participating?
151/// - or do we only care about blocks containing events from some particular streams?
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub enum ListeningMode {
154    /// Listen to everything: all blocks for the chain and all blocks from sender chains,
155    /// and participate in rounds.
156    FullChain,
157    /// Listen to all blocks for the chain, but don't download sender chain blocks or participate
158    /// in rounds. Use this when interested in the chain's state but not intending to propose
159    /// blocks (e.g., because we're not a chain owner).
160    FollowChain,
161    /// Only listen to blocks which contain events from those streams.
162    EventsOnly(BTreeSet<StreamId>),
163}
164
165impl PartialOrd for ListeningMode {
166    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
167        match (self, other) {
168            (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
169            (ListeningMode::FullChain, _) => Some(Ordering::Greater),
170            (_, ListeningMode::FullChain) => Some(Ordering::Less),
171            (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
172            (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
173            (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
174            (ListeningMode::EventsOnly(a), ListeningMode::EventsOnly(b)) => {
175                if a == b {
176                    Some(Ordering::Equal)
177                } else if a.is_superset(b) {
178                    Some(Ordering::Greater)
179                } else if b.is_superset(a) {
180                    Some(Ordering::Less)
181                } else {
182                    None
183                }
184            }
185        }
186    }
187}
188
189impl ListeningMode {
190    /// Returns whether a notification with this reason should be processed under this listening
191    /// mode.
192    pub fn is_relevant(&self, reason: &Reason) -> bool {
193        match (reason, self) {
194            (Reason::NewEvents { .. }, ListeningMode::FollowChain | ListeningMode::FullChain) => {
195                false
196            }
197            // FullChain processes everything.
198            (_, ListeningMode::FullChain) => true,
199            // FollowChain processes new blocks on the chain itself, including blocks that
200            // produced events.
201            (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
202            (_, ListeningMode::FollowChain) => false,
203            // EventsOnly only processes events from relevant streams.
204            // Accept both NewEvents and NewBlock (for old validators that don't emit
205            // NewEvents) if they contain relevant event streams.
206            (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant))
207            | (Reason::NewBlock { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
208                relevant.intersection(event_streams).next().is_some()
209            }
210            (_, ListeningMode::EventsOnly(_)) => false,
211        }
212    }
213
214    pub fn extend(&mut self, other: Option<ListeningMode>) {
215        match (self, other) {
216            (_, None) => (),
217            (ListeningMode::FullChain, _) => (),
218            (mode, Some(ListeningMode::FullChain)) => {
219                *mode = ListeningMode::FullChain;
220            }
221            (ListeningMode::FollowChain, _) => (),
222            (mode, Some(ListeningMode::FollowChain)) => {
223                *mode = ListeningMode::FollowChain;
224            }
225            (
226                ListeningMode::EventsOnly(self_events),
227                Some(ListeningMode::EventsOnly(other_events)),
228            ) => {
229                self_events.extend(other_events);
230            }
231        }
232    }
233
234    /// Returns whether this mode implies follow-only behavior (i.e., not participating in
235    /// consensus rounds).
236    pub fn is_follow_only(&self) -> bool {
237        !matches!(self, ListeningMode::FullChain)
238    }
239
240    /// Returns whether this is a full chain mode (synchronizing sender chains and updating
241    /// inboxes).
242    pub fn is_full(&self) -> bool {
243        matches!(self, ListeningMode::FullChain)
244    }
245}
246
247/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
248pub struct Client<Env: Environment> {
249    environment: Env,
250    /// Local node to manage the execution state and the local storage of the chains that we are
251    /// tracking.
252    pub local_node: LocalNodeClient<Env::Storage>,
253    /// Manages the requests sent to validator nodes.
254    requests_scheduler: RequestsScheduler<Env>,
255    /// The admin chain ID.
256    admin_chain_id: ChainId,
257    /// Chains that should be tracked by the client, along with their listening mode.
258    /// The presence of a chain in this map means it is tracked by the local node.
259    chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
260    /// References to clients waiting for chain notifications.
261    notifier: Arc<ChannelNotifier<Notification>>,
262    /// Chain state for the managed chains.
263    chains: papaya::HashMap<ChainId, ChainClientState>,
264    /// Configuration options.
265    options: ChainClientOptions,
266}
267
268impl<Env: Environment> Client<Env> {
269    /// Creates a new `Client` with a new cache and notifiers.
270    #[expect(clippy::too_many_arguments)]
271    #[instrument(level = "trace", skip_all)]
272    pub fn new(
273        environment: Env,
274        admin_chain_id: ChainId,
275        long_lived_services: bool,
276        chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
277        name: impl Into<String>,
278        chain_worker_ttl: Duration,
279        sender_chain_worker_ttl: Duration,
280        options: ChainClientOptions,
281        requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
282    ) -> Self {
283        let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect()));
284        let state = WorkerState::new_for_client(
285            name.into(),
286            environment.storage().clone(),
287            chain_modes.clone(),
288        )
289        .with_long_lived_services(long_lived_services)
290        .with_allow_inactive_chains(true)
291        .with_allow_messages_from_deprecated_epochs(true)
292        .with_chain_worker_ttl(chain_worker_ttl)
293        .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
294        let local_node = LocalNodeClient::new(state);
295        let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
296
297        Self {
298            environment,
299            local_node,
300            requests_scheduler,
301            chains: papaya::HashMap::new(),
302            admin_chain_id,
303            chain_modes,
304            notifier: Arc::new(ChannelNotifier::default()),
305            options,
306        }
307    }
308
309    /// Returns the chain ID of the admin chain.
310    pub fn admin_chain_id(&self) -> ChainId {
311        self.admin_chain_id
312    }
313
314    /// Subscribes to notifications for the given chain IDs.
315    pub fn subscribe(
316        &self,
317        chain_ids: Vec<ChainId>,
318    ) -> tokio::sync::mpsc::UnboundedReceiver<Notification> {
319        self.notifier.subscribe(chain_ids)
320    }
321
322    /// Adds additional chain IDs to an existing subscription.
323    pub fn subscribe_extra(
324        &self,
325        chain_ids: Vec<ChainId>,
326        sender: &tokio::sync::mpsc::UnboundedSender<Notification>,
327    ) {
328        self.notifier.add_sender(chain_ids, sender);
329    }
330
331    /// Returns the storage client used by this client's local node.
332    pub fn storage_client(&self) -> &Env::Storage {
333        self.environment.storage()
334    }
335
336    pub fn validator_node_provider(&self) -> &Env::Network {
337        self.environment.network()
338    }
339
340    /// Returns a reference to the client's [`Signer`][crate::environment::Signer].
341    #[instrument(level = "trace", skip(self))]
342    pub fn signer(&self) -> &Env::Signer {
343        self.environment.signer()
344    }
345
346    /// Returns whether the signer has a key for the given owner.
347    pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, ChainClientError> {
348        self.signer()
349            .contains_key(owner)
350            .await
351            .map_err(ChainClientError::signer_failure)
352    }
353
354    /// Returns a reference to the client's [`Wallet`][crate::environment::Wallet].
355    pub fn wallet(&self) -> &Env::Wallet {
356        self.environment.wallet()
357    }
358
359    /// Returns whether the given chain is in follow-only mode (no owner key in the wallet).
360    ///
361    /// If the chain is not in the wallet, returns `true` since we don't have an owner key
362    /// for it.
363    async fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
364        match self.wallet().get(chain_id).await {
365            Ok(Some(chain)) => chain.owner.is_none(),
366            // Chain not in wallet or error: treat as follow-only.
367            Ok(None) | Err(_) => true,
368        }
369    }
370
371    /// Extends the listening mode for a chain, combining with the existing mode if present.
372    /// Returns the resulting mode.
373    #[instrument(level = "trace", skip(self))]
374    pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
375        let mut chain_modes = self
376            .chain_modes
377            .write()
378            .expect("Panics should not happen while holding a lock to `chain_modes`");
379        let entry = chain_modes.entry(chain_id).or_insert(mode.clone());
380        entry.extend(Some(mode));
381        entry.clone()
382    }
383
384    /// Returns the listening mode for a chain, if it is tracked.
385    pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
386        self.chain_modes
387            .read()
388            .expect("Panics should not happen while holding a lock to `chain_modes`")
389            .get(&chain_id)
390            .cloned()
391    }
392
393    /// Returns whether a chain is fully tracked by the local node.
394    pub fn is_tracked(&self, chain_id: ChainId) -> bool {
395        self.chain_modes
396            .read()
397            .expect("Panics should not happen while holding a lock to `chain_modes`")
398            .get(&chain_id)
399            .is_some_and(ListeningMode::is_full)
400    }
401
402    /// Creates a new `ChainClient`.
403    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
404    pub fn create_chain_client(
405        self: &Arc<Self>,
406        chain_id: ChainId,
407        block_hash: Option<CryptoHash>,
408        next_block_height: BlockHeight,
409        pending_proposal: Option<PendingProposal>,
410        preferred_owner: Option<AccountOwner>,
411        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
412    ) -> ChainClient<Env> {
413        // If the entry already exists we assume that the entry is more up to date than
414        // the arguments: If they were read from the wallet file, they might be stale.
415        self.chains
416            .pin()
417            .get_or_insert_with(chain_id, || ChainClientState::new(pending_proposal.clone()));
418
419        ChainClient {
420            client: self.clone(),
421            chain_id,
422            options: self.options.clone(),
423            preferred_owner,
424            initial_block_hash: block_hash,
425            initial_next_block_height: next_block_height,
426            timing_sender,
427        }
428    }
429
430    /// Fetches the chain description blob if needed, and returns the chain info.
431    async fn fetch_chain_info(
432        &self,
433        chain_id: ChainId,
434        validators: &[RemoteNode<Env::ValidatorNode>],
435    ) -> Result<Box<ChainInfo>, ChainClientError> {
436        match self.local_node.chain_info(chain_id).await {
437            Ok(info) => Ok(info),
438            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
439                // Make sure the admin chain is up to date.
440                Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
441                // If the chain is missing then the error is a WorkerError
442                // and so a BlobsNotFound
443                self.update_local_node_with_blobs_from(blob_ids, validators)
444                    .await?;
445                Ok(self.local_node.chain_info(chain_id).await?)
446            }
447            Err(err) => Err(err.into()),
448        }
449    }
450
451    /// Downloads and processes all certificates up to (excluding) the specified height.
452    #[instrument(level = "trace", skip(self))]
453    async fn download_certificates(
454        &self,
455        chain_id: ChainId,
456        target_next_block_height: BlockHeight,
457    ) -> Result<Box<ChainInfo>, ChainClientError> {
458        let validators = self.validator_nodes().await?;
459        let mut info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
460        if target_next_block_height <= info.next_block_height {
461            return Ok(info);
462        }
463        if let Some(new_info) = self
464            .download_certificates_using_all(&validators, chain_id, target_next_block_height)
465            .await?
466        {
467            info = new_info;
468        }
469        ensure!(
470            target_next_block_height <= info.next_block_height,
471            ChainClientError::CannotDownloadCertificates {
472                chain_id,
473                target_next_block_height,
474            }
475        );
476        Ok(info)
477    }
478
479    /// Downloads and processes all certificates up to (excluding) the specified height from the
480    /// given validator.
481    #[instrument(level = "trace", skip_all)]
482    async fn download_certificates_from(
483        &self,
484        remote_node: &RemoteNode<Env::ValidatorNode>,
485        chain_id: ChainId,
486        stop: BlockHeight,
487    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
488        let mut last_info = None;
489        // First load any blocks from local storage, if available.
490        let chain_info = self.local_node.chain_info(chain_id).await?;
491        let mut next_height = chain_info.next_block_height;
492        let hashes = self
493            .local_node
494            .get_preprocessed_block_hashes(chain_id, next_height, stop)
495            .await?;
496        let certificates = self.storage_client().read_certificates(&hashes).await?;
497        let certificates = match ResultReadCertificates::new(certificates, hashes) {
498            ResultReadCertificates::Certificates(certificates) => certificates,
499            ResultReadCertificates::InvalidHashes(hashes) => {
500                return Err(ChainClientError::ReadCertificatesError(hashes))
501            }
502        };
503        for certificate in certificates {
504            last_info = Some(self.handle_certificate(certificate).await?.info);
505        }
506        // Now download the rest in batches from the remote node.
507        while next_height < stop {
508            // TODO(#2045): Analyze network errors instead of using a fixed batch size.
509            let limit = u64::from(stop)
510                .checked_sub(u64::from(next_height))
511                .ok_or(ArithmeticError::Overflow)?
512                .min(self.options.certificate_download_batch_size);
513
514            let certificates = self
515                .requests_scheduler
516                .download_certificates(remote_node, chain_id, next_height, limit)
517                .await?;
518            let Some(info) = self.process_certificates(remote_node, certificates).await? else {
519                break;
520            };
521            assert!(info.next_block_height > next_height);
522            next_height = info.next_block_height;
523            last_info = Some(info);
524        }
525        Ok(last_info)
526    }
527
528    /// Downloads and processes certificates up to `stop`, using all validators with
529    /// staggered concurrent requests for each batch. First loads locally available
530    /// certificates from storage, then downloads the rest.
531    #[instrument(level = "trace", skip_all)]
532    async fn download_certificates_using_all(
533        &self,
534        validators: &[RemoteNode<Env::ValidatorNode>],
535        chain_id: ChainId,
536        stop: BlockHeight,
537    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
538        let mut last_info = None;
539        // First load any blocks from local storage, if available.
540        let chain_info = self.local_node.chain_info(chain_id).await?;
541        let mut next_height = chain_info.next_block_height;
542        let hashes = self
543            .local_node
544            .get_preprocessed_block_hashes(chain_id, next_height, stop)
545            .await?;
546        let certificates = self.storage_client().read_certificates(&hashes).await?;
547        let certificates = match ResultReadCertificates::new(certificates, hashes) {
548            ResultReadCertificates::Certificates(certificates) => certificates,
549            ResultReadCertificates::InvalidHashes(hashes) => {
550                return Err(ChainClientError::ReadCertificatesError(hashes))
551            }
552        };
553        for certificate in certificates {
554            last_info = Some(self.handle_certificate(certificate).await?.info);
555        }
556        // Download remaining batches using all validators with staggered fallback.
557        while next_height < stop {
558            let limit = u64::from(stop)
559                .checked_sub(u64::from(next_height))
560                .ok_or(ArithmeticError::Overflow)?
561                .min(self.options.certificate_download_batch_size);
562            let certificates = self
563                .requests_scheduler
564                .download_certificates_from_validators(
565                    validators,
566                    chain_id,
567                    next_height,
568                    limit,
569                    self.options.certificate_batch_download_timeout,
570                )
571                .await?;
572            let Some(info) = self
573                .process_certificates_using_all(validators, certificates)
574                .await?
575            else {
576                break;
577            };
578            assert!(info.next_block_height > next_height);
579            next_height = info.next_block_height;
580            last_info = Some(info);
581        }
582        Ok(last_info)
583    }
584
585    /// Processes certificates, downloading missing blobs from any of the given validators.
586    #[instrument(level = "trace", skip_all)]
587    async fn process_certificates_using_all(
588        &self,
589        validators: &[RemoteNode<Env::ValidatorNode>],
590        certificates: Vec<ConfirmedBlockCertificate>,
591    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
592        let mut info = None;
593        let required_blob_ids: Vec<_> = certificates
594            .iter()
595            .flat_map(|certificate| certificate.value().required_blob_ids())
596            .collect();
597
598        match self
599            .local_node
600            .read_blob_states_from_storage(&required_blob_ids)
601            .await
602        {
603            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
604                self.download_blobs(validators, &blob_ids).await?;
605            }
606            x => {
607                x?;
608            }
609        }
610
611        for certificate in certificates {
612            info = Some(
613                match self.handle_certificate(certificate.clone()).await {
614                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
615                        self.download_blobs(validators, &blob_ids).await?;
616                        self.handle_certificate(certificate).await?
617                    }
618                    x => x?,
619                }
620                .info,
621            );
622        }
623
624        Ok(info)
625    }
626
627    async fn download_blobs(
628        &self,
629        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
630        blob_ids: &[BlobId],
631    ) -> Result<(), ChainClientError> {
632        let blobs = &self
633            .requests_scheduler
634            .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
635            .await?
636            .ok_or_else(|| {
637                ChainClientError::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
638            })?;
639        self.local_node.store_blobs(blobs).await.map_err(Into::into)
640    }
641
642    /// Downloads publisher chain certificates starting from height 0 until all the
643    /// given events are locally available. For each unique publisher chain referenced
644    /// in the event IDs, certificates are downloaded in batches and after each batch
645    /// we check whether the missing events have been found, stopping early.
646    #[instrument(level = "trace", skip_all)]
647    async fn download_publisher_chains_for_events(
648        &self,
649        event_ids: &[EventId],
650    ) -> Result<(), ChainClientError> {
651        // Group events by publisher chain.
652        let mut events_by_chain: BTreeMap<ChainId, Vec<EventId>> = BTreeMap::new();
653        for event_id in event_ids {
654            events_by_chain
655                .entry(event_id.chain_id)
656                .or_default()
657                .push(event_id.clone());
658        }
659        for (chain_id, chain_event_ids) in events_by_chain {
660            self.download_chain_until_events_found(chain_id, &chain_event_ids)
661                .await?;
662        }
663        Ok(())
664    }
665
666    /// Downloads certificates for `chain_id` one at a time, stopping as soon as all
667    /// requested events are locally available or all validators fail to provide the
668    /// next height (i.e. the chain is exhausted).
669    /// Uses all validators with staggered concurrent requests for each height.
670    #[instrument(level = "trace", skip_all, fields(chain_id, num_events = event_ids.len()))]
671    async fn download_chain_until_events_found(
672        &self,
673        chain_id: ChainId,
674        event_ids: &[EventId],
675    ) -> Result<(), ChainClientError> {
676        let validators = self.validator_nodes().await?;
677        let info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
678        let mut next_height = info.next_block_height;
679        // Download one certificate at a time so we never overshoot the chain length.
680        loop {
681            let result = self
682                .requests_scheduler
683                .download_certificates_from_validators(
684                    &validators,
685                    chain_id,
686                    next_height,
687                    1,
688                    self.options.certificate_batch_download_timeout,
689                )
690                .await;
691            let certificates = match result {
692                Ok(certificates) => certificates,
693                Err(_) => break, // No validator has a certificate at this height.
694            };
695            let Some(batch_info) = self
696                .process_certificates_using_all(&validators, certificates)
697                .await?
698            else {
699                break;
700            };
701            assert!(batch_info.next_block_height > next_height);
702            next_height = batch_info.next_block_height;
703            if self.has_all_events(event_ids).await? {
704                return Ok(());
705            }
706        }
707        Ok(())
708    }
709
710    /// Returns `true` if all the given events exist in local storage.
711    async fn has_all_events(&self, event_ids: &[EventId]) -> Result<bool, ChainClientError> {
712        for event_id in event_ids {
713            if !self
714                .storage_client()
715                .contains_event(event_id.clone())
716                .await?
717            {
718                return Ok(false);
719            }
720        }
721        Ok(true)
722    }
723
724    /// Tries to process all the certificates, requesting any missing blobs from the given node.
725    /// Returns the chain info of the last successfully processed certificate.
726    #[instrument(level = "trace", skip_all)]
727    async fn process_certificates(
728        &self,
729        remote_node: &RemoteNode<Env::ValidatorNode>,
730        certificates: Vec<ConfirmedBlockCertificate>,
731    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
732        let mut info = None;
733        let required_blob_ids: Vec<_> = certificates
734            .iter()
735            .flat_map(|certificate| certificate.value().required_blob_ids())
736            .collect();
737
738        match self
739            .local_node
740            .read_blob_states_from_storage(&required_blob_ids)
741            .await
742        {
743            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
744                self.download_blobs(&[remote_node.clone()], &blob_ids)
745                    .await?;
746            }
747            x => {
748                x?;
749            }
750        }
751
752        for certificate in certificates {
753            info = Some(
754                match self.handle_certificate(certificate.clone()).await {
755                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
756                        self.download_blobs(&[remote_node.clone()], &blob_ids)
757                            .await?;
758                        self.handle_certificate(certificate).await?
759                    }
760                    x => x?,
761                }
762                .info,
763            );
764        }
765
766        // Done with all certificates.
767        Ok(info)
768    }
769
770    async fn handle_certificate<T: ProcessableCertificate>(
771        &self,
772        certificate: GenericCertificate<T>,
773    ) -> Result<ChainInfoResponse, LocalNodeError> {
774        let chain_id = certificate.inner().chain_id();
775        let response = self
776            .local_node
777            .handle_certificate(certificate, &self.notifier)
778            .await?;
779        if self.is_tracked(chain_id) {
780            self.update_publisher_chain_modes(chain_id).await?;
781        }
782        Ok(response)
783    }
784
785    /// Registers publisher chains in `EventsOnly` listening mode based on the event
786    /// subscriptions of the given chain.
787    async fn update_publisher_chain_modes(&self, chain_id: ChainId) -> Result<(), LocalNodeError> {
788        let subscriptions = self.local_node.get_event_subscriptions(chain_id).await?;
789        let mut publishers = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
790        for ((publisher_id, stream_name), _) in subscriptions {
791            publishers
792                .entry(publisher_id)
793                .or_default()
794                .insert(stream_name);
795        }
796        if chain_id != self.admin_chain_id {
797            publishers.entry(self.admin_chain_id).or_default();
798        }
799        for (publisher_id, streams) in publishers {
800            if publisher_id != chain_id {
801                self.extend_chain_mode(publisher_id, ListeningMode::EventsOnly(streams));
802            }
803        }
804        Ok(())
805    }
806
807    async fn chain_info_with_committees(
808        &self,
809        chain_id: ChainId,
810    ) -> Result<Box<ChainInfo>, LocalNodeError> {
811        let query = ChainInfoQuery::new(chain_id).with_committees();
812        let info = self.local_node.handle_chain_info_query(query).await?.info;
813        Ok(info)
814    }
815
816    /// Obtains all the committees trusted by any of the given chains. Also returns the highest
817    /// of their epochs.
818    #[instrument(level = "trace", skip_all)]
819    async fn admin_committees(
820        &self,
821    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
822        let info = self.chain_info_with_committees(self.admin_chain_id).await?;
823        Ok((info.epoch, info.into_committees()?))
824    }
825
826    /// Obtains the committee for the latest epoch on the admin chain.
827    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
828        let info = self.chain_info_with_committees(self.admin_chain_id).await?;
829        Ok((info.epoch, info.into_current_committee()?))
830    }
831
832    /// Obtains the validators for the latest epoch.
833    async fn validator_nodes(
834        &self,
835    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, ChainClientError> {
836        let (_, committee) = self.admin_committee().await?;
837        Ok(self.make_nodes(&committee)?)
838    }
839
840    /// Creates a [`RemoteNode`] for each validator in the committee.
841    fn make_nodes(
842        &self,
843        committee: &Committee,
844    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
845        Ok(self
846            .validator_node_provider()
847            .make_nodes(committee)?
848            .map(|(public_key, node)| RemoteNode { public_key, node })
849            .collect())
850    }
851
852    /// Ensures that the client has the `ChainDescription` blob corresponding to this
853    /// client's `ChainId`, and returns the chain description blob.
854    pub async fn get_chain_description_blob(
855        &self,
856        chain_id: ChainId,
857    ) -> Result<Blob, ChainClientError> {
858        let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
859        let blob = self
860            .local_node
861            .storage_client()
862            .read_blob(chain_desc_id)
863            .await?;
864        if let Some(blob) = blob {
865            // We have the blob - return it.
866            return Ok(blob);
867        }
868        // Recover history from the current validators, according to the admin chain.
869        Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
870        let nodes = self.validator_nodes().await?;
871        Ok(self
872            .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
873            .await?
874            .pop()
875            .unwrap()) // Returns exactly as many blobs as passed-in IDs.
876    }
877
878    /// Ensures that the client has the `ChainDescription` blob corresponding to this
879    /// client's `ChainId`, and returns the chain description.
880    pub async fn get_chain_description(
881        &self,
882        chain_id: ChainId,
883    ) -> Result<ChainDescription, ChainClientError> {
884        let blob = self.get_chain_description_blob(chain_id).await?;
885        Ok(bcs::from_bytes(blob.bytes())?)
886    }
887
888    /// Updates the latest block and next block height and round information from the chain info.
889    #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
890    fn update_from_info(&self, info: &ChainInfo) {
891        self.chains.pin().update(info.chain_id, |state| {
892            let mut state = state.clone_for_update_unchecked();
893            state.update_from_info(info);
894            state
895        });
896    }
897
898    /// Handles the certificate in the local node and the resulting notifications.
899    #[instrument(level = "trace", skip_all)]
900    async fn process_certificate<T: ProcessableCertificate>(
901        &self,
902        certificate: Box<GenericCertificate<T>>,
903    ) -> Result<(), LocalNodeError> {
904        let info = self.handle_certificate(*certificate).await?.info;
905        self.update_from_info(&info);
906        Ok(())
907    }
908
909    /// Submits a validated block for finalization and returns the confirmed block certificate.
910    #[instrument(level = "trace", skip_all)]
911    async fn finalize_block(
912        self: &Arc<Self>,
913        committee: &Committee,
914        certificate: ValidatedBlockCertificate,
915    ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
916        debug!(round = %certificate.round, "Submitting block for confirmation");
917        let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
918        let finalize_action = CommunicateAction::FinalizeBlock {
919            certificate: Box::new(certificate),
920            delivery: self.options.cross_chain_message_delivery,
921        };
922        let certificate = self
923            .communicate_chain_action(committee, finalize_action, hashed_value)
924            .await?;
925        self.receive_certificate_with_checked_signatures(certificate.clone())
926            .await?;
927        Ok(certificate)
928    }
929
930    /// Submits a block proposal to the validators.
931    #[instrument(level = "trace", skip_all)]
932    async fn submit_block_proposal<T: ProcessableCertificate>(
933        self: &Arc<Self>,
934        committee: &Committee,
935        proposal: Box<BlockProposal>,
936        value: T,
937    ) -> Result<GenericCertificate<T>, ChainClientError> {
938        debug!(
939            round = %proposal.content.round,
940            "Submitting block proposal to validators"
941        );
942
943        // Check if the block timestamp is in the future and log INFO.
944        let block_timestamp = proposal.content.block.timestamp;
945        let local_time = self.local_node.storage_client().clock().current_time();
946        if block_timestamp > local_time {
947            info!(
948                chain_id = %proposal.content.block.chain_id,
949                %block_timestamp,
950                %local_time,
951                "Block timestamp is in the future; waiting until it can be proposed",
952            );
953        }
954
955        // Create channel for clock skew reports from validators.
956        let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
957        let submit_action = CommunicateAction::SubmitBlock {
958            proposal,
959            blob_ids: value.required_blob_ids().into_iter().collect(),
960            clock_skew_sender,
961        };
962
963        // Spawn a task to monitor clock skew reports and warn if threshold is reached.
964        let validity_threshold = committee.validity_threshold();
965        let committee_clone = committee.clone();
966        let clock_skew_check_handle = linera_base::Task::spawn(async move {
967            let mut skew_weight = 0u64;
968            let mut min_skew = TimeDelta::MAX;
969            let mut max_skew = TimeDelta::ZERO;
970            while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
971                if clock_skew.as_micros() > 0 {
972                    skew_weight += committee_clone.weight(&public_key);
973                    min_skew = min_skew.min(clock_skew);
974                    max_skew = max_skew.max(clock_skew);
975                    if skew_weight >= validity_threshold {
976                        warn!(
977                            skew_weight,
978                            validity_threshold,
979                            min_skew_ms = min_skew.as_micros() / 1000,
980                            max_skew_ms = max_skew.as_micros() / 1000,
981                            "A validity threshold of validators reported clock skew; \
982                             consider checking your system clock",
983                        );
984                        return;
985                    }
986                }
987            }
988        });
989
990        let certificate = self
991            .communicate_chain_action(committee, submit_action, value)
992            .await?;
993
994        clock_skew_check_handle.await;
995
996        self.process_certificate(Box::new(certificate.clone()))
997            .await?;
998        Ok(certificate)
999    }
1000
1001    /// Broadcasts certified blocks to validators.
1002    #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
1003    async fn communicate_chain_updates(
1004        self: &Arc<Self>,
1005        committee: &Committee,
1006        chain_id: ChainId,
1007        height: BlockHeight,
1008        delivery: CrossChainMessageDelivery,
1009        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
1010    ) -> Result<(), ChainClientError> {
1011        let nodes = self.make_nodes(committee)?;
1012        communicate_with_quorum(
1013            &nodes,
1014            committee,
1015            |_: &()| (),
1016            |remote_node| {
1017                let mut updater = ValidatorUpdater {
1018                    remote_node,
1019                    client: self.clone(),
1020                    admin_chain_id: self.admin_chain_id,
1021                };
1022                let certificate = latest_certificate.clone();
1023                Box::pin(async move {
1024                    updater
1025                        .send_chain_information(chain_id, height, delivery, certificate)
1026                        .await
1027                })
1028            },
1029            self.options.quorum_grace_period,
1030        )
1031        .await?;
1032        Ok(())
1033    }
1034
1035    /// Broadcasts certified blocks and optionally a block proposal, certificate or
1036    /// leader timeout request.
1037    ///
1038    /// In that case, it verifies that the validator votes are for the provided value,
1039    /// and returns a certificate.
1040    #[instrument(level = "trace", skip_all)]
1041    async fn communicate_chain_action<T: CertificateValue>(
1042        self: &Arc<Self>,
1043        committee: &Committee,
1044        action: CommunicateAction,
1045        value: T,
1046    ) -> Result<GenericCertificate<T>, ChainClientError> {
1047        let nodes = self.make_nodes(committee)?;
1048        let ((votes_hash, votes_round), votes) = communicate_with_quorum(
1049            &nodes,
1050            committee,
1051            |vote: &LiteVote| (vote.value.value_hash, vote.round),
1052            |remote_node| {
1053                let mut updater = ValidatorUpdater {
1054                    remote_node,
1055                    client: self.clone(),
1056                    admin_chain_id: self.admin_chain_id,
1057                };
1058                let action = action.clone();
1059                Box::pin(async move { updater.send_chain_update(action).await })
1060            },
1061            self.options.quorum_grace_period,
1062        )
1063        .await?;
1064        ensure!(
1065            (votes_hash, votes_round) == (value.hash(), action.round()),
1066            ChainClientError::UnexpectedQuorum {
1067                hash: votes_hash,
1068                round: votes_round,
1069                expected_hash: value.hash(),
1070                expected_round: action.round(),
1071            }
1072        );
1073        // Certificate is valid because
1074        // * `communicate_with_quorum` ensured a sufficient "weight" of
1075        // (non-error) answers were returned by validators.
1076        // * each answer is a vote signed by the expected validator.
1077        let certificate = LiteCertificate::try_from_votes(votes)
1078            .ok_or_else(|| {
1079                ChainClientError::InternalError("Vote values or rounds don't match; this is a bug")
1080            })?
1081            .with_value(value)
1082            .ok_or_else(|| {
1083                ChainClientError::ProtocolError("A quorum voted for an unexpected value")
1084            })?;
1085        Ok(certificate)
1086    }
1087
1088    /// Processes the confirmed block certificate in the local node without checking signatures.
1089    /// Also downloads and processes all ancestors that are still missing.
1090    #[instrument(level = "trace", skip_all)]
1091    async fn receive_certificate_with_checked_signatures(
1092        &self,
1093        certificate: ConfirmedBlockCertificate,
1094    ) -> Result<(), ChainClientError> {
1095        let certificate = Box::new(certificate);
1096        let block = certificate.block();
1097        // Recover history from the network.
1098        self.download_certificates(block.header.chain_id, block.header.height)
1099            .await?;
1100        // Process the received operations. Download required hashed certificate values if
1101        // necessary.
1102        if let Err(err) = self.process_certificate(certificate.clone()).await {
1103            match &err {
1104                LocalNodeError::BlobsNotFound(blob_ids) => {
1105                    self.download_blobs(&self.validator_nodes().await?, blob_ids)
1106                        .await
1107                        .map_err(|_| err)?;
1108                    self.process_certificate(certificate).await?;
1109                }
1110                _ => {
1111                    // The certificate is not as expected. Give up.
1112                    warn!("Failed to process network hashed certificate value");
1113                    return Err(err.into());
1114                }
1115            }
1116        }
1117
1118        Ok(())
1119    }
1120
1121    /// Processes the confirmed block in the local node, possibly without executing it.
1122    #[instrument(level = "trace", skip_all)]
1123    async fn receive_sender_certificate(
1124        &self,
1125        certificate: ConfirmedBlockCertificate,
1126        mode: ReceiveCertificateMode,
1127        nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
1128    ) -> Result<(), ChainClientError> {
1129        // Verify the certificate before doing any expensive networking.
1130        let (max_epoch, committees) = self.admin_committees().await?;
1131        if let ReceiveCertificateMode::NeedsCheck = mode {
1132            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
1133        }
1134        // Recover history from the network.
1135        let nodes = if let Some(nodes) = nodes {
1136            nodes
1137        } else {
1138            self.validator_nodes().await?
1139        };
1140        if let Err(err) = self.handle_certificate(certificate.clone()).await {
1141            match &err {
1142                LocalNodeError::BlobsNotFound(blob_ids) => {
1143                    self.download_blobs(&nodes, blob_ids).await?;
1144                    self.handle_certificate(certificate.clone()).await?;
1145                }
1146                _ => {
1147                    // The certificate is not as expected. Give up.
1148                    warn!("Failed to process network hashed certificate value");
1149                    return Err(err.into());
1150                }
1151            }
1152        }
1153
1154        Ok(())
1155    }
1156
1157    /// Downloads and processes certificates for sender chain blocks.
1158    #[instrument(level = "trace", skip_all)]
1159    async fn download_and_process_sender_chain(
1160        &self,
1161        sender_chain_id: ChainId,
1162        nodes: &[RemoteNode<Env::ValidatorNode>],
1163        received_log: &ReceivedLogs,
1164        mut remote_heights: Vec<BlockHeight>,
1165        sender: mpsc::UnboundedSender<ChainAndHeight>,
1166    ) {
1167        let (max_epoch, committees) = match self.admin_committees().await {
1168            Ok(result) => result,
1169            Err(error) => {
1170                error!(%error, %sender_chain_id, "could not read admin committees");
1171                return;
1172            }
1173        };
1174        let committees_ref = &committees;
1175        let mut nodes = nodes.to_vec();
1176        while !remote_heights.is_empty() {
1177            let remote_heights_ref = &remote_heights;
1178            nodes.shuffle(&mut rand::thread_rng());
1179            let certificates = match communicate_concurrently(
1180                &nodes,
1181                async move |remote_node| {
1182                    let mut remote_heights = remote_heights_ref.clone();
1183                    // No need trying to download certificates the validator didn't have in their
1184                    // log - we'll retry downloading the remaining ones next time we loop.
1185                    remote_heights.retain(|height| {
1186                        received_log.validator_has_block(
1187                            &remote_node.public_key,
1188                            sender_chain_id,
1189                            *height,
1190                        )
1191                    });
1192                    if remote_heights.is_empty() {
1193                        // It makes no sense to return `Ok(_)` if we aren't going to try downloading
1194                        // anything from the validator - let the function try the other validators
1195                        return Err(());
1196                    }
1197                    let certificates = self
1198                        .requests_scheduler
1199                        .download_certificates_by_heights(
1200                            &remote_node,
1201                            sender_chain_id,
1202                            remote_heights,
1203                        )
1204                        .await
1205                        .map_err(|_| ())?;
1206                    let mut certificates_with_check_results = vec![];
1207                    for cert in certificates {
1208                        if let Ok(check_result) =
1209                            Self::check_certificate(max_epoch, committees_ref, &cert)
1210                        {
1211                            certificates_with_check_results
1212                                .push((cert, check_result.into_result().is_ok()));
1213                        } else {
1214                            // Invalid signature - the validator is faulty
1215                            return Err(());
1216                        }
1217                    }
1218                    Ok(certificates_with_check_results)
1219                },
1220                |errors| {
1221                    errors
1222                        .into_iter()
1223                        .map(|(validator, _error)| validator)
1224                        .collect::<BTreeSet<_>>()
1225                },
1226                self.options.certificate_batch_download_timeout,
1227            )
1228            .await
1229            {
1230                Ok(certificates_with_check_results) => certificates_with_check_results,
1231                Err(faulty_validators) => {
1232                    // filter out faulty validators and retry if any are left
1233                    nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1234                    if nodes.is_empty() {
1235                        info!(
1236                            chain_id = %sender_chain_id,
1237                            "could not download certificates for chain - no more correct validators left"
1238                        );
1239                        return;
1240                    }
1241                    continue;
1242                }
1243            };
1244
1245            trace!(
1246                chain_id = %sender_chain_id,
1247                num_certificates = %certificates.len(),
1248                "received certificates",
1249            );
1250
1251            let mut to_remove_from_queue = BTreeSet::new();
1252
1253            for (certificate, check_result) in certificates {
1254                let hash = certificate.hash();
1255                let chain_id = certificate.block().header.chain_id;
1256                let height = certificate.block().header.height;
1257                if !check_result {
1258                    // The certificate was correctly signed, but we were missing a committee to
1259                    // validate it properly - do not receive it, but also do not attempt to
1260                    // re-download it.
1261                    to_remove_from_queue.insert(height);
1262                    continue;
1263                }
1264                // We checked the certificates right after downloading them.
1265                let mode = ReceiveCertificateMode::AlreadyChecked;
1266                if let Err(error) = self
1267                    .receive_sender_certificate(certificate, mode, None)
1268                    .await
1269                {
1270                    warn!(%error, %hash, "Received invalid certificate");
1271                } else {
1272                    to_remove_from_queue.insert(height);
1273                    if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1274                        error!(
1275                            %chain_id,
1276                            %height,
1277                            %error,
1278                            "failed to send chain and height over the channel",
1279                        );
1280                    }
1281                }
1282            }
1283
1284            remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1285        }
1286        trace!(
1287            chain_id = %sender_chain_id,
1288            "find_received_certificates: finished processing chain",
1289        );
1290    }
1291
1292    /// Downloads the log of received messages for a chain from a validator.
1293    #[instrument(level = "trace", skip(self))]
1294    async fn get_received_log_from_validator(
1295        &self,
1296        chain_id: ChainId,
1297        remote_node: &RemoteNode<Env::ValidatorNode>,
1298        tracker: u64,
1299    ) -> Result<Vec<ChainAndHeight>, ChainClientError> {
1300        let mut offset = tracker;
1301
1302        // Retrieve the list of newly received certificates from this validator.
1303        let mut remote_log = Vec::new();
1304        loop {
1305            trace!("get_received_log_from_validator: looping");
1306            let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1307            let info = remote_node.handle_chain_info_query(query).await?;
1308            let received_entries = info.requested_received_log.len();
1309            offset += received_entries as u64;
1310            remote_log.extend(info.requested_received_log);
1311            trace!(
1312                remote_node = remote_node.address(),
1313                %received_entries,
1314                "get_received_log_from_validator: received log batch",
1315            );
1316            if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1317                break;
1318            }
1319        }
1320
1321        trace!(
1322            remote_node = remote_node.address(),
1323            num_entries = remote_log.len(),
1324            "get_received_log_from_validator: returning downloaded log",
1325        );
1326
1327        Ok(remote_log)
1328    }
1329
1330    /// Downloads a specific sender block and recursively downloads any earlier blocks
1331    /// that also sent a message to our chain, based on `previous_message_blocks`.
1332    ///
1333    /// This ensures that we have all the sender blocks needed to preprocess the target block
1334    /// and put the messages to our chain into the outbox.
1335    async fn download_sender_block_with_sending_ancestors(
1336        &self,
1337        receiver_chain_id: ChainId,
1338        sender_chain_id: ChainId,
1339        height: BlockHeight,
1340        remote_node: &RemoteNode<Env::ValidatorNode>,
1341    ) -> Result<(), ChainClientError> {
1342        let next_outbox_height = self
1343            .local_node
1344            .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1345            .await?
1346            .get(&sender_chain_id)
1347            .copied()
1348            .unwrap_or(BlockHeight::ZERO);
1349        let (max_epoch, committees) = self.admin_committees().await?;
1350
1351        // Recursively collect all certificates we need, following
1352        // the chain of previous_message_blocks back to next_outbox_height.
1353        let mut certificates = BTreeMap::new();
1354        let mut current_height = height;
1355
1356        // Stop if we've reached the height we've already processed.
1357        while current_height >= next_outbox_height {
1358            // Download the certificate for this height.
1359            let downloaded = self
1360                .requests_scheduler
1361                .download_certificates_by_heights(
1362                    remote_node,
1363                    sender_chain_id,
1364                    vec![current_height],
1365                )
1366                .await?;
1367            let Some(certificate) = downloaded.into_iter().next() else {
1368                return Err(ChainClientError::CannotDownloadMissingSenderBlock {
1369                    chain_id: sender_chain_id,
1370                    height: current_height,
1371                });
1372            };
1373
1374            // Validate the certificate.
1375            Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1376                .into_result()?;
1377
1378            // Check if there's a previous message block to our chain.
1379            let block = certificate.block();
1380            let next_height = block
1381                .body
1382                .previous_message_blocks
1383                .get(&receiver_chain_id)
1384                .map(|(_prev_hash, prev_height)| *prev_height);
1385
1386            // Store this certificate.
1387            certificates.insert(current_height, certificate);
1388
1389            if let Some(prev_height) = next_height {
1390                // Continue with the previous block.
1391                current_height = prev_height;
1392            } else {
1393                // No more dependencies.
1394                break;
1395            }
1396        }
1397
1398        if certificates.is_empty() {
1399            self.local_node
1400                .retry_pending_cross_chain_requests(sender_chain_id)
1401                .await?;
1402        }
1403
1404        // Process certificates in ascending block height order (BTreeMap keeps them sorted).
1405        for certificate in certificates.into_values() {
1406            self.receive_sender_certificate(
1407                certificate,
1408                ReceiveCertificateMode::AlreadyChecked,
1409                Some(vec![remote_node.clone()]),
1410            )
1411            .await?;
1412        }
1413
1414        Ok(())
1415    }
1416
1417    /// Downloads event-bearing blocks for the given streams by walking the
1418    /// `previous_event_blocks` linked list backwards from `height`, stopping when we
1419    /// reach blocks that are already executed locally or whose events we already track.
1420    async fn download_event_bearing_blocks(
1421        &self,
1422        sender_chain_id: ChainId,
1423        height: BlockHeight,
1424        hash: CryptoHash,
1425        local_next_block_height: BlockHeight,
1426        subscribed_streams: &BTreeSet<StreamId>,
1427        remote_node: &RemoteNode<Env::ValidatorNode>,
1428    ) -> Result<(), ChainClientError> {
1429        let (max_epoch, committees) = self.admin_committees().await?;
1430
1431        let mut certificates = BTreeMap::new();
1432        let mut blocks_to_fetch = BTreeSet::<_>::from([(height, hash)]);
1433        let next_expected_events = subscribed_streams
1434            .iter()
1435            .zip(
1436                self.local_node
1437                    .chain_state_view(sender_chain_id)
1438                    .await?
1439                    .next_expected_events
1440                    .multi_get(subscribed_streams)
1441                    .await?
1442                    .into_iter()
1443                    .map(|maybe_index| maybe_index.unwrap_or_default()),
1444            )
1445            .collect::<BTreeMap<_, _>>();
1446
1447        while let Some((current_height, current_hash)) = blocks_to_fetch.pop_last() {
1448            if current_height < local_next_block_height {
1449                continue; // Already executed locally.
1450            }
1451            if certificates.contains_key(&current_height) {
1452                continue;
1453            }
1454
1455            let certificate = if let Some(certificate) =
1456                self.storage_client().read_certificate(current_hash).await?
1457            {
1458                certificate
1459            } else {
1460                let downloaded = self
1461                    .requests_scheduler
1462                    .download_certificates(remote_node, sender_chain_id, current_height, 1)
1463                    .await?;
1464                let Some(certificate) = downloaded.into_iter().next() else {
1465                    continue;
1466                };
1467
1468                Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1469                    .into_result()?;
1470
1471                certificate
1472            };
1473            let block = certificate.block();
1474            // Walk previous_event_blocks for subscribed streams.
1475            for stream_id in subscribed_streams {
1476                if let Some((prev_hash, prev_height)) =
1477                    block.body.previous_event_blocks.get(stream_id)
1478                {
1479                    if next_expected_events.get(stream_id).is_some_and(|index| {
1480                        block
1481                            .body
1482                            .events
1483                            .iter()
1484                            .flatten()
1485                            .find(|event| event.stream_id == *stream_id)
1486                            .is_some_and(|event| event.index == *index)
1487                    }) {
1488                        continue;
1489                    }
1490                    if !certificates.contains_key(prev_height) {
1491                        blocks_to_fetch.insert((*prev_height, *prev_hash));
1492                    }
1493                }
1494            }
1495
1496            certificates.insert(current_height, certificate);
1497        }
1498
1499        // Process in ascending height order.
1500        for certificate in certificates.into_values() {
1501            self.receive_sender_certificate(
1502                certificate,
1503                ReceiveCertificateMode::AlreadyChecked,
1504                Some(vec![remote_node.clone()]),
1505            )
1506            .await?;
1507        }
1508
1509        Ok(())
1510    }
1511
1512    #[instrument(
1513        level = "trace", skip_all,
1514        fields(certificate_hash = ?incoming_certificate.hash()),
1515    )]
1516    fn check_certificate(
1517        highest_known_epoch: Epoch,
1518        committees: &BTreeMap<Epoch, Committee>,
1519        incoming_certificate: &ConfirmedBlockCertificate,
1520    ) -> Result<CheckCertificateResult, NodeError> {
1521        let block = incoming_certificate.block();
1522        // Check that certificates are valid w.r.t one of our trusted committees.
1523        if block.header.epoch > highest_known_epoch {
1524            return Ok(CheckCertificateResult::FutureEpoch);
1525        }
1526        if let Some(known_committee) = committees.get(&block.header.epoch) {
1527            // This epoch is recognized by our chain. Let's verify the
1528            // certificate.
1529            incoming_certificate.check(known_committee)?;
1530            Ok(CheckCertificateResult::New)
1531        } else {
1532            // We don't accept a certificate from a committee that was retired.
1533            Ok(CheckCertificateResult::OldEpoch)
1534        }
1535    }
1536
1537    /// Downloads and processes any certificates we are missing for the given chain.
1538    ///
1539    /// If we are an owner of the chain, also synchronizes the consensus state.
1540    #[instrument(level = "trace", skip_all)]
1541    pub(crate) async fn synchronize_chain_state(
1542        &self,
1543        chain_id: ChainId,
1544    ) -> Result<Box<ChainInfo>, ChainClientError> {
1545        let (_, committee) = self.admin_committee().await?;
1546        Box::pin(self.synchronize_chain_state_from_committee(chain_id, committee)).await
1547    }
1548
1549    /// Downloads certificates for the given chain from the given committee.
1550    ///
1551    /// If the chain is not in follow-only mode, also fetches and processes manager values
1552    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1553    #[instrument(level = "trace", skip_all)]
1554    pub async fn synchronize_chain_state_from_committee(
1555        &self,
1556        chain_id: ChainId,
1557        committee: Committee,
1558    ) -> Result<Box<ChainInfo>, ChainClientError> {
1559        #[cfg(with_metrics)]
1560        let _latency = if !self.is_chain_follow_only(chain_id).await {
1561            Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1562        } else {
1563            None
1564        };
1565
1566        let validators = self.make_nodes(&committee)?;
1567        Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1568        communicate_with_quorum(
1569            &validators,
1570            &committee,
1571            |_: &()| (),
1572            |remote_node| async move {
1573                self.synchronize_chain_state_from(&remote_node, chain_id)
1574                    .await
1575            },
1576            self.options.quorum_grace_period,
1577        )
1578        .await?;
1579
1580        self.local_node
1581            .chain_info(chain_id)
1582            .await
1583            .map_err(Into::into)
1584    }
1585
1586    /// Downloads any certificates from the specified validator that we are missing for the given
1587    /// chain.
1588    ///
1589    /// If the chain is owned, also fetches and processes manager values
1590    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1591    #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1592    pub(crate) async fn synchronize_chain_state_from(
1593        &self,
1594        remote_node: &RemoteNode<Env::ValidatorNode>,
1595        chain_id: ChainId,
1596    ) -> Result<(), ChainClientError> {
1597        let with_manager_values = !self.is_chain_follow_only(chain_id).await;
1598        let query = if with_manager_values {
1599            ChainInfoQuery::new(chain_id).with_manager_values()
1600        } else {
1601            ChainInfoQuery::new(chain_id)
1602        };
1603        let remote_info = remote_node.handle_chain_info_query(query).await?;
1604        let local_info = self
1605            .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1606            .await?;
1607
1608        if !with_manager_values {
1609            return Ok(());
1610        }
1611
1612        // If we are at the same height as the remote node, we also update our chain manager.
1613        let local_height = match local_info {
1614            Some(info) => info.next_block_height,
1615            None => {
1616                self.local_node
1617                    .chain_info(chain_id)
1618                    .await?
1619                    .next_block_height
1620            }
1621        };
1622        if local_height != remote_info.next_block_height {
1623            debug!(
1624                remote_node = remote_node.address(),
1625                remote_height = %remote_info.next_block_height,
1626                local_height = %local_height,
1627                "synced from validator, but remote height and local height are different",
1628            );
1629            return Ok(());
1630        };
1631
1632        if let Some(timeout) = remote_info.manager.timeout {
1633            self.handle_certificate(*timeout).await?;
1634        }
1635        let mut proposals = Vec::new();
1636        if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1637            proposals.push(*proposal);
1638        }
1639        if let Some(proposal) = remote_info.manager.requested_proposed {
1640            proposals.push(*proposal);
1641        }
1642        if let Some(locking) = remote_info.manager.requested_locking {
1643            match *locking {
1644                LockingBlock::Fast(proposal) => {
1645                    proposals.push(proposal);
1646                }
1647                LockingBlock::Regular(cert) => {
1648                    let hash = cert.hash();
1649                    if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1650                    {
1651                        debug!(
1652                            remote_node = remote_node.address(),
1653                            %hash,
1654                            height = %local_height,
1655                            %error,
1656                            "skipping locked block from validator",
1657                        );
1658                    }
1659                }
1660            }
1661        }
1662        'proposal_loop: for proposal in proposals {
1663            let owner: AccountOwner = proposal.owner();
1664            if let Err(mut err) =
1665                Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1666            {
1667                if let LocalNodeError::BlobsNotFound(_) = &err {
1668                    let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1669                    if !required_blob_ids.is_empty() {
1670                        let mut blobs = Vec::new();
1671                        for blob_id in required_blob_ids {
1672                            let blob_content = match self
1673                                .requests_scheduler
1674                                .download_pending_blob(remote_node, chain_id, blob_id)
1675                                .await
1676                            {
1677                                Ok(content) => content,
1678                                Err(error) => {
1679                                    info!(
1680                                        remote_node = remote_node.address(),
1681                                        height = %local_height,
1682                                        proposer = %owner,
1683                                        %blob_id,
1684                                        %error,
1685                                        "skipping proposal from validator; failed to download blob",
1686                                    );
1687                                    continue 'proposal_loop;
1688                                }
1689                            };
1690                            blobs.push(Blob::new(blob_content));
1691                        }
1692                        self.local_node
1693                            .handle_pending_blobs(chain_id, blobs)
1694                            .await?;
1695                        // We found the missing blobs: retry.
1696                        if let Err(new_err) =
1697                            Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1698                        {
1699                            err = new_err;
1700                        } else {
1701                            continue;
1702                        }
1703                    }
1704                    if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1705                        self.update_local_node_with_blobs_from(
1706                            blob_ids.clone(),
1707                            &[remote_node.clone()],
1708                        )
1709                        .await?;
1710                        // We found the missing blobs: retry.
1711                        if let Err(new_err) =
1712                            Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1713                        {
1714                            err = new_err;
1715                        } else {
1716                            continue;
1717                        }
1718                    }
1719                }
1720                while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1721                    if let ChainError::MissingCrossChainUpdate {
1722                        chain_id,
1723                        origin,
1724                        height,
1725                    } = &**chain_err
1726                    {
1727                        self.download_sender_block_with_sending_ancestors(
1728                            *chain_id,
1729                            *origin,
1730                            *height,
1731                            remote_node,
1732                        )
1733                        .await?;
1734                        // Retry
1735                        if let Err(new_err) =
1736                            Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1737                        {
1738                            err = new_err;
1739                        } else {
1740                            continue 'proposal_loop;
1741                        }
1742                    } else {
1743                        break;
1744                    }
1745                }
1746
1747                debug!(
1748                    remote_node = remote_node.address(),
1749                    proposer = %owner,
1750                    height = %local_height,
1751                    error = %err,
1752                    "skipping proposal from validator",
1753                );
1754            }
1755        }
1756        Ok(())
1757    }
1758
1759    async fn try_process_locking_block_from(
1760        &self,
1761        remote_node: &RemoteNode<Env::ValidatorNode>,
1762        certificate: GenericCertificate<ValidatedBlock>,
1763    ) -> Result<(), ChainClientError> {
1764        let chain_id = certificate.inner().chain_id();
1765        let certificate = Box::new(certificate);
1766        match self.process_certificate(certificate.clone()).await {
1767            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1768                let mut blobs = Vec::new();
1769                for blob_id in blob_ids {
1770                    let blob_content = self
1771                        .requests_scheduler
1772                        .download_pending_blob(remote_node, chain_id, blob_id)
1773                        .await?;
1774                    blobs.push(Blob::new(blob_content));
1775                }
1776                self.local_node
1777                    .handle_pending_blobs(chain_id, blobs)
1778                    .await?;
1779                self.process_certificate(certificate).await?;
1780                Ok(())
1781            }
1782            Err(err) => Err(err.into()),
1783            Ok(()) => Ok(()),
1784        }
1785    }
1786
1787    /// Downloads and processes from the specified validators a confirmed block certificates that
1788    /// use the given blobs. If this succeeds, the blob will be in our storage.
1789    async fn update_local_node_with_blobs_from(
1790        &self,
1791        blob_ids: Vec<BlobId>,
1792        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1793    ) -> Result<Vec<Blob>, ChainClientError> {
1794        let timeout = self.options.blob_download_timeout;
1795        // Deduplicate IDs.
1796        let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1797        stream::iter(blob_ids.into_iter().map(|blob_id| {
1798            communicate_concurrently(
1799                remote_nodes,
1800                async move |remote_node| {
1801                    let certificate = self
1802                        .requests_scheduler
1803                        .download_certificate_for_blob(&remote_node, blob_id)
1804                        .await?;
1805                    self.receive_sender_certificate(
1806                        certificate,
1807                        ReceiveCertificateMode::NeedsCheck,
1808                        Some(vec![remote_node.clone()]),
1809                    )
1810                    .await?;
1811                    let blob = self
1812                        .local_node
1813                        .storage_client()
1814                        .read_blob(blob_id)
1815                        .await?
1816                        .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1817                    Result::<_, ChainClientError>::Ok(blob)
1818                },
1819                move |_| ChainClientError::from(NodeError::BlobsNotFound(vec![blob_id])),
1820                timeout,
1821            )
1822        }))
1823        .buffer_unordered(self.options.max_joined_tasks)
1824        .collect::<Vec<_>>()
1825        .await
1826        .into_iter()
1827        .collect()
1828    }
1829
1830    /// Attempts to execute the block locally. If any incoming message execution fails, that
1831    /// message is rejected and execution is retried, until the block accepts only messages
1832    /// that succeed.
1833    ///
1834    /// Attempts to execute the block locally with a specified policy for handling bundle failures.
1835    /// If any attempt to read a blob fails, the blob is downloaded and execution is retried.
1836    ///
1837    /// Returns the modified block (bundles may be rejected/removed based on the policy)
1838    /// and the execution result.
1839    #[instrument(level = "trace", skip(self, block))]
1840    async fn stage_block_execution_with_policy(
1841        &self,
1842        block: ProposedBlock,
1843        round: Option<u32>,
1844        published_blobs: Vec<Blob>,
1845        policy: BundleExecutionPolicy,
1846    ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1847        let mut downloaded_events = HashSet::<EventId>::new();
1848        loop {
1849            let result = self
1850                .local_node
1851                .stage_block_execution_with_policy(
1852                    block.clone(),
1853                    round,
1854                    published_blobs.clone(),
1855                    policy,
1856                )
1857                .await;
1858            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1859                let validators = self.validator_nodes().await?;
1860                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1861                    .await?;
1862                continue; // We found the missing blob: retry.
1863            }
1864            if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1865                let new_events: Vec<_> = event_ids
1866                    .iter()
1867                    .filter(|id| !downloaded_events.contains(id))
1868                    .cloned()
1869                    .collect();
1870                if !new_events.is_empty() {
1871                    self.download_publisher_chains_for_events(&new_events)
1872                        .await?;
1873                    downloaded_events.extend(new_events);
1874                    continue; // We downloaded new publisher chain data: retry.
1875                }
1876                // All reported events were already downloaded; don't loop forever.
1877            }
1878            if let Ok((_, executed_block, _, _)) = &result {
1879                let hash = CryptoHash::new(executed_block);
1880                let notification = Notification {
1881                    chain_id: executed_block.header.chain_id,
1882                    reason: Reason::BlockExecuted {
1883                        height: executed_block.header.height,
1884                        hash,
1885                    },
1886                };
1887                self.notifier.notify(&[notification]);
1888            }
1889            let (_modified_block, executed_block, response, _resource_tracker) = result?;
1890            return Ok((executed_block, response));
1891        }
1892    }
1893
1894    /// Attempts to execute the block locally. If any attempt to read a blob or event fails,
1895    /// the missing data is downloaded and execution is retried.
1896    #[instrument(level = "trace", skip(self, block))]
1897    async fn stage_block_execution(
1898        &self,
1899        block: ProposedBlock,
1900        round: Option<u32>,
1901        published_blobs: Vec<Blob>,
1902    ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1903        let mut downloaded_events = HashSet::<EventId>::new();
1904        loop {
1905            let result = self
1906                .local_node
1907                .stage_block_execution(block.clone(), round, published_blobs.clone())
1908                .await;
1909            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1910                let validators = self.validator_nodes().await?;
1911                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1912                    .await?;
1913                continue; // We found the missing blob: retry.
1914            }
1915            if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1916                let new_events: Vec<_> = event_ids
1917                    .iter()
1918                    .filter(|id| !downloaded_events.contains(id))
1919                    .cloned()
1920                    .collect();
1921                if !new_events.is_empty() {
1922                    self.download_publisher_chains_for_events(&new_events)
1923                        .await?;
1924                    downloaded_events.extend(new_events);
1925                    continue; // We downloaded new publisher chain data: retry.
1926                }
1927                // All reported events were already downloaded; don't loop forever.
1928            }
1929            if let Ok((block, _, _)) = &result {
1930                let hash = CryptoHash::new(block);
1931                let notification = Notification {
1932                    chain_id: block.header.chain_id,
1933                    reason: Reason::BlockExecuted {
1934                        height: block.header.height,
1935                        hash,
1936                    },
1937                };
1938                self.notifier.notify(&[notification]);
1939            }
1940            let (block, response, _resource_tracker) = result?;
1941            return Ok((block, response));
1942        }
1943    }
1944}
1945
1946#[derive(Debug, Clone, Copy)]
1947pub enum TimingType {
1948    ExecuteOperations,
1949    ExecuteBlock,
1950    SubmitBlockProposal,
1951    UpdateValidators,
1952}
1953
1954#[derive(Debug, Clone)]
1955pub struct ChainClientOptions {
1956    /// Maximum number of pending message bundles processed at a time in a block.
1957    pub max_pending_message_bundles: usize,
1958    /// Maximum number of message bundles to discard from a block proposal due to block limit
1959    /// errors before discarding all remaining bundles.
1960    ///
1961    /// Discarded bundles can be retried in the next block.
1962    pub max_block_limit_errors: u32,
1963    /// Maximum number of new stream events processed at a time in a block.
1964    pub max_new_events_per_block: usize,
1965    /// Time budget for staging message bundles. When set, limits bundle execution by
1966    /// wall-clock time, in addition to the count limit from `max_pending_message_bundles`.
1967    pub staging_bundles_time_budget: Option<Duration>,
1968    /// The policy for automatically handling incoming messages.
1969    pub message_policy: MessagePolicy,
1970    /// Whether to block on cross-chain message delivery.
1971    pub cross_chain_message_delivery: CrossChainMessageDelivery,
1972    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
1973    /// as a fraction of time taken to reach quorum.
1974    pub quorum_grace_period: f64,
1975    /// The delay when downloading a blob, after which we try a second validator.
1976    pub blob_download_timeout: Duration,
1977    /// The delay when downloading a batch of certificates, after which we try a second validator.
1978    pub certificate_batch_download_timeout: Duration,
1979    /// Maximum number of certificates that we download at a time from one validator when
1980    /// synchronizing one of our chains.
1981    pub certificate_download_batch_size: u64,
1982    /// Maximum number of sender certificates we try to download and receive in one go
1983    /// when syncing sender chains.
1984    pub sender_certificate_download_batch_size: usize,
1985    /// Maximum number of tasks that can be joined concurrently using buffer_unordered.
1986    pub max_joined_tasks: usize,
1987    /// Whether to allow creating blocks in the fast round. Fast blocks have lower latency but
1988    /// must be used carefully so that there are never any conflicting fast block proposals.
1989    pub allow_fast_blocks: bool,
1990}
1991
1992pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
1993pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
1994
1995#[cfg(with_testing)]
1996impl ChainClientOptions {
1997    pub fn test_default() -> Self {
1998        use crate::DEFAULT_QUORUM_GRACE_PERIOD;
1999
2000        ChainClientOptions {
2001            max_pending_message_bundles: 10,
2002            max_block_limit_errors: 3,
2003            max_new_events_per_block: 10,
2004            staging_bundles_time_budget: None,
2005            message_policy: MessagePolicy::new_accept_all(),
2006            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
2007            quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
2008            blob_download_timeout: Duration::from_secs(1),
2009            certificate_batch_download_timeout: Duration::from_secs(1),
2010            certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
2011            sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
2012            max_joined_tasks: 100,
2013            allow_fast_blocks: false,
2014        }
2015    }
2016}
2017
2018impl ChainClientOptions {
2019    /// Builds the [`BundleExecutionPolicy`] based on the client options.
2020    pub fn bundle_execution_policy(&self) -> BundleExecutionPolicy {
2021        BundleExecutionPolicy {
2022            on_failure: BundleFailurePolicy::AutoRetry {
2023                max_failures: self.max_block_limit_errors,
2024            },
2025            time_budget: self.staging_bundles_time_budget,
2026        }
2027    }
2028}
2029
2030/// Client to operate a chain by interacting with validators and the given local storage
2031/// implementation.
2032/// * The chain being operated is called the "local chain" or just the "chain".
2033/// * As a rule, operations are considered successful (and communication may stop) when
2034///   they succeeded in gathering a quorum of responses.
2035#[derive(Debug)]
2036pub struct ChainClient<Env: Environment> {
2037    /// The Linera [`Client`] that manages operations for this chain client.
2038    #[debug(skip)]
2039    client: Arc<Client<Env>>,
2040    /// The off-chain chain ID.
2041    chain_id: ChainId,
2042    /// The client options.
2043    #[debug(skip)]
2044    options: ChainClientOptions,
2045    /// The preferred owner of the chain used to sign proposals.
2046    /// `None` if we cannot propose on this chain.
2047    preferred_owner: Option<AccountOwner>,
2048    /// The next block height as read from the wallet.
2049    initial_next_block_height: BlockHeight,
2050    /// The last block hash as read from the wallet.
2051    initial_block_hash: Option<CryptoHash>,
2052    /// Optional timing sender for benchmarking.
2053    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
2054}
2055
2056impl<Env: Environment> Clone for ChainClient<Env> {
2057    fn clone(&self) -> Self {
2058        Self {
2059            client: self.client.clone(),
2060            chain_id: self.chain_id,
2061            options: self.options.clone(),
2062            preferred_owner: self.preferred_owner,
2063            initial_next_block_height: self.initial_next_block_height,
2064            initial_block_hash: self.initial_block_hash,
2065            timing_sender: self.timing_sender.clone(),
2066        }
2067    }
2068}
2069
2070/// Error type for [`ChainClient`].
2071#[derive(Debug, Error)]
2072pub enum ChainClientError {
2073    #[error("Local node operation failed: {0}")]
2074    LocalNodeError(#[from] LocalNodeError),
2075
2076    #[error("Remote node operation failed: {0}")]
2077    RemoteNodeError(#[from] NodeError),
2078
2079    #[error(transparent)]
2080    ArithmeticError(#[from] ArithmeticError),
2081
2082    #[error("Missing certificates: {0:?}")]
2083    ReadCertificatesError(Vec<CryptoHash>),
2084
2085    #[error("Missing confirmed block: {0:?}")]
2086    MissingConfirmedBlock(CryptoHash),
2087
2088    #[error("JSON (de)serialization error: {0}")]
2089    JsonError(#[from] serde_json::Error),
2090
2091    #[error("Chain operation failed: {0}")]
2092    ChainError(#[from] ChainError),
2093
2094    #[error(transparent)]
2095    CommunicationError(#[from] CommunicationError<NodeError>),
2096
2097    #[error("Internal error within chain client: {0}")]
2098    InternalError(&'static str),
2099
2100    #[error(
2101        "Cannot accept a certificate from an unknown committee in the future. \
2102         Please synchronize the local view of the admin chain"
2103    )]
2104    CommitteeSynchronizationError,
2105
2106    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
2107    WalletSynchronizationError,
2108
2109    #[error("The state of the client is incompatible with the proposed block: {0}")]
2110    BlockProposalError(&'static str),
2111
2112    #[error(
2113        "Cannot accept a certificate from a committee that was retired. \
2114         Try a newer certificate from the same origin"
2115    )]
2116    CommitteeDeprecationError,
2117
2118    #[error("Protocol error within chain client: {0}")]
2119    ProtocolError(&'static str),
2120
2121    #[error("Signer doesn't have key to sign for chain {0}")]
2122    CannotFindKeyForChain(ChainId),
2123
2124    #[error("client is not configured to propose on chain {0}")]
2125    NoAccountKeyConfigured(ChainId),
2126
2127    #[error("The chain client isn't owner on chain {0}")]
2128    NotAnOwner(ChainId),
2129
2130    #[error(transparent)]
2131    ViewError(#[from] ViewError),
2132
2133    #[error(
2134        "Failed to download certificates and update local node to the next height \
2135         {target_next_block_height} of chain {chain_id}"
2136    )]
2137    CannotDownloadCertificates {
2138        chain_id: ChainId,
2139        target_next_block_height: BlockHeight,
2140    },
2141
2142    #[error(transparent)]
2143    BcsError(#[from] bcs::Error),
2144
2145    #[error(
2146        "Unexpected quorum: validators voted for block hash {hash} in {round}, \
2147         expected block hash {expected_hash} in {expected_round}"
2148    )]
2149    UnexpectedQuorum {
2150        hash: CryptoHash,
2151        round: Round,
2152        expected_hash: CryptoHash,
2153        expected_round: Round,
2154    },
2155
2156    #[error("signer error: {0:?}")]
2157    Signer(#[source] Box<dyn signer::Error>),
2158
2159    #[error("Cannot revoke the current epoch {0}")]
2160    CannotRevokeCurrentEpoch(Epoch),
2161
2162    #[error("Epoch is already revoked")]
2163    EpochAlreadyRevoked,
2164
2165    #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
2166    CannotDownloadMissingSenderBlock {
2167        chain_id: ChainId,
2168        height: BlockHeight,
2169    },
2170
2171    #[error(
2172        "A different block was already committed at this height. \
2173         The committed certificate hash is {0}"
2174    )]
2175    Conflict(CryptoHash),
2176}
2177
2178impl From<Infallible> for ChainClientError {
2179    fn from(infallible: Infallible) -> Self {
2180        match infallible {}
2181    }
2182}
2183
2184impl ChainClientError {
2185    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
2186        Self::Signer(Box::new(err))
2187    }
2188}
2189
2190impl<Env: Environment> ChainClient<Env> {
2191    /// Gets the client mutex from the chain's state.
2192    #[instrument(level = "trace", skip(self))]
2193    fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
2194        self.client
2195            .chains
2196            .pin()
2197            .get(&self.chain_id)
2198            .expect("Chain client constructed for invalid chain")
2199            .client_mutex()
2200    }
2201
2202    /// Gets the next pending block.
2203    #[instrument(level = "trace", skip(self))]
2204    pub fn pending_proposal(&self) -> Option<PendingProposal> {
2205        self.client
2206            .chains
2207            .pin()
2208            .get(&self.chain_id)
2209            .expect("Chain client constructed for invalid chain")
2210            .pending_proposal()
2211            .clone()
2212    }
2213
2214    /// Updates the chain's state using a closure.
2215    #[instrument(level = "trace", skip(self, f))]
2216    fn update_state<F>(&self, f: F)
2217    where
2218        F: Fn(&mut ChainClientState),
2219    {
2220        let chains = self.client.chains.pin();
2221        chains
2222            .update(self.chain_id, |state| {
2223                let mut state = state.clone_for_update_unchecked();
2224                f(&mut state);
2225                state
2226            })
2227            .expect("Chain client constructed for invalid chain");
2228    }
2229
2230    /// Gets a reference to the client's signer instance.
2231    #[instrument(level = "trace", skip(self))]
2232    pub fn signer(&self) -> &impl Signer {
2233        self.client.signer()
2234    }
2235
2236    /// Gets a mutable reference to the per-`ChainClient` options.
2237    #[instrument(level = "trace", skip(self))]
2238    pub fn options_mut(&mut self) -> &mut ChainClientOptions {
2239        &mut self.options
2240    }
2241
2242    /// Gets a reference to the per-`ChainClient` options.
2243    #[instrument(level = "trace", skip(self))]
2244    pub fn options(&self) -> &ChainClientOptions {
2245        &self.options
2246    }
2247
2248    /// Gets the ID of the associated chain.
2249    #[instrument(level = "trace", skip(self))]
2250    pub fn chain_id(&self) -> ChainId {
2251        self.chain_id
2252    }
2253
2254    /// Gets a clone of the timing sender for benchmarking.
2255    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
2256        self.timing_sender.clone()
2257    }
2258
2259    /// Gets the ID of the admin chain.
2260    #[instrument(level = "trace", skip(self))]
2261    pub fn admin_chain_id(&self) -> ChainId {
2262        self.client.admin_chain_id
2263    }
2264
2265    /// Gets the currently preferred owner for signing the blocks.
2266    #[instrument(level = "trace", skip(self))]
2267    pub fn preferred_owner(&self) -> Option<AccountOwner> {
2268        self.preferred_owner
2269    }
2270
2271    /// Sets the new, preferred owner for signing the blocks.
2272    #[instrument(level = "trace", skip(self))]
2273    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
2274        self.preferred_owner = Some(preferred_owner);
2275    }
2276
2277    /// Unsets the preferred owner for signing the blocks.
2278    #[instrument(level = "trace", skip(self))]
2279    pub fn unset_preferred_owner(&mut self) {
2280        self.preferred_owner = None;
2281    }
2282
2283    /// Obtains a `ChainStateView` for this client's chain.
2284    #[instrument(level = "trace")]
2285    pub async fn chain_state_view(
2286        &self,
2287    ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
2288        self.client.local_node.chain_state_view(self.chain_id).await
2289    }
2290
2291    /// Returns chain IDs that this chain subscribes to, along with the subscribed streams.
2292    /// An empty stream set for a chain means follow all event streams.
2293    #[instrument(level = "trace", skip(self))]
2294    pub async fn event_stream_publishers(
2295        &self,
2296    ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
2297        let subscriptions = self
2298            .client
2299            .local_node
2300            .get_event_subscriptions(self.chain_id)
2301            .await?;
2302        let mut publishers = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
2303        for ((chain_id, stream_name), _) in subscriptions {
2304            publishers.entry(chain_id).or_default().insert(stream_name);
2305        }
2306        if self.chain_id != self.client.admin_chain_id {
2307            // Empty streams = follow all for admin chain.
2308            publishers.entry(self.client.admin_chain_id).or_default();
2309        }
2310        Ok(publishers)
2311    }
2312
2313    /// Subscribes to notifications from this client's chain.
2314    #[instrument(level = "trace")]
2315    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
2316        self.subscribe_to(self.chain_id)
2317    }
2318
2319    /// Subscribes to notifications from the specified chain.
2320    #[instrument(level = "trace")]
2321    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
2322        Ok(Box::pin(UnboundedReceiverStream::new(
2323            self.client.notifier.subscribe(vec![chain_id]),
2324        )))
2325    }
2326
2327    /// Returns the storage client used by this client's local node.
2328    #[instrument(level = "trace")]
2329    pub fn storage_client(&self) -> &Env::Storage {
2330        self.client.storage_client()
2331    }
2332
2333    /// Obtains the basic `ChainInfo` data for the local chain.
2334    #[instrument(level = "trace")]
2335    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
2336        let query = ChainInfoQuery::new(self.chain_id);
2337        let response = self
2338            .client
2339            .local_node
2340            .handle_chain_info_query(query)
2341            .await?;
2342        self.client.update_from_info(&response.info);
2343        Ok(response.info)
2344    }
2345
2346    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
2347    #[instrument(level = "trace")]
2348    pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
2349        let query = ChainInfoQuery::new(self.chain_id)
2350            .with_manager_values()
2351            .with_committees();
2352        let response = self
2353            .client
2354            .local_node
2355            .handle_chain_info_query(query)
2356            .await?;
2357        self.client.update_from_info(&response.info);
2358        Ok(response.info)
2359    }
2360
2361    /// Returns the chain's description. Fetches it from the validators if necessary.
2362    pub async fn get_chain_description(&self) -> Result<ChainDescription, ChainClientError> {
2363        self.client.get_chain_description(self.chain_id).await
2364    }
2365
2366    /// Prepares the chain for the specified owner.
2367    ///
2368    /// Ensures we have the chain description blob, gets chain info, and validates
2369    /// that the owner can propose on this chain (either by being an owner or via
2370    /// `open_multi_leader_rounds`).
2371    pub async fn prepare_for_owner(
2372        &self,
2373        owner: AccountOwner,
2374    ) -> Result<Box<ChainInfo>, ChainClientError> {
2375        ensure!(
2376            self.client.has_key_for(&owner).await?,
2377            ChainClientError::CannotFindKeyForChain(self.chain_id)
2378        );
2379        // Ensure we have the chain description blob.
2380        self.client
2381            .get_chain_description_blob(self.chain_id)
2382            .await?;
2383
2384        // Get chain info.
2385        let info = self.chain_info().await?;
2386
2387        // Validate that the owner can propose on this chain.
2388        ensure!(
2389            info.manager
2390                .ownership
2391                .can_propose_in_multi_leader_round(&owner),
2392            ChainClientError::NotAnOwner(self.chain_id)
2393        );
2394
2395        Ok(info)
2396    }
2397
2398    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
2399    /// local chain.
2400    #[instrument(level = "trace")]
2401    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
2402        if self.options.message_policy.is_ignore() {
2403            // Ignore all messages.
2404            return Ok(Vec::new());
2405        }
2406
2407        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
2408        let info = self
2409            .client
2410            .local_node
2411            .handle_chain_info_query(query)
2412            .await?
2413            .info;
2414        if self.preferred_owner.is_some_and(|owner| {
2415            info.manager
2416                .ownership
2417                .is_super_owner_no_regular_owners(&owner)
2418        }) {
2419            // There are only super owners; they are expected to sync manually.
2420            ensure!(
2421                info.next_block_height >= self.initial_next_block_height,
2422                ChainClientError::WalletSynchronizationError
2423            );
2424        }
2425
2426        Ok(info
2427            .requested_pending_message_bundles
2428            .into_iter()
2429            .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
2430            .take(self.options.max_pending_message_bundles)
2431            .collect())
2432    }
2433
2434    /// Returns an `UpdateStreams` operation that updates this client's chain about new events
2435    /// in any of the streams its applications are subscribing to. Returns `None` if there are no
2436    /// new events.
2437    #[instrument(level = "trace")]
2438    async fn collect_stream_updates(&self) -> Result<Option<Operation>, ChainClientError> {
2439        // Load all our subscriptions.
2440        let subscription_map = self
2441            .client
2442            .local_node
2443            .get_event_subscriptions(self.chain_id)
2444            .await?;
2445        // Collect the indices of all new events.
2446        let futures = subscription_map
2447            .into_iter()
2448            .filter(|((chain_id, _), _)| {
2449                self.options
2450                    .message_policy
2451                    .restrict_chain_ids_to
2452                    .as_ref()
2453                    .is_none_or(|chain_set| chain_set.contains(chain_id))
2454            })
2455            .map(|((chain_id, stream_id), subscriptions)| {
2456                let client = self.client.clone();
2457                let previous_index = subscriptions.next_index;
2458                async move {
2459                    let next_index = client
2460                        .local_node
2461                        .get_stream_event_count(chain_id, stream_id.clone())
2462                        .await?;
2463                    if let Some(next_index) =
2464                        next_index.filter(|next_index| *next_index > previous_index)
2465                    {
2466                        Ok(Some((chain_id, stream_id, previous_index, next_index)))
2467                    } else {
2468                        Ok::<_, ChainClientError>(None)
2469                    }
2470                }
2471            });
2472        let all_updates = futures::stream::iter(futures)
2473            .buffer_unordered(self.options.max_joined_tasks)
2474            .try_collect::<Vec<_>>()
2475            .await?
2476            .into_iter()
2477            .flatten()
2478            .collect::<Vec<_>>();
2479        // Apply the max_new_events_per_block limit.
2480        let max_events = self.options.max_new_events_per_block;
2481        let mut total_events: usize = 0;
2482        let mut updates = Vec::new();
2483        for (chain_id, stream_id, previous_index, next_index) in all_updates {
2484            let new_events = (next_index - previous_index) as usize;
2485            if total_events + new_events <= max_events {
2486                total_events += new_events;
2487                updates.push((chain_id, stream_id, next_index));
2488            } else {
2489                let remaining = max_events.saturating_sub(total_events);
2490                if remaining > 0 {
2491                    updates.push((chain_id, stream_id, previous_index + remaining as u32));
2492                }
2493                break;
2494            }
2495        }
2496        if updates.is_empty() {
2497            return Ok(None);
2498        }
2499        Ok(Some(SystemOperation::UpdateStreams(updates).into()))
2500    }
2501
2502    #[instrument(level = "trace")]
2503    async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
2504        self.client.chain_info_with_committees(self.chain_id).await
2505    }
2506
2507    /// Obtains the current epoch of the local chain as well as its set of trusted committees.
2508    #[instrument(level = "trace")]
2509    async fn epoch_and_committees(
2510        &self,
2511    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
2512        let info = self.chain_info_with_committees().await?;
2513        let epoch = info.epoch;
2514        let committees = info.into_committees()?;
2515        Ok((epoch, committees))
2516    }
2517
2518    /// Obtains the committee for the current epoch of the local chain.
2519    #[instrument(level = "trace")]
2520    pub async fn local_committee(&self) -> Result<Committee, ChainClientError> {
2521        let info = match self.chain_info_with_committees().await {
2522            Ok(info) => info,
2523            Err(LocalNodeError::BlobsNotFound(_)) => {
2524                self.synchronize_chain_state(self.chain_id).await?;
2525                self.chain_info_with_committees().await?
2526            }
2527            Err(err) => return Err(err.into()),
2528        };
2529        Ok(info.into_current_committee()?)
2530    }
2531
2532    /// Obtains the committee for the latest epoch on the admin chain.
2533    #[instrument(level = "trace")]
2534    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
2535        self.client.admin_committee().await
2536    }
2537
2538    /// Obtains the identity of the current owner of the chain.
2539    ///
2540    /// Returns an error if we don't have the private key for the identity.
2541    #[instrument(level = "trace")]
2542    pub async fn identity(&self) -> Result<AccountOwner, ChainClientError> {
2543        let Some(preferred_owner) = self.preferred_owner else {
2544            return Err(ChainClientError::NoAccountKeyConfigured(self.chain_id));
2545        };
2546        let manager = self.chain_info().await?.manager;
2547        ensure!(
2548            manager.ownership.is_active(),
2549            LocalNodeError::InactiveChain(self.chain_id)
2550        );
2551
2552        // Check if the preferred owner can propose on this chain: either they are an owner,
2553        // the current leader, or open_multi_leader_rounds is enabled.
2554        let is_owner = manager
2555            .ownership
2556            .can_propose_in_multi_leader_round(&preferred_owner);
2557
2558        if !is_owner {
2559            let accepted_owners = manager
2560                .ownership
2561                .all_owners()
2562                .chain(&manager.leader)
2563                .collect::<Vec<_>>();
2564            warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
2565                "The preferred owner is not configured as an owner of this chain",
2566            );
2567            return Err(ChainClientError::NotAnOwner(self.chain_id));
2568        }
2569
2570        let has_signer = self
2571            .signer()
2572            .contains_key(&preferred_owner)
2573            .await
2574            .map_err(ChainClientError::signer_failure)?;
2575
2576        if !has_signer {
2577            warn!(%self.chain_id, ?preferred_owner,
2578                "Chain is one of the owners but its Signer instance doesn't contain the key",
2579            );
2580            return Err(ChainClientError::CannotFindKeyForChain(self.chain_id));
2581        }
2582
2583        Ok(preferred_owner)
2584    }
2585
2586    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
2587    /// its current height.
2588    #[instrument(level = "trace")]
2589    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2590        #[cfg(with_metrics)]
2591        let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();
2592
2593        let mut info = self.synchronize_to_known_height().await?;
2594
2595        if self.preferred_owner.is_none_or(|owner| {
2596            !info
2597                .manager
2598                .ownership
2599                .is_super_owner_no_regular_owners(&owner)
2600        }) {
2601            // If we are not a super owner or there are regular owners, we could be missing recent
2602            // certificates created by other clients. Further synchronize blocks from the network.
2603            // This is a best-effort that depends on network conditions.
2604            info = self.client.synchronize_chain_state(self.chain_id).await?;
2605        }
2606
2607        if info.epoch > self.client.admin_committees().await?.0 {
2608            self.client
2609                .synchronize_chain_state(self.client.admin_chain_id)
2610                .await?;
2611        }
2612
2613        self.client.update_from_info(&info);
2614        Ok(info)
2615    }
2616
2617    // Verifies that our local storage contains enough history compared to the
2618    // known block height. Otherwise, downloads the missing history from the
2619    // network.
2620    // The known height only differs if the wallet is ahead of storage.
2621    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2622        let info = self
2623            .client
2624            .download_certificates(self.chain_id, self.initial_next_block_height)
2625            .await?;
2626        if info.next_block_height == self.initial_next_block_height {
2627            // Check that our local node has the expected block hash.
2628            ensure!(
2629                self.initial_block_hash == info.block_hash,
2630                ChainClientError::InternalError("Invalid chain of blocks in local node")
2631            );
2632        }
2633        Ok(info)
2634    }
2635
2636    /// Attempts to update all validators about the local chain.
2637    #[instrument(level = "trace", skip(old_committee, latest_certificate))]
2638    pub async fn update_validators(
2639        &self,
2640        old_committee: Option<&Committee>,
2641        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
2642    ) -> Result<(), ChainClientError> {
2643        let update_validators_start = linera_base::time::Instant::now();
2644        // Communicate the new certificate now.
2645        if let Some(old_committee) = old_committee {
2646            let old_committee_start = linera_base::time::Instant::now();
2647            self.communicate_chain_updates(old_committee, latest_certificate.clone())
2648                .await?;
2649            tracing::debug!(
2650                old_committee_ms = old_committee_start.elapsed().as_millis(),
2651                "communicated chain updates to old committee"
2652            );
2653        };
2654        if let Ok(new_committee) = self.local_committee().await {
2655            if Some(&new_committee) != old_committee {
2656                // If the configuration just changed, communicate to the new committee as well.
2657                // (This is actually more important that updating the previous committee.)
2658                let new_committee_start = linera_base::time::Instant::now();
2659                self.communicate_chain_updates(&new_committee, latest_certificate)
2660                    .await?;
2661                tracing::debug!(
2662                    new_committee_ms = new_committee_start.elapsed().as_millis(),
2663                    "communicated chain updates to new committee"
2664                );
2665            }
2666        }
2667        self.send_timing(update_validators_start, TimingType::UpdateValidators);
2668        Ok(())
2669    }
2670
2671    /// Broadcasts certified blocks to validators.
2672    #[instrument(level = "trace", skip(committee, latest_certificate))]
2673    pub async fn communicate_chain_updates(
2674        &self,
2675        committee: &Committee,
2676        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
2677    ) -> Result<(), ChainClientError> {
2678        let delivery = self.options.cross_chain_message_delivery;
2679        let height = self.chain_info().await?.next_block_height;
2680        self.client
2681            .communicate_chain_updates(
2682                committee,
2683                self.chain_id,
2684                height,
2685                delivery,
2686                latest_certificate,
2687            )
2688            .await
2689    }
2690
2691    /// Synchronizes all chains that any application on this chain subscribes to.
2692    /// We always consider the admin chain a relevant publishing chain, for new epochs.
2693    async fn synchronize_publisher_chains(&self) -> Result<(), ChainClientError> {
2694        let subscriptions = self
2695            .client
2696            .local_node
2697            .get_event_subscriptions(self.chain_id)
2698            .await?;
2699        let chain_ids: BTreeSet<_> = subscriptions
2700            .iter()
2701            .map(|((chain_id, _), _)| *chain_id)
2702            .chain(iter::once(self.client.admin_chain_id))
2703            .filter(|chain_id| *chain_id != self.chain_id)
2704            .collect();
2705        stream::iter(
2706            chain_ids
2707                .into_iter()
2708                .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
2709        )
2710        .buffer_unordered(self.options.max_joined_tasks)
2711        .collect::<Vec<_>>()
2712        .await
2713        .into_iter()
2714        .collect::<Result<Vec<_>, _>>()?;
2715        Ok(())
2716    }
2717
2718    /// Attempts to download new received certificates.
2719    ///
2720    /// This is a best effort: it will only find certificates that have been confirmed
2721    /// amongst sufficiently many validators of the current committee of the target
2722    /// chain.
2723    ///
2724    /// However, this should be the case whenever a sender's chain is still in use and
2725    /// is regularly upgraded to new committees.
2726    #[instrument(level = "trace")]
2727    pub async fn find_received_certificates(&self) -> Result<(), ChainClientError> {
2728        debug!(chain_id = %self.chain_id, "starting find_received_certificates");
2729        #[cfg(with_metrics)]
2730        let _latency = metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
2731        // Use network information from the local chain.
2732        let chain_id = self.chain_id;
2733        let (_, committee) = self.admin_committee().await?;
2734        let nodes = self.client.make_nodes(&committee)?;
2735
2736        let trackers = self
2737            .client
2738            .local_node
2739            .get_received_certificate_trackers(chain_id)
2740            .await?;
2741
2742        trace!("find_received_certificates: read trackers");
2743
2744        let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
2745        // Proceed to downloading received logs.
2746        let result = communicate_with_quorum(
2747            &nodes,
2748            &committee,
2749            |_| (),
2750            |remote_node| {
2751                let client = &self.client;
2752                let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
2753                let received_log_batches = Arc::clone(&received_log_batches);
2754                Box::pin(async move {
2755                    let batch = client
2756                        .get_received_log_from_validator(chain_id, &remote_node, tracker)
2757                        .await?;
2758                    let mut batches = received_log_batches.lock().unwrap();
2759                    batches.push((remote_node.public_key, batch));
2760                    Ok(())
2761                })
2762            },
2763            self.options.quorum_grace_period,
2764        )
2765        .await;
2766
2767        if let Err(error) = result {
2768            error!(
2769                %error,
2770                "Failed to synchronize received_logs from at least a quorum of validators",
2771            );
2772        }
2773
2774        let received_logs: Vec<_> = {
2775            let mut received_log_batches = received_log_batches.lock().unwrap();
2776            std::mem::take(received_log_batches.as_mut())
2777        };
2778
2779        debug!(
2780            received_logs_len = %received_logs.len(),
2781            received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
2782            "collected received logs"
2783        );
2784
2785        let (received_logs, mut validator_trackers) = {
2786            (
2787                ReceivedLogs::from_received_result(received_logs.clone()),
2788                ValidatorTrackers::new(received_logs, &trackers),
2789            )
2790        };
2791
2792        debug!(
2793            num_chains = %received_logs.num_chains(),
2794            num_certs = %received_logs.num_certs(),
2795            "find_received_certificates: total number of chains and certificates to sync",
2796        );
2797
2798        let max_blocks_per_chain =
2799            self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
2800        for received_log in received_logs.into_batches(
2801            self.options.sender_certificate_download_batch_size,
2802            max_blocks_per_chain,
2803        ) {
2804            validator_trackers = self
2805                .receive_sender_certificates(received_log, validator_trackers, &nodes)
2806                .await?;
2807
2808            self.update_received_certificate_trackers(&validator_trackers)
2809                .await;
2810        }
2811
2812        info!("find_received_certificates finished");
2813
2814        Ok(())
2815    }
2816
2817    async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
2818        let updated_trackers = trackers.to_map();
2819        trace!(?updated_trackers, "updated tracker values");
2820
2821        // Update the trackers.
2822        if let Err(error) = self
2823            .client
2824            .local_node
2825            .update_received_certificate_trackers(self.chain_id, updated_trackers)
2826            .await
2827        {
2828            error!(
2829                chain_id = %self.chain_id,
2830                %error,
2831                "Failed to update the certificate trackers for chain",
2832            );
2833        }
2834    }
2835
2836    /// Downloads and processes or preprocesses the certificates for blocks sending messages to
2837    /// this chain that we are still missing.
2838    async fn receive_sender_certificates(
2839        &self,
2840        mut received_logs: ReceivedLogs,
2841        mut validator_trackers: ValidatorTrackers,
2842        nodes: &[RemoteNode<Env::ValidatorNode>],
2843    ) -> Result<ValidatorTrackers, ChainClientError> {
2844        debug!(
2845            num_chains = %received_logs.num_chains(),
2846            num_certs = %received_logs.num_certs(),
2847            "receive_sender_certificates: number of chains and certificates to sync",
2848        );
2849
2850        // Obtain the next block height we need in the local node, for each chain.
2851        let local_next_heights = self
2852            .client
2853            .local_node
2854            .next_outbox_heights(received_logs.chains(), self.chain_id)
2855            .await?;
2856
2857        validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
2858
2859        debug!(
2860            remaining_total_certificates = %received_logs.num_certs(),
2861            "receive_sender_certificates: computed remote_heights"
2862        );
2863
2864        let mut other_sender_chains = Vec::new();
2865        let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
2866
2867        let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
2868            let received_logs = &received_logs;
2869            let other_sender_chains = &mut other_sender_chains;
2870
2871            move |(sender_chain_id, remote_heights)| {
2872                if remote_heights.is_empty() {
2873                    // Our highest, locally executed block is higher than any block height
2874                    // from the current batch. Skip this batch, but remember to wait for
2875                    // the messages to be delivered to the inboxes.
2876                    other_sender_chains.push(sender_chain_id);
2877                    return None;
2878                };
2879                let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
2880                let sender = sender.clone();
2881                let client = self.client.clone();
2882                let mut nodes = nodes.to_vec();
2883                nodes.shuffle(&mut rand::thread_rng());
2884                Some(async move {
2885                    client
2886                        .download_and_process_sender_chain(
2887                            sender_chain_id,
2888                            &nodes,
2889                            received_logs,
2890                            remote_heights,
2891                            sender,
2892                        )
2893                        .await
2894                })
2895            }
2896        });
2897
2898        future::join(
2899            stream::iter(cert_futures)
2900                .buffer_unordered(self.options.max_joined_tasks)
2901                .collect::<()>(),
2902            async {
2903                while let Some(chain_and_height) = receiver.recv().await {
2904                    validator_trackers.downloaded_cert(chain_and_height);
2905                }
2906            },
2907        )
2908        .await;
2909
2910        debug!(
2911            num_other_chains = %other_sender_chains.len(),
2912            "receive_sender_certificates: processing certificates finished"
2913        );
2914
2915        // Certificates for these chains were omitted from `certificates` because they were
2916        // already processed locally. If they were processed in a concurrent task, it is not
2917        // guaranteed that their cross-chain messages were already handled.
2918        self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
2919            .await;
2920
2921        debug!("receive_sender_certificates: finished processing other_sender_chains");
2922
2923        Ok(validator_trackers)
2924    }
2925
2926    /// Retries cross chain requests on the chains which may have been processed on
2927    /// another task without the messages being correctly handled.
2928    async fn retry_pending_cross_chain_requests(
2929        &self,
2930        nodes: &[RemoteNode<Env::ValidatorNode>],
2931        other_sender_chains: Vec<ChainId>,
2932    ) {
2933        let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
2934            let local_node = self.client.local_node.clone();
2935            async move {
2936                if let Err(error) = match local_node
2937                    .retry_pending_cross_chain_requests(chain_id)
2938                    .await
2939                {
2940                    Ok(()) => Ok(()),
2941                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
2942                        if let Err(error) = self
2943                            .client
2944                            .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
2945                            .await
2946                        {
2947                            error!(
2948                                ?blob_ids,
2949                                %error,
2950                                "Error while attempting to download blobs during retrying outgoing \
2951                                messages"
2952                            );
2953                        }
2954                        local_node
2955                            .retry_pending_cross_chain_requests(chain_id)
2956                            .await
2957                    }
2958                    err => err,
2959                } {
2960                    error!(
2961                        %chain_id,
2962                        %error,
2963                        "Failed to retry outgoing messages from chain"
2964                    );
2965                }
2966            }
2967        }));
2968        stream.for_each(future::ready).await;
2969    }
2970
2971    /// Sends money.
2972    #[instrument(level = "trace")]
2973    pub async fn transfer(
2974        &self,
2975        owner: AccountOwner,
2976        amount: Amount,
2977        recipient: Account,
2978    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2979        // TODO(#467): check the balance of `owner` before signing any block proposal.
2980        Box::pin(self.execute_operation(SystemOperation::Transfer {
2981            owner,
2982            recipient,
2983            amount,
2984        }))
2985        .await
2986    }
2987
2988    /// Verify if a data blob is readable from storage.
2989    // TODO(#2490): Consider removing or renaming this.
2990    #[instrument(level = "trace")]
2991    pub async fn read_data_blob(
2992        &self,
2993        hash: CryptoHash,
2994    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2995        let blob_id = BlobId {
2996            hash,
2997            blob_type: BlobType::Data,
2998        };
2999        Box::pin(self.execute_operation(SystemOperation::VerifyBlob { blob_id })).await
3000    }
3001
3002    /// Claims money in a remote chain.
3003    #[instrument(level = "trace")]
3004    pub async fn claim(
3005        &self,
3006        owner: AccountOwner,
3007        target_id: ChainId,
3008        recipient: Account,
3009        amount: Amount,
3010    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3011        Box::pin(self.execute_operation(SystemOperation::Claim {
3012            owner,
3013            target_id,
3014            recipient,
3015            amount,
3016        }))
3017        .await
3018    }
3019
3020    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
3021    /// certificate and sends it to all validators, to make them enter the next round.
3022    #[instrument(level = "trace")]
3023    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, ChainClientError> {
3024        let chain_id = self.chain_id;
3025        let info = self.chain_info_with_committees().await?;
3026        let committee = info.current_committee()?;
3027        let height = info.next_block_height;
3028        let round = info.manager.current_round;
3029        let action = CommunicateAction::RequestTimeout {
3030            height,
3031            round,
3032            chain_id,
3033        };
3034        let value = Timeout::new(chain_id, height, info.epoch);
3035        let certificate = Box::new(
3036            self.client
3037                .communicate_chain_action(committee, action, value)
3038                .await?,
3039        );
3040        self.client.process_certificate(certificate.clone()).await?;
3041        // The block height didn't increase, but this will communicate the timeout as well.
3042        self.client
3043            .communicate_chain_updates(
3044                committee,
3045                chain_id,
3046                height,
3047                CrossChainMessageDelivery::NonBlocking,
3048                None,
3049            )
3050            .await?;
3051        Ok(*certificate)
3052    }
3053
3054    /// Downloads and processes any certificates we are missing for the given chain.
3055    #[instrument(level = "trace", skip_all)]
3056    pub async fn synchronize_chain_state(
3057        &self,
3058        chain_id: ChainId,
3059    ) -> Result<Box<ChainInfo>, ChainClientError> {
3060        self.client.synchronize_chain_state(chain_id).await
3061    }
3062
3063    /// Downloads and processes any certificates we are missing for this chain, from the given
3064    /// committee.
3065    #[instrument(level = "trace", skip_all)]
3066    pub async fn synchronize_chain_state_from_committee(
3067        &self,
3068        committee: Committee,
3069    ) -> Result<Box<ChainInfo>, ChainClientError> {
3070        Box::pin(
3071            self.client
3072                .synchronize_chain_state_from_committee(self.chain_id, committee),
3073        )
3074        .await
3075    }
3076
3077    /// Executes a list of operations.
3078    #[instrument(level = "trace", skip(operations, blobs))]
3079    pub async fn execute_operations(
3080        &self,
3081        operations: Vec<Operation>,
3082        blobs: Vec<Blob>,
3083    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3084        let timing_start = linera_base::time::Instant::now();
3085        tracing::debug!("execute_operations started");
3086
3087        let result = loop {
3088            let execute_block_start = linera_base::time::Instant::now();
3089            // TODO(#2066): Remove boxing once the call-stack is shallower
3090            tracing::debug!("calling execute_block");
3091            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
3092                Ok(ClientOutcome::Committed(certificate)) => {
3093                    tracing::debug!(
3094                        execute_block_ms = execute_block_start.elapsed().as_millis(),
3095                        "execute_block succeeded"
3096                    );
3097                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
3098                    break Ok(ClientOutcome::Committed(certificate));
3099                }
3100                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
3101                    break Ok(ClientOutcome::WaitForTimeout(timeout));
3102                }
3103                Ok(ClientOutcome::Conflict(certificate)) => {
3104                    info!(
3105                        height = %certificate.block().header.height,
3106                        "Another block was committed."
3107                    );
3108                    break Ok(ClientOutcome::Conflict(certificate));
3109                }
3110                Err(ChainClientError::CommunicationError(CommunicationError::Trusted(
3111                    NodeError::UnexpectedBlockHeight {
3112                        expected_block_height,
3113                        found_block_height,
3114                    },
3115                ))) if expected_block_height > found_block_height => {
3116                    tracing::info!(
3117                        "Local state is outdated; synchronizing chain {:.8}",
3118                        self.chain_id
3119                    );
3120                    self.synchronize_chain_state(self.chain_id).await?;
3121                }
3122                Err(err) => return Err(err),
3123            };
3124        };
3125
3126        self.send_timing(timing_start, TimingType::ExecuteOperations);
3127        tracing::debug!(
3128            total_execute_operations_ms = timing_start.elapsed().as_millis(),
3129            "execute_operations returning"
3130        );
3131
3132        result
3133    }
3134
3135    /// Executes an operation.
3136    pub async fn execute_operation(
3137        &self,
3138        operation: impl Into<Operation>,
3139    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3140        self.execute_operations(vec![operation.into()], vec![])
3141            .await
3142    }
3143
3144    /// Executes a new block.
3145    ///
3146    /// This must be preceded by a call to `prepare_chain()`.
3147    #[instrument(level = "trace", skip(operations, blobs))]
3148    async fn execute_block(
3149        &self,
3150        operations: Vec<Operation>,
3151        blobs: Vec<Blob>,
3152    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3153        #[cfg(with_metrics)]
3154        let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
3155
3156        let mutex = self.client_mutex();
3157        let lock_start = linera_base::time::Instant::now();
3158        let _guard = mutex.lock_owned().await;
3159        tracing::debug!(
3160            lock_wait_ms = lock_start.elapsed().as_millis(),
3161            "acquired client_mutex in execute_block"
3162        );
3163        // TODO(#5092): We shouldn't need to call this explicitly.
3164        match self.process_pending_block_without_prepare().await? {
3165            ClientOutcome::Committed(Some(certificate)) => {
3166                return Ok(ClientOutcome::Conflict(Box::new(certificate)))
3167            }
3168            ClientOutcome::WaitForTimeout(timeout) => {
3169                return Ok(ClientOutcome::WaitForTimeout(timeout))
3170            }
3171            ClientOutcome::Conflict(certificate) => {
3172                return Ok(ClientOutcome::Conflict(certificate))
3173            }
3174            ClientOutcome::Committed(None) => {}
3175        }
3176
3177        // Collect pending messages and epoch changes after acquiring the lock to avoid
3178        // race conditions where messages valid for one block height are proposed at a
3179        // different height.
3180        let transactions = self.prepend_epochs_messages_and_events(operations).await?;
3181
3182        if transactions.is_empty() {
3183            return Err(ChainClientError::LocalNodeError(
3184                LocalNodeError::WorkerError(WorkerError::ChainError(Box::new(
3185                    ChainError::EmptyBlock,
3186                ))),
3187            ));
3188        }
3189
3190        let block = self.new_pending_block(transactions, blobs).await?;
3191
3192        match self.process_pending_block_without_prepare().await? {
3193            ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
3194                Ok(ClientOutcome::Committed(certificate))
3195            }
3196            ClientOutcome::Committed(Some(certificate)) => {
3197                Ok(ClientOutcome::Conflict(Box::new(certificate)))
3198            }
3199            // Should be unreachable: We did set a pending block.
3200            ClientOutcome::Committed(None) => Err(ChainClientError::BlockProposalError(
3201                "Unexpected block proposal error",
3202            )),
3203            ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
3204            ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
3205        }
3206    }
3207
3208    /// Creates a vector of transactions which, in addition to the provided operations,
3209    /// also contains epoch changes, receiving message bundles and event stream updates
3210    /// (if there are any to be processed).
3211    /// This should be called when executing a block, in order to make sure that any pending
3212    /// messages or events are included in it.
3213    #[instrument(level = "trace", skip(operations))]
3214    async fn prepend_epochs_messages_and_events(
3215        &self,
3216        operations: Vec<Operation>,
3217    ) -> Result<Vec<Transaction>, ChainClientError> {
3218        let incoming_bundles = self.pending_message_bundles().await?;
3219        let stream_updates = self.collect_stream_updates().await?;
3220        Ok(self
3221            .collect_epoch_changes()
3222            .await?
3223            .into_iter()
3224            .map(Transaction::ExecuteOperation)
3225            .chain(
3226                incoming_bundles
3227                    .into_iter()
3228                    .map(Transaction::ReceiveMessages),
3229            )
3230            .chain(
3231                stream_updates
3232                    .into_iter()
3233                    .map(Transaction::ExecuteOperation),
3234            )
3235            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
3236            .collect::<Vec<_>>())
3237    }
3238
3239    /// Creates a new pending block and handles the proposal in the local node.
3240    /// Next time `process_pending_block_without_prepare` is called, this block will be proposed
3241    /// to the validators.
3242    #[instrument(level = "trace", skip(transactions, blobs))]
3243    async fn new_pending_block(
3244        &self,
3245        transactions: Vec<Transaction>,
3246        blobs: Vec<Blob>,
3247    ) -> Result<Block, ChainClientError> {
3248        let identity = self.identity().await?;
3249
3250        ensure!(
3251            self.pending_proposal().is_none(),
3252            ChainClientError::BlockProposalError(
3253                "Client state already has a pending block; \
3254                use the `linera retry-pending-block` command to commit that first"
3255            )
3256        );
3257        let info = self.chain_info_with_committees().await?;
3258        let timestamp = self.next_timestamp(&transactions, info.timestamp);
3259        let proposed_block = ProposedBlock {
3260            epoch: info.epoch,
3261            chain_id: self.chain_id,
3262            transactions,
3263            previous_block_hash: info.block_hash,
3264            height: info.next_block_height,
3265            authenticated_signer: Some(identity),
3266            timestamp,
3267        };
3268
3269        let round = self.round_for_oracle(&info, &identity).await?;
3270        // Make sure every incoming message succeeds and otherwise remove them.
3271        // Also, compute the final certified hash while we're at it.
3272        let (block, _) = Box::pin(self.client.stage_block_execution_with_policy(
3273            proposed_block,
3274            round,
3275            blobs.clone(),
3276            self.options.bundle_execution_policy(),
3277        ))
3278        .await?;
3279        let (proposed_block, _) = block.clone().into_proposal();
3280        self.update_state(|state| {
3281            state.set_pending_proposal(proposed_block.clone(), blobs.clone())
3282        });
3283        Ok(block)
3284    }
3285
3286    /// Returns a suitable timestamp for the next block.
3287    ///
3288    /// This will usually be the current time according to the local clock, but may be slightly
3289    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
3290    #[instrument(level = "trace", skip(transactions))]
3291    fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
3292        let local_time = self.storage_client().clock().current_time();
3293        transactions
3294            .iter()
3295            .filter_map(Transaction::incoming_bundle)
3296            .map(|msg| msg.bundle.timestamp)
3297            .max()
3298            .map_or(local_time, |timestamp| timestamp.max(local_time))
3299            .max(block_time)
3300    }
3301
3302    /// Queries an application.
3303    #[instrument(level = "trace", skip(query))]
3304    pub async fn query_application(
3305        &self,
3306        query: Query,
3307        block_hash: Option<CryptoHash>,
3308    ) -> Result<(QueryOutcome, BlockHeight), ChainClientError> {
3309        loop {
3310            let result = self
3311                .client
3312                .local_node
3313                .query_application(self.chain_id, query.clone(), block_hash)
3314                .await;
3315            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
3316                let validators = self.client.validator_nodes().await?;
3317                self.client
3318                    .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
3319                    .await?;
3320                continue; // We found the missing blob: retry.
3321            }
3322            return Ok(result?);
3323        }
3324    }
3325
3326    /// Queries a system application.
3327    #[instrument(level = "trace", skip(query))]
3328    pub async fn query_system_application(
3329        &self,
3330        query: SystemQuery,
3331    ) -> Result<QueryOutcome<SystemResponse>, ChainClientError> {
3332        let (
3333            QueryOutcome {
3334                response,
3335                operations,
3336            },
3337            _,
3338        ) = self.query_application(Query::System(query), None).await?;
3339        match response {
3340            QueryResponse::System(response) => Ok(QueryOutcome {
3341                response,
3342                operations,
3343            }),
3344            _ => Err(ChainClientError::InternalError(
3345                "Unexpected response for system query",
3346            )),
3347        }
3348    }
3349
3350    /// Queries a user application.
3351    #[instrument(level = "trace", skip(application_id, query))]
3352    #[cfg(with_testing)]
3353    pub async fn query_user_application<A: Abi>(
3354        &self,
3355        application_id: ApplicationId<A>,
3356        query: &A::Query,
3357    ) -> Result<QueryOutcome<A::QueryResponse>, ChainClientError> {
3358        let query = Query::user(application_id, query)?;
3359        let (
3360            QueryOutcome {
3361                response,
3362                operations,
3363            },
3364            _,
3365        ) = self.query_application(query, None).await?;
3366        match response {
3367            QueryResponse::User(response_bytes) => {
3368                let response = serde_json::from_slice(&response_bytes)?;
3369                Ok(QueryOutcome {
3370                    response,
3371                    operations,
3372                })
3373            }
3374            _ => Err(ChainClientError::InternalError(
3375                "Unexpected response for user query",
3376            )),
3377        }
3378    }
3379
3380    /// Obtains the local balance of the chain account after staging the execution of
3381    /// incoming messages in a new block.
3382    ///
3383    /// Does not attempt to synchronize with validators. The result will reflect up to
3384    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
3385    /// block.
3386    #[instrument(level = "trace")]
3387    pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
3388        let (balance, _) = Box::pin(self.query_balances_with_owner(AccountOwner::CHAIN)).await?;
3389        Ok(balance)
3390    }
3391
3392    /// Obtains the local balance of an account after staging the execution of incoming messages in
3393    /// a new block.
3394    ///
3395    /// Does not attempt to synchronize with validators. The result will reflect up to
3396    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
3397    /// block.
3398    #[instrument(level = "trace", skip(owner))]
3399    pub async fn query_owner_balance(
3400        &self,
3401        owner: AccountOwner,
3402    ) -> Result<Amount, ChainClientError> {
3403        if owner.is_chain() {
3404            Box::pin(self.query_balance()).await
3405        } else {
3406            Ok(Box::pin(self.query_balances_with_owner(owner))
3407                .await?
3408                .1
3409                .unwrap_or(Amount::ZERO))
3410        }
3411    }
3412
3413    /// Obtains the local balance of an account and optionally another user after staging the
3414    /// execution of incoming messages in a new block.
3415    ///
3416    /// Does not attempt to synchronize with validators. The result will reflect up to
3417    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
3418    /// block.
3419    #[instrument(level = "trace", skip(owner))]
3420    async fn query_balances_with_owner(
3421        &self,
3422        owner: AccountOwner,
3423    ) -> Result<(Amount, Option<Amount>), ChainClientError> {
3424        let incoming_bundles = self.pending_message_bundles().await?;
3425        // Since we disallow empty blocks, and there is no incoming messages,
3426        // that could change it, we query for the balance immediately.
3427        if incoming_bundles.is_empty() {
3428            let chain_balance = self.local_balance().await?;
3429            let owner_balance = self.local_owner_balance(owner).await?;
3430            return Ok((chain_balance, Some(owner_balance)));
3431        }
3432        let info = self.chain_info().await?;
3433        let transactions = incoming_bundles
3434            .into_iter()
3435            .map(Transaction::ReceiveMessages)
3436            .collect::<Vec<_>>();
3437        let timestamp = self.next_timestamp(&transactions, info.timestamp);
3438        let block = ProposedBlock {
3439            epoch: info.epoch,
3440            chain_id: self.chain_id,
3441            transactions,
3442            previous_block_hash: info.block_hash,
3443            height: info.next_block_height,
3444            authenticated_signer: if owner == AccountOwner::CHAIN {
3445                None
3446            } else {
3447                Some(owner)
3448            },
3449            timestamp,
3450        };
3451        match Box::pin(self.client.stage_block_execution_with_policy(
3452            block,
3453            None,
3454            Vec::new(),
3455            self.options.bundle_execution_policy(),
3456        ))
3457        .await
3458        {
3459            Ok((_, response)) => Ok((
3460                response.info.chain_balance,
3461                response.info.requested_owner_balance,
3462            )),
3463            Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3464                WorkerError::ChainError(error),
3465            ))) if matches!(
3466                &*error,
3467                ChainError::ExecutionError(
3468                    execution_error,
3469                    ChainExecutionContext::Block
3470                ) if matches!(
3471                    **execution_error,
3472                    ExecutionError::FeesExceedFunding { .. }
3473                )
3474            ) =>
3475            {
3476                // We can't even pay for the execution of one empty block. Let's return zero.
3477                Ok((Amount::ZERO, Some(Amount::ZERO)))
3478            }
3479            Err(error) => Err(error),
3480        }
3481    }
3482
3483    /// Reads the local balance of the chain account.
3484    ///
3485    /// Does not process the inbox or attempt to synchronize with validators.
3486    #[instrument(level = "trace")]
3487    pub async fn local_balance(&self) -> Result<Amount, ChainClientError> {
3488        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
3489        Ok(balance)
3490    }
3491
3492    /// Reads the local balance of a user account.
3493    ///
3494    /// Does not process the inbox or attempt to synchronize with validators.
3495    #[instrument(level = "trace", skip(owner))]
3496    pub async fn local_owner_balance(
3497        &self,
3498        owner: AccountOwner,
3499    ) -> Result<Amount, ChainClientError> {
3500        if owner.is_chain() {
3501            self.local_balance().await
3502        } else {
3503            Ok(self
3504                .local_balances_with_owner(owner)
3505                .await?
3506                .1
3507                .unwrap_or(Amount::ZERO))
3508        }
3509    }
3510
3511    /// Reads the local balance of the chain account and optionally another user.
3512    ///
3513    /// Does not process the inbox or attempt to synchronize with validators.
3514    #[instrument(level = "trace", skip(owner))]
3515    async fn local_balances_with_owner(
3516        &self,
3517        owner: AccountOwner,
3518    ) -> Result<(Amount, Option<Amount>), ChainClientError> {
3519        ensure!(
3520            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
3521            ChainClientError::WalletSynchronizationError
3522        );
3523        let mut query = ChainInfoQuery::new(self.chain_id);
3524        query.request_owner_balance = owner;
3525        let response = self
3526            .client
3527            .local_node
3528            .handle_chain_info_query(query)
3529            .await?;
3530        Ok((
3531            response.info.chain_balance,
3532            response.info.requested_owner_balance,
3533        ))
3534    }
3535
3536    /// Sends tokens to a chain.
3537    #[instrument(level = "trace")]
3538    pub async fn transfer_to_account(
3539        &self,
3540        from: AccountOwner,
3541        amount: Amount,
3542        account: Account,
3543    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3544        self.transfer(from, amount, account).await
3545    }
3546
3547    /// Burns tokens (transfer to a special address).
3548    #[cfg(with_testing)]
3549    #[instrument(level = "trace")]
3550    pub async fn burn(
3551        &self,
3552        owner: AccountOwner,
3553        amount: Amount,
3554    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3555        let recipient = Account::burn_address(self.chain_id);
3556        self.transfer(owner, amount, recipient).await
3557    }
3558
3559    #[instrument(level = "trace")]
3560    pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3561        let validators = self.client.validator_nodes().await?;
3562        self.client
3563            .fetch_chain_info(self.chain_id, &validators)
3564            .await
3565    }
3566
3567    /// Attempts to synchronize chains that have sent us messages and populate our local
3568    /// inbox.
3569    ///
3570    /// To create a block that actually executes the messages in the inbox,
3571    /// `process_inbox` must be called separately.
3572    ///
3573    /// If the chain is in follow-only mode, this only downloads blocks for this chain without
3574    /// fetching manager values or sender/publisher chains.
3575    #[instrument(level = "trace")]
3576    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3577        if self.preferred_owner.is_none() {
3578            return self.client.synchronize_chain_state(self.chain_id).await;
3579        }
3580        let info = self.prepare_chain().await?;
3581        self.synchronize_publisher_chains().await?;
3582        self.find_received_certificates().await?;
3583        Ok(info)
3584    }
3585
3586    /// Processes the last pending block
3587    #[instrument(level = "trace")]
3588    pub async fn process_pending_block(
3589        &self,
3590    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3591        self.prepare_chain().await?;
3592        self.process_pending_block_without_prepare().await
3593    }
3594
3595    /// Processes the last pending block. Assumes that the local chain is up to date.
3596    #[instrument(level = "trace")]
3597    async fn process_pending_block_without_prepare(
3598        &self,
3599    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3600        let process_start = linera_base::time::Instant::now();
3601        tracing::debug!("process_pending_block_without_prepare started");
3602        let info = self.request_leader_timeout_if_needed().await?;
3603
3604        // If there is a validated block in the current round, finalize it.
3605        if info.manager.has_locking_block_in_current_round()
3606            && !info.manager.current_round.is_fast()
3607        {
3608            return Box::pin(self.finalize_locking_block(info)).await;
3609        }
3610        let owner = self.identity().await?;
3611
3612        let local_node = &self.client.local_node;
3613        // Otherwise we have to re-propose the highest validated block, if there is one.
3614        let pending_proposal = self.pending_proposal();
3615        let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
3616            match &**locking {
3617                LockingBlock::Regular(certificate) => {
3618                    let blob_ids = certificate.block().required_blob_ids();
3619                    let blobs = local_node
3620                        .get_locking_blobs(&blob_ids, self.chain_id)
3621                        .await?
3622                        .ok_or_else(|| {
3623                            ChainClientError::InternalError("Missing local locking blobs")
3624                        })?;
3625                    debug!("Retrying locking block from round {}", certificate.round);
3626                    (certificate.block().clone(), blobs)
3627                }
3628                LockingBlock::Fast(proposal) => {
3629                    let proposed_block = proposal.content.block.clone();
3630                    let blob_ids = proposed_block.published_blob_ids();
3631                    let blobs = local_node
3632                        .get_locking_blobs(&blob_ids, self.chain_id)
3633                        .await?
3634                        .ok_or_else(|| {
3635                            ChainClientError::InternalError("Missing local locking blobs")
3636                        })?;
3637                    let block = self
3638                        .client
3639                        .stage_block_execution(proposed_block, None, blobs.clone())
3640                        .await?
3641                        .0;
3642                    debug!("Retrying locking block from fast round.");
3643                    (block, blobs)
3644                }
3645            }
3646        } else if let Some(pending_proposal) = pending_proposal {
3647            // Otherwise we are free to propose our own pending block.
3648            let proposed_block = pending_proposal.block;
3649            let round = self.round_for_oracle(&info, &owner).await?;
3650            let (block, _) = self
3651                .client
3652                .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
3653                .await?;
3654            debug!("Proposing the local pending block.");
3655            (block, pending_proposal.blobs)
3656        } else {
3657            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
3658        };
3659
3660        let has_oracle_responses = block.has_oracle_responses();
3661        let (proposed_block, outcome) = block.into_proposal();
3662        let round = match self
3663            .round_for_new_proposal(&info, &owner, has_oracle_responses)
3664            .await?
3665        {
3666            Either::Left(round) => round,
3667            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
3668        };
3669        debug!("Proposing block for round {}", round);
3670
3671        let already_handled_locally = info
3672            .manager
3673            .already_handled_proposal(round, &proposed_block);
3674        // Create the final block proposal.
3675        let proposal = if let Some(locking) = info.manager.requested_locking {
3676            Box::new(match *locking {
3677                LockingBlock::Regular(cert) => {
3678                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
3679                        .await
3680                        .map_err(ChainClientError::signer_failure)?
3681                }
3682                LockingBlock::Fast(proposal) => {
3683                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
3684                        .await
3685                        .map_err(ChainClientError::signer_failure)?
3686                }
3687            })
3688        } else {
3689            Box::new(
3690                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
3691                    .await
3692                    .map_err(ChainClientError::signer_failure)?,
3693            )
3694        };
3695        if !already_handled_locally {
3696            // Check the final block proposal. This will be cheaper after #1401.
3697            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
3698                match err {
3699                    LocalNodeError::BlobsNotFound(_) => {
3700                        local_node
3701                            .handle_pending_blobs(self.chain_id, blobs)
3702                            .await?;
3703                        local_node.handle_block_proposal(*proposal.clone()).await?;
3704                    }
3705                    err => return Err(err.into()),
3706                }
3707            }
3708        }
3709        let committee = self.local_committee().await?;
3710        let block = Block::new(proposed_block, outcome);
3711        // Send the query to validators.
3712        let submit_block_proposal_start = linera_base::time::Instant::now();
3713        let certificate = if round.is_fast() {
3714            let hashed_value = ConfirmedBlock::new(block);
3715            Box::pin(
3716                self.client
3717                    .submit_block_proposal(&committee, proposal, hashed_value),
3718            )
3719            .await?
3720        } else {
3721            let hashed_value = ValidatedBlock::new(block);
3722            let certificate = Box::pin(self.client.submit_block_proposal(
3723                &committee,
3724                proposal,
3725                hashed_value.clone(),
3726            ))
3727            .await?;
3728            Box::pin(self.client.finalize_block(&committee, certificate)).await?
3729        };
3730        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
3731        debug!(round = %certificate.round, "Sending confirmed block to validators");
3732        let update_start = linera_base::time::Instant::now();
3733        Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
3734        tracing::debug!(
3735            update_validators_ms = update_start.elapsed().as_millis(),
3736            total_process_ms = process_start.elapsed().as_millis(),
3737            "process_pending_block_without_prepare completing"
3738        );
3739        Ok(ClientOutcome::Committed(Some(certificate)))
3740    }
3741
3742    fn send_timing(&self, start: Instant, timing_type: TimingType) {
3743        let Some(sender) = &self.timing_sender else {
3744            return;
3745        };
3746        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
3747            tracing::warn!(%err, "Failed to send timing info");
3748        }
3749    }
3750
3751    /// Requests a leader timeout certificate if the current round has timed out. Returns the
3752    /// chain info for the (possibly new) current round.
3753    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3754        let mut info = self.chain_info_with_manager_values().await?;
3755        // If the current round has timed out, we request a timeout certificate and retry in
3756        // the next round.
3757        if let Some(round_timeout) = info.manager.round_timeout {
3758            if round_timeout <= self.storage_client().clock().current_time() {
3759                if let Err(e) = self.request_leader_timeout().await {
3760                    debug!("Failed to obtain a timeout certificate: {}", e);
3761                } else {
3762                    info = self.chain_info_with_manager_values().await?;
3763                }
3764            }
3765        }
3766        Ok(info)
3767    }
3768
3769    /// Finalizes the locking block.
3770    ///
3771    /// Panics if there is no locking block; fails if the locking block is not in the current round.
3772    async fn finalize_locking_block(
3773        &self,
3774        info: Box<ChainInfo>,
3775    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3776        let locking = info
3777            .manager
3778            .requested_locking
3779            .expect("Should have a locking block");
3780        let LockingBlock::Regular(certificate) = *locking else {
3781            panic!("Should have a locking validated block");
3782        };
3783        debug!(
3784            round = %certificate.round,
3785            "Finalizing locking block"
3786        );
3787        let committee = self.local_committee().await?;
3788        let certificate =
3789            Box::pin(self.client.finalize_block(&committee, certificate.clone())).await?;
3790        Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
3791        Ok(ClientOutcome::Committed(Some(certificate)))
3792    }
3793
3794    /// Returns the number for the round number oracle to use when staging a block proposal.
3795    async fn round_for_oracle(
3796        &self,
3797        info: &ChainInfo,
3798        identity: &AccountOwner,
3799    ) -> Result<Option<u32>, ChainClientError> {
3800        // Pretend we do use oracles: If we don't, the round number is never read anyway.
3801        match self.round_for_new_proposal(info, identity, true).await {
3802            // If it is a multi-leader round, use its number for the oracle.
3803            Ok(Either::Left(round)) => Ok(round.multi_leader()),
3804            // If there is no suitable round with oracles, use None: If it works without oracles,
3805            // the block won't read the value. If it returns a timeout, it will be a single-leader
3806            // round, in which the oracle returns None.
3807            Err(ChainClientError::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
3808            Err(err) => Err(err),
3809        }
3810    }
3811
3812    /// Returns a round in which we can propose a new block or the given one, if possible.
3813    async fn round_for_new_proposal(
3814        &self,
3815        info: &ChainInfo,
3816        identity: &AccountOwner,
3817        has_oracle_responses: bool,
3818    ) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
3819        let manager = &info.manager;
3820        let seed = self
3821            .client
3822            .local_node
3823            .get_manager_seed(self.chain_id)
3824            .await?;
3825        // If there is a conflicting proposal in the current round, we can only propose if the
3826        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
3827        // Similarly, we cannot propose a block that uses oracles in the fast round, and also
3828        // skip the fast round if fast blocks are not allowed.
3829        let skip_fast = manager.current_round.is_fast()
3830            && (has_oracle_responses || !self.options.allow_fast_blocks);
3831        let conflict = manager
3832            .requested_signed_proposal
3833            .as_ref()
3834            .into_iter()
3835            .chain(&manager.requested_proposed)
3836            .any(|proposal| proposal.content.round == manager.current_round)
3837            || skip_fast;
3838        let round = if !conflict {
3839            manager.current_round
3840        } else if let Some(round) = manager
3841            .ownership
3842            .next_round(manager.current_round)
3843            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
3844        {
3845            round
3846        } else if let Some(timeout) = info.round_timeout() {
3847            return Ok(Either::Right(timeout));
3848        } else {
3849            return Err(ChainClientError::BlockProposalError(
3850                "Conflicting proposal in the current round",
3851            ));
3852        };
3853        let current_committee = info
3854            .current_committee()?
3855            .validators
3856            .values()
3857            .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
3858            .collect();
3859        if manager.should_propose(identity, round, seed, &current_committee) {
3860            return Ok(Either::Left(round));
3861        }
3862        if let Some(timeout) = info.round_timeout() {
3863            return Ok(Either::Right(timeout));
3864        }
3865        Err(ChainClientError::BlockProposalError(
3866            "Not a leader in the current round",
3867        ))
3868    }
3869
3870    /// Clears the information on any operation that previously failed.
3871    #[cfg(with_testing)]
3872    #[instrument(level = "trace")]
3873    pub fn clear_pending_proposal(&self) {
3874        self.update_state(|state| state.clear_pending_proposal());
3875    }
3876
3877    /// Rotates the key of the chain.
3878    ///
3879    /// Replaces current owners of the chain with the new key pair.
3880    #[instrument(level = "trace")]
3881    pub async fn rotate_key_pair(
3882        &self,
3883        public_key: AccountPublicKey,
3884    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3885        Box::pin(self.transfer_ownership(public_key.into())).await
3886    }
3887
3888    /// Transfers ownership of the chain to a single super owner.
3889    #[instrument(level = "trace")]
3890    pub async fn transfer_ownership(
3891        &self,
3892        new_owner: AccountOwner,
3893    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3894        Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
3895            super_owners: vec![new_owner],
3896            owners: Vec::new(),
3897            multi_leader_rounds: 2,
3898            open_multi_leader_rounds: false,
3899            timeout_config: TimeoutConfig::default(),
3900        }))
3901        .await
3902    }
3903
3904    /// Adds another owner to the chain, and turns existing super owners into regular owners.
3905    #[instrument(level = "trace")]
3906    pub async fn share_ownership(
3907        &self,
3908        new_owner: AccountOwner,
3909        new_weight: u64,
3910    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3911        let ownership = self.prepare_chain().await?.manager.ownership;
3912        ensure!(
3913            ownership.is_active(),
3914            ChainError::InactiveChain(self.chain_id)
3915        );
3916        let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
3917        owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
3918        owners.push((new_owner, new_weight));
3919        let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
3920            super_owners: Vec::new(),
3921            owners,
3922            multi_leader_rounds: ownership.multi_leader_rounds,
3923            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3924            timeout_config: ownership.timeout_config,
3925        })];
3926        match self.execute_block(operations, vec![]).await? {
3927            ClientOutcome::Committed(certificate) => Ok(ClientOutcome::Committed(certificate)),
3928            ClientOutcome::Conflict(certificate) => {
3929                info!(
3930                    height = %certificate.block().header.height,
3931                    "Another block was committed."
3932                );
3933                Ok(ClientOutcome::Conflict(certificate))
3934            }
3935            ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
3936        }
3937    }
3938
3939    /// Returns the current ownership settings on this chain.
3940    #[instrument(level = "trace")]
3941    pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, ChainClientError> {
3942        Ok(self
3943            .client
3944            .local_node
3945            .chain_state_view(self.chain_id)
3946            .await?
3947            .execution_state
3948            .system
3949            .ownership
3950            .get()
3951            .clone())
3952    }
3953
3954    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
3955    /// `remove_owners` is `true`.
3956    #[instrument(level = "trace")]
3957    pub async fn change_ownership(
3958        &self,
3959        ownership: ChainOwnership,
3960    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3961        Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
3962            super_owners: ownership.super_owners.into_iter().collect(),
3963            owners: ownership.owners.into_iter().collect(),
3964            multi_leader_rounds: ownership.multi_leader_rounds,
3965            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3966            timeout_config: ownership.timeout_config.clone(),
3967        }))
3968        .await
3969    }
3970
3971    /// Returns the current application permissions on this chain.
3972    #[instrument(level = "trace")]
3973    pub async fn query_application_permissions(
3974        &self,
3975    ) -> Result<ApplicationPermissions, ChainClientError> {
3976        Ok(self
3977            .client
3978            .local_node
3979            .chain_state_view(self.chain_id)
3980            .await?
3981            .execution_state
3982            .system
3983            .application_permissions
3984            .get()
3985            .clone())
3986    }
3987
3988    /// Changes the application permissions configuration on this chain.
3989    #[instrument(level = "trace", skip(application_permissions))]
3990    pub async fn change_application_permissions(
3991        &self,
3992        application_permissions: ApplicationPermissions,
3993    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3994        Box::pin(
3995            self.execute_operation(SystemOperation::ChangeApplicationPermissions(
3996                application_permissions,
3997            )),
3998        )
3999        .await
4000    }
4001
4002    /// Opens a new chain with a derived UID.
4003    #[instrument(level = "trace", skip(self))]
4004    pub async fn open_chain(
4005        &self,
4006        ownership: ChainOwnership,
4007        application_permissions: ApplicationPermissions,
4008        balance: Amount,
4009    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, ChainClientError>
4010    {
4011        let config = OpenChainConfig {
4012            ownership: ownership.clone(),
4013            balance,
4014            application_permissions: application_permissions.clone(),
4015        };
4016        let operation = Operation::system(SystemOperation::OpenChain(config));
4017        let certificate = match self.execute_block(vec![operation], vec![]).await? {
4018            ClientOutcome::Committed(certificate) => certificate,
4019            ClientOutcome::Conflict(certificate) => {
4020                return Ok(ClientOutcome::Conflict(certificate));
4021            }
4022            ClientOutcome::WaitForTimeout(timeout) => {
4023                return Ok(ClientOutcome::WaitForTimeout(timeout));
4024            }
4025        };
4026        // The only operation, i.e. the last transaction, created the new chain.
4027        let chain_blob = certificate
4028            .block()
4029            .body
4030            .blobs
4031            .last()
4032            .and_then(|blobs| blobs.last())
4033            .ok_or_else(|| ChainClientError::InternalError("Failed to create a new chain"))?;
4034        let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
4035        // If we have a key for any owner, add it to the list of tracked chains.
4036        for owner in ownership.all_owners() {
4037            if self.client.has_key_for(owner).await? {
4038                self.client
4039                    .extend_chain_mode(description.id(), ListeningMode::FullChain);
4040                break;
4041            }
4042        }
4043        self.client
4044            .local_node
4045            .retry_pending_cross_chain_requests(self.chain_id)
4046            .await?;
4047        Ok(ClientOutcome::Committed((description, certificate)))
4048    }
4049
4050    /// Closes the chain (and loses everything in it!!).
4051    /// Returns `None` if the chain was already closed.
4052    #[instrument(level = "trace")]
4053    pub async fn close_chain(
4054        &self,
4055    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
4056        match Box::pin(self.execute_operation(SystemOperation::CloseChain)).await {
4057            Ok(outcome) => Ok(outcome.map(Some)),
4058            Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
4059                WorkerError::ChainError(chain_error),
4060            ))) if matches!(*chain_error, ChainError::ClosedChain) => {
4061                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
4062            }
4063            Err(error) => Err(error),
4064        }
4065    }
4066
4067    /// Publishes some module.
4068    #[cfg(not(target_arch = "wasm32"))]
4069    #[instrument(level = "trace", skip(contract, service))]
4070    pub async fn publish_module(
4071        &self,
4072        contract: Bytecode,
4073        service: Bytecode,
4074        vm_runtime: VmRuntime,
4075    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
4076        let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
4077        Box::pin(self.publish_module_blobs(blobs, module_id)).await
4078    }
4079
4080    /// Publishes some module.
4081    #[cfg(not(target_arch = "wasm32"))]
4082    #[instrument(level = "trace", skip(blobs, module_id))]
4083    pub async fn publish_module_blobs(
4084        &self,
4085        blobs: Vec<Blob>,
4086        module_id: ModuleId,
4087    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
4088        self.execute_operations(
4089            vec![Operation::system(SystemOperation::PublishModule {
4090                module_id,
4091            })],
4092            blobs,
4093        )
4094        .await?
4095        .try_map(|certificate| Ok((module_id, certificate)))
4096    }
4097
4098    /// Publishes some data blobs.
4099    #[instrument(level = "trace", skip(bytes))]
4100    pub async fn publish_data_blobs(
4101        &self,
4102        bytes: Vec<Vec<u8>>,
4103    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4104        let blobs = bytes.into_iter().map(Blob::new_data);
4105        let publish_blob_operations = blobs
4106            .clone()
4107            .map(|blob| {
4108                Operation::system(SystemOperation::PublishDataBlob {
4109                    blob_hash: blob.id().hash,
4110                })
4111            })
4112            .collect();
4113        self.execute_operations(publish_blob_operations, blobs.collect())
4114            .await
4115    }
4116
4117    /// Publishes some data blob.
4118    #[instrument(level = "trace", skip(bytes))]
4119    pub async fn publish_data_blob(
4120        &self,
4121        bytes: Vec<u8>,
4122    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4123        Box::pin(self.publish_data_blobs(vec![bytes])).await
4124    }
4125
4126    /// Creates an application by instantiating some bytecode.
4127    #[instrument(
4128        level = "trace",
4129        skip(self, parameters, instantiation_argument, required_application_ids)
4130    )]
4131    pub async fn create_application<
4132        A: Abi,
4133        Parameters: Serialize,
4134        InstantiationArgument: Serialize,
4135    >(
4136        &self,
4137        module_id: ModuleId<A, Parameters, InstantiationArgument>,
4138        parameters: &Parameters,
4139        instantiation_argument: &InstantiationArgument,
4140        required_application_ids: Vec<ApplicationId>,
4141    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, ChainClientError>
4142    {
4143        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
4144        let parameters = serde_json::to_vec(parameters)?;
4145        Ok(Box::pin(self.create_application_untyped(
4146            module_id.forget_abi(),
4147            parameters,
4148            instantiation_argument,
4149            required_application_ids,
4150        ))
4151        .await?
4152        .map(|(app_id, cert)| (app_id.with_abi(), cert)))
4153    }
4154
4155    /// Creates an application by instantiating some bytecode.
4156    #[instrument(
4157        level = "trace",
4158        skip(
4159            self,
4160            module_id,
4161            parameters,
4162            instantiation_argument,
4163            required_application_ids
4164        )
4165    )]
4166    pub async fn create_application_untyped(
4167        &self,
4168        module_id: ModuleId,
4169        parameters: Vec<u8>,
4170        instantiation_argument: Vec<u8>,
4171        required_application_ids: Vec<ApplicationId>,
4172    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, ChainClientError> {
4173        Box::pin(self.execute_operation(SystemOperation::CreateApplication {
4174            module_id,
4175            parameters,
4176            instantiation_argument,
4177            required_application_ids,
4178        }))
4179        .await?
4180        .try_map(|certificate| {
4181            // The first message of the only operation created the application.
4182            let mut creation: Vec<_> = certificate
4183                .block()
4184                .created_blob_ids()
4185                .into_iter()
4186                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
4187                .collect();
4188            if creation.len() > 1 {
4189                return Err(ChainClientError::InternalError(
4190                    "Unexpected number of application descriptions published",
4191                ));
4192            }
4193            let blob_id = creation.pop().ok_or(ChainClientError::InternalError(
4194                "ApplicationDescription blob not found.",
4195            ))?;
4196            let id = ApplicationId::new(blob_id.hash);
4197            Ok((id, certificate))
4198        })
4199    }
4200
4201    /// Creates a new committee and starts using it (admin chains only).
4202    #[instrument(level = "trace", skip(committee))]
4203    pub async fn stage_new_committee(
4204        &self,
4205        committee: Committee,
4206    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4207        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
4208        let blob_hash = blob.id().hash;
4209        match self
4210            .execute_operations(
4211                vec![Operation::system(SystemOperation::Admin(
4212                    AdminOperation::PublishCommitteeBlob { blob_hash },
4213                ))],
4214                vec![blob],
4215            )
4216            .await?
4217        {
4218            ClientOutcome::Committed(_) => {}
4219            outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
4220            outcome @ ClientOutcome::Conflict(_) => return Ok(outcome),
4221        }
4222        let epoch = Box::pin(self.chain_info()).await?.epoch.try_add_one()?;
4223        Box::pin(
4224            self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
4225                epoch,
4226                blob_hash,
4227            })),
4228        )
4229        .await
4230    }
4231
4232    /// Synchronizes the chain with the validators and creates blocks without any operations to
4233    /// process all incoming messages. This may require several blocks.
4234    ///
4235    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
4236    /// is returned, too.
4237    #[instrument(level = "trace")]
4238    pub async fn process_inbox(
4239        &self,
4240    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
4241        self.prepare_chain().await?;
4242        self.process_inbox_without_prepare().await
4243    }
4244
4245    /// Creates blocks without any operations to process all incoming messages. This may require
4246    /// several blocks.
4247    ///
4248    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
4249    /// is returned, too.
4250    #[instrument(level = "trace")]
4251    pub async fn process_inbox_without_prepare(
4252        &self,
4253    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
4254        #[cfg(with_metrics)]
4255        let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
4256
4257        let mut certificates = Vec::new();
4258        loop {
4259            // We provide no operations - this means that the only operations executed
4260            // will be epoch changes, receiving messages and processing event stream
4261            // updates, if any are pending.
4262            match self.execute_block(vec![], vec![]).await {
4263                Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
4264                Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
4265                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
4266                    return Ok((certificates, Some(timeout)));
4267                }
4268                // Nothing in the inbox and no stream updates to be processed.
4269                Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
4270                    WorkerError::ChainError(chain_error),
4271                ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
4272                    return Ok((certificates, None));
4273                }
4274                Err(error) => return Err(error),
4275            };
4276        }
4277    }
4278
4279    /// Returns operations to process all pending epoch changes: first the new epochs, in order,
4280    /// then the removed epochs, in order.
4281    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, ChainClientError> {
4282        let (mut min_epoch, mut next_epoch) = {
4283            let (epoch, committees) = self.epoch_and_committees().await?;
4284            let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
4285            (min_epoch, epoch.try_add_one()?)
4286        };
4287        let mut epoch_change_ops = Vec::new();
4288        while self
4289            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
4290            .await?
4291        {
4292            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
4293                next_epoch,
4294            )));
4295            next_epoch.try_add_assign_one()?;
4296        }
4297        while self
4298            .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
4299            .await?
4300        {
4301            epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
4302                min_epoch,
4303            )));
4304            min_epoch.try_add_assign_one()?;
4305        }
4306        Ok(epoch_change_ops)
4307    }
4308
4309    /// Returns whether the system event on the admin chain with the given stream name and key
4310    /// exists in storage.
4311    async fn has_admin_event(
4312        &self,
4313        stream_name: &[u8],
4314        index: u32,
4315    ) -> Result<bool, ChainClientError> {
4316        let event_id = EventId {
4317            chain_id: self.client.admin_chain_id,
4318            stream_id: StreamId::system(stream_name),
4319            index,
4320        };
4321        Ok(self
4322            .client
4323            .storage_client()
4324            .read_event(event_id)
4325            .await?
4326            .is_some())
4327    }
4328
4329    /// Returns the indices and events from the storage
4330    pub async fn events_from_index(
4331        &self,
4332        stream_id: StreamId,
4333        start_index: u32,
4334    ) -> Result<Vec<IndexAndEvent>, ChainClientError> {
4335        Ok(self
4336            .client
4337            .storage_client()
4338            .read_events_from_index(&self.chain_id, &stream_id, start_index)
4339            .await?)
4340    }
4341
4342    /// Deprecates all the configurations of voting rights up to the given one (admin chains
4343    /// only). Currently, each individual chain is still entitled to wait before accepting
4344    /// this command. However, it is expected that deprecated validators stop functioning
4345    /// shortly after such command is issued.
4346    #[instrument(level = "trace")]
4347    pub async fn revoke_epochs(
4348        &self,
4349        revoked_epoch: Epoch,
4350    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4351        self.prepare_chain().await?;
4352        let (current_epoch, committees) = self.epoch_and_committees().await?;
4353        ensure!(
4354            revoked_epoch < current_epoch,
4355            ChainClientError::CannotRevokeCurrentEpoch(current_epoch)
4356        );
4357        ensure!(
4358            committees.contains_key(&revoked_epoch),
4359            ChainClientError::EpochAlreadyRevoked
4360        );
4361        let operations = committees
4362            .keys()
4363            .filter_map(|epoch| {
4364                if *epoch <= revoked_epoch {
4365                    Some(Operation::system(SystemOperation::Admin(
4366                        AdminOperation::RemoveCommittee { epoch: *epoch },
4367                    )))
4368                } else {
4369                    None
4370                }
4371            })
4372            .collect();
4373        self.execute_operations(operations, vec![]).await
4374    }
4375
4376    /// Sends money to a chain.
4377    /// Do not check balance. (This may block the client)
4378    /// Do not confirm the transaction.
4379    #[instrument(level = "trace")]
4380    pub async fn transfer_to_account_unsafe_unconfirmed(
4381        &self,
4382        owner: AccountOwner,
4383        amount: Amount,
4384        recipient: Account,
4385    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4386        Box::pin(self.execute_operation(SystemOperation::Transfer {
4387            owner,
4388            recipient,
4389            amount,
4390        }))
4391        .await
4392    }
4393
4394    #[instrument(level = "trace", skip(hash))]
4395    pub async fn read_confirmed_block(
4396        &self,
4397        hash: CryptoHash,
4398    ) -> Result<ConfirmedBlock, ChainClientError> {
4399        let block = self
4400            .client
4401            .storage_client()
4402            .read_confirmed_block(hash)
4403            .await?;
4404        block.ok_or(ChainClientError::MissingConfirmedBlock(hash))
4405    }
4406
4407    #[instrument(level = "trace", skip(hash))]
4408    pub async fn read_certificate(
4409        &self,
4410        hash: CryptoHash,
4411    ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
4412        let certificate = self.client.storage_client().read_certificate(hash).await?;
4413        certificate.ok_or(ChainClientError::ReadCertificatesError(vec![hash]))
4414    }
4415
4416    /// Handles any cross-chain requests for any pending outgoing messages.
4417    #[instrument(level = "trace")]
4418    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), ChainClientError> {
4419        self.client
4420            .local_node
4421            .retry_pending_cross_chain_requests(self.chain_id)
4422            .await?;
4423        Ok(())
4424    }
4425
4426    #[instrument(level = "trace", skip(local_node))]
4427    async fn local_chain_info(
4428        &self,
4429        chain_id: ChainId,
4430        local_node: &mut LocalNodeClient<Env::Storage>,
4431    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
4432        match local_node.chain_info(chain_id).await {
4433            Ok(info) => {
4434                // Useful in case `chain_id` is the same as a local chain.
4435                self.client.update_from_info(&info);
4436                Ok(Some(info))
4437            }
4438            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
4439            Err(err) => Err(err.into()),
4440        }
4441    }
4442
4443    #[instrument(level = "trace", skip(chain_id, local_node))]
4444    async fn local_next_block_height(
4445        &self,
4446        chain_id: ChainId,
4447        local_node: &mut LocalNodeClient<Env::Storage>,
4448    ) -> Result<BlockHeight, ChainClientError> {
4449        Ok(self
4450            .local_chain_info(chain_id, local_node)
4451            .await?
4452            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
4453    }
4454
4455    /// Returns the next height we expect to receive from the given sender chain, according to the
4456    /// local inbox.
4457    #[instrument(level = "trace")]
4458    async fn local_next_height_to_receive(
4459        &self,
4460        origin: ChainId,
4461    ) -> Result<BlockHeight, ChainClientError> {
4462        Ok(self
4463            .client
4464            .local_node
4465            .get_inbox_next_height(self.chain_id, origin)
4466            .await?)
4467    }
4468
4469    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
4470    async fn process_notification(
4471        &self,
4472        remote_node: RemoteNode<Env::ValidatorNode>,
4473        mut local_node: LocalNodeClient<Env::Storage>,
4474        notification: Notification,
4475    ) -> Result<(), ChainClientError> {
4476        let mode = self.client.chain_mode(notification.chain_id);
4477        let dominated = mode
4478            .as_ref()
4479            .is_none_or(|mode| !mode.is_relevant(&notification.reason));
4480        if dominated {
4481            debug!(
4482                chain_id = %notification.chain_id,
4483                reason = ?notification.reason,
4484                listening_mode = ?mode,
4485                "Ignoring notification due to listening mode"
4486            );
4487            return Ok(());
4488        }
4489        match notification.reason {
4490            Reason::NewIncomingBundle { origin, height } => {
4491                if self.local_next_height_to_receive(origin).await? > height {
4492                    debug!(
4493                        chain_id = %self.chain_id,
4494                        "Accepting redundant notification for new message"
4495                    );
4496                    return Ok(());
4497                }
4498                self.client
4499                    .download_sender_block_with_sending_ancestors(
4500                        self.chain_id,
4501                        origin,
4502                        height,
4503                        &remote_node,
4504                    )
4505                    .await?;
4506                if self.local_next_height_to_receive(origin).await? <= height {
4507                    info!(
4508                        chain_id = %self.chain_id,
4509                        "NewIncomingBundle: Fail to synchronize new message after notification"
4510                    );
4511                }
4512            }
4513            Reason::NewBlock {
4514                height,
4515                hash,
4516                event_streams,
4517                ..
4518            } => {
4519                let chain_id = notification.chain_id;
4520                let local_height = self
4521                    .local_next_block_height(chain_id, &mut local_node)
4522                    .await?;
4523                if local_height > height {
4524                    debug!(
4525                        chain_id = %self.chain_id,
4526                        "Accepting redundant notification for new block"
4527                    );
4528                    return Ok(());
4529                }
4530                // In EventsOnly mode, download only event-bearing blocks sparsely
4531                // instead of doing a full sync. This also handles old validators
4532                // that emit NewBlock but not NewEvents.
4533                if let Some(ListeningMode::EventsOnly(subscribed)) =
4534                    self.client.chain_mode(chain_id)
4535                {
4536                    if !event_streams.is_empty() {
4537                        self.client
4538                            .download_event_bearing_blocks(
4539                                chain_id,
4540                                height,
4541                                hash,
4542                                local_height,
4543                                &subscribed,
4544                                &remote_node,
4545                            )
4546                            .await?;
4547                    }
4548                } else {
4549                    self.client
4550                        .synchronize_chain_state_from(&remote_node, chain_id)
4551                        .await?;
4552                    if self
4553                        .local_next_block_height(chain_id, &mut local_node)
4554                        .await?
4555                        <= height
4556                    {
4557                        error!("NewBlock: Fail to synchronize new block after notification");
4558                    }
4559                }
4560            }
4561            Reason::NewEvents { height, hash, .. } => {
4562                let chain_id = notification.chain_id;
4563                let local_height = self
4564                    .local_next_block_height(chain_id, &mut local_node)
4565                    .await?;
4566                if local_height > height {
4567                    debug!(
4568                        chain_id = %self.chain_id,
4569                        "Accepting redundant notification for new events"
4570                    );
4571                    return Ok(());
4572                }
4573                let subscribed = match self.client.chain_mode(chain_id) {
4574                    Some(ListeningMode::EventsOnly(streams)) => streams,
4575                    _ => return Ok(()),
4576                };
4577                self.client
4578                    .download_event_bearing_blocks(
4579                        chain_id,
4580                        height,
4581                        hash,
4582                        local_height,
4583                        &subscribed,
4584                        &remote_node,
4585                    )
4586                    .await?;
4587            }
4588            Reason::NewRound { height, round } => {
4589                let chain_id = notification.chain_id;
4590                if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
4591                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
4592                        debug!(
4593                            chain_id = %self.chain_id,
4594                            "Accepting redundant notification for new round"
4595                        );
4596                        return Ok(());
4597                    }
4598                }
4599                self.client
4600                    .synchronize_chain_state_from(&remote_node, chain_id)
4601                    .await?;
4602                let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
4603                    error!(
4604                        chain_id = %self.chain_id,
4605                        "NewRound: Fail to read local chain info for {chain_id}"
4606                    );
4607                    return Ok(());
4608                };
4609                if (info.next_block_height, info.manager.current_round) < (height, round) {
4610                    info!(
4611                        chain_id = %self.chain_id,
4612                        "NewRound: Fail to synchronize new block after notification"
4613                    );
4614                }
4615            }
4616            Reason::BlockExecuted { .. } => {
4617                // Ignored.
4618            }
4619        }
4620        Ok(())
4621    }
4622
4623    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
4624    pub fn is_tracked(&self) -> bool {
4625        self.client.is_tracked(self.chain_id)
4626    }
4627
4628    /// Returns the listening mode for this chain, if it is tracked.
4629    pub fn listening_mode(&self) -> Option<ListeningMode> {
4630        self.client.chain_mode(self.chain_id)
4631    }
4632
4633    /// Spawns a task that listens to notifications about the current chain from all validators,
4634    /// and synchronizes the local state accordingly.
4635    ///
4636    /// The listening mode must be set in `Client::chain_modes` before calling this method.
4637    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
4638    pub async fn listen(
4639        &self,
4640    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), ChainClientError> {
4641        use future::FutureExt as _;
4642
4643        async fn await_while_polling<F: FusedFuture>(
4644            future: F,
4645            background_work: impl FusedStream<Item = ()>,
4646        ) -> F::Output {
4647            tokio::pin!(future);
4648            tokio::pin!(background_work);
4649            loop {
4650                futures::select! {
4651                    _ = background_work.next() => (),
4652                    result = future => return result,
4653                }
4654            }
4655        }
4656
4657        let mut senders = HashMap::new(); // Senders to cancel notification streams.
4658        let notifications = self.subscribe()?;
4659        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
4660
4661        // Beware: if this future ceases to make progress, notification processing will
4662        // deadlock, because of the issue described in
4663        // https://github.com/linera-io/linera-protocol/pull/1173.
4664
4665        // TODO(#2013): replace this lock with an asynchronous communication channel
4666
4667        let mut process_notifications = FuturesUnordered::new();
4668
4669        match self.update_notification_streams(&mut senders).await {
4670            Ok(handler) => process_notifications.push(handler),
4671            Err(error) => error!("Failed to update committee: {error}"),
4672        };
4673
4674        let this = self.clone();
4675        let update_streams = async move {
4676            let mut abortable_notifications = abortable_notifications.fuse();
4677
4678            while let Some(notification) =
4679                await_while_polling(abortable_notifications.next(), &mut process_notifications)
4680                    .await
4681            {
4682                // Re-subscribe to validators on NewBlock to handle committee changes.
4683                // Skip this for EventsOnly chains — they don't participate in governance
4684                // and re-subscribing would trigger a full sync that defeats sparse download.
4685                if let Reason::NewBlock { .. } = notification.reason {
4686                    let is_events_only = this
4687                        .listening_mode()
4688                        .is_some_and(|m| matches!(m, ListeningMode::EventsOnly(_)));
4689                    if !is_events_only {
4690                        match Box::pin(await_while_polling(
4691                            this.update_notification_streams(&mut senders).fuse(),
4692                            &mut process_notifications,
4693                        ))
4694                        .await
4695                        {
4696                            Ok(handler) => process_notifications.push(handler),
4697                            Err(error) => error!("Failed to update committee: {error}"),
4698                        }
4699                    }
4700                }
4701            }
4702
4703            for abort in senders.into_values() {
4704                abort.abort();
4705            }
4706
4707            let () = process_notifications.collect().await;
4708        }
4709        .in_current_span();
4710
4711        Ok((update_streams, AbortOnDrop(abort), notifications))
4712    }
4713
4714    #[instrument(level = "trace", skip(senders))]
4715    async fn update_notification_streams(
4716        &self,
4717        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
4718    ) -> Result<impl Future<Output = ()>, ChainClientError> {
4719        let events_only = self
4720            .listening_mode()
4721            .is_some_and(|m| matches!(m, ListeningMode::EventsOnly(_)));
4722        let (nodes, local_node) = {
4723            // For EventsOnly chains, use the admin chain's committee: the chain's own
4724            // committee may be stale (we don't track epoch changes), and
4725            // `local_committee()` could trigger a full sync on `BlobsNotFound`.
4726            let committee = if events_only {
4727                let (_, committee) = self.admin_committee().await?;
4728                committee
4729            } else {
4730                self.local_committee().await?
4731            };
4732            let nodes: HashMap<_, _> = self
4733                .client
4734                .validator_node_provider()
4735                .make_nodes(&committee)?
4736                .collect();
4737            (nodes, self.client.local_node.clone())
4738        };
4739        // Drop removed validators.
4740        senders.retain(|validator, abort| {
4741            if !nodes.contains_key(validator) {
4742                abort.abort();
4743            }
4744            !abort.is_aborted()
4745        });
4746        // Add tasks for new validators.
4747        let validator_tasks = FuturesUnordered::new();
4748        for (public_key, node) in nodes {
4749            let address = node.address();
4750            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
4751                continue;
4752            };
4753            let this = self.clone();
4754            let stream = stream::once({
4755                let node = node.clone();
4756                async move {
4757                    let stream = node.subscribe(vec![this.chain_id]).await?;
4758                    // Only now the notification stream is established. We may have missed
4759                    // notifications since the last time we synchronized.
4760                    // For EventsOnly chains, skip the full sync — new events will be
4761                    // downloaded sparsely via NewEvents notifications.
4762                    if !events_only {
4763                        let remote_node = RemoteNode { public_key, node };
4764                        this.client
4765                            .synchronize_chain_state_from(&remote_node, this.chain_id)
4766                            .await?;
4767                    }
4768                    Ok::<_, ChainClientError>(stream)
4769                }
4770            })
4771            .filter_map(move |result| {
4772                let address = address.clone();
4773                async move {
4774                    if let Err(error) = &result {
4775                        info!(?error, address, "could not connect to validator");
4776                    } else {
4777                        debug!(address, "connected to validator");
4778                    }
4779                    result.ok()
4780                }
4781            })
4782            .flatten();
4783            let (stream, abort) = stream::abortable(stream);
4784            let mut stream = Box::pin(stream);
4785            let abort_on_exit = abort.clone();
4786            let this = self.clone();
4787            let local_node = local_node.clone();
4788            let remote_node = RemoteNode { public_key, node };
4789            validator_tasks.push(async move {
4790                while let Some(notification) = stream.next().await {
4791                    if let Err(error) = this
4792                        .process_notification(
4793                            remote_node.clone(),
4794                            local_node.clone(),
4795                            notification.clone(),
4796                        )
4797                        .await
4798                    {
4799                        tracing::info!(
4800                            chain_id = %this.chain_id,
4801                            address = remote_node.address(),
4802                            ?notification,
4803                            %error,
4804                            "failed to process notification",
4805                        );
4806                    }
4807                }
4808                warn!(
4809                    chain_id = %this.chain_id,
4810                    address = remote_node.address(),
4811                    "Validator notification stream ended; will reconnect on next update"
4812                );
4813                abort_on_exit.abort();
4814            });
4815            entry.insert(abort);
4816        }
4817        Ok(validator_tasks.collect())
4818    }
4819
4820    /// Attempts to update a validator with the local information.
4821    #[instrument(level = "trace", skip(remote_node))]
4822    pub async fn sync_validator(
4823        &self,
4824        remote_node: Env::ValidatorNode,
4825    ) -> Result<(), ChainClientError> {
4826        let validator_next_block_height = match remote_node
4827            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
4828            .await
4829        {
4830            Ok(info) => info.info.next_block_height,
4831            Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
4832            Err(err) => return Err(err.into()),
4833        };
4834        let local_next_block_height = self.chain_info().await?.next_block_height;
4835
4836        if validator_next_block_height >= local_next_block_height {
4837            debug!("Validator is up-to-date with local state");
4838            return Ok(());
4839        }
4840
4841        let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
4842            .map(BlockHeight)
4843            .collect();
4844
4845        let certificates = self
4846            .client
4847            .storage_client()
4848            .read_certificates_by_heights(self.chain_id, &heights)
4849            .await?
4850            .into_iter()
4851            .flatten()
4852            .collect::<Vec<_>>();
4853
4854        for certificate in certificates {
4855            match remote_node
4856                .handle_confirmed_certificate(
4857                    certificate.clone(),
4858                    CrossChainMessageDelivery::NonBlocking,
4859                )
4860                .await
4861            {
4862                Ok(_) => (),
4863                Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
4864                    // Upload the missing blobs we have and retry.
4865                    let missing_blobs: Vec<_> = self
4866                        .client
4867                        .storage_client()
4868                        .read_blobs(&missing_blob_ids)
4869                        .await?
4870                        .into_iter()
4871                        .flatten()
4872                        .collect();
4873                    remote_node.upload_blobs(missing_blobs).await?;
4874                    remote_node
4875                        .handle_confirmed_certificate(
4876                            certificate,
4877                            CrossChainMessageDelivery::NonBlocking,
4878                        )
4879                        .await?;
4880                }
4881                Err(err) => return Err(err.into()),
4882            }
4883        }
4884
4885        Ok(())
4886    }
4887}
4888
4889/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on
4890/// each subsequent node. Returns error `err` if all of the nodes fail.
4891async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
4892    nodes: &[RemoteNode<A>],
4893    f: F,
4894    err: G,
4895    timeout: Duration,
4896) -> Result<V, E2>
4897where
4898    F: Clone + FnOnce(RemoteNode<A>) -> R,
4899    RemoteNode<A>: Clone,
4900    G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
4901    R: Future<Output = Result<V, E1>> + 'a,
4902{
4903    let mut stream = nodes
4904        .iter()
4905        .zip(0..)
4906        .map(|(remote_node, i)| {
4907            let fun = f.clone();
4908            let node = remote_node.clone();
4909            async move {
4910                linera_base::time::timer::sleep(timeout * i * i).await;
4911                fun(node).await.map_err(|err| (remote_node.public_key, err))
4912            }
4913        })
4914        .collect::<FuturesUnordered<_>>();
4915    let mut errors = vec![];
4916    while let Some(maybe_result) = stream.next().await {
4917        match maybe_result {
4918            Ok(result) => return Ok(result),
4919            Err(error) => errors.push(error),
4920        };
4921    }
4922    Err(err(errors))
4923}
4924
4925#[cfg(with_testing)]
4926impl<Env: Environment> ChainClient<Env> {
4927    pub async fn process_notification_from(
4928        &self,
4929        notification: Notification,
4930        validator: (ValidatorPublicKey, &str),
4931    ) {
4932        let mut node_list = self
4933            .client
4934            .validator_node_provider()
4935            .make_nodes_from_list(vec![validator])
4936            .unwrap();
4937        let (public_key, node) = node_list.next().unwrap();
4938        let remote_node = RemoteNode { node, public_key };
4939        let local_node = self.client.local_node.clone();
4940        self.process_notification(remote_node, local_node, notification)
4941            .await
4942            .unwrap();
4943    }
4944}
4945
4946/// Wrapper for `AbortHandle` that aborts when its dropped.
4947#[must_use]
4948pub struct AbortOnDrop(pub AbortHandle);
4949
4950impl Drop for AbortOnDrop {
4951    #[instrument(level = "trace", skip(self))]
4952    fn drop(&mut self) {
4953        self.0.abort();
4954    }
4955}
4956
4957/// A pending proposed block, together with its published blobs.
4958#[derive(Clone, Serialize, Deserialize)]
4959pub struct PendingProposal {
4960    pub block: ProposedBlock,
4961    pub blobs: Vec<Blob>,
4962}
4963
4964enum ReceiveCertificateMode {
4965    NeedsCheck,
4966    AlreadyChecked,
4967}
4968
4969enum CheckCertificateResult {
4970    OldEpoch,
4971    New,
4972    FutureEpoch,
4973}
4974
4975impl CheckCertificateResult {
4976    fn into_result(self) -> Result<(), ChainClientError> {
4977        match self {
4978            Self::OldEpoch => Err(ChainClientError::CommitteeDeprecationError),
4979            Self::New => Ok(()),
4980            Self::FutureEpoch => Err(ChainClientError::CommitteeSynchronizationError),
4981        }
4982    }
4983}
4984
4985/// Creates a compressed Contract, Service and bytecode.
4986#[cfg(not(target_arch = "wasm32"))]
4987pub async fn create_bytecode_blobs(
4988    contract: Bytecode,
4989    service: Bytecode,
4990    vm_runtime: VmRuntime,
4991) -> (Vec<Blob>, ModuleId) {
4992    match vm_runtime {
4993        VmRuntime::Wasm => {
4994            let (compressed_contract, compressed_service) =
4995                tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
4996                    .await
4997                    .expect("Compression should not panic");
4998            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
4999            let service_blob = Blob::new_service_bytecode(compressed_service);
5000            let module_id =
5001                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
5002            (vec![contract_blob, service_blob], module_id)
5003        }
5004        VmRuntime::Evm => {
5005            let compressed_contract = contract.compress();
5006            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
5007            let module_id = ModuleId::new(
5008                evm_contract_blob.id().hash,
5009                evm_contract_blob.id().hash,
5010                vm_runtime,
5011            );
5012            (vec![evm_contract_blob], module_id)
5013        }
5014    }
5015}