linera_core/client/
mod.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
7    convert::Infallible,
8    iter,
9    ops::{Deref, DerefMut},
10    sync::{Arc, RwLock},
11};
12
13use chain_client_state::ChainClientState;
14use custom_debug_derive::Debug;
15use dashmap::{
16    mapref::one::{MappedRef as DashMapMappedRef, Ref as DashMapRef, RefMut as DashMapRefMut},
17    DashMap,
18};
19use futures::{
20    future::{self, Either, FusedFuture, Future},
21    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt},
22};
23#[cfg(with_metrics)]
24use linera_base::prometheus_util::MeasureLatency as _;
25use linera_base::{
26    abi::Abi,
27    crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
28    data_types::{
29        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
30        ChainDescription, Epoch, Round, Timestamp,
31    },
32    ensure,
33    identifiers::{
34        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
35        ModuleId, StreamId,
36    },
37    ownership::{ChainOwnership, TimeoutConfig},
38    time::{Duration, Instant},
39};
40#[cfg(not(target_arch = "wasm32"))]
41use linera_base::{data_types::Bytecode, vm::VmRuntime};
42use linera_chain::{
43    data_types::{
44        BlockProposal, ChainAndHeight, IncomingBundle, LiteVote, MessageAction, ProposedBlock,
45        Transaction,
46    },
47    manager::LockingBlock,
48    types::{
49        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
50        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
51    },
52    ChainError, ChainExecutionContext, ChainStateView,
53};
54use linera_execution::{
55    committee::Committee,
56    system::{
57        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
58        REMOVED_EPOCH_STREAM_NAME,
59    },
60    ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
61};
62use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
63use linera_views::ViewError;
64use rand::prelude::SliceRandom as _;
65use serde::{Deserialize, Serialize};
66use thiserror::Error;
67use tokio::sync::{mpsc, OwnedRwLockReadGuard};
68use tokio_stream::wrappers::UnboundedReceiverStream;
69use tracing::{debug, error, info, instrument, warn, Instrument as _};
70
71use crate::{
72    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout},
73    environment::Environment,
74    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
75    node::{
76        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
77        ValidatorNodeProvider as _,
78    },
79    notifier::ChannelNotifier,
80    remote_node::RemoteNode,
81    updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
82    worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
83};
84
85mod chain_client_state;
86#[cfg(test)]
87#[path = "../unit_tests/client_tests.rs"]
88mod client_tests;
89
90#[cfg(with_metrics)]
91mod metrics {
92    use std::sync::LazyLock;
93
94    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
95    use prometheus::HistogramVec;
96
97    pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
98        LazyLock::new(|| {
99            register_histogram_vec(
100                "process_inbox_latency",
101                "process_inbox latency",
102                &[],
103                exponential_bucket_latencies(500.0),
104            )
105        });
106
107    pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
108        register_histogram_vec(
109            "prepare_chain_latency",
110            "prepare_chain latency",
111            &[],
112            exponential_bucket_latencies(500.0),
113        )
114    });
115
116    pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
117        register_histogram_vec(
118            "synchronize_chain_state_latency",
119            "synchronize_chain_state latency",
120            &[],
121            exponential_bucket_latencies(500.0),
122        )
123    });
124
125    pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
126        register_histogram_vec(
127            "execute_block_latency",
128            "execute_block latency",
129            &[],
130            exponential_bucket_latencies(500.0),
131        )
132    });
133
134    pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
135        register_histogram_vec(
136            "find_received_certificates_latency",
137            "find_received_certificates latency",
138            &[],
139            exponential_bucket_latencies(500.0),
140        )
141    });
142}
143
144/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
145pub struct Client<Env: Environment> {
146    environment: Env,
147    /// Local node to manage the execution state and the local storage of the chains that we are
148    /// tracking.
149    local_node: LocalNodeClient<Env::Storage>,
150    /// The admin chain ID.
151    admin_id: ChainId,
152    /// Chains that should be tracked by the client.
153    // TODO(#2412): Merge with set of chains the client is receiving notifications from validators
154    tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
155    /// References to clients waiting for chain notifications.
156    notifier: Arc<ChannelNotifier<Notification>>,
157    /// Chain state for the managed chains.
158    chains: DashMap<ChainId, ChainClientState>,
159    /// Configuration options.
160    options: ChainClientOptions,
161}
162
163impl<Env: Environment> Client<Env> {
164    /// Creates a new `Client` with a new cache and notifiers.
165    #[instrument(level = "trace", skip_all)]
166    pub fn new(
167        environment: Env,
168        admin_id: ChainId,
169        long_lived_services: bool,
170        tracked_chains: impl IntoIterator<Item = ChainId>,
171        name: impl Into<String>,
172        chain_worker_ttl: Duration,
173        options: ChainClientOptions,
174    ) -> Self {
175        let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
176        let state = WorkerState::new_for_client(
177            name.into(),
178            environment.storage().clone(),
179            tracked_chains.clone(),
180        )
181        .with_long_lived_services(long_lived_services)
182        .with_allow_inactive_chains(true)
183        .with_allow_messages_from_deprecated_epochs(true)
184        .with_chain_worker_ttl(chain_worker_ttl);
185        let local_node = LocalNodeClient::new(state);
186
187        Self {
188            environment,
189            local_node,
190            chains: DashMap::new(),
191            admin_id,
192            tracked_chains,
193            notifier: Arc::new(ChannelNotifier::default()),
194            options,
195        }
196    }
197
198    /// Returns the storage client used by this client's local node.
199    pub fn storage_client(&self) -> &Env::Storage {
200        self.environment.storage()
201    }
202
203    pub fn validator_node_provider(&self) -> &Env::Network {
204        self.environment.network()
205    }
206
207    /// Returns a reference to the [`Signer`] of the client.
208    #[instrument(level = "trace", skip(self))]
209    pub fn signer(&self) -> &impl Signer {
210        self.environment.signer()
211    }
212
213    /// Adds a chain to the set of chains tracked by the local node.
214    #[instrument(level = "trace", skip(self))]
215    pub fn track_chain(&self, chain_id: ChainId) {
216        self.tracked_chains
217            .write()
218            .expect("Panics should not happen while holding a lock to `tracked_chains`")
219            .insert(chain_id);
220    }
221
222    /// Creates a new `ChainClient`.
223    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
224    pub fn create_chain_client(
225        self: &Arc<Self>,
226        chain_id: ChainId,
227        block_hash: Option<CryptoHash>,
228        next_block_height: BlockHeight,
229        pending_proposal: Option<PendingProposal>,
230        preferred_owner: Option<AccountOwner>,
231        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
232    ) -> ChainClient<Env> {
233        // If the entry already exists we assume that the entry is more up to date than
234        // the arguments: If they were read from the wallet file, they might be stale.
235        if let dashmap::mapref::entry::Entry::Vacant(e) = self.chains.entry(chain_id) {
236            e.insert(ChainClientState::new(pending_proposal));
237        }
238
239        ChainClient {
240            client: self.clone(),
241            chain_id,
242            options: self.options.clone(),
243            preferred_owner,
244            initial_block_hash: block_hash,
245            initial_next_block_height: next_block_height,
246            timing_sender,
247        }
248    }
249
250    /// Fetches the chain description blob if needed, and returns the chain info.
251    async fn fetch_chain_info(
252        &self,
253        chain_id: ChainId,
254        validators: &[RemoteNode<Env::ValidatorNode>],
255    ) -> Result<Box<ChainInfo>, ChainClientError> {
256        match self.local_node.chain_info(chain_id).await {
257            Ok(info) => Ok(info),
258            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
259                // Make sure the admin chain is up to date.
260                self.synchronize_chain_state(self.admin_id).await?;
261                // If the chain is missing then the error is a WorkerError
262                // and so a BlobsNotFound
263                self.update_local_node_with_blobs_from(blob_ids, validators)
264                    .await?;
265                Ok(self.local_node.chain_info(chain_id).await?)
266            }
267            Err(err) => Err(err.into()),
268        }
269    }
270
271    /// Downloads and processes all certificates up to (excluding) the specified height.
272    #[instrument(level = "trace", skip(self))]
273    async fn download_certificates(
274        &self,
275        chain_id: ChainId,
276        target_next_block_height: BlockHeight,
277    ) -> Result<Box<ChainInfo>, ChainClientError> {
278        let mut validators = self.validator_nodes().await?;
279        // Sequentially try each validator in random order.
280        validators.shuffle(&mut rand::thread_rng());
281        let mut info = self.fetch_chain_info(chain_id, &validators).await?;
282        for remote_node in validators {
283            if target_next_block_height <= info.next_block_height {
284                return Ok(info);
285            }
286            match self
287                .download_certificates_from(&remote_node, chain_id, target_next_block_height)
288                .await
289            {
290                Err(err) => warn!(
291                    "Failed to download certificates from validator {:?}: {err}",
292                    remote_node.public_key
293                ),
294                Ok(Some(new_info)) => info = new_info,
295                Ok(None) => {}
296            }
297        }
298        ensure!(
299            target_next_block_height <= info.next_block_height,
300            ChainClientError::CannotDownloadCertificates {
301                chain_id,
302                target_next_block_height,
303            }
304        );
305        Ok(info)
306    }
307
308    /// Downloads and processes all certificates up to (excluding) the specified height from the
309    /// given validator.
310    #[instrument(level = "trace", skip_all)]
311    async fn download_certificates_from(
312        &self,
313        remote_node: &RemoteNode<Env::ValidatorNode>,
314        chain_id: ChainId,
315        stop: BlockHeight,
316    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
317        let mut last_info = None;
318        // First load any blocks from local storage, if available.
319        let mut hashes = Vec::new();
320        let mut next_height = BlockHeight::ZERO;
321        {
322            let chain = self.local_node.chain_state_view(chain_id).await?;
323            next_height = next_height.max(chain.tip_state.get().next_block_height);
324            while next_height < stop {
325                let Some(hash) = chain.preprocessed_blocks.get(&next_height).await? else {
326                    break;
327                };
328                hashes.push(hash);
329                next_height = next_height.try_add_one()?;
330            }
331        }
332        let certificates = self
333            .storage_client()
334            .read_certificates(hashes.clone())
335            .await?;
336        let certificates = match ResultReadCertificates::new(certificates, hashes) {
337            ResultReadCertificates::Certificates(certificates) => certificates,
338            ResultReadCertificates::InvalidHashes(hashes) => {
339                return Err(ChainClientError::ReadCertificatesError(hashes))
340            }
341        };
342        for certificate in certificates {
343            last_info = Some(self.handle_certificate(Box::new(certificate)).await?.info);
344        }
345        // Now download the rest in batches from the remote node.
346        while next_height < stop {
347            // TODO(#2045): Analyze network errors instead of guessing the batch size.
348            let limit = u64::from(stop)
349                .checked_sub(u64::from(next_height))
350                .ok_or(ArithmeticError::Overflow)?
351                .min(1000);
352            let certificates = remote_node
353                .query_certificates_from(chain_id, next_height, limit)
354                .await?;
355            let Some(info) = self.process_certificates(remote_node, certificates).await? else {
356                break;
357            };
358            assert!(info.next_block_height > next_height);
359            next_height = info.next_block_height;
360            last_info = Some(info);
361        }
362        Ok(last_info)
363    }
364
365    /// Tries to process all the certificates, requesting any missing blobs from the given node.
366    /// Returns the chain info of the last successfully processed certificate.
367    #[instrument(level = "trace", skip_all)]
368    async fn process_certificates(
369        &self,
370        remote_node: &RemoteNode<impl ValidatorNode>,
371        certificates: Vec<ConfirmedBlockCertificate>,
372    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
373        let mut info = None;
374        for certificate in certificates {
375            let certificate = Box::new(certificate);
376            let mut result = self.handle_certificate(certificate.clone()).await;
377
378            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
379                let blobs = future::join_all(blob_ids.iter().map(|blob_id| async move {
380                    remote_node.try_download_blob(*blob_id).await.unwrap()
381                }))
382                .await;
383                self.local_node.store_blobs(&blobs).await?;
384                result = self.handle_certificate(certificate.clone()).await;
385            }
386
387            info = Some(result?.info);
388        }
389        // Done with all certificates.
390        Ok(info)
391    }
392
393    async fn handle_certificate<T: ProcessableCertificate>(
394        &self,
395        certificate: Box<GenericCertificate<T>>,
396    ) -> Result<ChainInfoResponse, LocalNodeError> {
397        self.local_node
398            .handle_certificate(*certificate, &self.notifier)
399            .await
400    }
401
402    async fn chain_info_with_committees(
403        &self,
404        chain_id: ChainId,
405    ) -> Result<Box<ChainInfo>, LocalNodeError> {
406        let query = ChainInfoQuery::new(chain_id).with_committees();
407        let info = self.local_node.handle_chain_info_query(query).await?.info;
408        Ok(info)
409    }
410
411    /// Obtains all the committees trusted by any of the given chains. Also returns the highest
412    /// of their epochs.
413    #[instrument(level = "trace", skip_all)]
414    async fn admin_committees(
415        &self,
416    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
417        let info = self.chain_info_with_committees(self.admin_id).await?;
418        Ok((info.epoch, info.into_committees()?))
419    }
420
421    /// Obtains the committee for the latest epoch on the admin chain.
422    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
423        let info = self.chain_info_with_committees(self.admin_id).await?;
424        Ok((info.epoch, info.into_current_committee()?))
425    }
426
427    /// Obtains the validators for the latest epoch.
428    async fn validator_nodes(
429        &self,
430    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, ChainClientError> {
431        let (_, committee) = self.admin_committee().await?;
432        Ok(self.make_nodes(&committee)?)
433    }
434
435    /// Creates a [`RemoteNode`] for each validator in the committee.
436    fn make_nodes(
437        &self,
438        committee: &Committee,
439    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
440        Ok(self
441            .validator_node_provider()
442            .make_nodes(committee)?
443            .map(|(public_key, node)| RemoteNode { public_key, node })
444            .collect())
445    }
446
447    /// Ensures that the client has the `ChainDescription` blob corresponding to this
448    /// client's `ChainId`.
449    pub async fn get_chain_description(
450        &self,
451        chain_id: ChainId,
452    ) -> Result<ChainDescription, ChainClientError> {
453        let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
454        let blob = self
455            .local_node
456            .storage_client()
457            .read_blob(chain_desc_id)
458            .await?;
459        if let Some(blob) = blob {
460            // We have the blob - return it.
461            return Ok(bcs::from_bytes(blob.bytes())?);
462        };
463        // Recover history from the current validators, according to the admin chain.
464        self.synchronize_chain_state(self.admin_id).await?;
465        let nodes = self.validator_nodes().await?;
466        let blob = self
467            .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
468            .await?
469            .pop()
470            .unwrap(); // Returns exactly as many blobs as passed-in IDs.
471        Ok(bcs::from_bytes(blob.bytes())?)
472    }
473
474    /// Updates the latest block and next block height and round information from the chain info.
475    #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
476    fn update_from_info(&self, info: &ChainInfo) {
477        if let Some(mut state) = self.chains.get_mut(&info.chain_id) {
478            state.value_mut().update_from_info(info);
479        }
480    }
481
482    /// Handles the certificate in the local node and the resulting notifications.
483    #[instrument(level = "trace", skip_all)]
484    async fn process_certificate<T: ProcessableCertificate>(
485        &self,
486        certificate: Box<GenericCertificate<T>>,
487    ) -> Result<(), LocalNodeError> {
488        let info = self.handle_certificate(certificate).await?.info;
489        self.update_from_info(&info);
490        Ok(())
491    }
492
493    /// Submits a validated block for finalization and returns the confirmed block certificate.
494    #[instrument(level = "trace", skip_all)]
495    async fn finalize_block(
496        &self,
497        committee: &Committee,
498        certificate: ValidatedBlockCertificate,
499    ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
500        debug!(round = %certificate.round, "Submitting block for confirmation");
501        let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
502        let finalize_action = CommunicateAction::FinalizeBlock {
503            certificate: Box::new(certificate),
504            delivery: self.options.cross_chain_message_delivery,
505        };
506        let certificate = self
507            .communicate_chain_action(committee, finalize_action, hashed_value)
508            .await?;
509        self.receive_certificate(certificate.clone(), ReceiveCertificateMode::AlreadyChecked)
510            .await?;
511        Ok(certificate)
512    }
513
514    /// Submits a block proposal to the validators.
515    #[instrument(level = "trace", skip_all)]
516    async fn submit_block_proposal<T: ProcessableCertificate>(
517        &self,
518        committee: &Committee,
519        proposal: Box<BlockProposal>,
520        value: T,
521    ) -> Result<GenericCertificate<T>, ChainClientError> {
522        debug!(
523            round = %proposal.content.round,
524            "Submitting block proposal to validators"
525        );
526        let submit_action = CommunicateAction::SubmitBlock {
527            proposal,
528            blob_ids: value.required_blob_ids().into_iter().collect(),
529        };
530        let certificate = self
531            .communicate_chain_action(committee, submit_action, value)
532            .await?;
533        self.process_certificate(Box::new(certificate.clone()))
534            .await?;
535        Ok(certificate)
536    }
537
538    /// Broadcasts certified blocks to validators.
539    #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
540    async fn communicate_chain_updates(
541        &self,
542        committee: &Committee,
543        chain_id: ChainId,
544        height: BlockHeight,
545        delivery: CrossChainMessageDelivery,
546    ) -> Result<(), ChainClientError> {
547        let nodes = self.make_nodes(committee)?;
548        communicate_with_quorum(
549            &nodes,
550            committee,
551            |_: &()| (),
552            |remote_node| {
553                let mut updater = ValidatorUpdater {
554                    remote_node,
555                    local_node: self.local_node.clone(),
556                    admin_id: self.admin_id,
557                };
558                Box::pin(async move {
559                    updater
560                        .send_chain_information(chain_id, height, delivery)
561                        .await
562                })
563            },
564            self.options.grace_period,
565        )
566        .await?;
567        Ok(())
568    }
569
570    /// Broadcasts certified blocks and optionally a block proposal, certificate or
571    /// leader timeout request.
572    ///
573    /// In that case, it verifies that the validator votes are for the provided value,
574    /// and returns a certificate.
575    #[instrument(level = "trace", skip_all)]
576    async fn communicate_chain_action<T: CertificateValue>(
577        &self,
578        committee: &Committee,
579        action: CommunicateAction,
580        value: T,
581    ) -> Result<GenericCertificate<T>, ChainClientError> {
582        let nodes = self.make_nodes(committee)?;
583        let ((votes_hash, votes_round), votes) = communicate_with_quorum(
584            &nodes,
585            committee,
586            |vote: &LiteVote| (vote.value.value_hash, vote.round),
587            |remote_node| {
588                let mut updater = ValidatorUpdater {
589                    remote_node,
590                    local_node: self.local_node.clone(),
591                    admin_id: self.admin_id,
592                };
593                let action = action.clone();
594                Box::pin(async move { updater.send_chain_update(action).await })
595            },
596            self.options.grace_period,
597        )
598        .await?;
599        ensure!(
600            (votes_hash, votes_round) == (value.hash(), action.round()),
601            ChainClientError::UnexpectedQuorum {
602                hash: votes_hash,
603                round: votes_round,
604                expected_hash: value.hash(),
605                expected_round: action.round(),
606            }
607        );
608        // Certificate is valid because
609        // * `communicate_with_quorum` ensured a sufficient "weight" of
610        // (non-error) answers were returned by validators.
611        // * each answer is a vote signed by the expected validator.
612        let certificate = LiteCertificate::try_from_votes(votes)
613            .ok_or_else(|| {
614                ChainClientError::InternalError("Vote values or rounds don't match; this is a bug")
615            })?
616            .with_value(value)
617            .ok_or_else(|| {
618                ChainClientError::ProtocolError("A quorum voted for an unexpected value")
619            })?;
620        Ok(certificate)
621    }
622
623    /// Processes the confirmed block certificate and its ancestors in the local node, then
624    /// updates the validators up to that certificate.
625    #[instrument(level = "trace", skip_all)]
626    async fn receive_certificate_and_update_validators(
627        &self,
628        certificate: ConfirmedBlockCertificate,
629        mode: ReceiveCertificateMode,
630    ) -> Result<(), ChainClientError> {
631        let block_chain_id = certificate.block().header.chain_id;
632        let block_height = certificate.block().header.height;
633
634        self.receive_certificate(certificate, mode).await?;
635
636        // Make sure a quorum of validators (according to the chain's new committee) are up-to-date
637        // for data availability.
638        let local_committee = self
639            .chain_info_with_committees(block_chain_id)
640            .await?
641            .into_current_committee()?;
642        self.communicate_chain_updates(
643            &local_committee,
644            block_chain_id,
645            block_height.try_add_one()?,
646            CrossChainMessageDelivery::Blocking,
647        )
648        .await?;
649        Ok(())
650    }
651
652    /// Processes the confirmed block certificate in the local node. Also downloads and processes
653    /// all ancestors that are still missing.
654    #[instrument(level = "trace", skip_all)]
655    async fn receive_certificate(
656        &self,
657        certificate: ConfirmedBlockCertificate,
658        mode: ReceiveCertificateMode,
659    ) -> Result<(), ChainClientError> {
660        let certificate = Box::new(certificate);
661        let block = certificate.block();
662
663        // Verify the certificate before doing any expensive networking.
664        let (max_epoch, committees) = self.admin_committees().await?;
665        if let ReceiveCertificateMode::NeedsCheck = mode {
666            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
667        }
668        // Recover history from the network.
669        self.download_certificates(block.header.chain_id, block.header.height)
670            .await?;
671        // Process the received operations. Download required hashed certificate values if
672        // necessary.
673        if let Err(err) = self.process_certificate(certificate.clone()).await {
674            match &err {
675                LocalNodeError::BlobsNotFound(blob_ids) => {
676                    let blobs = RemoteNode::download_blobs(
677                        blob_ids,
678                        &self.validator_nodes().await?,
679                        self.options.blob_download_timeout,
680                    )
681                    .await
682                    .ok_or(err)?;
683                    self.local_node.store_blobs(&blobs).await?;
684                    self.process_certificate(certificate).await?;
685                }
686                _ => {
687                    // The certificate is not as expected. Give up.
688                    warn!("Failed to process network hashed certificate value");
689                    return Err(err.into());
690                }
691            }
692        }
693
694        Ok(())
695    }
696
697    /// Processes the confirmed block in the local node without executing it.
698    #[instrument(level = "trace", skip_all)]
699    #[allow(dead_code)] // Otherwise CI fails when built for docker.
700    async fn receive_sender_certificate(
701        &self,
702        certificate: ConfirmedBlockCertificate,
703        mode: ReceiveCertificateMode,
704        nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
705    ) -> Result<(), ChainClientError> {
706        let certificate = Box::new(certificate);
707
708        // Verify the certificate before doing any expensive networking.
709        let (max_epoch, committees) = self.admin_committees().await?;
710        if let ReceiveCertificateMode::NeedsCheck = mode {
711            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
712        }
713        // Recover history from the network.
714        let nodes = if let Some(nodes) = nodes {
715            nodes
716        } else {
717            self.validator_nodes().await?
718        };
719        if let Err(err) = self.handle_certificate(certificate.clone()).await {
720            match &err {
721                LocalNodeError::BlobsNotFound(blob_ids) => {
722                    let blobs = RemoteNode::download_blobs(
723                        blob_ids,
724                        &nodes,
725                        self.options.blob_download_timeout,
726                    )
727                    .await
728                    .ok_or(err)?;
729                    self.local_node.store_blobs(&blobs).await?;
730                    self.handle_certificate(certificate.clone()).await?;
731                }
732                _ => {
733                    // The certificate is not as expected. Give up.
734                    warn!("Failed to process network hashed certificate value");
735                    return Err(err.into());
736                }
737            }
738        }
739
740        Ok(())
741    }
742
743    /// Downloads and preprocesses all confirmed block certificates that sent any message to this
744    /// chain.
745    #[instrument(level = "trace", skip(self))]
746    async fn synchronize_received_certificates_from_validator(
747        &self,
748        chain_id: ChainId,
749        remote_node: &RemoteNode<Env::ValidatorNode>,
750    ) -> Result<ReceivedCertificatesFromValidator, ChainClientError> {
751        let mut tracker = self
752            .local_node
753            .chain_state_view(chain_id)
754            .await?
755            .received_certificate_trackers
756            .get()
757            .get(&remote_node.public_key)
758            .copied()
759            .unwrap_or(0);
760        let (max_epoch, committees) = self.admin_committees().await?;
761
762        // Retrieve the list of newly received certificates from this validator.
763        let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(tracker);
764        let info = remote_node.handle_chain_info_query(query).await?;
765        let remote_log = info.requested_received_log;
766        let remote_heights = Self::heights_per_chain(&remote_log);
767
768        // Obtain the next block height we need in the local node, for each chain.
769        let local_next_heights = self
770            .local_node
771            .next_outbox_heights(remote_heights.keys(), chain_id)
772            .await?;
773
774        // We keep track of the height we've successfully downloaded and checked, per chain.
775        let mut downloaded_heights = BTreeMap::new();
776        // And we make a list of chains we already fully have locally. We need to make sure to
777        // put all their sent messages into the inbox.
778        let mut other_sender_chains = Vec::new();
779
780        let certificates = future::try_join_all(remote_heights.into_iter().filter_map(
781            |(sender_chain_id, remote_heights)| {
782                let local_next = *local_next_heights.get(&sender_chain_id)?;
783                if let Ok(height) = local_next.try_sub_one() {
784                    downloaded_heights.insert(sender_chain_id, height);
785                }
786                let remote_heights = remote_heights
787                    .into_iter()
788                    .filter(|h| *h >= local_next)
789                    .collect::<Vec<_>>();
790                if remote_heights.is_empty() {
791                    // Our highest, locally executed block is higher than any block height
792                    // from the current batch. Skip this batch, but remember to wait for
793                    // the messages to be delivered to the inboxes.
794                    other_sender_chains.push(sender_chain_id);
795                    return None;
796                };
797                Some(async move {
798                    let certificates = remote_node
799                        .download_certificates_by_heights(sender_chain_id, remote_heights)
800                        .await?;
801                    Ok::<Vec<_>, ChainClientError>(certificates)
802                })
803            },
804        ))
805        .await?
806        .into_iter()
807        .flatten()
808        .collect::<Vec<_>>();
809
810        let mut certificates_by_height_by_chain = BTreeMap::new();
811
812        // Check the signatures and keep only the ones that are valid.
813        for confirmed_block_certificate in certificates {
814            let block_header = &confirmed_block_certificate.inner().block().header;
815            let sender_chain_id = block_header.chain_id;
816            let height = block_header.height;
817            let epoch = block_header.epoch;
818            match Self::check_certificate(max_epoch, &committees, &confirmed_block_certificate)? {
819                CheckCertificateResult::FutureEpoch => {
820                    warn!(
821                        "Postponing received certificate from {sender_chain_id:.8} at height \
822                         {height} from future epoch {epoch}"
823                    );
824                    // Do not process this certificate now. It can still be
825                    // downloaded later, once our committee is updated.
826                }
827                CheckCertificateResult::OldEpoch => {
828                    // This epoch is not recognized any more. Let's skip the certificate.
829                    // If a higher block with a recognized epoch comes up later from the
830                    // same chain, the call to `receive_certificate` below will download
831                    // the skipped certificate again.
832                    warn!("Skipping received certificate from past epoch {epoch:?}");
833                }
834                CheckCertificateResult::New => {
835                    certificates_by_height_by_chain
836                        .entry(sender_chain_id)
837                        .or_insert_with(BTreeMap::new)
838                        .insert(height, confirmed_block_certificate);
839                }
840            }
841        }
842
843        // Increase the tracker up to the first position we haven't downloaded.
844        for entry in remote_log {
845            if certificates_by_height_by_chain
846                .get(&entry.chain_id)
847                .is_some_and(|certs| certs.contains_key(&entry.height))
848            {
849                tracker += 1;
850            } else {
851                break;
852            }
853        }
854
855        for (sender_chain_id, certs) in &mut certificates_by_height_by_chain {
856            if certs
857                .values()
858                .any(|cert| !cert.block().recipients().contains(&chain_id))
859            {
860                warn!(
861                    "Skipping received certificates from chain {sender_chain_id:.8}:
862                    No messages for {chain_id:.8}."
863                );
864                certs.clear();
865            }
866        }
867
868        Ok(ReceivedCertificatesFromValidator {
869            public_key: remote_node.public_key,
870            tracker,
871            certificates: certificates_by_height_by_chain
872                .into_values()
873                .flat_map(BTreeMap::into_values)
874                .collect(),
875            other_sender_chains,
876        })
877    }
878
879    #[instrument(
880        level = "trace", skip_all,
881        fields(certificate_hash = ?incoming_certificate.hash()),
882    )]
883    fn check_certificate(
884        highest_known_epoch: Epoch,
885        committees: &BTreeMap<Epoch, Committee>,
886        incoming_certificate: &ConfirmedBlockCertificate,
887    ) -> Result<CheckCertificateResult, NodeError> {
888        let block = incoming_certificate.block();
889        // Check that certificates are valid w.r.t one of our trusted committees.
890        if block.header.epoch > highest_known_epoch {
891            return Ok(CheckCertificateResult::FutureEpoch);
892        }
893        if let Some(known_committee) = committees.get(&block.header.epoch) {
894            // This epoch is recognized by our chain. Let's verify the
895            // certificate.
896            incoming_certificate.check(known_committee)?;
897            Ok(CheckCertificateResult::New)
898        } else {
899            // We don't accept a certificate from a committee that was retired.
900            Ok(CheckCertificateResult::OldEpoch)
901        }
902    }
903
904    /// Given a set of chain ID-block height pairs, returns a map that assigns to each chain ID
905    /// the set of heights. The returned map contains no empty values.
906    fn heights_per_chain(
907        remote_log: &[ChainAndHeight],
908    ) -> BTreeMap<ChainId, BTreeSet<BlockHeight>> {
909        remote_log.iter().fold(
910            BTreeMap::<ChainId, BTreeSet<_>>::new(),
911            |mut chain_to_info, entry| {
912                chain_to_info
913                    .entry(entry.chain_id)
914                    .or_default()
915                    .insert(entry.height);
916                chain_to_info
917            },
918        )
919    }
920
921    /// Downloads and processes any certificates we are missing for the given chain.
922    #[instrument(level = "trace", skip_all)]
923    async fn synchronize_chain_state(
924        &self,
925        chain_id: ChainId,
926    ) -> Result<Box<ChainInfo>, ChainClientError> {
927        let (_, committee) = self.admin_committee().await?;
928        self.synchronize_chain_state_from_committee(chain_id, committee)
929            .await
930    }
931
932    /// Downloads and processes any certificates we are missing for the given chain, from the given
933    /// committee.
934    #[instrument(level = "trace", skip_all)]
935    pub async fn synchronize_chain_state_from_committee(
936        &self,
937        chain_id: ChainId,
938        committee: Committee,
939    ) -> Result<Box<ChainInfo>, ChainClientError> {
940        #[cfg(with_metrics)]
941        let _latency = metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency();
942
943        let validators = self.make_nodes(&committee)?;
944        Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
945        communicate_with_quorum(
946            &validators,
947            &committee,
948            |_: &()| (),
949            |remote_node| async move {
950                self.synchronize_chain_state_from(&remote_node, chain_id)
951                    .await
952            },
953            self.options.grace_period,
954        )
955        .await?;
956
957        self.local_node
958            .chain_info(chain_id)
959            .await
960            .map_err(Into::into)
961    }
962
963    /// Downloads any certificates from the specified validator that we are missing for the given
964    /// chain, and processes them.
965    #[instrument(level = "trace", skip(self, remote_node, chain_id))]
966    async fn synchronize_chain_state_from(
967        &self,
968        remote_node: &RemoteNode<Env::ValidatorNode>,
969        chain_id: ChainId,
970    ) -> Result<(), ChainClientError> {
971        let mut local_info = self.local_node.chain_info(chain_id).await?;
972        let query = ChainInfoQuery::new(chain_id).with_manager_values();
973        let remote_info = remote_node.handle_chain_info_query(query).await?;
974        if let Some(new_info) = self
975            .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
976            .await?
977        {
978            local_info = new_info;
979        };
980
981        // If we are at the same height as the remote node, we also update our chain manager.
982        if local_info.next_block_height != remote_info.next_block_height {
983            debug!(
984                "Synced from validator {}; but remote height is {} and local height is {}",
985                remote_node.public_key, remote_info.next_block_height, local_info.next_block_height
986            );
987            return Ok(());
988        };
989
990        if let Some(timeout) = remote_info.manager.timeout {
991            self.handle_certificate(Box::new(*timeout)).await?;
992        }
993        let mut proposals = Vec::new();
994        if let Some(proposal) = remote_info.manager.requested_signed_proposal {
995            proposals.push(*proposal);
996        }
997        if let Some(proposal) = remote_info.manager.requested_proposed {
998            proposals.push(*proposal);
999        }
1000        if let Some(locking) = remote_info.manager.requested_locking {
1001            match *locking {
1002                LockingBlock::Fast(proposal) => {
1003                    proposals.push(proposal);
1004                }
1005                LockingBlock::Regular(cert) => {
1006                    let hash = cert.hash();
1007                    if let Err(err) = self.try_process_locking_block_from(remote_node, cert).await {
1008                        debug!(
1009                            "Skipping locked block {hash} from validator {} at height {}: {err}",
1010                            remote_node.public_key, local_info.next_block_height,
1011                        );
1012                    }
1013                }
1014            }
1015        }
1016        'proposal_loop: for proposal in proposals {
1017            let owner: AccountOwner = proposal.owner();
1018            if let Err(mut err) = self
1019                .local_node
1020                .handle_block_proposal(proposal.clone())
1021                .await
1022            {
1023                if let LocalNodeError::BlobsNotFound(_) = &err {
1024                    let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1025                    if !required_blob_ids.is_empty() {
1026                        let mut blobs = Vec::new();
1027                        for blob_id in required_blob_ids {
1028                            let blob_content = match remote_node
1029                                .node
1030                                .download_pending_blob(chain_id, blob_id)
1031                                .await
1032                            {
1033                                Ok(content) => content,
1034                                Err(err) => {
1035                                    warn!(
1036                                        "Skipping proposal from {owner} and validator {} at \
1037                                        height {}; failed to download {blob_id}: {err}",
1038                                        remote_node.public_key, local_info.next_block_height
1039                                    );
1040                                    continue 'proposal_loop;
1041                                }
1042                            };
1043                            blobs.push(Blob::new(blob_content));
1044                        }
1045                        self.local_node
1046                            .handle_pending_blobs(chain_id, blobs)
1047                            .await?;
1048                        // We found the missing blobs: retry.
1049                        if let Err(new_err) = self
1050                            .local_node
1051                            .handle_block_proposal(proposal.clone())
1052                            .await
1053                        {
1054                            err = new_err;
1055                        } else {
1056                            continue;
1057                        }
1058                    }
1059                    if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1060                        self.update_local_node_with_blobs_from(
1061                            blob_ids.clone(),
1062                            &[remote_node.clone()],
1063                        )
1064                        .await?;
1065                        // We found the missing blobs: retry.
1066                        if let Err(new_err) = self
1067                            .local_node
1068                            .handle_block_proposal(proposal.clone())
1069                            .await
1070                        {
1071                            err = new_err;
1072                        } else {
1073                            continue;
1074                        }
1075                    }
1076                }
1077
1078                debug!(
1079                    "Skipping proposal from {owner} and validator {} at height {}: {err}",
1080                    remote_node.public_key, local_info.next_block_height
1081                );
1082            }
1083        }
1084        Ok(())
1085    }
1086
1087    async fn try_process_locking_block_from(
1088        &self,
1089        remote_node: &RemoteNode<Env::ValidatorNode>,
1090        certificate: GenericCertificate<ValidatedBlock>,
1091    ) -> Result<(), ChainClientError> {
1092        let chain_id = certificate.inner().chain_id();
1093        let certificate = Box::new(certificate);
1094        match self.process_certificate(certificate.clone()).await {
1095            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1096                let mut blobs = Vec::new();
1097                for blob_id in blob_ids {
1098                    let blob_content = remote_node
1099                        .node
1100                        .download_pending_blob(chain_id, blob_id)
1101                        .await?;
1102                    blobs.push(Blob::new(blob_content));
1103                }
1104                self.local_node
1105                    .handle_pending_blobs(chain_id, blobs)
1106                    .await?;
1107                self.process_certificate(certificate).await?;
1108                Ok(())
1109            }
1110            Err(err) => Err(err.into()),
1111            Ok(()) => Ok(()),
1112        }
1113    }
1114
1115    /// Downloads and processes from the specified validator a confirmed block certificates that
1116    /// use the given blobs. If this succeeds, the blob will be in our storage.
1117    async fn update_local_node_with_blobs_from(
1118        &self,
1119        blob_ids: Vec<BlobId>,
1120        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1121    ) -> Result<Vec<Blob>, ChainClientError> {
1122        let timeout = self.options.blob_download_timeout;
1123        future::try_join_all(blob_ids.into_iter().map(|blob_id| async move {
1124            let mut stream = remote_nodes
1125                .iter()
1126                .zip(0..)
1127                .map(|(remote_node, i)| async move {
1128                    linera_base::time::timer::sleep(timeout * i * i).await;
1129                    let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
1130                    // This will download all ancestors of the certificate and process all of them locally.
1131                    self.receive_sender_certificate(
1132                        certificate,
1133                        ReceiveCertificateMode::NeedsCheck,
1134                        Some(vec![remote_node.clone()]),
1135                    )
1136                    .await?;
1137                    let blob = self
1138                        .local_node
1139                        .storage_client()
1140                        .read_blob(blob_id)
1141                        .await?
1142                        .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1143                    Result::<_, ChainClientError>::Ok(blob)
1144                })
1145                .collect::<FuturesUnordered<_>>();
1146            while let Some(maybe_blob) = stream.next().await {
1147                if let Ok(blob) = maybe_blob {
1148                    return Ok(blob);
1149                }
1150            }
1151            Err(LocalNodeError::BlobsNotFound(vec![blob_id]).into())
1152        }))
1153        .await
1154    }
1155
1156    /// Downloads and processes confirmed block certificates that use the given blobs.
1157    /// If this succeeds, the blobs will be in our storage.
1158    async fn receive_certificates_for_blobs(
1159        &self,
1160        blob_ids: Vec<BlobId>,
1161    ) -> Result<(), ChainClientError> {
1162        // Deduplicate IDs.
1163        let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1164        let validators = self.validator_nodes().await?;
1165
1166        let mut missing_blobs = Vec::new();
1167        for blob_id in blob_ids {
1168            let mut certificate_stream = validators
1169                .iter()
1170                .map(|remote_node| async move {
1171                    let cert = remote_node.download_certificate_for_blob(blob_id).await?;
1172                    Ok::<_, NodeError>((remote_node.clone(), cert))
1173                })
1174                .collect::<FuturesUnordered<_>>();
1175            loop {
1176                let Some(result) = certificate_stream.next().await else {
1177                    missing_blobs.push(blob_id);
1178                    break;
1179                };
1180                if let Ok((remote_node, cert)) = result {
1181                    if self
1182                        .receive_sender_certificate(
1183                            cert,
1184                            ReceiveCertificateMode::NeedsCheck,
1185                            Some(vec![remote_node]),
1186                        )
1187                        .await
1188                        .is_ok()
1189                    {
1190                        break;
1191                    }
1192                }
1193            }
1194        }
1195
1196        if missing_blobs.is_empty() {
1197            Ok(())
1198        } else {
1199            Err(NodeError::BlobsNotFound(missing_blobs).into())
1200        }
1201    }
1202
1203    /// Attempts to execute the block locally. If any incoming message execution fails, that
1204    /// message is rejected and execution is retried, until the block accepts only messages
1205    /// that succeed.
1206    // TODO(#2806): Measure how failing messages affect the execution times.
1207    #[tracing::instrument(level = "trace", skip(self, block))]
1208    async fn stage_block_execution_and_discard_failing_messages(
1209        &self,
1210        mut block: ProposedBlock,
1211        round: Option<u32>,
1212        published_blobs: Vec<Blob>,
1213    ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1214        loop {
1215            let result = self
1216                .stage_block_execution(block.clone(), round, published_blobs.clone())
1217                .await;
1218            if let Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
1219                WorkerError::ChainError(chain_error),
1220            ))) = &result
1221            {
1222                if let ChainError::ExecutionError(
1223                    error,
1224                    ChainExecutionContext::IncomingBundle(index),
1225                ) = &**chain_error
1226                {
1227                    let transaction = block
1228                        .transactions
1229                        .get_mut(*index as usize)
1230                        .expect("Transaction at given index should exist");
1231                    let Transaction::ReceiveMessages(message) = transaction else {
1232                        panic!(
1233                            "Expected incoming bundle at transaction index {}, found operation",
1234                            index
1235                        );
1236                    };
1237                    ensure!(
1238                        !message.bundle.is_protected(),
1239                        ChainClientError::BlockProposalError(
1240                            "Protected incoming message failed to execute locally"
1241                        )
1242                    );
1243                    // Reject the faulty message from the block and continue.
1244                    // TODO(#1420): This is potentially a bit heavy-handed for
1245                    // retryable errors.
1246                    info!(
1247                        %error, origin = ?message.origin,
1248                        "Message failed to execute locally and will be rejected."
1249                    );
1250                    message.action = MessageAction::Reject;
1251                    continue;
1252                }
1253            }
1254            return result;
1255        }
1256    }
1257
1258    /// Attempts to execute the block locally. If any attempt to read a blob fails, the blob is
1259    /// downloaded and execution is retried.
1260    #[instrument(level = "trace", skip(self, block))]
1261    async fn stage_block_execution(
1262        &self,
1263        block: ProposedBlock,
1264        round: Option<u32>,
1265        published_blobs: Vec<Blob>,
1266    ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1267        loop {
1268            let result = self
1269                .local_node
1270                .stage_block_execution(block.clone(), round, published_blobs.clone())
1271                .await;
1272            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1273                self.receive_certificates_for_blobs(blob_ids.clone())
1274                    .await?;
1275                continue; // We found the missing blob: retry.
1276            }
1277            return Ok(result?);
1278        }
1279    }
1280}
1281
1282/// Policies for automatically handling incoming messages.
1283#[derive(Clone, Debug)]
1284pub struct MessagePolicy {
1285    /// The blanket policy applied to all messages.
1286    blanket: BlanketMessagePolicy,
1287    /// A collection of chains which restrict the origin of messages to be
1288    /// accepted. `Option::None` means that messages from all chains are accepted. An empty
1289    /// `HashSet` denotes that messages from no chains are accepted.
1290    restrict_chain_ids_to: Option<HashSet<ChainId>>,
1291}
1292
1293#[derive(Copy, Clone, Debug, clap::ValueEnum)]
1294pub enum BlanketMessagePolicy {
1295    /// Automatically accept all incoming messages. Reject them only if execution fails.
1296    Accept,
1297    /// Automatically reject tracked messages, ignore or skip untracked messages, but accept
1298    /// protected ones.
1299    Reject,
1300    /// Don't include any messages in blocks, and don't make any decision whether to accept or
1301    /// reject.
1302    Ignore,
1303}
1304
1305impl MessagePolicy {
1306    pub fn new(
1307        blanket: BlanketMessagePolicy,
1308        restrict_chain_ids_to: Option<HashSet<ChainId>>,
1309    ) -> Self {
1310        Self {
1311            blanket,
1312            restrict_chain_ids_to,
1313        }
1314    }
1315
1316    #[cfg(with_testing)]
1317    pub fn new_accept_all() -> Self {
1318        Self {
1319            blanket: BlanketMessagePolicy::Accept,
1320            restrict_chain_ids_to: None,
1321        }
1322    }
1323
1324    #[instrument(level = "trace", skip(self))]
1325    fn must_handle(&self, bundle: &mut IncomingBundle) -> bool {
1326        if self.is_reject() {
1327            if bundle.bundle.is_skippable() {
1328                return false;
1329            } else if !bundle.bundle.is_protected() {
1330                bundle.action = MessageAction::Reject;
1331            }
1332        }
1333        match &self.restrict_chain_ids_to {
1334            None => true,
1335            Some(chains) => chains.contains(&bundle.origin),
1336        }
1337    }
1338
1339    #[instrument(level = "trace", skip(self))]
1340    fn is_ignore(&self) -> bool {
1341        matches!(self.blanket, BlanketMessagePolicy::Ignore)
1342    }
1343
1344    #[instrument(level = "trace", skip(self))]
1345    fn is_reject(&self) -> bool {
1346        matches!(self.blanket, BlanketMessagePolicy::Reject)
1347    }
1348}
1349
1350#[derive(Debug, Clone, Copy)]
1351pub enum TimingType {
1352    ExecuteOperations,
1353    ExecuteBlock,
1354    SubmitBlockProposal,
1355    UpdateValidators,
1356}
1357
1358#[derive(Debug, Clone)]
1359pub struct ChainClientOptions {
1360    /// Maximum number of pending message bundles processed at a time in a block.
1361    pub max_pending_message_bundles: usize,
1362    /// The policy for automatically handling incoming messages.
1363    pub message_policy: MessagePolicy,
1364    /// Whether to block on cross-chain message delivery.
1365    pub cross_chain_message_delivery: CrossChainMessageDelivery,
1366    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
1367    /// as a fraction of time taken to reach quorum.
1368    pub grace_period: f64,
1369    /// The delay when downloading a blob, after which we try a second validator.
1370    pub blob_download_timeout: Duration,
1371}
1372
1373#[cfg(with_testing)]
1374impl ChainClientOptions {
1375    pub fn test_default() -> Self {
1376        use crate::DEFAULT_GRACE_PERIOD;
1377
1378        ChainClientOptions {
1379            max_pending_message_bundles: 10,
1380            message_policy: MessagePolicy::new_accept_all(),
1381            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
1382            grace_period: DEFAULT_GRACE_PERIOD,
1383            blob_download_timeout: Duration::from_secs(1),
1384        }
1385    }
1386}
1387
1388/// Client to operate a chain by interacting with validators and the given local storage
1389/// implementation.
1390/// * The chain being operated is called the "local chain" or just the "chain".
1391/// * As a rule, operations are considered successful (and communication may stop) when
1392///   they succeeded in gathering a quorum of responses.
1393#[derive(Debug)]
1394pub struct ChainClient<Env: Environment> {
1395    /// The Linera [`Client`] that manages operations for this chain client.
1396    #[debug(skip)]
1397    client: Arc<Client<Env>>,
1398    /// The off-chain chain ID.
1399    chain_id: ChainId,
1400    /// The client options.
1401    #[debug(skip)]
1402    options: ChainClientOptions,
1403    /// The preferred owner of the chain used to sign proposals.
1404    /// `None` if we cannot propose on this chain.
1405    preferred_owner: Option<AccountOwner>,
1406    /// The next block height as read from the wallet.
1407    initial_next_block_height: BlockHeight,
1408    /// The last block hash as read from the wallet.
1409    initial_block_hash: Option<CryptoHash>,
1410    /// Optional timing sender for benchmarking.
1411    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
1412}
1413
1414impl<Env: Environment> Clone for ChainClient<Env> {
1415    fn clone(&self) -> Self {
1416        Self {
1417            client: self.client.clone(),
1418            chain_id: self.chain_id,
1419            options: self.options.clone(),
1420            preferred_owner: self.preferred_owner,
1421            initial_next_block_height: self.initial_next_block_height,
1422            initial_block_hash: self.initial_block_hash,
1423            timing_sender: self.timing_sender.clone(),
1424        }
1425    }
1426}
1427
1428/// Error type for [`ChainClient`].
1429#[derive(Debug, Error)]
1430pub enum ChainClientError {
1431    #[error("Local node operation failed: {0}")]
1432    LocalNodeError(#[from] LocalNodeError),
1433
1434    #[error("Remote node operation failed: {0}")]
1435    RemoteNodeError(#[from] NodeError),
1436
1437    #[error(transparent)]
1438    ArithmeticError(#[from] ArithmeticError),
1439
1440    #[error("Missing certificates: {0:?}")]
1441    ReadCertificatesError(Vec<CryptoHash>),
1442
1443    #[error("Missing confirmed block: {0:?}")]
1444    MissingConfirmedBlock(CryptoHash),
1445
1446    #[error("JSON (de)serialization error: {0}")]
1447    JsonError(#[from] serde_json::Error),
1448
1449    #[error("Chain operation failed: {0}")]
1450    ChainError(#[from] ChainError),
1451
1452    #[error(transparent)]
1453    CommunicationError(#[from] CommunicationError<NodeError>),
1454
1455    #[error("Internal error within chain client: {0}")]
1456    InternalError(&'static str),
1457
1458    #[error(
1459        "Cannot accept a certificate from an unknown committee in the future. \
1460         Please synchronize the local view of the admin chain"
1461    )]
1462    CommitteeSynchronizationError,
1463
1464    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
1465    WalletSynchronizationError,
1466
1467    #[error("The state of the client is incompatible with the proposed block: {0}")]
1468    BlockProposalError(&'static str),
1469
1470    #[error(
1471        "Cannot accept a certificate from a committee that was retired. \
1472         Try a newer certificate from the same origin"
1473    )]
1474    CommitteeDeprecationError,
1475
1476    #[error("Protocol error within chain client: {0}")]
1477    ProtocolError(&'static str),
1478
1479    #[error("Signer doesn't have key to sign for chain {0}")]
1480    CannotFindKeyForChain(ChainId),
1481
1482    #[error("client is not configured to propose on chain {0}")]
1483    NoAccountKeyConfigured(ChainId),
1484
1485    #[error("The chain client isn't owner on chain {0}")]
1486    NotAnOwner(ChainId),
1487
1488    #[error(transparent)]
1489    ViewError(#[from] ViewError),
1490
1491    #[error(
1492        "Failed to download certificates and update local node to the next height \
1493         {target_next_block_height} of chain {chain_id:?}"
1494    )]
1495    CannotDownloadCertificates {
1496        chain_id: ChainId,
1497        target_next_block_height: BlockHeight,
1498    },
1499
1500    #[error(transparent)]
1501    BcsError(#[from] bcs::Error),
1502
1503    #[error(
1504        "Unexpected quorum: validators voted for block {hash} in {round}, \
1505         expected block {expected_hash} in {expected_round}"
1506    )]
1507    UnexpectedQuorum {
1508        hash: CryptoHash,
1509        round: Round,
1510        expected_hash: CryptoHash,
1511        expected_round: Round,
1512    },
1513
1514    #[error("signer error: {0:?}")]
1515    Signer(#[source] Box<dyn signer::Error>),
1516
1517    #[error("Cannot revoke the current epoch {0}")]
1518    CannotRevokeCurrentEpoch(Epoch),
1519
1520    #[error("Epoch is already revoked")]
1521    EpochAlreadyRevoked,
1522}
1523
1524impl From<Infallible> for ChainClientError {
1525    fn from(infallible: Infallible) -> Self {
1526        match infallible {}
1527    }
1528}
1529
1530impl ChainClientError {
1531    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
1532        Self::Signer(Box::new(err))
1533    }
1534}
1535
1536// We never want to pass the DashMap references over an `await` point, for fear of
1537// deadlocks. The following construct will cause a (relatively) helpful error if we do.
1538
1539pub struct Unsend<T> {
1540    inner: T,
1541    _phantom: std::marker::PhantomData<*mut u8>,
1542}
1543
1544impl<T> Unsend<T> {
1545    fn new(inner: T) -> Self {
1546        Self {
1547            inner,
1548            _phantom: Default::default(),
1549        }
1550    }
1551}
1552
1553impl<T: Deref> Deref for Unsend<T> {
1554    type Target = T::Target;
1555    fn deref(&self) -> &T::Target {
1556        self.inner.deref()
1557    }
1558}
1559
1560impl<T: DerefMut> DerefMut for Unsend<T> {
1561    fn deref_mut(&mut self) -> &mut T::Target {
1562        self.inner.deref_mut()
1563    }
1564}
1565
1566pub type ChainGuard<'a, T> = Unsend<DashMapRef<'a, ChainId, T>>;
1567pub type ChainGuardMut<'a, T> = Unsend<DashMapRefMut<'a, ChainId, T>>;
1568pub type ChainGuardMapped<'a, T> = Unsend<DashMapMappedRef<'a, ChainId, ChainClientState, T>>;
1569
1570impl<Env: Environment> ChainClient<Env> {
1571    /// Gets a shared reference to the chain's state.
1572    #[instrument(level = "trace", skip(self))]
1573    pub fn state(&self) -> ChainGuard<ChainClientState> {
1574        Unsend::new(
1575            self.client
1576                .chains
1577                .get(&self.chain_id)
1578                .expect("Chain client constructed for invalid chain"),
1579        )
1580    }
1581
1582    /// Gets a mutable reference to the state.
1583    /// Beware: this will block any other reference to any chain's state!
1584    #[instrument(level = "trace", skip(self))]
1585    fn state_mut(&self) -> ChainGuardMut<ChainClientState> {
1586        Unsend::new(
1587            self.client
1588                .chains
1589                .get_mut(&self.chain_id)
1590                .expect("Chain client constructed for invalid chain"),
1591        )
1592    }
1593
1594    /// Gets a reference to the client's signer instance.
1595    #[instrument(level = "trace", skip(self))]
1596    pub fn signer(&self) -> &impl Signer {
1597        self.client.signer()
1598    }
1599
1600    /// Gets a mutable reference to the per-`ChainClient` options.
1601    #[instrument(level = "trace", skip(self))]
1602    pub fn options_mut(&mut self) -> &mut ChainClientOptions {
1603        &mut self.options
1604    }
1605
1606    /// Gets a reference to the per-`ChainClient` options.
1607    #[instrument(level = "trace", skip(self))]
1608    pub fn options(&self) -> &ChainClientOptions {
1609        &self.options
1610    }
1611
1612    /// Gets the ID of the associated chain.
1613    #[instrument(level = "trace", skip(self))]
1614    pub fn chain_id(&self) -> ChainId {
1615        self.chain_id
1616    }
1617
1618    /// Gets a clone of the timing sender for benchmarking.
1619    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
1620        self.timing_sender.clone()
1621    }
1622
1623    /// Gets the ID of the admin chain.
1624    #[instrument(level = "trace", skip(self))]
1625    pub fn admin_id(&self) -> ChainId {
1626        self.client.admin_id
1627    }
1628
1629    /// Gets a guarded reference to the next pending block.
1630    #[instrument(level = "trace", skip(self))]
1631    pub fn pending_proposal(&self) -> ChainGuardMapped<Option<PendingProposal>> {
1632        Unsend::new(self.state().inner.map(|state| state.pending_proposal()))
1633    }
1634
1635    /// Gets the currently preferred owner for signing the blocks.
1636    #[instrument(level = "trace", skip(self))]
1637    pub fn preferred_owner(&self) -> Option<AccountOwner> {
1638        self.preferred_owner
1639    }
1640
1641    /// Sets the new, preferred owner for signing the blocks.
1642    #[instrument(level = "trace", skip(self))]
1643    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
1644        self.preferred_owner = Some(preferred_owner);
1645    }
1646
1647    /// Unsets the preferred owner for signing the blocks.
1648    #[instrument(level = "trace", skip(self))]
1649    pub fn unset_preferred_owner(&mut self) {
1650        self.preferred_owner = None;
1651    }
1652
1653    /// Obtains a `ChainStateView` for this client's chain.
1654    #[instrument(level = "trace")]
1655    pub async fn chain_state_view(
1656        &self,
1657    ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
1658        self.client.local_node.chain_state_view(self.chain_id).await
1659    }
1660
1661    /// Returns chain IDs that this chain subscribes to.
1662    #[instrument(level = "trace", skip(self))]
1663    pub async fn event_stream_publishers(&self) -> Result<BTreeSet<ChainId>, LocalNodeError> {
1664        let mut publishers = self
1665            .chain_state_view()
1666            .await?
1667            .execution_state
1668            .system
1669            .event_subscriptions
1670            .indices()
1671            .await?
1672            .into_iter()
1673            .map(|(chain_id, _)| chain_id)
1674            .collect::<BTreeSet<_>>();
1675        if self.chain_id != self.client.admin_id {
1676            publishers.insert(self.client.admin_id);
1677        }
1678        Ok(publishers)
1679    }
1680
1681    /// Subscribes to notifications from this client's chain.
1682    #[instrument(level = "trace")]
1683    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
1684        self.subscribe_to(self.chain_id)
1685    }
1686
1687    /// Subscribes to notifications from the specified chain.
1688    #[instrument(level = "trace")]
1689    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
1690        Ok(Box::pin(UnboundedReceiverStream::new(
1691            self.client.notifier.subscribe(vec![chain_id]),
1692        )))
1693    }
1694
1695    /// Returns the storage client used by this client's local node.
1696    #[instrument(level = "trace")]
1697    pub fn storage_client(&self) -> &Env::Storage {
1698        self.client.storage_client()
1699    }
1700
1701    /// Obtains the basic `ChainInfo` data for the local chain.
1702    #[instrument(level = "trace")]
1703    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1704        let query = ChainInfoQuery::new(self.chain_id);
1705        let response = self
1706            .client
1707            .local_node
1708            .handle_chain_info_query(query)
1709            .await?;
1710        self.client.update_from_info(&response.info);
1711        Ok(response.info)
1712    }
1713
1714    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
1715    #[instrument(level = "trace")]
1716    async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1717        let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
1718        let response = self
1719            .client
1720            .local_node
1721            .handle_chain_info_query(query)
1722            .await?;
1723        self.client.update_from_info(&response.info);
1724        Ok(response.info)
1725    }
1726
1727    /// Returns the chain's description. Fetches it from the validators if necessary.
1728    pub async fn get_chain_description(&self) -> Result<ChainDescription, ChainClientError> {
1729        self.client.get_chain_description(self.chain_id).await
1730    }
1731
1732    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
1733    /// local chain.
1734    #[instrument(level = "trace")]
1735    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
1736        if self.options.message_policy.is_ignore() {
1737            // Ignore all messages.
1738            return Ok(Vec::new());
1739        }
1740
1741        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
1742        let info = self
1743            .client
1744            .local_node
1745            .handle_chain_info_query(query)
1746            .await?
1747            .info;
1748        {
1749            ensure!(
1750                self.has_other_owners(&info.manager.ownership)
1751                    || info.next_block_height >= self.initial_next_block_height,
1752                ChainClientError::WalletSynchronizationError
1753            );
1754        }
1755
1756        Ok(info
1757            .requested_pending_message_bundles
1758            .into_iter()
1759            .filter_map(|mut bundle| {
1760                self.options
1761                    .message_policy
1762                    .must_handle(&mut bundle)
1763                    .then_some(bundle)
1764            })
1765            .take(self.options.max_pending_message_bundles)
1766            .collect())
1767    }
1768
1769    /// Returns an `UpdateStreams` operation that updates this client's chain about new events
1770    /// in any of the streams its applications are subscribing to. Returns `None` if there are no
1771    /// new events.
1772    #[instrument(level = "trace")]
1773    async fn collect_stream_updates(&self) -> Result<Option<Operation>, ChainClientError> {
1774        // Load all our subscriptions.
1775        let subscription_map = self
1776            .chain_state_view()
1777            .await?
1778            .execution_state
1779            .system
1780            .event_subscriptions
1781            .index_values()
1782            .await?;
1783        // Collect the indices of all new events.
1784        let futures = subscription_map
1785            .into_iter()
1786            .map(|((chain_id, stream_id), subscriptions)| {
1787                let client = self.client.clone();
1788                async move {
1789                    let chain = client.local_node.chain_state_view(chain_id).await?;
1790                    if let Some(next_index) = chain
1791                        .execution_state
1792                        .stream_event_counts
1793                        .get(&stream_id)
1794                        .await?
1795                        .filter(|next_index| *next_index > subscriptions.next_index)
1796                    {
1797                        Ok(Some((chain_id, stream_id, next_index)))
1798                    } else {
1799                        Ok::<_, ChainClientError>(None)
1800                    }
1801                }
1802            });
1803        let updates = future::try_join_all(futures)
1804            .await?
1805            .into_iter()
1806            .flatten()
1807            .collect::<Vec<_>>();
1808        if updates.is_empty() {
1809            return Ok(None);
1810        }
1811        Ok(Some(SystemOperation::UpdateStreams(updates).into()))
1812    }
1813
1814    #[instrument(level = "trace")]
1815    async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1816        self.client.chain_info_with_committees(self.chain_id).await
1817    }
1818
1819    /// Obtains the current epoch of the local chain as well as its set of trusted committees.
1820    #[instrument(level = "trace")]
1821    async fn epoch_and_committees(
1822        &self,
1823    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
1824        let info = self.chain_info_with_committees().await?;
1825        let epoch = info.epoch;
1826        let committees = info.into_committees()?;
1827        Ok((epoch, committees))
1828    }
1829
1830    /// Obtains the committee for the current epoch of the local chain.
1831    #[instrument(level = "trace")]
1832    pub async fn local_committee(&self) -> Result<Committee, ChainClientError> {
1833        let info = match self.chain_info_with_committees().await {
1834            Ok(info) => info,
1835            Err(LocalNodeError::BlobsNotFound(_)) => {
1836                self.synchronize_chain_state(self.chain_id).await?;
1837                self.chain_info_with_committees().await?
1838            }
1839            Err(err) => return Err(err.into()),
1840        };
1841        Ok(info.into_current_committee()?)
1842    }
1843
1844    /// Obtains the committee for the latest epoch on the admin chain.
1845    #[instrument(level = "trace")]
1846    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
1847        self.client.admin_committee().await
1848    }
1849
1850    /// Obtains the identity of the current owner of the chain.
1851    ///
1852    /// Returns an error if we don't have the private key for the identity.
1853    #[instrument(level = "trace")]
1854    pub async fn identity(&self) -> Result<AccountOwner, ChainClientError> {
1855        let Some(preferred_owner) = self.preferred_owner else {
1856            return Err(ChainClientError::NoAccountKeyConfigured(self.chain_id));
1857        };
1858        let manager = self.chain_info().await?.manager;
1859        ensure!(
1860            manager.ownership.is_active(),
1861            LocalNodeError::InactiveChain(self.chain_id)
1862        );
1863
1864        let is_owner = manager
1865            .ownership
1866            .all_owners()
1867            .chain(&manager.leader)
1868            .any(|owner| *owner == preferred_owner);
1869
1870        if !is_owner {
1871            let accepted_owners = manager
1872                .ownership
1873                .all_owners()
1874                .chain(&manager.leader)
1875                .collect::<Vec<_>>();
1876            warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
1877                "Chain has multiple owners configured but none is preferred owner",
1878            );
1879            return Err(ChainClientError::NotAnOwner(self.chain_id));
1880        }
1881
1882        let has_signer = self
1883            .signer()
1884            .contains_key(&preferred_owner)
1885            .await
1886            .map_err(ChainClientError::signer_failure)?;
1887
1888        if !has_signer {
1889            warn!(%self.chain_id, ?preferred_owner,
1890                "Chain is one of the owners but its Signer instance doesn't contain the key",
1891            );
1892            return Err(ChainClientError::CannotFindKeyForChain(self.chain_id));
1893        }
1894
1895        Ok(preferred_owner)
1896    }
1897
1898    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
1899    /// its current height and are not missing any received messages from the inbox.
1900    #[instrument(level = "trace")]
1901    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
1902        #[cfg(with_metrics)]
1903        let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();
1904
1905        let mut info = self.synchronize_to_known_height().await?;
1906
1907        if self.has_other_owners(&info.manager.ownership) {
1908            // For chains with any owner other than ourselves, we could be missing recent
1909            // certificates created by other owners. Further synchronize blocks from the network.
1910            // This is a best-effort that depends on network conditions.
1911            info = self.client.synchronize_chain_state(self.chain_id).await?;
1912        }
1913
1914        if info.epoch > self.client.admin_committees().await?.0 {
1915            self.client
1916                .synchronize_chain_state(self.client.admin_id)
1917                .await?;
1918        }
1919
1920        let result = self
1921            .chain_state_view()
1922            .await?
1923            .validate_incoming_bundles()
1924            .await;
1925        if matches!(result, Err(ChainError::MissingCrossChainUpdate { .. })) {
1926            self.find_received_certificates().await?;
1927        }
1928        self.client.update_from_info(&info);
1929        Ok(info)
1930    }
1931
1932    // Verifies that our local storage contains enough history compared to the
1933    // known block height. Otherwise, downloads the missing history from the
1934    // network.
1935    // The known height only differs if the wallet is ahead of storage.
1936    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, ChainClientError> {
1937        let info = self
1938            .client
1939            .download_certificates(self.chain_id, self.initial_next_block_height)
1940            .await?;
1941        if info.next_block_height == self.initial_next_block_height {
1942            // Check that our local node has the expected block hash.
1943            ensure!(
1944                self.initial_block_hash == info.block_hash,
1945                ChainClientError::InternalError("Invalid chain of blocks in local node")
1946            );
1947        }
1948        Ok(info)
1949    }
1950
1951    /// Submits a fast block proposal to the validators.
1952    ///
1953    /// This must only be used with valid epoch and super owner.
1954    #[instrument(level = "trace", skip(committee, operations))]
1955    pub async fn submit_fast_block_proposal(
1956        &self,
1957        committee: &Committee,
1958        operations: &[Operation],
1959        incoming_bundles: &[IncomingBundle],
1960        super_owner: AccountOwner,
1961    ) -> Result<(u64, u64, u64, u64), ChainClientError> {
1962        let creating_proposal_start = Instant::now();
1963        let info = self.chain_info().await?;
1964        let timestamp = self.next_timestamp(incoming_bundles, info.timestamp);
1965        let transactions = incoming_bundles
1966            .iter()
1967            .map(|bundle| Transaction::ReceiveMessages(bundle.clone()))
1968            .chain(
1969                operations
1970                    .iter()
1971                    .map(|operation| Transaction::ExecuteOperation(operation.clone())),
1972            )
1973            .collect::<Vec<_>>();
1974        let proposed_block = ProposedBlock {
1975            epoch: info.epoch,
1976            chain_id: self.chain_id,
1977            transactions,
1978            previous_block_hash: info.block_hash,
1979            height: info.next_block_height,
1980            authenticated_signer: Some(super_owner),
1981            timestamp,
1982        };
1983        let proposal = Box::new(
1984            BlockProposal::new_initial(
1985                super_owner,
1986                Round::Fast,
1987                proposed_block.clone(),
1988                self.signer(),
1989            )
1990            .await
1991            .map_err(ChainClientError::signer_failure)?,
1992        );
1993        let creating_proposal_ms = creating_proposal_start.elapsed().as_millis() as u64;
1994        let stage_block_execution_start = Instant::now();
1995        let block = self
1996            .client
1997            .local_node
1998            .stage_block_execution(proposed_block, None, Vec::new())
1999            .await?
2000            .0;
2001        let stage_block_execution_ms = stage_block_execution_start.elapsed().as_millis() as u64;
2002        let creating_confirmed_block_start = Instant::now();
2003        let value = ConfirmedBlock::new(block);
2004        let creating_confirmed_block_ms =
2005            creating_confirmed_block_start.elapsed().as_millis() as u64;
2006        let submitting_block_proposal_start = Instant::now();
2007        self.client
2008            .submit_block_proposal(committee, proposal, value)
2009            .await?;
2010        let submitting_block_proposal_ms =
2011            submitting_block_proposal_start.elapsed().as_millis() as u64;
2012        Ok((
2013            creating_proposal_ms,
2014            stage_block_execution_ms,
2015            creating_confirmed_block_ms,
2016            submitting_block_proposal_ms,
2017        ))
2018    }
2019
2020    /// Attempts to update all validators about the local chain.
2021    #[instrument(level = "trace", skip(old_committee))]
2022    pub async fn update_validators(
2023        &self,
2024        old_committee: Option<&Committee>,
2025    ) -> Result<(), ChainClientError> {
2026        let update_validators_start = linera_base::time::Instant::now();
2027        // Communicate the new certificate now.
2028        if let Some(old_committee) = old_committee {
2029            self.communicate_chain_updates(old_committee).await?
2030        };
2031        if let Ok(new_committee) = self.local_committee().await {
2032            if Some(&new_committee) != old_committee {
2033                // If the configuration just changed, communicate to the new committee as well.
2034                // (This is actually more important that updating the previous committee.)
2035                self.communicate_chain_updates(&new_committee).await?;
2036            }
2037        }
2038        self.send_timing(update_validators_start, TimingType::UpdateValidators);
2039        Ok(())
2040    }
2041
2042    /// Broadcasts certified blocks to validators.
2043    #[instrument(level = "trace", skip(committee))]
2044    pub async fn communicate_chain_updates(
2045        &self,
2046        committee: &Committee,
2047    ) -> Result<(), ChainClientError> {
2048        let delivery = self.options.cross_chain_message_delivery;
2049        let height = self.chain_info().await?.next_block_height;
2050        self.client
2051            .communicate_chain_updates(committee, self.chain_id, height, delivery)
2052            .await
2053    }
2054
2055    /// Processes the results of [`synchronize_received_certificates_from_validator`] and updates
2056    /// the trackers for the validators.
2057    #[tracing::instrument(level = "trace", skip(received_certificates_batches))]
2058    async fn receive_certificates_from_validators(
2059        &self,
2060        received_certificates_batches: Vec<ReceivedCertificatesFromValidator>,
2061    ) {
2062        let validator_count = received_certificates_batches.len();
2063        let mut other_sender_chains = BTreeSet::new();
2064        let mut certificates =
2065            BTreeMap::<ChainId, BTreeMap<BlockHeight, ConfirmedBlockCertificate>>::new();
2066        let mut new_trackers = BTreeMap::new();
2067        for response in received_certificates_batches {
2068            other_sender_chains.extend(response.other_sender_chains);
2069            new_trackers.insert(response.public_key, response.tracker);
2070            for certificate in response.certificates {
2071                certificates
2072                    .entry(certificate.block().header.chain_id)
2073                    .or_default()
2074                    .insert(certificate.block().header.height, certificate);
2075            }
2076        }
2077        let certificate_count = certificates.values().map(BTreeMap::len).sum::<usize>();
2078
2079        tracing::info!(
2080            "Received {certificate_count} certificates from {validator_count} validator(s)."
2081        );
2082
2083        // Process the certificates sorted by chain and in ascending order of block height.
2084        let stream = FuturesUnordered::from_iter(certificates.into_values().map(|certificates| {
2085            let client = self.client.clone();
2086            async move {
2087                for certificate in certificates.into_values() {
2088                    let hash = certificate.hash();
2089                    let mode = ReceiveCertificateMode::AlreadyChecked;
2090                    if let Err(err) = client
2091                        .receive_sender_certificate(certificate, mode, None)
2092                        .await
2093                    {
2094                        error!("Received invalid certificate {hash}: {err}");
2095                    }
2096                }
2097            }
2098        }));
2099        stream.for_each(future::ready).await;
2100
2101        // Certificates for these chains were omitted from `certificates` because they were
2102        // already processed locally. If they were processed in a concurrent task, it is not
2103        // guaranteed that their cross-chain messages were already handled.
2104        let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
2105            let local_node = self.client.local_node.clone();
2106            async move {
2107                if let Err(error) = local_node
2108                    .retry_pending_cross_chain_requests(chain_id)
2109                    .await
2110                {
2111                    error!("Failed to retry outgoing messages from {chain_id}: {error}");
2112                }
2113            }
2114        }));
2115        stream.for_each(future::ready).await;
2116
2117        // Update the trackers.
2118        if let Err(error) = self
2119            .client
2120            .local_node
2121            .update_received_certificate_trackers(self.chain_id, new_trackers)
2122            .await
2123        {
2124            error!(
2125                "Failed to update the certificate trackers for chain {:.8}: {error}",
2126                self.chain_id
2127            );
2128        }
2129    }
2130
2131    /// Synchronizes all chains that any application on this chain subscribes to.
2132    /// We always consider the admin chain a relevant publishing chain, for new epochs.
2133    async fn synchronize_publisher_chains(&self) -> Result<(), ChainClientError> {
2134        let chain_ids = self
2135            .chain_state_view()
2136            .await?
2137            .execution_state
2138            .system
2139            .event_subscriptions
2140            .indices()
2141            .await?
2142            .iter()
2143            .map(|(chain_id, _)| *chain_id)
2144            .chain(iter::once(self.client.admin_id))
2145            .filter(|chain_id| *chain_id != self.chain_id)
2146            .collect::<BTreeSet<_>>();
2147        future::try_join_all(
2148            chain_ids
2149                .into_iter()
2150                .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
2151        )
2152        .await?;
2153        Ok(())
2154    }
2155
2156    /// Attempts to download new received certificates.
2157    ///
2158    /// This is a best effort: it will only find certificates that have been confirmed
2159    /// amongst sufficiently many validators of the current committee of the target
2160    /// chain.
2161    ///
2162    /// However, this should be the case whenever a sender's chain is still in use and
2163    /// is regularly upgraded to new committees.
2164    #[instrument(level = "trace")]
2165    async fn find_received_certificates(&self) -> Result<(), ChainClientError> {
2166        #[cfg(with_metrics)]
2167        let _latency = metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
2168
2169        // Use network information from the local chain.
2170        let chain_id = self.chain_id;
2171        let (_, committee) = self.admin_committee().await?;
2172        let nodes = self.client.make_nodes(&committee)?;
2173        // Proceed to downloading received certificates.
2174        let result = communicate_with_quorum(
2175            &nodes,
2176            &committee,
2177            |_| (),
2178            |remote_node| {
2179                let client = &self.client;
2180                Box::pin(async move {
2181                    client
2182                        .synchronize_received_certificates_from_validator(chain_id, &remote_node)
2183                        .await
2184                })
2185            },
2186            self.options.grace_period,
2187        )
2188        .await;
2189        let received_certificate_batches = match result {
2190            Ok(((), received_certificate_batches)) => received_certificate_batches
2191                .into_iter()
2192                .map(|(_, batch)| batch)
2193                .collect(),
2194            Err(CommunicationError::Trusted(NodeError::InactiveChain(id))) if id == chain_id => {
2195                // The chain is visibly not active (yet or any more) so there is no need
2196                // to synchronize received certificates.
2197                return Ok(());
2198            }
2199            Err(error) => {
2200                return Err(error.into());
2201            }
2202        };
2203        self.receive_certificates_from_validators(received_certificate_batches)
2204            .await;
2205        Ok(())
2206    }
2207
2208    /// Sends money.
2209    #[instrument(level = "trace")]
2210    pub async fn transfer(
2211        &self,
2212        owner: AccountOwner,
2213        amount: Amount,
2214        recipient: Account,
2215    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2216        // TODO(#467): check the balance of `owner` before signing any block proposal.
2217        self.execute_operation(SystemOperation::Transfer {
2218            owner,
2219            recipient,
2220            amount,
2221        })
2222        .await
2223    }
2224
2225    /// Verify if a data blob is readable from storage.
2226    // TODO(#2490): Consider removing or renaming this.
2227    #[instrument(level = "trace")]
2228    pub async fn read_data_blob(
2229        &self,
2230        hash: CryptoHash,
2231    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2232        let blob_id = BlobId {
2233            hash,
2234            blob_type: BlobType::Data,
2235        };
2236        self.execute_operation(SystemOperation::VerifyBlob { blob_id })
2237            .await
2238    }
2239
2240    /// Claims money in a remote chain.
2241    #[instrument(level = "trace")]
2242    pub async fn claim(
2243        &self,
2244        owner: AccountOwner,
2245        target_id: ChainId,
2246        recipient: Account,
2247        amount: Amount,
2248    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2249        self.execute_operation(SystemOperation::Claim {
2250            owner,
2251            target_id,
2252            recipient,
2253            amount,
2254        })
2255        .await
2256    }
2257
2258    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
2259    /// certificate and sends it to all validators, to make them enter the next round.
2260    #[instrument(level = "trace")]
2261    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, ChainClientError> {
2262        let chain_id = self.chain_id;
2263        let info = self.chain_info_with_committees().await?;
2264        let committee = info.current_committee()?;
2265        let height = info.next_block_height;
2266        let round = info.manager.current_round;
2267        let action = CommunicateAction::RequestTimeout {
2268            height,
2269            round,
2270            chain_id,
2271        };
2272        let value = Timeout::new(chain_id, height, info.epoch);
2273        let certificate = Box::new(
2274            self.client
2275                .communicate_chain_action(committee, action, value)
2276                .await?,
2277        );
2278        self.client.process_certificate(certificate.clone()).await?;
2279        // The block height didn't increase, but this will communicate the timeout as well.
2280        self.client
2281            .communicate_chain_updates(
2282                committee,
2283                chain_id,
2284                height,
2285                CrossChainMessageDelivery::NonBlocking,
2286            )
2287            .await?;
2288        Ok(*certificate)
2289    }
2290
2291    /// Downloads and processes any certificates we are missing for the given chain.
2292    #[instrument(level = "trace", skip_all)]
2293    pub async fn synchronize_chain_state(
2294        &self,
2295        chain_id: ChainId,
2296    ) -> Result<Box<ChainInfo>, ChainClientError> {
2297        self.client.synchronize_chain_state(chain_id).await
2298    }
2299
2300    /// Downloads and processes any certificates we are missing for this chain, from the given
2301    /// committee.
2302    #[instrument(level = "trace", skip_all)]
2303    pub async fn synchronize_chain_state_from_committee(
2304        &self,
2305        committee: Committee,
2306    ) -> Result<Box<ChainInfo>, ChainClientError> {
2307        self.client
2308            .synchronize_chain_state_from_committee(self.chain_id, committee)
2309            .await
2310    }
2311
2312    /// Executes a list of operations.
2313    #[instrument(level = "trace", skip(operations, blobs))]
2314    pub async fn execute_operations(
2315        &self,
2316        operations: Vec<Operation>,
2317        blobs: Vec<Blob>,
2318    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2319        let timing_start = linera_base::time::Instant::now();
2320
2321        let result = loop {
2322            let execute_block_start = linera_base::time::Instant::now();
2323            // TODO(#2066): Remove boxing once the call-stack is shallower
2324            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
2325                Ok(ExecuteBlockOutcome::Executed(certificate)) => {
2326                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
2327                    break Ok(ClientOutcome::Committed(certificate));
2328                }
2329                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
2330                    break Ok(ClientOutcome::WaitForTimeout(timeout));
2331                }
2332                Ok(ExecuteBlockOutcome::Conflict(certificate)) => {
2333                    info!(
2334                        height = %certificate.block().header.height,
2335                        "Another block was committed; retrying."
2336                    );
2337                }
2338                Err(ChainClientError::CommunicationError(CommunicationError::Trusted(
2339                    NodeError::UnexpectedBlockHeight {
2340                        expected_block_height,
2341                        found_block_height,
2342                    },
2343                ))) if expected_block_height > found_block_height => {
2344                    tracing::info!(
2345                        "Local state is outdated; synchronizing chain {:.8}",
2346                        self.chain_id
2347                    );
2348                    self.synchronize_chain_state(self.chain_id).await?;
2349                }
2350                Err(err) => return Err(err),
2351            };
2352        };
2353
2354        self.send_timing(timing_start, TimingType::ExecuteOperations);
2355
2356        result
2357    }
2358
2359    /// Executes an operation.
2360    pub async fn execute_operation(
2361        &self,
2362        operation: impl Into<Operation>,
2363    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2364        self.execute_operations(vec![operation.into()], vec![])
2365            .await
2366    }
2367
2368    /// Executes a new block.
2369    ///
2370    /// This must be preceded by a call to `prepare_chain()`.
2371    #[instrument(level = "trace", skip(operations, blobs))]
2372    async fn execute_block(
2373        &self,
2374        operations: Vec<Operation>,
2375        blobs: Vec<Blob>,
2376    ) -> Result<ExecuteBlockOutcome, ChainClientError> {
2377        #[cfg(with_metrics)]
2378        let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
2379
2380        let mutex = self.state().client_mutex();
2381        let _guard = mutex.lock_owned().await;
2382        // TOOD: We shouldn't need to call this explicitly.
2383        match self.process_pending_block_without_prepare().await? {
2384            ClientOutcome::Committed(Some(certificate)) => {
2385                return Ok(ExecuteBlockOutcome::Conflict(certificate))
2386            }
2387            ClientOutcome::WaitForTimeout(timeout) => {
2388                return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
2389            }
2390            ClientOutcome::Committed(None) => {}
2391        }
2392
2393        let incoming_bundles = self.pending_message_bundles().await?;
2394        let identity = self.identity().await?;
2395        let confirmed_value = self
2396            .new_pending_block(incoming_bundles, operations, blobs, identity)
2397            .await?;
2398
2399        match self.process_pending_block_without_prepare().await? {
2400            ClientOutcome::Committed(Some(certificate))
2401                if certificate.block() == confirmed_value.block() =>
2402            {
2403                Ok(ExecuteBlockOutcome::Executed(certificate))
2404            }
2405            ClientOutcome::Committed(Some(certificate)) => {
2406                Ok(ExecuteBlockOutcome::Conflict(certificate))
2407            }
2408            // Should be unreachable: We did set a pending block.
2409            ClientOutcome::Committed(None) => Err(ChainClientError::BlockProposalError(
2410                "Unexpected block proposal error",
2411            )),
2412            ClientOutcome::WaitForTimeout(timeout) => {
2413                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
2414            }
2415        }
2416    }
2417
2418    /// Creates a new pending block and handles the proposal in the local node.
2419    /// Next time `process_pending_block_without_prepare` is called, this block will be proposed
2420    /// to the validators.
2421    #[instrument(level = "trace", skip(incoming_bundles, operations, blobs))]
2422    async fn new_pending_block(
2423        &self,
2424        incoming_bundles: Vec<IncomingBundle>,
2425        operations: Vec<Operation>,
2426        blobs: Vec<Blob>,
2427        identity: AccountOwner,
2428    ) -> Result<ConfirmedBlock, ChainClientError> {
2429        ensure!(
2430            self.state().pending_proposal().is_none(),
2431            ChainClientError::BlockProposalError(
2432                "Client state already has a pending block; \
2433                    use the `linera retry-pending-block` command to commit that first"
2434            )
2435        );
2436        let info = self.chain_info().await?;
2437        let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2438        let transactions = incoming_bundles
2439            .into_iter()
2440            .map(Transaction::ReceiveMessages)
2441            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
2442            .collect::<Vec<_>>();
2443        let proposed_block = ProposedBlock {
2444            epoch: info.epoch,
2445            chain_id: self.chain_id,
2446            transactions,
2447            previous_block_hash: info.block_hash,
2448            height: info.next_block_height,
2449            authenticated_signer: Some(identity),
2450            timestamp,
2451        };
2452
2453        // Use the round number assuming there are oracle responses.
2454        // Using the round number during execution counts as an oracle.
2455        // Accessing the round number in single-leader rounds where we are not the leader
2456        // is not currently supported.
2457        let round = match Self::round_for_new_proposal(&info, &identity, true)? {
2458            Either::Left(round) => round.multi_leader(),
2459            Either::Right(_) => None,
2460        };
2461        // Make sure every incoming message succeeds and otherwise remove them.
2462        // Also, compute the final certified hash while we're at it.
2463        let (block, _) = self
2464            .client
2465            .stage_block_execution_and_discard_failing_messages(
2466                proposed_block,
2467                round,
2468                blobs.clone(),
2469            )
2470            .await?;
2471        let (proposed_block, _) = block.clone().into_proposal();
2472        self.state_mut().set_pending_proposal(proposed_block, blobs);
2473        Ok(ConfirmedBlock::new(block))
2474    }
2475
2476    /// Returns a suitable timestamp for the next block.
2477    ///
2478    /// This will usually be the current time according to the local clock, but may be slightly
2479    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
2480    #[instrument(level = "trace", skip(incoming_bundles))]
2481    fn next_timestamp(
2482        &self,
2483        incoming_bundles: &[IncomingBundle],
2484        block_time: Timestamp,
2485    ) -> Timestamp {
2486        let local_time = self.storage_client().clock().current_time();
2487        incoming_bundles
2488            .iter()
2489            .map(|msg| msg.bundle.timestamp)
2490            .max()
2491            .map_or(local_time, |timestamp| timestamp.max(local_time))
2492            .max(block_time)
2493    }
2494
2495    /// Queries an application.
2496    #[instrument(level = "trace", skip(query))]
2497    pub async fn query_application(&self, query: Query) -> Result<QueryOutcome, ChainClientError> {
2498        loop {
2499            let result = self
2500                .client
2501                .local_node
2502                .query_application(self.chain_id, query.clone())
2503                .await;
2504            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2505                self.client
2506                    .receive_certificates_for_blobs(blob_ids.clone())
2507                    .await?;
2508                continue; // We found the missing blob: retry.
2509            }
2510            return Ok(result?);
2511        }
2512    }
2513
2514    /// Queries a system application.
2515    #[instrument(level = "trace", skip(query))]
2516    pub async fn query_system_application(
2517        &self,
2518        query: SystemQuery,
2519    ) -> Result<QueryOutcome<SystemResponse>, ChainClientError> {
2520        let QueryOutcome {
2521            response,
2522            operations,
2523        } = self.query_application(Query::System(query)).await?;
2524        match response {
2525            QueryResponse::System(response) => Ok(QueryOutcome {
2526                response,
2527                operations,
2528            }),
2529            _ => Err(ChainClientError::InternalError(
2530                "Unexpected response for system query",
2531            )),
2532        }
2533    }
2534
2535    /// Queries a user application.
2536    #[instrument(level = "trace", skip(application_id, query))]
2537    pub async fn query_user_application<A: Abi>(
2538        &self,
2539        application_id: ApplicationId<A>,
2540        query: &A::Query,
2541    ) -> Result<QueryOutcome<A::QueryResponse>, ChainClientError> {
2542        let query = Query::user(application_id, query)?;
2543        let QueryOutcome {
2544            response,
2545            operations,
2546        } = self.query_application(query).await?;
2547        match response {
2548            QueryResponse::User(response_bytes) => {
2549                let response = serde_json::from_slice(&response_bytes)?;
2550                Ok(QueryOutcome {
2551                    response,
2552                    operations,
2553                })
2554            }
2555            _ => Err(ChainClientError::InternalError(
2556                "Unexpected response for user query",
2557            )),
2558        }
2559    }
2560
2561    /// Obtains the local balance of the chain account after staging the execution of
2562    /// incoming messages in a new block.
2563    ///
2564    /// Does not attempt to synchronize with validators. The result will reflect up to
2565    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
2566    /// block.
2567    #[instrument(level = "trace")]
2568    pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
2569        let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
2570        Ok(balance)
2571    }
2572
2573    /// Obtains the local balance of an account after staging the execution of incoming messages in
2574    /// a new block.
2575    ///
2576    /// Does not attempt to synchronize with validators. The result will reflect up to
2577    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
2578    /// block.
2579    #[instrument(level = "trace", skip(owner))]
2580    pub async fn query_owner_balance(
2581        &self,
2582        owner: AccountOwner,
2583    ) -> Result<Amount, ChainClientError> {
2584        if owner.is_chain() {
2585            self.query_balance().await
2586        } else {
2587            Ok(self
2588                .query_balances_with_owner(owner)
2589                .await?
2590                .1
2591                .unwrap_or(Amount::ZERO))
2592        }
2593    }
2594
2595    /// Obtains the local balance of an account and optionally another user after staging the
2596    /// execution of incoming messages in a new block.
2597    ///
2598    /// Does not attempt to synchronize with validators. The result will reflect up to
2599    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
2600    /// block.
2601    #[instrument(level = "trace", skip(owner))]
2602    async fn query_balances_with_owner(
2603        &self,
2604        owner: AccountOwner,
2605    ) -> Result<(Amount, Option<Amount>), ChainClientError> {
2606        let incoming_bundles = self.pending_message_bundles().await?;
2607        // Since we disallow empty blocks, and there is no incoming messages,
2608        // that could change it, we query for the balance immediately.
2609        if incoming_bundles.is_empty() {
2610            let chain_balance = self.local_balance().await?;
2611            let owner_balance = self.local_owner_balance(owner).await?;
2612            return Ok((chain_balance, Some(owner_balance)));
2613        }
2614        let info = self.chain_info().await?;
2615        let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2616        let transactions = incoming_bundles
2617            .into_iter()
2618            .map(Transaction::ReceiveMessages)
2619            .collect::<Vec<_>>();
2620        let block = ProposedBlock {
2621            epoch: info.epoch,
2622            chain_id: self.chain_id,
2623            transactions,
2624            previous_block_hash: info.block_hash,
2625            height: info.next_block_height,
2626            authenticated_signer: if owner == AccountOwner::CHAIN {
2627                None
2628            } else {
2629                Some(owner)
2630            },
2631            timestamp,
2632        };
2633        match self
2634            .client
2635            .stage_block_execution_and_discard_failing_messages(block, None, Vec::new())
2636            .await
2637        {
2638            Ok((_, response)) => Ok((
2639                response.info.chain_balance,
2640                response.info.requested_owner_balance,
2641            )),
2642            Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
2643                WorkerError::ChainError(error),
2644            ))) if matches!(
2645                &*error,
2646                ChainError::ExecutionError(
2647                    execution_error,
2648                    ChainExecutionContext::Block
2649                ) if matches!(
2650                    **execution_error,
2651                    ExecutionError::FeesExceedFunding { .. }
2652                )
2653            ) =>
2654            {
2655                // We can't even pay for the execution of one empty block. Let's return zero.
2656                Ok((Amount::ZERO, Some(Amount::ZERO)))
2657            }
2658            Err(error) => Err(error),
2659        }
2660    }
2661
2662    /// Reads the local balance of the chain account.
2663    ///
2664    /// Does not process the inbox or attempt to synchronize with validators.
2665    #[instrument(level = "trace")]
2666    pub async fn local_balance(&self) -> Result<Amount, ChainClientError> {
2667        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
2668        Ok(balance)
2669    }
2670
2671    /// Reads the local balance of a user account.
2672    ///
2673    /// Does not process the inbox or attempt to synchronize with validators.
2674    #[instrument(level = "trace", skip(owner))]
2675    pub async fn local_owner_balance(
2676        &self,
2677        owner: AccountOwner,
2678    ) -> Result<Amount, ChainClientError> {
2679        if owner.is_chain() {
2680            self.local_balance().await
2681        } else {
2682            Ok(self
2683                .local_balances_with_owner(owner)
2684                .await?
2685                .1
2686                .unwrap_or(Amount::ZERO))
2687        }
2688    }
2689
2690    /// Reads the local balance of the chain account and optionally another user.
2691    ///
2692    /// Does not process the inbox or attempt to synchronize with validators.
2693    #[instrument(level = "trace", skip(owner))]
2694    async fn local_balances_with_owner(
2695        &self,
2696        owner: AccountOwner,
2697    ) -> Result<(Amount, Option<Amount>), ChainClientError> {
2698        ensure!(
2699            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
2700            ChainClientError::WalletSynchronizationError
2701        );
2702        let mut query = ChainInfoQuery::new(self.chain_id);
2703        query.request_owner_balance = owner;
2704        let response = self
2705            .client
2706            .local_node
2707            .handle_chain_info_query(query)
2708            .await?;
2709        Ok((
2710            response.info.chain_balance,
2711            response.info.requested_owner_balance,
2712        ))
2713    }
2714
2715    /// Sends tokens to a chain.
2716    #[instrument(level = "trace")]
2717    pub async fn transfer_to_account(
2718        &self,
2719        from: AccountOwner,
2720        amount: Amount,
2721        account: Account,
2722    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2723        self.transfer(from, amount, account).await
2724    }
2725
2726    /// Burns tokens (transfer to a special address).
2727    #[cfg(with_testing)]
2728    #[instrument(level = "trace")]
2729    pub async fn burn(
2730        &self,
2731        owner: AccountOwner,
2732        amount: Amount,
2733    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2734        let recipient = Account::burn_address(self.chain_id);
2735        self.transfer(owner, amount, recipient).await
2736    }
2737
2738    #[instrument(level = "trace")]
2739    pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2740        let validators = self.client.validator_nodes().await?;
2741        self.client
2742            .fetch_chain_info(self.chain_id, &validators)
2743            .await
2744    }
2745
2746    /// Attempts to synchronize chains that have sent us messages and populate our local
2747    /// inbox.
2748    ///
2749    /// To create a block that actually executes the messages in the inbox,
2750    /// `process_inbox` must be called separately.
2751    #[instrument(level = "trace")]
2752    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2753        let info = self.prepare_chain().await?;
2754        self.synchronize_publisher_chains().await?;
2755        self.find_received_certificates().await?;
2756        Ok(info)
2757    }
2758
2759    /// Processes the last pending block
2760    #[instrument(level = "trace")]
2761    pub async fn process_pending_block(
2762        &self,
2763    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2764        self.synchronize_from_validators().await?;
2765        self.process_pending_block_without_prepare().await
2766    }
2767
2768    /// Processes the last pending block. Assumes that the local chain is up to date.
2769    #[instrument(level = "trace")]
2770    async fn process_pending_block_without_prepare(
2771        &self,
2772    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2773        let info = self.request_leader_timeout_if_needed().await?;
2774
2775        // If there is a validated block in the current round, finalize it.
2776        if info.manager.has_locking_block_in_current_round()
2777            && !info.manager.current_round.is_fast()
2778        {
2779            return self.finalize_locking_block(info).await;
2780        }
2781        let owner = self.identity().await?;
2782
2783        let local_node = &self.client.local_node;
2784        // Otherwise we have to re-propose the highest validated block, if there is one.
2785        let pending_proposal = self.state().pending_proposal().clone();
2786        let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
2787            match &**locking {
2788                LockingBlock::Regular(certificate) => {
2789                    let blob_ids = certificate.block().required_blob_ids();
2790                    let blobs = local_node
2791                        .get_locking_blobs(&blob_ids, self.chain_id)
2792                        .await?
2793                        .ok_or_else(|| {
2794                            ChainClientError::InternalError("Missing local locking blobs")
2795                        })?;
2796                    debug!("Retrying locking block from round {}", certificate.round);
2797                    (certificate.block().clone(), blobs)
2798                }
2799                LockingBlock::Fast(proposal) => {
2800                    let proposed_block = proposal.content.block.clone();
2801                    let blob_ids = proposed_block.published_blob_ids();
2802                    let blobs = local_node
2803                        .get_locking_blobs(&blob_ids, self.chain_id)
2804                        .await?
2805                        .ok_or_else(|| {
2806                            ChainClientError::InternalError("Missing local locking blobs")
2807                        })?;
2808                    let block = self
2809                        .client
2810                        .stage_block_execution(proposed_block, None, blobs.clone())
2811                        .await?
2812                        .0;
2813                    debug!("Retrying locking block from fast round.");
2814                    (block, blobs)
2815                }
2816            }
2817        } else if let Some(pending_proposal) = pending_proposal {
2818            // Otherwise we are free to propose our own pending block.
2819            // Use the round number assuming there are oracle responses.
2820            // Using the round number during execution counts as an oracle.
2821            let proposed_block = pending_proposal.block;
2822            let round = match Self::round_for_new_proposal(&info, &owner, true)? {
2823                Either::Left(round) => round.multi_leader(),
2824                Either::Right(_) => None,
2825            };
2826            let (block, _) = self
2827                .client
2828                .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
2829                .await?;
2830            debug!("Proposing the local pending block.");
2831            (block, pending_proposal.blobs)
2832        } else {
2833            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
2834        };
2835
2836        let has_oracle_responses = block.has_oracle_responses();
2837        let (proposed_block, outcome) = block.into_proposal();
2838        let round = match Self::round_for_new_proposal(&info, &owner, has_oracle_responses)? {
2839            Either::Left(round) => round,
2840            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
2841        };
2842        debug!("Proposing block for round {}", round);
2843
2844        let already_handled_locally = info
2845            .manager
2846            .already_handled_proposal(round, &proposed_block);
2847        // Create the final block proposal.
2848        let proposal = if let Some(locking) = info.manager.requested_locking {
2849            Box::new(match *locking {
2850                LockingBlock::Regular(cert) => {
2851                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
2852                        .await
2853                        .map_err(ChainClientError::signer_failure)?
2854                }
2855                LockingBlock::Fast(proposal) => {
2856                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
2857                        .await
2858                        .map_err(ChainClientError::signer_failure)?
2859                }
2860            })
2861        } else {
2862            Box::new(
2863                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
2864                    .await
2865                    .map_err(ChainClientError::signer_failure)?,
2866            )
2867        };
2868        if !already_handled_locally {
2869            // Check the final block proposal. This will be cheaper after #1401.
2870            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2871                match err {
2872                    LocalNodeError::BlobsNotFound(_) => {
2873                        local_node
2874                            .handle_pending_blobs(self.chain_id, blobs)
2875                            .await?;
2876                        local_node.handle_block_proposal(*proposal.clone()).await?;
2877                    }
2878                    err => return Err(err.into()),
2879                }
2880            }
2881        }
2882        let committee = self.local_committee().await?;
2883        let block = Block::new(proposed_block, outcome);
2884        // Send the query to validators.
2885        let submit_block_proposal_start = linera_base::time::Instant::now();
2886        let certificate = if round.is_fast() {
2887            let hashed_value = ConfirmedBlock::new(block);
2888            self.client
2889                .submit_block_proposal(&committee, proposal, hashed_value)
2890                .await?
2891        } else {
2892            let hashed_value = ValidatedBlock::new(block);
2893            let certificate = self
2894                .client
2895                .submit_block_proposal(&committee, proposal, hashed_value.clone())
2896                .await?;
2897            self.client.finalize_block(&committee, certificate).await?
2898        };
2899        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2900        debug!(round = %certificate.round, "Sending confirmed block to validators");
2901        self.update_validators(Some(&committee)).await?;
2902        Ok(ClientOutcome::Committed(Some(certificate)))
2903    }
2904
2905    fn send_timing(&self, start: Instant, timing_type: TimingType) {
2906        let Some(sender) = &self.timing_sender else {
2907            return;
2908        };
2909        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2910            tracing::warn!(%err, "Failed to send timing info");
2911        }
2912    }
2913
2914    /// Requests a leader timeout certificate if the current round has timed out. Returns the
2915    /// chain info for the (possibly new) current round.
2916    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2917        let mut info = self.chain_info_with_manager_values().await?;
2918        // If the current round has timed out, we request a timeout certificate and retry in
2919        // the next round.
2920        if let Some(round_timeout) = info.manager.round_timeout {
2921            if round_timeout <= self.storage_client().clock().current_time() {
2922                self.request_leader_timeout().await?;
2923                info = self.chain_info_with_manager_values().await?;
2924            }
2925        }
2926        Ok(info)
2927    }
2928
2929    /// Finalizes the locking block.
2930    ///
2931    /// Panics if there is no locking block; fails if the locking block is not in the current round.
2932    async fn finalize_locking_block(
2933        &self,
2934        info: Box<ChainInfo>,
2935    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2936        let locking = info
2937            .manager
2938            .requested_locking
2939            .expect("Should have a locking block");
2940        let LockingBlock::Regular(certificate) = *locking else {
2941            panic!("Should have a locking validated block");
2942        };
2943        debug!(
2944            round = %certificate.round,
2945            "Finalizing locking block"
2946        );
2947        let committee = self.local_committee().await?;
2948        match self
2949            .client
2950            .finalize_block(&committee, certificate.clone())
2951            .await
2952        {
2953            Ok(certificate) => {
2954                self.update_validators(Some(&committee)).await?;
2955                Ok(ClientOutcome::Committed(Some(certificate)))
2956            }
2957            Err(ChainClientError::CommunicationError(error)) => {
2958                // Communication errors in this case often mean that someone else already
2959                // finalized the block or started another round.
2960                let timestamp = info.manager.round_timeout.ok_or(error)?;
2961                Ok(ClientOutcome::WaitForTimeout(RoundTimeout {
2962                    timestamp,
2963                    current_round: info.manager.current_round,
2964                    next_block_height: info.next_block_height,
2965                }))
2966            }
2967            Err(error) => Err(error),
2968        }
2969    }
2970
2971    /// Returns a round in which we can propose a new block or the given one, if possible.
2972    fn round_for_new_proposal(
2973        info: &ChainInfo,
2974        identity: &AccountOwner,
2975        has_oracle_responses: bool,
2976    ) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
2977        let manager = &info.manager;
2978        // If there is a conflicting proposal in the current round, we can only propose if the
2979        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
2980        // Similarly, we cannot propose a block that uses oracles in the fast round.
2981        let conflict = manager
2982            .requested_signed_proposal
2983            .as_ref()
2984            .into_iter()
2985            .chain(&manager.requested_proposed)
2986            .any(|proposal| proposal.content.round == manager.current_round)
2987            || (manager.current_round.is_fast() && has_oracle_responses);
2988        let round = if !conflict {
2989            manager.current_round
2990        } else if let Some(round) = manager
2991            .ownership
2992            .next_round(manager.current_round)
2993            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
2994        {
2995            round
2996        } else if let Some(timeout) = info.round_timeout() {
2997            return Ok(Either::Right(timeout));
2998        } else {
2999            return Err(ChainClientError::BlockProposalError(
3000                "Conflicting proposal in the current round",
3001            ));
3002        };
3003        if manager.can_propose(identity, round) {
3004            return Ok(Either::Left(round));
3005        }
3006        if let Some(timeout) = info.round_timeout() {
3007            return Ok(Either::Right(timeout));
3008        }
3009        Err(ChainClientError::BlockProposalError(
3010            "Not a leader in the current round",
3011        ))
3012    }
3013
3014    /// Clears the information on any operation that previously failed.
3015    #[instrument(level = "trace")]
3016    pub fn clear_pending_proposal(&self) {
3017        self.state_mut().clear_pending_proposal();
3018    }
3019
3020    /// Processes a confirmed block for which this chain is a recipient and updates validators.
3021    #[instrument(
3022        level = "trace",
3023        skip(certificate),
3024        fields(certificate_hash = ?certificate.hash()),
3025    )]
3026    pub async fn receive_certificate_and_update_validators(
3027        &self,
3028        certificate: ConfirmedBlockCertificate,
3029    ) -> Result<(), ChainClientError> {
3030        self.client
3031            .receive_certificate_and_update_validators(
3032                certificate,
3033                ReceiveCertificateMode::NeedsCheck,
3034            )
3035            .await
3036    }
3037
3038    /// Rotates the key of the chain.
3039    ///
3040    /// Replaces current owners of the chain with the new key pair.
3041    #[instrument(level = "trace")]
3042    pub async fn rotate_key_pair(
3043        &self,
3044        public_key: AccountPublicKey,
3045    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3046        self.transfer_ownership(public_key.into()).await
3047    }
3048
3049    /// Transfers ownership of the chain to a single super owner.
3050    #[instrument(level = "trace")]
3051    pub async fn transfer_ownership(
3052        &self,
3053        new_owner: AccountOwner,
3054    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3055        self.execute_operation(SystemOperation::ChangeOwnership {
3056            super_owners: vec![new_owner],
3057            owners: Vec::new(),
3058            multi_leader_rounds: 2,
3059            open_multi_leader_rounds: false,
3060            timeout_config: TimeoutConfig::default(),
3061        })
3062        .await
3063    }
3064
3065    /// Adds another owner to the chain, and turns existing super owners into regular owners.
3066    #[instrument(level = "trace")]
3067    pub async fn share_ownership(
3068        &self,
3069        new_owner: AccountOwner,
3070        new_weight: u64,
3071    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3072        loop {
3073            let ownership = self.prepare_chain().await?.manager.ownership;
3074            ensure!(
3075                ownership.is_active(),
3076                ChainError::InactiveChain(self.chain_id)
3077            );
3078            let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
3079            owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
3080            owners.push((new_owner, new_weight));
3081            let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
3082                super_owners: Vec::new(),
3083                owners,
3084                multi_leader_rounds: ownership.multi_leader_rounds,
3085                open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3086                timeout_config: ownership.timeout_config,
3087            })];
3088            match self.execute_block(operations, vec![]).await? {
3089                ExecuteBlockOutcome::Executed(certificate) => {
3090                    return Ok(ClientOutcome::Committed(certificate));
3091                }
3092                ExecuteBlockOutcome::Conflict(certificate) => {
3093                    info!(
3094                        height = %certificate.block().header.height,
3095                        "Another block was committed; retrying."
3096                    );
3097                }
3098                ExecuteBlockOutcome::WaitForTimeout(timeout) => {
3099                    return Ok(ClientOutcome::WaitForTimeout(timeout));
3100                }
3101            };
3102        }
3103    }
3104
3105    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
3106    /// `remove_owners` is `true`.
3107    #[instrument(level = "trace")]
3108    pub async fn change_ownership(
3109        &self,
3110        ownership: ChainOwnership,
3111    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3112        self.execute_operation(SystemOperation::ChangeOwnership {
3113            super_owners: ownership.super_owners.into_iter().collect(),
3114            owners: ownership.owners.into_iter().collect(),
3115            multi_leader_rounds: ownership.multi_leader_rounds,
3116            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3117            timeout_config: ownership.timeout_config.clone(),
3118        })
3119        .await
3120    }
3121
3122    /// Changes the application permissions configuration on this chain.
3123    #[instrument(level = "trace", skip(application_permissions))]
3124    pub async fn change_application_permissions(
3125        &self,
3126        application_permissions: ApplicationPermissions,
3127    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3128        self.execute_operation(SystemOperation::ChangeApplicationPermissions(
3129            application_permissions,
3130        ))
3131        .await
3132    }
3133
3134    /// Opens a new chain with a derived UID.
3135    #[instrument(level = "trace", skip(self))]
3136    pub async fn open_chain(
3137        &self,
3138        ownership: ChainOwnership,
3139        application_permissions: ApplicationPermissions,
3140        balance: Amount,
3141    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, ChainClientError>
3142    {
3143        loop {
3144            let config = OpenChainConfig {
3145                ownership: ownership.clone(),
3146                balance,
3147                application_permissions: application_permissions.clone(),
3148            };
3149            let operation = Operation::system(SystemOperation::OpenChain(config));
3150            let certificate = match self.execute_block(vec![operation], vec![]).await? {
3151                ExecuteBlockOutcome::Executed(certificate) => certificate,
3152                ExecuteBlockOutcome::Conflict(_) => continue,
3153                ExecuteBlockOutcome::WaitForTimeout(timeout) => {
3154                    return Ok(ClientOutcome::WaitForTimeout(timeout));
3155                }
3156            };
3157            // The only operation, i.e. the last transaction, created the new chain.
3158            let chain_blob = certificate
3159                .block()
3160                .body
3161                .blobs
3162                .last()
3163                .and_then(|blobs| blobs.last())
3164                .ok_or_else(|| ChainClientError::InternalError("Failed to create a new chain"))?;
3165            let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
3166            // Add the new chain to the list of tracked chains
3167            self.client.track_chain(description.id());
3168            self.client
3169                .local_node
3170                .retry_pending_cross_chain_requests(self.chain_id)
3171                .await?;
3172            return Ok(ClientOutcome::Committed((description, certificate)));
3173        }
3174    }
3175
3176    /// Closes the chain (and loses everything in it!!).
3177    /// Returns `None` if the chain was already closed.
3178    #[instrument(level = "trace")]
3179    pub async fn close_chain(
3180        &self,
3181    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3182        match self.execute_operation(SystemOperation::CloseChain).await {
3183            Ok(outcome) => Ok(outcome.map(Some)),
3184            Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3185                WorkerError::ChainError(chain_error),
3186            ))) if matches!(*chain_error, ChainError::ClosedChain) => {
3187                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
3188            }
3189            Err(error) => Err(error),
3190        }
3191    }
3192
3193    /// Publishes some module.
3194    #[cfg(not(target_arch = "wasm32"))]
3195    #[instrument(level = "trace", skip(contract, service))]
3196    pub async fn publish_module(
3197        &self,
3198        contract: Bytecode,
3199        service: Bytecode,
3200        vm_runtime: VmRuntime,
3201    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3202        let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
3203        self.publish_module_blobs(blobs, module_id).await
3204    }
3205
3206    /// Publishes some module.
3207    #[cfg(not(target_arch = "wasm32"))]
3208    #[instrument(level = "trace", skip(blobs, module_id))]
3209    pub async fn publish_module_blobs(
3210        &self,
3211        blobs: Vec<Blob>,
3212        module_id: ModuleId,
3213    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3214        self.execute_operations(
3215            vec![Operation::system(SystemOperation::PublishModule {
3216                module_id,
3217            })],
3218            blobs,
3219        )
3220        .await?
3221        .try_map(|certificate| Ok((module_id, certificate)))
3222    }
3223
3224    /// Publishes some data blobs.
3225    #[instrument(level = "trace", skip(bytes))]
3226    pub async fn publish_data_blobs(
3227        &self,
3228        bytes: Vec<Vec<u8>>,
3229    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3230        let blobs = bytes.into_iter().map(Blob::new_data);
3231        let publish_blob_operations = blobs
3232            .clone()
3233            .map(|blob| {
3234                Operation::system(SystemOperation::PublishDataBlob {
3235                    blob_hash: blob.id().hash,
3236                })
3237            })
3238            .collect();
3239        self.execute_operations(publish_blob_operations, blobs.collect())
3240            .await
3241    }
3242
3243    /// Publishes some data blob.
3244    #[instrument(level = "trace", skip(bytes))]
3245    pub async fn publish_data_blob(
3246        &self,
3247        bytes: Vec<u8>,
3248    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3249        self.publish_data_blobs(vec![bytes]).await
3250    }
3251
3252    /// Creates an application by instantiating some bytecode.
3253    #[instrument(
3254        level = "trace",
3255        skip(self, parameters, instantiation_argument, required_application_ids)
3256    )]
3257    pub async fn create_application<
3258        A: Abi,
3259        Parameters: Serialize,
3260        InstantiationArgument: Serialize,
3261    >(
3262        &self,
3263        module_id: ModuleId<A, Parameters, InstantiationArgument>,
3264        parameters: &Parameters,
3265        instantiation_argument: &InstantiationArgument,
3266        required_application_ids: Vec<ApplicationId>,
3267    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, ChainClientError>
3268    {
3269        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
3270        let parameters = serde_json::to_vec(parameters)?;
3271        Ok(self
3272            .create_application_untyped(
3273                module_id.forget_abi(),
3274                parameters,
3275                instantiation_argument,
3276                required_application_ids,
3277            )
3278            .await?
3279            .map(|(app_id, cert)| (app_id.with_abi(), cert)))
3280    }
3281
3282    /// Creates an application by instantiating some bytecode.
3283    #[instrument(
3284        level = "trace",
3285        skip(
3286            self,
3287            module_id,
3288            parameters,
3289            instantiation_argument,
3290            required_application_ids
3291        )
3292    )]
3293    pub async fn create_application_untyped(
3294        &self,
3295        module_id: ModuleId,
3296        parameters: Vec<u8>,
3297        instantiation_argument: Vec<u8>,
3298        required_application_ids: Vec<ApplicationId>,
3299    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, ChainClientError> {
3300        self.execute_operation(SystemOperation::CreateApplication {
3301            module_id,
3302            parameters,
3303            instantiation_argument,
3304            required_application_ids,
3305        })
3306        .await?
3307        .try_map(|certificate| {
3308            // The first message of the only operation created the application.
3309            let mut creation: Vec<_> = certificate
3310                .block()
3311                .created_blob_ids()
3312                .into_iter()
3313                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
3314                .collect();
3315            if creation.len() > 1 {
3316                return Err(ChainClientError::InternalError(
3317                    "Unexpected number of application descriptions published",
3318                ));
3319            }
3320            let blob_id = creation.pop().ok_or(ChainClientError::InternalError(
3321                "ApplicationDescription blob not found.",
3322            ))?;
3323            let id = ApplicationId::new(blob_id.hash);
3324            Ok((id, certificate))
3325        })
3326    }
3327
3328    /// Creates a new committee and starts using it (admin chains only).
3329    #[instrument(level = "trace", skip(committee))]
3330    pub async fn stage_new_committee(
3331        &self,
3332        committee: Committee,
3333    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3334        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
3335        let blob_hash = blob.id().hash;
3336        match self
3337            .execute_operations(
3338                vec![Operation::system(SystemOperation::Admin(
3339                    AdminOperation::PublishCommitteeBlob { blob_hash },
3340                ))],
3341                vec![blob],
3342            )
3343            .await?
3344        {
3345            ClientOutcome::Committed(_) => {}
3346            outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
3347        }
3348        let epoch = self.chain_info().await?.epoch.try_add_one()?;
3349        self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
3350            epoch,
3351            blob_hash,
3352        }))
3353        .await
3354    }
3355
3356    /// Synchronizes the chain with the validators and creates blocks without any operations to
3357    /// process all incoming messages. This may require several blocks.
3358    ///
3359    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
3360    /// is returned, too.
3361    #[instrument(level = "trace")]
3362    pub async fn process_inbox(
3363        &self,
3364    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3365        self.prepare_chain().await?;
3366        self.process_inbox_without_prepare().await
3367    }
3368
3369    /// Creates blocks without any operations to process all incoming messages. This may require
3370    /// several blocks.
3371    ///
3372    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
3373    /// is returned, too.
3374    #[instrument(level = "trace")]
3375    pub async fn process_inbox_without_prepare(
3376        &self,
3377    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3378        #[cfg(with_metrics)]
3379        let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
3380
3381        let mut epoch_change_ops = self.collect_epoch_changes().await?.into_iter();
3382
3383        let mut certificates = Vec::new();
3384        loop {
3385            let incoming_bundles = self.pending_message_bundles().await?;
3386            let stream_updates = self.collect_stream_updates().await?;
3387            let block_operations = stream_updates
3388                .into_iter()
3389                .chain(epoch_change_ops.next())
3390                .collect::<Vec<_>>();
3391            if incoming_bundles.is_empty() && block_operations.is_empty() {
3392                return Ok((certificates, None));
3393            }
3394            match self.execute_block(block_operations, vec![]).await {
3395                Ok(ExecuteBlockOutcome::Executed(certificate))
3396                | Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
3397                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
3398                    return Ok((certificates, Some(timeout)));
3399                }
3400                Err(error) => return Err(error),
3401            };
3402        }
3403    }
3404
3405    /// Returns operations to process all pending epoch changes: first the new epochs, in order,
3406    /// then the removed epochs, in order.
3407    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, ChainClientError> {
3408        let (mut min_epoch, mut next_epoch) = {
3409            let (epoch, committees) = self.epoch_and_committees().await?;
3410            let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
3411            (min_epoch, epoch.try_add_one()?)
3412        };
3413        let mut epoch_change_ops = Vec::new();
3414        while self
3415            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
3416            .await?
3417        {
3418            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
3419                next_epoch,
3420            )));
3421            next_epoch.try_add_assign_one()?;
3422        }
3423        while self
3424            .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
3425            .await?
3426        {
3427            epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
3428                min_epoch,
3429            )));
3430            min_epoch.try_add_assign_one()?;
3431        }
3432        Ok(epoch_change_ops)
3433    }
3434
3435    /// Returns whether the system event on the admin chain with the given stream name and key
3436    /// exists in storage.
3437    async fn has_admin_event(
3438        &self,
3439        stream_name: &[u8],
3440        index: u32,
3441    ) -> Result<bool, ChainClientError> {
3442        let event_id = EventId {
3443            chain_id: self.client.admin_id,
3444            stream_id: StreamId::system(stream_name),
3445            index,
3446        };
3447        Ok(self
3448            .client
3449            .storage_client()
3450            .read_event(event_id)
3451            .await?
3452            .is_some())
3453    }
3454
3455    /// Returns the indices and events from the storage
3456    pub async fn events_from_index(
3457        &self,
3458        stream_id: StreamId,
3459        start_index: u32,
3460    ) -> Result<Vec<IndexAndEvent>, ChainClientError> {
3461        Ok(self
3462            .client
3463            .storage_client()
3464            .read_events_from_index(&self.chain_id, &stream_id, start_index)
3465            .await?)
3466    }
3467
3468    /// Deprecates all the configurations of voting rights up to the given one (admin chains
3469    /// only). Currently, each individual chain is still entitled to wait before accepting
3470    /// this command. However, it is expected that deprecated validators stop functioning
3471    /// shortly after such command is issued.
3472    #[instrument(level = "trace")]
3473    pub async fn revoke_epochs(
3474        &self,
3475        revoked_epoch: Epoch,
3476    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3477        self.prepare_chain().await?;
3478        let (current_epoch, committees) = self.epoch_and_committees().await?;
3479        ensure!(
3480            revoked_epoch < current_epoch,
3481            ChainClientError::CannotRevokeCurrentEpoch(current_epoch)
3482        );
3483        ensure!(
3484            committees.contains_key(&revoked_epoch),
3485            ChainClientError::EpochAlreadyRevoked
3486        );
3487        let operations = committees
3488            .keys()
3489            .filter_map(|epoch| {
3490                if *epoch <= revoked_epoch {
3491                    Some(Operation::system(SystemOperation::Admin(
3492                        AdminOperation::RemoveCommittee { epoch: *epoch },
3493                    )))
3494                } else {
3495                    None
3496                }
3497            })
3498            .collect();
3499        self.execute_operations(operations, vec![]).await
3500    }
3501
3502    /// Sends money to a chain.
3503    /// Do not check balance. (This may block the client)
3504    /// Do not confirm the transaction.
3505    #[instrument(level = "trace")]
3506    pub async fn transfer_to_account_unsafe_unconfirmed(
3507        &self,
3508        owner: AccountOwner,
3509        amount: Amount,
3510        recipient: Account,
3511    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3512        self.execute_operation(SystemOperation::Transfer {
3513            owner,
3514            recipient,
3515            amount,
3516        })
3517        .await
3518    }
3519
3520    #[instrument(level = "trace", skip(hash))]
3521    pub async fn read_confirmed_block(
3522        &self,
3523        hash: CryptoHash,
3524    ) -> Result<ConfirmedBlock, ChainClientError> {
3525        let block = self
3526            .client
3527            .storage_client()
3528            .read_confirmed_block(hash)
3529            .await?;
3530        block.ok_or(ChainClientError::MissingConfirmedBlock(hash))
3531    }
3532
3533    /// Handles any cross-chain requests for any pending outgoing messages.
3534    #[instrument(level = "trace")]
3535    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), ChainClientError> {
3536        self.client
3537            .local_node
3538            .retry_pending_cross_chain_requests(self.chain_id)
3539            .await?;
3540        Ok(())
3541    }
3542
3543    #[instrument(level = "trace", skip(local_node))]
3544    async fn local_chain_info(
3545        &self,
3546        chain_id: ChainId,
3547        local_node: &mut LocalNodeClient<Env::Storage>,
3548    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
3549        match local_node.chain_info(chain_id).await {
3550            Ok(info) => {
3551                // Useful in case `chain_id` is the same as a local chain.
3552                self.client.update_from_info(&info);
3553                Ok(Some(info))
3554            }
3555            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
3556            Err(err) => Err(err.into()),
3557        }
3558    }
3559
3560    #[instrument(level = "trace", skip(chain_id, local_node))]
3561    async fn local_next_block_height(
3562        &self,
3563        chain_id: ChainId,
3564        local_node: &mut LocalNodeClient<Env::Storage>,
3565    ) -> Result<BlockHeight, ChainClientError> {
3566        Ok(self
3567            .local_chain_info(chain_id, local_node)
3568            .await?
3569            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
3570    }
3571
3572    /// Returns the next height we expect to receive from the given sender chain, according to the
3573    /// local inbox.
3574    #[instrument(level = "trace")]
3575    async fn local_next_height_to_receive(
3576        &self,
3577        origin: ChainId,
3578    ) -> Result<BlockHeight, ChainClientError> {
3579        let chain = self.chain_state_view().await?;
3580        Ok(match chain.inboxes.try_load_entry(&origin).await? {
3581            Some(inbox) => inbox.next_block_height_to_receive()?,
3582            None => BlockHeight::ZERO,
3583        })
3584    }
3585
3586    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
3587    async fn process_notification(
3588        &self,
3589        remote_node: RemoteNode<Env::ValidatorNode>,
3590        mut local_node: LocalNodeClient<Env::Storage>,
3591        notification: Notification,
3592    ) -> Result<(), ChainClientError> {
3593        match notification.reason {
3594            Reason::NewIncomingBundle { origin, height } => {
3595                if self.local_next_height_to_receive(origin).await? > height {
3596                    debug!(
3597                        chain_id = %self.chain_id,
3598                        "Accepting redundant notification for new message"
3599                    );
3600                    return Ok(());
3601                }
3602                self.find_received_certificates_from_validator(remote_node)
3603                    .await?;
3604                if self.local_next_height_to_receive(origin).await? <= height {
3605                    warn!(
3606                        chain_id = %self.chain_id,
3607                        "NewIncomingBundle: Fail to synchronize new message after notification"
3608                    );
3609                }
3610            }
3611            Reason::NewBlock { height, .. } => {
3612                let chain_id = notification.chain_id;
3613                if self
3614                    .local_next_block_height(chain_id, &mut local_node)
3615                    .await?
3616                    > height
3617                {
3618                    debug!(
3619                        chain_id = %self.chain_id,
3620                        "Accepting redundant notification for new block"
3621                    );
3622                    return Ok(());
3623                }
3624                self.client
3625                    .synchronize_chain_state_from(&remote_node, chain_id)
3626                    .await?;
3627                if self
3628                    .local_next_block_height(chain_id, &mut local_node)
3629                    .await?
3630                    <= height
3631                {
3632                    error!("NewBlock: Fail to synchronize new block after notification");
3633                }
3634            }
3635            Reason::NewRound { height, round } => {
3636                let chain_id = notification.chain_id;
3637                if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
3638                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
3639                        debug!(
3640                            chain_id = %self.chain_id,
3641                            "Accepting redundant notification for new round"
3642                        );
3643                        return Ok(());
3644                    }
3645                }
3646                self.client
3647                    .synchronize_chain_state_from(&remote_node, chain_id)
3648                    .await?;
3649                let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
3650                    error!(
3651                        chain_id = %self.chain_id,
3652                        "NewRound: Fail to read local chain info for {chain_id}"
3653                    );
3654                    return Ok(());
3655                };
3656                if (info.next_block_height, info.manager.current_round) < (height, round) {
3657                    error!(
3658                        chain_id = %self.chain_id,
3659                        "NewRound: Fail to synchronize new block after notification"
3660                    );
3661                }
3662            }
3663        }
3664        Ok(())
3665    }
3666
3667    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
3668    pub fn is_tracked(&self) -> bool {
3669        self.client
3670            .tracked_chains
3671            .read()
3672            .unwrap()
3673            .contains(&self.chain_id)
3674    }
3675
3676    /// Spawns a task that listens to notifications about the current chain from all validators,
3677    /// and synchronizes the local state accordingly.
3678    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
3679    pub async fn listen(
3680        &self,
3681    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), ChainClientError> {
3682        use future::FutureExt as _;
3683
3684        async fn await_while_polling<F: FusedFuture>(
3685            future: F,
3686            background_work: impl FusedStream<Item = ()>,
3687        ) -> F::Output {
3688            tokio::pin!(future);
3689            tokio::pin!(background_work);
3690            loop {
3691                futures::select! {
3692                    _ = background_work.next() => (),
3693                    result = future => return result,
3694                }
3695            }
3696        }
3697
3698        let mut senders = HashMap::new(); // Senders to cancel notification streams.
3699        let notifications = self.subscribe()?;
3700        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
3701
3702        // Beware: if this future ceases to make progress, notification processing will
3703        // deadlock, because of the issue described in
3704        // https://github.com/linera-io/linera-protocol/pull/1173.
3705
3706        // TODO(#2013): replace this lock with an asynchronous communication channel
3707
3708        let mut process_notifications = FuturesUnordered::new();
3709
3710        match self.update_notification_streams(&mut senders).await {
3711            Ok(handler) => process_notifications.push(handler),
3712            Err(error) => error!("Failed to update committee: {error}"),
3713        };
3714
3715        let this = self.clone();
3716        let update_streams = async move {
3717            let mut abortable_notifications = abortable_notifications.fuse();
3718
3719            while let Some(notification) =
3720                await_while_polling(abortable_notifications.next(), &mut process_notifications)
3721                    .await
3722            {
3723                if let Reason::NewBlock { .. } = notification.reason {
3724                    match Box::pin(await_while_polling(
3725                        this.update_notification_streams(&mut senders).fuse(),
3726                        &mut process_notifications,
3727                    ))
3728                    .await
3729                    {
3730                        Ok(handler) => process_notifications.push(handler),
3731                        Err(error) => error!("Failed to update committee: {error}"),
3732                    }
3733                }
3734            }
3735
3736            for abort in senders.into_values() {
3737                abort.abort();
3738            }
3739
3740            let () = process_notifications.collect().await;
3741        }
3742        .in_current_span();
3743
3744        Ok((update_streams, AbortOnDrop(abort), notifications))
3745    }
3746
3747    #[instrument(level = "trace", skip(senders))]
3748    async fn update_notification_streams(
3749        &self,
3750        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3751    ) -> Result<impl Future<Output = ()>, ChainClientError> {
3752        let (nodes, local_node) = {
3753            let committee = self.local_committee().await?;
3754            let nodes: HashMap<_, _> = self
3755                .client
3756                .validator_node_provider()
3757                .make_nodes(&committee)?
3758                .collect();
3759            (nodes, self.client.local_node.clone())
3760        };
3761        // Drop removed validators.
3762        senders.retain(|validator, abort| {
3763            if !nodes.contains_key(validator) {
3764                abort.abort();
3765            }
3766            !abort.is_aborted()
3767        });
3768        // Add tasks for new validators.
3769        let validator_tasks = FuturesUnordered::new();
3770        for (public_key, node) in nodes {
3771            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3772                continue;
3773            };
3774            let this = self.clone();
3775            let stream = stream::once({
3776                let node = node.clone();
3777                async move {
3778                    let stream = node.subscribe(vec![this.chain_id]).await?;
3779                    // Only now the notification stream is established. We may have missed
3780                    // notifications since the last time we synchronized.
3781                    let remote_node = RemoteNode { public_key, node };
3782                    this.client
3783                        .synchronize_chain_state_from(&remote_node, this.chain_id)
3784                        .await?;
3785                    Ok::<_, ChainClientError>(stream)
3786                }
3787            })
3788            .filter_map(move |result| async move {
3789                if let Err(error) = &result {
3790                    warn!(?error, "Could not connect to validator {public_key}");
3791                } else {
3792                    info!("Connected to validator {public_key}");
3793                }
3794                result.ok()
3795            })
3796            .flatten();
3797            let (stream, abort) = stream::abortable(stream);
3798            let mut stream = Box::pin(stream);
3799            let this = self.clone();
3800            let local_node = local_node.clone();
3801            let remote_node = RemoteNode { public_key, node };
3802            validator_tasks.push(async move {
3803                while let Some(notification) = stream.next().await {
3804                    if let Err(err) = this
3805                        .process_notification(
3806                            remote_node.clone(),
3807                            local_node.clone(),
3808                            notification.clone(),
3809                        )
3810                        .await
3811                    {
3812                        tracing::warn!(
3813                            chain_id = %this.chain_id,
3814                            validator_public_key = ?remote_node.public_key,
3815                            ?notification,
3816                            "Failed to process notification: {err}",
3817                        );
3818                    }
3819                }
3820            });
3821            entry.insert(abort);
3822        }
3823        Ok(validator_tasks.collect())
3824    }
3825
3826    /// Attempts to download new received certificates from a particular validator.
3827    ///
3828    /// This is similar to `find_received_certificates` but for only one validator.
3829    /// We also don't try to synchronize the admin chain.
3830    #[instrument(level = "trace")]
3831    async fn find_received_certificates_from_validator(
3832        &self,
3833        remote_node: RemoteNode<Env::ValidatorNode>,
3834    ) -> Result<(), ChainClientError> {
3835        let chain_id = self.chain_id;
3836        // Proceed to downloading received certificates.
3837        let received_certificates = self
3838            .client
3839            .synchronize_received_certificates_from_validator(chain_id, &remote_node)
3840            .await?;
3841        // Process received certificates. If the client state has changed during the
3842        // network calls, we should still be fine.
3843        self.receive_certificates_from_validators(vec![received_certificates])
3844            .await;
3845        Ok(())
3846    }
3847
3848    /// Attempts to update a validator with the local information.
3849    #[instrument(level = "trace", skip(remote_node))]
3850    pub async fn sync_validator(
3851        &self,
3852        remote_node: Env::ValidatorNode,
3853    ) -> Result<(), ChainClientError> {
3854        let validator_next_block_height = match remote_node
3855            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3856            .await
3857        {
3858            Ok(info) => info.info.next_block_height.0,
3859            Err(NodeError::BlobsNotFound(_)) => 0,
3860            Err(err) => return Err(err.into()),
3861        };
3862        let local_chain_state = self.chain_info().await?;
3863
3864        let Some(missing_certificate_count) = local_chain_state
3865            .next_block_height
3866            .0
3867            .checked_sub(validator_next_block_height)
3868            .filter(|count| *count > 0)
3869        else {
3870            debug!("Validator is up-to-date with local state");
3871            return Ok(());
3872        };
3873
3874        let missing_certificates_end = usize::try_from(local_chain_state.next_block_height.0)
3875            .expect("`usize` should be at least `u64`");
3876        let missing_certificates_start = missing_certificates_end
3877            - usize::try_from(missing_certificate_count).expect("`usize` should be at least `u64`");
3878
3879        let missing_certificate_hashes = self
3880            .chain_state_view()
3881            .await?
3882            .confirmed_log
3883            .read(missing_certificates_start..missing_certificates_end)
3884            .await?;
3885
3886        let certificates = self
3887            .client
3888            .storage_client()
3889            .read_certificates(missing_certificate_hashes.clone())
3890            .await?;
3891        let certificates =
3892            match ResultReadCertificates::new(certificates, missing_certificate_hashes) {
3893                ResultReadCertificates::Certificates(certificates) => certificates,
3894                ResultReadCertificates::InvalidHashes(hashes) => {
3895                    return Err(ChainClientError::ReadCertificatesError(hashes))
3896                }
3897            };
3898        for certificate in certificates {
3899            match remote_node
3900                .handle_confirmed_certificate(
3901                    certificate.clone(),
3902                    CrossChainMessageDelivery::NonBlocking,
3903                )
3904                .await
3905            {
3906                Ok(_) => (),
3907                Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
3908                    // Upload the missing blobs we have and retry.
3909                    let missing_blobs: Vec<_> = self
3910                        .client
3911                        .storage_client()
3912                        .read_blobs(&missing_blob_ids)
3913                        .await?
3914                        .into_iter()
3915                        .flatten()
3916                        .collect();
3917                    remote_node.upload_blobs(missing_blobs).await?;
3918                    remote_node
3919                        .handle_confirmed_certificate(
3920                            certificate,
3921                            CrossChainMessageDelivery::NonBlocking,
3922                        )
3923                        .await?;
3924                }
3925                Err(err) => return Err(err.into()),
3926            }
3927        }
3928
3929        Ok(())
3930    }
3931
3932    /// Returns whether the given ownership includes anyone whose secret key we don't have.
3933    fn has_other_owners(&self, ownership: &ChainOwnership) -> bool {
3934        ownership
3935            .all_owners()
3936            .any(|owner| Some(owner) != self.preferred_owner.as_ref())
3937    }
3938}
3939
3940#[cfg(with_testing)]
3941impl<Env: Environment> ChainClient<Env> {
3942    pub async fn process_notification_from(
3943        &self,
3944        notification: Notification,
3945        validator: (ValidatorPublicKey, &str),
3946    ) {
3947        let mut node_list = self
3948            .client
3949            .validator_node_provider()
3950            .make_nodes_from_list(vec![validator])
3951            .unwrap();
3952        let (public_key, node) = node_list.next().unwrap();
3953        let remote_node = RemoteNode { node, public_key };
3954        let local_node = self.client.local_node.clone();
3955        self.process_notification(remote_node, local_node, notification)
3956            .await
3957            .unwrap();
3958    }
3959}
3960
3961/// The outcome of trying to commit a list of incoming messages and operations to the chain.
3962#[derive(Debug)]
3963enum ExecuteBlockOutcome {
3964    /// A block with the messages and operations was committed.
3965    Executed(ConfirmedBlockCertificate),
3966    /// A different block was already proposed and got committed. Check whether the messages and
3967    /// operations are still suitable, and try again at the next block height.
3968    Conflict(ConfirmedBlockCertificate),
3969    /// We are not the round leader and cannot do anything. Try again at the specified time or
3970    /// or whenever the round or block height changes.
3971    WaitForTimeout(RoundTimeout),
3972}
3973
3974/// Wrapper for `AbortHandle` that aborts when its dropped.
3975#[must_use]
3976pub struct AbortOnDrop(pub AbortHandle);
3977
3978impl Drop for AbortOnDrop {
3979    #[instrument(level = "trace", skip(self))]
3980    fn drop(&mut self) {
3981        self.0.abort();
3982    }
3983}
3984
3985/// The result of `synchronize_received_certificates_from_validator`.
3986struct ReceivedCertificatesFromValidator {
3987    /// The name of the validator we downloaded from.
3988    public_key: ValidatorPublicKey,
3989    /// The new tracker value for that validator.
3990    tracker: u64,
3991    /// The downloaded certificates. The signatures were already checked and they are ready
3992    /// to be processed.
3993    certificates: Vec<ConfirmedBlockCertificate>,
3994    /// Sender chains that were already up to date locally. We need to ensure their messages
3995    /// are delivered.
3996    other_sender_chains: Vec<ChainId>,
3997}
3998
3999/// A pending proposed block, together with its published blobs.
4000#[derive(Clone, Serialize, Deserialize)]
4001pub struct PendingProposal {
4002    pub block: ProposedBlock,
4003    pub blobs: Vec<Blob>,
4004}
4005
4006enum ReceiveCertificateMode {
4007    NeedsCheck,
4008    AlreadyChecked,
4009}
4010
4011enum CheckCertificateResult {
4012    OldEpoch,
4013    New,
4014    FutureEpoch,
4015}
4016
4017impl CheckCertificateResult {
4018    fn into_result(self) -> Result<(), ChainClientError> {
4019        match self {
4020            Self::OldEpoch => Err(ChainClientError::CommitteeDeprecationError),
4021            Self::New => Ok(()),
4022            Self::FutureEpoch => Err(ChainClientError::CommitteeSynchronizationError),
4023        }
4024    }
4025}
4026
4027/// Creates a compressed Contract, Service and bytecode.
4028#[cfg(not(target_arch = "wasm32"))]
4029pub async fn create_bytecode_blobs(
4030    contract: Bytecode,
4031    service: Bytecode,
4032    vm_runtime: VmRuntime,
4033) -> (Vec<Blob>, ModuleId) {
4034    match vm_runtime {
4035        VmRuntime::Wasm => {
4036            let (compressed_contract, compressed_service) =
4037                tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
4038                    .await
4039                    .expect("Compression should not panic");
4040            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
4041            let service_blob = Blob::new_service_bytecode(compressed_service);
4042            let module_id =
4043                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
4044            (vec![contract_blob, service_blob], module_id)
4045        }
4046        VmRuntime::Evm => {
4047            let compressed_contract = contract.compress();
4048            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
4049            let module_id = ModuleId::new(
4050                evm_contract_blob.id().hash,
4051                evm_contract_blob.id().hash,
4052                vm_runtime,
4053            );
4054            (vec![evm_contract_blob], module_id)
4055        }
4056    }
4057}