Skip to main content

linera_core/client/chain_client/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod state;
5use std::{
6    collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7    convert::Infallible,
8    iter,
9    sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14    future::{self, Either, FusedFuture, Future},
15    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20    abi::Abi,
21    crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
22    data_types::{
23        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
24        ChainDescription, Epoch, MessagePolicy, Round, Timestamp,
25    },
26    ensure,
27    identifiers::{
28        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
29        ModuleId, StreamId,
30    },
31    ownership::{ChainOwnership, TimeoutConfig},
32    time::{Duration, Instant},
33};
34#[cfg(not(target_arch = "wasm32"))]
35use linera_base::{data_types::Bytecode, vm::VmRuntime};
36use linera_chain::{
37    data_types::{
38        BlockProposal, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight, IncomingBundle,
39        ProposedBlock, Transaction,
40    },
41    manager::LockingBlock,
42    types::{
43        Block, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, Timeout,
44        TimeoutCertificate, ValidatedBlock,
45    },
46    ChainError, ChainExecutionContext,
47};
48use linera_execution::{
49    committee::Committee,
50    system::{
51        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
52        REMOVED_EPOCH_STREAM_NAME,
53    },
54    ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
55};
56use linera_storage::{Clock as _, Storage as _};
57use linera_views::ViewError;
58use serde::Serialize;
59pub(crate) use state::State;
60use thiserror::Error;
61use tokio::sync::mpsc;
62use tokio_stream::wrappers::UnboundedReceiverStream;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65#[cfg(not(target_arch = "wasm32"))]
66use super::create_bytecode_blobs;
67use super::{
68    received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
69    ListeningMode, PendingProposal, TimingType,
70};
71use crate::{
72    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
73    environment::Environment,
74    local_node::{LocalNodeClient, LocalNodeError},
75    node::{
76        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
77        ValidatorNodeProvider as _,
78    },
79    remote_node::RemoteNode,
80    updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
81    worker::{Notification, Reason, WorkerError},
82};
83
84#[derive(Debug, Clone)]
85pub struct Options {
86    /// Maximum number of pending message bundles processed at a time in a block.
87    pub max_pending_message_bundles: usize,
88    /// Maximum number of message bundles to discard from a block proposal due to block limit
89    /// errors before discarding all remaining bundles.
90    ///
91    /// Discarded bundles can be retried in the next block.
92    pub max_block_limit_errors: u32,
93    /// Maximum number of new stream events processed at a time in a block.
94    pub max_new_events_per_block: usize,
95    /// Time budget for staging message bundles. When set, limits bundle execution by
96    /// wall-clock time, in addition to the count limit from `max_pending_message_bundles`.
97    pub staging_bundles_time_budget: Option<Duration>,
98    /// The policy for automatically handling incoming messages.
99    pub message_policy: MessagePolicy,
100    /// Whether to block on cross-chain message delivery.
101    pub cross_chain_message_delivery: CrossChainMessageDelivery,
102    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
103    /// as a fraction of time taken to reach quorum.
104    pub quorum_grace_period: f64,
105    /// The delay when downloading a blob, after which we try a second validator.
106    pub blob_download_timeout: Duration,
107    /// The delay when downloading a batch of certificates, after which we try a second validator.
108    pub certificate_batch_download_timeout: Duration,
109    /// Maximum number of certificates that we download at a time from one validator when
110    /// synchronizing one of our chains.
111    pub certificate_download_batch_size: u64,
112    /// Maximum number of certificates read from local storage and uploaded to a validator
113    /// at a time when synchronizing a chain.
114    pub certificate_upload_batch_size: u64,
115    /// Maximum number of sender certificates we try to download and receive in one go
116    /// when syncing sender chains.
117    pub sender_certificate_download_batch_size: usize,
118    /// Maximum number of tasks that can be joined concurrently using buffer_unordered.
119    pub max_joined_tasks: usize,
120    /// Whether to allow creating blocks in the fast round. Fast blocks have lower latency but
121    /// must be used carefully so that there are never any conflicting fast block proposals.
122    pub allow_fast_blocks: bool,
123    /// Initial probe interval for the notification circuit breaker. When a validator's
124    /// notification stream exhausts retries, the circuit breaker waits this long before
125    /// probing again. Doubles on each failed probe.
126    pub notification_circuit_breaker_initial_probe_interval: Duration,
127    /// Maximum probe interval for the notification circuit breaker. The probe interval
128    /// doubles on each failure but is capped at this value.
129    pub notification_circuit_breaker_max_probe_interval: Duration,
130    /// Maximum number of event stream IDs to include in a single `PreviousEventBlocks`
131    /// request. Larger sets are split into multiple requests.
132    pub max_event_stream_queries: usize,
133}
134
135struct CircuitBreakerState {
136    next_probe_at: Instant,
137    probe_interval: Duration,
138}
139
140#[cfg(with_testing)]
141impl Options {
142    pub fn test_default() -> Self {
143        use super::{
144            DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
145            DEFAULT_MAX_EVENT_STREAM_QUERIES, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
146        };
147        use crate::DEFAULT_QUORUM_GRACE_PERIOD;
148
149        Options {
150            max_pending_message_bundles: 10,
151            max_block_limit_errors: 3,
152            max_new_events_per_block: 10,
153            staging_bundles_time_budget: None,
154            message_policy: MessagePolicy::new_accept_all(),
155            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
156            quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
157            blob_download_timeout: Duration::from_secs(1),
158            certificate_batch_download_timeout: Duration::from_secs(1),
159            certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
160            certificate_upload_batch_size: DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
161            sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
162            max_joined_tasks: 100,
163            allow_fast_blocks: false,
164            notification_circuit_breaker_initial_probe_interval: Duration::from_secs(300),
165            notification_circuit_breaker_max_probe_interval: Duration::from_secs(3600),
166            max_event_stream_queries: DEFAULT_MAX_EVENT_STREAM_QUERIES,
167        }
168    }
169}
170
171impl Options {
172    /// Builds the [`BundleExecutionPolicy`] based on the client options.
173    pub fn bundle_execution_policy(&self) -> BundleExecutionPolicy {
174        BundleExecutionPolicy {
175            on_failure: BundleFailurePolicy::AutoRetry {
176                max_failures: self.max_block_limit_errors,
177            },
178            time_budget: self.staging_bundles_time_budget,
179        }
180    }
181}
182
183/// Client to operate a chain by interacting with validators and the given local storage
184/// implementation.
185/// * The chain being operated is called the "local chain" or just the "chain".
186/// * As a rule, operations are considered successful (and communication may stop) when
187///   they succeeded in gathering a quorum of responses.
188#[derive(Debug)]
189pub struct ChainClient<Env: Environment> {
190    /// The Linera [`Client`] that manages operations for this chain client.
191    #[debug(skip)]
192    pub(crate) client: Arc<Client<Env>>,
193    /// The off-chain chain ID.
194    chain_id: ChainId,
195    /// The client options.
196    #[debug(skip)]
197    options: Options,
198    /// The preferred owner of the chain used to sign proposals.
199    /// `None` if we cannot propose on this chain.
200    preferred_owner: Option<AccountOwner>,
201    /// The next block height as read from the wallet.
202    initial_next_block_height: BlockHeight,
203    /// The last block hash as read from the wallet.
204    initial_block_hash: Option<CryptoHash>,
205    /// Optional timing sender for benchmarking.
206    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
207}
208
209impl<Env: Environment> Clone for ChainClient<Env> {
210    fn clone(&self) -> Self {
211        Self {
212            client: self.client.clone(),
213            chain_id: self.chain_id,
214            options: self.options.clone(),
215            preferred_owner: self.preferred_owner,
216            initial_next_block_height: self.initial_next_block_height,
217            initial_block_hash: self.initial_block_hash,
218            timing_sender: self.timing_sender.clone(),
219        }
220    }
221}
222
223/// Error type for [`ChainClient`].
224#[derive(Debug, Error)]
225pub enum Error {
226    #[error("Local node operation failed: {0}")]
227    LocalNodeError(#[from] LocalNodeError),
228
229    #[error("Remote node operation failed: {0}")]
230    RemoteNodeError(#[from] NodeError),
231
232    #[error(transparent)]
233    ArithmeticError(#[from] ArithmeticError),
234
235    #[error("Missing certificates: {0:?}")]
236    ReadCertificatesError(Vec<CryptoHash>),
237
238    #[error("Missing confirmed block: {0:?}")]
239    MissingConfirmedBlock(CryptoHash),
240
241    #[error("JSON (de)serialization error: {0}")]
242    JsonError(#[from] serde_json::Error),
243
244    #[error("Chain operation failed: {0}")]
245    ChainError(#[from] ChainError),
246
247    #[error(transparent)]
248    CommunicationError(#[from] CommunicationError<NodeError>),
249
250    #[error("Internal error within chain client: {0}")]
251    InternalError(&'static str),
252
253    #[error(
254        "Cannot accept a certificate from an unknown committee in the future. \
255         Please synchronize the local view of the admin chain"
256    )]
257    CommitteeSynchronizationError,
258
259    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
260    WalletSynchronizationError,
261
262    #[error("The state of the client is incompatible with the proposed block: {0}")]
263    BlockProposalError(&'static str),
264
265    #[error(
266        "Cannot accept a certificate from a committee that was retired. \
267         Try a newer certificate from the same origin"
268    )]
269    CommitteeDeprecationError,
270
271    #[error("Protocol error within chain client: {0}")]
272    ProtocolError(&'static str),
273
274    #[error("Signer doesn't have key to sign for chain {0}")]
275    CannotFindKeyForChain(ChainId),
276
277    #[error("client is not configured to propose on chain {0}")]
278    NoAccountKeyConfigured(ChainId),
279
280    #[error("The chain client isn't owner on chain {0}")]
281    NotAnOwner(ChainId),
282
283    #[error(transparent)]
284    ViewError(#[from] ViewError),
285
286    #[error(
287        "Failed to download certificates and update local node to the next height \
288         {target_next_block_height} of chain {chain_id}"
289    )]
290    CannotDownloadCertificates {
291        chain_id: ChainId,
292        target_next_block_height: BlockHeight,
293    },
294
295    #[error(transparent)]
296    BcsError(#[from] bcs::Error),
297
298    #[error(
299        "Unexpected quorum: validators voted for block hash {hash} in {round}, \
300         expected block hash {expected_hash} in {expected_round}"
301    )]
302    UnexpectedQuorum {
303        hash: CryptoHash,
304        round: Round,
305        expected_hash: CryptoHash,
306        expected_round: Round,
307    },
308
309    #[error("signer error: {0:?}")]
310    Signer(#[source] Box<dyn signer::Error>),
311
312    #[error("Cannot revoke the current epoch {0}")]
313    CannotRevokeCurrentEpoch(Epoch),
314
315    #[error("Epoch is already revoked")]
316    EpochAlreadyRevoked,
317
318    #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
319    CannotDownloadMissingSenderBlock {
320        chain_id: ChainId,
321        height: BlockHeight,
322    },
323
324    #[error(
325        "A different block was already committed at this height. \
326         The committed certificate hash is {0}"
327    )]
328    Conflict(CryptoHash),
329}
330
331impl From<Infallible> for Error {
332    fn from(infallible: Infallible) -> Self {
333        match infallible {}
334    }
335}
336
337impl Error {
338    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
339        Self::Signer(Box::new(err))
340    }
341}
342
343impl<Env: Environment> ChainClient<Env> {
344    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
345    pub(crate) fn new(
346        client: Arc<Client<Env>>,
347        chain_id: ChainId,
348        options: Options,
349        initial_block_hash: Option<CryptoHash>,
350        initial_next_block_height: BlockHeight,
351        preferred_owner: Option<AccountOwner>,
352        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
353    ) -> Self {
354        ChainClient {
355            client,
356            chain_id,
357            options,
358            preferred_owner,
359            initial_block_hash,
360            initial_next_block_height,
361            timing_sender,
362        }
363    }
364
365    /// Returns whether this chain is in follow-only mode.
366    #[instrument(level = "trace", skip(self))]
367    pub fn is_follow_only(&self) -> bool {
368        self.client
369            .chain_mode(self.chain_id)
370            .is_none_or(|mode| mode.is_follow_only())
371    }
372
373    /// Returns the proposal mutex for this chain.
374    ///
375    /// The mutex serializes block proposals and holds the pending proposal (if any).
376    #[instrument(level = "trace", skip(self))]
377    fn proposal_mutex(&self) -> Arc<tokio::sync::Mutex<Option<PendingProposal>>> {
378        self.client
379            .chains
380            .pin()
381            .get(&self.chain_id)
382            .expect("Chain client constructed for invalid chain")
383            .proposal_mutex()
384    }
385
386    /// Returns the pending proposal, if any.
387    #[instrument(level = "trace", skip(self))]
388    pub async fn pending_proposal(&self) -> Option<PendingProposal> {
389        self.proposal_mutex().lock().await.clone()
390    }
391
392    /// Gets a reference to the client's signer instance.
393    #[instrument(level = "trace", skip(self))]
394    pub fn signer(&self) -> &impl Signer {
395        self.client.signer()
396    }
397
398    /// Returns whether the signer has a key for the given owner.
399    pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, Error> {
400        self.signer()
401            .contains_key(owner)
402            .await
403            .map_err(Error::signer_failure)
404    }
405
406    /// Gets a mutable reference to the per-`ChainClient` options.
407    #[instrument(level = "trace", skip(self))]
408    pub fn options_mut(&mut self) -> &mut Options {
409        &mut self.options
410    }
411
412    /// Gets a reference to the per-`ChainClient` options.
413    #[instrument(level = "trace", skip(self))]
414    pub fn options(&self) -> &Options {
415        &self.options
416    }
417
418    /// Gets the ID of the associated chain.
419    #[instrument(level = "trace", skip(self))]
420    pub fn chain_id(&self) -> ChainId {
421        self.chain_id
422    }
423
424    /// Gets a clone of the timing sender for benchmarking.
425    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
426        self.timing_sender.clone()
427    }
428
429    /// Gets the ID of the admin chain.
430    #[instrument(level = "trace", skip(self))]
431    pub fn admin_chain_id(&self) -> ChainId {
432        self.client.admin_chain_id
433    }
434
435    /// Gets the currently preferred owner for signing the blocks.
436    #[instrument(level = "trace", skip(self))]
437    pub fn preferred_owner(&self) -> Option<AccountOwner> {
438        self.preferred_owner
439    }
440
441    /// Sets the new, preferred owner for signing the blocks.
442    #[instrument(level = "trace", skip(self))]
443    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
444        self.preferred_owner = Some(preferred_owner);
445    }
446
447    /// Unsets the preferred owner for signing the blocks.
448    #[instrument(level = "trace", skip(self))]
449    pub fn unset_preferred_owner(&mut self) {
450        self.preferred_owner = None;
451    }
452
453    /// Obtains a `ChainStateView` for this client's chain.
454    #[instrument(level = "trace")]
455    pub async fn chain_state_view(
456        &self,
457    ) -> Result<crate::worker::ChainStateViewReadGuard<Env::Storage>, LocalNodeError> {
458        self.client.local_node.chain_state_view(self.chain_id).await
459    }
460
461    /// Returns chain IDs that this chain subscribes to, along with the subscribed streams.
462    /// An empty stream set for a chain means follow all event streams.
463    #[instrument(level = "trace", skip(self))]
464    pub async fn event_stream_publishers(
465        &self,
466    ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
467        let subscriptions = self
468            .client
469            .local_node
470            .get_event_subscriptions(self.chain_id)
471            .await?;
472        let mut publishers = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
473        for ((chain_id, stream_name), _) in subscriptions {
474            publishers.entry(chain_id).or_default().insert(stream_name);
475        }
476        if self.chain_id != self.client.admin_chain_id {
477            // Empty streams = follow all for admin chain.
478            publishers.entry(self.client.admin_chain_id).or_default();
479        }
480        Ok(publishers)
481    }
482
483    /// Subscribes to notifications from this client's chain.
484    #[instrument(level = "trace")]
485    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
486        self.subscribe_to(self.chain_id)
487    }
488
489    /// Subscribes to notifications from the specified chain.
490    #[instrument(level = "trace")]
491    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
492        Ok(Box::pin(UnboundedReceiverStream::new(
493            self.client.notifier.subscribe(vec![chain_id]),
494        )))
495    }
496
497    /// Returns the storage client used by this client's local node.
498    #[instrument(level = "trace")]
499    pub fn storage_client(&self) -> &Env::Storage {
500        self.client.storage_client()
501    }
502
503    /// Obtains the basic `ChainInfo` data for the local chain.
504    #[instrument(level = "trace")]
505    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
506        let query = ChainInfoQuery::new(self.chain_id);
507        let response = self
508            .client
509            .local_node
510            .handle_chain_info_query(query)
511            .await?;
512        Ok(response.info)
513    }
514
515    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
516    #[instrument(level = "trace")]
517    pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
518        let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
519        let response = self
520            .client
521            .local_node
522            .handle_chain_info_query(query)
523            .await?;
524        Ok(response.info)
525    }
526
527    /// Returns the chain's description. Fetches it from the validators if necessary.
528    pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
529        self.client.get_chain_description(self.chain_id).await
530    }
531
532    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
533    /// local chain.
534    #[instrument(level = "trace")]
535    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
536        if self.options.message_policy.is_ignore() {
537            // Ignore all messages.
538            return Ok(Vec::new());
539        }
540
541        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
542        let info = self
543            .client
544            .local_node
545            .handle_chain_info_query(query)
546            .await?
547            .info;
548        if self.preferred_owner.is_some_and(|owner| {
549            info.manager
550                .ownership
551                .is_super_owner_no_regular_owners(&owner)
552        }) {
553            // There are only super owners; they are expected to sync manually.
554            ensure!(
555                info.next_block_height >= self.initial_next_block_height,
556                Error::WalletSynchronizationError
557            );
558        }
559
560        Ok(info
561            .requested_pending_message_bundles
562            .into_iter()
563            .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
564            .take(self.options.max_pending_message_bundles)
565            .collect())
566    }
567
568    /// Returns an `UpdateStreams` operation that updates this client's chain about new events
569    /// in any of the streams its applications are subscribing to. Returns `None` if there are no
570    /// new events.
571    #[instrument(level = "trace")]
572    async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
573        // Load all our subscriptions.
574        let subscription_map = self
575            .client
576            .local_node
577            .get_event_subscriptions(self.chain_id)
578            .await?;
579        // Collect the indices of all new events.
580        let futures = subscription_map
581            .into_iter()
582            .filter(|((chain_id, _), _)| {
583                self.options
584                    .message_policy
585                    .restrict_chain_ids_to
586                    .as_ref()
587                    .is_none_or(|chain_set| chain_set.contains(chain_id))
588            })
589            .filter(|((_, stream_id), _)| {
590                self.options
591                    .message_policy
592                    .process_events_from_application_ids
593                    .as_ref()
594                    .is_none_or(|app_set| app_set.contains(&stream_id.application_id))
595            })
596            .map(|((chain_id, stream_id), subscriptions)| {
597                let client = self.client.clone();
598                let previous_index = subscriptions.next_index;
599                async move {
600                    let next_index = client
601                        .local_node
602                        .get_stream_event_count(chain_id, stream_id.clone())
603                        .await?;
604                    if let Some(next_index) =
605                        next_index.filter(|next_index| *next_index > previous_index)
606                    {
607                        Ok(Some((chain_id, stream_id, previous_index, next_index)))
608                    } else {
609                        Ok::<_, Error>(None)
610                    }
611                }
612            });
613        let all_updates = futures::stream::iter(futures)
614            .buffer_unordered(self.options.max_joined_tasks)
615            .try_collect::<Vec<_>>()
616            .await?
617            .into_iter()
618            .flatten()
619            .collect::<Vec<_>>();
620        // Apply the max_new_events_per_block limit.
621        let max_events = self.options.max_new_events_per_block;
622        let mut total_events: usize = 0;
623        let mut updates = Vec::new();
624        for (chain_id, stream_id, previous_index, next_index) in all_updates {
625            let new_events = (next_index - previous_index) as usize;
626            if total_events + new_events <= max_events {
627                total_events += new_events;
628                updates.push((chain_id, stream_id, next_index));
629            } else {
630                let remaining = max_events.saturating_sub(total_events);
631                if remaining > 0 {
632                    updates.push((chain_id, stream_id, previous_index + remaining as u32));
633                }
634                break;
635            }
636        }
637        if updates.is_empty() {
638            return Ok(None);
639        }
640        Ok(Some(SystemOperation::UpdateStreams(updates).into()))
641    }
642
643    #[instrument(level = "trace")]
644    async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
645        self.client.chain_info_with_committees(self.chain_id).await
646    }
647
648    /// Obtains the current epoch of the local chain as well as its set of trusted committees.
649    #[instrument(level = "trace")]
650    async fn epoch_and_committees(
651        &self,
652    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
653        let info = self.chain_info_with_committees().await?;
654        let committees = info
655            .requested_committees
656            .ok_or(LocalNodeError::InvalidChainInfoResponse)?;
657        Ok((info.epoch, committees))
658    }
659
660    /// Obtains the committee for the current epoch of the local chain.
661    #[instrument(level = "trace")]
662    pub async fn local_committee(&self) -> Result<Arc<Committee>, Error> {
663        let info = match self.chain_info().await {
664            Ok(info) => info,
665            Err(LocalNodeError::BlobsNotFound(_)) => {
666                self.synchronize_chain_state(self.chain_id).await?;
667                self.chain_info().await?
668            }
669            Err(LocalNodeError::EventsNotFound(event_ids))
670                if event_ids
671                    .iter()
672                    .all(|event_id| event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)) =>
673            {
674                // `initialize_and_save_if_needed` couldn't start the chain because
675                // the admin chain's epoch events aren't synced yet.
676                self.synchronize_chain_state(self.client.admin_chain_id)
677                    .await?;
678                self.chain_info().await?
679            }
680            Err(err) => return Err(err.into()),
681        };
682        let committee = self
683            .client
684            .storage_client()
685            .get_or_load_committee(info.epoch)
686            .await?
687            .ok_or_else(|| LocalNodeError::InactiveChain(self.chain_id))?;
688        Ok(committee)
689    }
690
691    /// Obtains the committee for the latest epoch on the admin chain.
692    #[instrument(level = "trace")]
693    pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
694        self.client.admin_committee().await
695    }
696
697    /// Obtains the identity of the current owner of the chain.
698    ///
699    /// Returns an error if we don't have the private key for the identity.
700    #[instrument(level = "trace")]
701    pub async fn identity(&self) -> Result<AccountOwner, Error> {
702        let Some(preferred_owner) = self.preferred_owner else {
703            return Err(Error::NoAccountKeyConfigured(self.chain_id));
704        };
705        let manager = self.chain_info().await?.manager;
706        ensure!(
707            manager.ownership.is_active(),
708            LocalNodeError::InactiveChain(self.chain_id)
709        );
710
711        // Check if the preferred owner can propose on this chain: either they are an owner,
712        // the current leader, or open_multi_leader_rounds is enabled.
713        let is_owner = manager
714            .ownership
715            .can_propose_in_multi_leader_round(&preferred_owner);
716
717        if !is_owner {
718            let accepted_owners = manager
719                .ownership
720                .all_owners()
721                .chain(&manager.leader)
722                .collect::<Vec<_>>();
723            warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
724                "The preferred owner is not configured as an owner of this chain",
725            );
726            return Err(Error::NotAnOwner(self.chain_id));
727        }
728
729        let has_signer = self
730            .signer()
731            .contains_key(&preferred_owner)
732            .await
733            .map_err(Error::signer_failure)?;
734
735        if !has_signer {
736            warn!(%self.chain_id, ?preferred_owner,
737                "Chain is one of the owners but its Signer instance doesn't contain the key",
738            );
739            return Err(Error::CannotFindKeyForChain(self.chain_id));
740        }
741
742        Ok(preferred_owner)
743    }
744
745    /// Prepares the chain for a new owner by fetching the chain description and validating access.
746    ///
747    /// This is useful when assigning a chain to a client that may not have the owner key,
748    /// e.g. when a faucet creates a chain with `open_multi_leader_rounds`.
749    ///
750    /// Returns the chain info if the owner can propose on this chain (either because they are
751    /// an owner, or because `open_multi_leader_rounds` is enabled).
752    #[instrument(level = "trace")]
753    pub async fn prepare_for_owner(&self, owner: AccountOwner) -> Result<Box<ChainInfo>, Error> {
754        ensure!(
755            self.client.has_key_for(&owner).await?,
756            Error::CannotFindKeyForChain(self.chain_id)
757        );
758        // Ensure we have the chain description blob.
759        self.client
760            .get_chain_description_blob(self.chain_id)
761            .await?;
762
763        // Get chain info.
764        let info = self.chain_info().await?;
765
766        // Validate that the owner can propose on this chain.
767        ensure!(
768            info.manager
769                .ownership
770                .can_propose_in_multi_leader_round(&owner),
771            Error::NotAnOwner(self.chain_id)
772        );
773
774        Ok(info)
775    }
776
777    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
778    /// its current height.
779    #[instrument(level = "trace")]
780    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
781        #[cfg(with_metrics)]
782        let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
783
784        let mut info = self.synchronize_to_known_height().await?;
785
786        if self.preferred_owner.is_none_or(|owner| {
787            !info
788                .manager
789                .ownership
790                .is_super_owner_no_regular_owners(&owner)
791        }) {
792            // If we are not a super owner or there are regular owners, we could be missing recent
793            // certificates created by other clients. Further synchronize blocks from the network.
794            // This is a best-effort that depends on network conditions.
795            info = self.client.synchronize_chain_state(self.chain_id).await?;
796        }
797
798        if info.epoch > self.client.admin_committees().await?.0 {
799            self.client
800                .synchronize_chain_state(self.client.admin_chain_id)
801                .await?;
802        }
803
804        Ok(info)
805    }
806
807    // Verifies that our local storage contains enough history compared to the
808    // known block height. Otherwise, downloads the missing history from the
809    // network.
810    // The known height only differs if the wallet is ahead of storage.
811    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
812        let info = self
813            .client
814            .download_certificates(self.chain_id, self.initial_next_block_height)
815            .await?;
816        if info.next_block_height == self.initial_next_block_height {
817            // Check that our local node has the expected block hash.
818            ensure!(
819                self.initial_block_hash == info.block_hash,
820                Error::InternalError("Invalid chain of blocks in local node")
821            );
822        }
823        Ok(info)
824    }
825
826    /// Attempts to update all validators about the local chain.
827    #[instrument(level = "trace", skip(old_committee, latest_certificate))]
828    pub async fn update_validators(
829        &self,
830        old_committee: Option<&Committee>,
831        latest_certificate: Option<ConfirmedBlockCertificate>,
832    ) -> Result<(), Error> {
833        let update_validators_start = linera_base::time::Instant::now();
834        // Communicate the new certificate now.
835        if let Some(old_committee) = old_committee {
836            let old_committee_start = linera_base::time::Instant::now();
837            self.communicate_chain_updates(old_committee, latest_certificate.clone())
838                .await?;
839            tracing::debug!(
840                old_committee_ms = old_committee_start.elapsed().as_millis(),
841                "communicated chain updates to old committee"
842            );
843        };
844        if let Ok(new_committee) = self.local_committee().await {
845            if Some(&*new_committee) != old_committee {
846                // If the configuration just changed, communicate to the new committee as well.
847                // (This is actually more important that updating the previous committee.)
848                let new_committee_start = linera_base::time::Instant::now();
849                self.communicate_chain_updates(&new_committee, latest_certificate)
850                    .await?;
851                tracing::debug!(
852                    new_committee_ms = new_committee_start.elapsed().as_millis(),
853                    "communicated chain updates to new committee"
854                );
855            }
856        }
857        self.send_timing(update_validators_start, TimingType::UpdateValidators);
858        Ok(())
859    }
860
861    /// Broadcasts certified blocks to validators.
862    #[instrument(level = "trace", skip(committee, latest_certificate))]
863    pub async fn communicate_chain_updates(
864        &self,
865        committee: &Committee,
866        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
867    ) -> Result<(), Error> {
868        let delivery = self.options.cross_chain_message_delivery;
869        let height = self.chain_info().await?.next_block_height;
870        self.client
871            .communicate_chain_updates(
872                committee,
873                self.chain_id,
874                height,
875                delivery,
876                latest_certificate,
877            )
878            .await
879    }
880
881    /// Synchronizes all chains that any application on this chain subscribes to.
882    /// We always consider the admin chain a relevant publishing chain, for new epochs.
883    /// For event publisher chains, only event-bearing blocks are downloaded (partial sync).
884    async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
885        let subscriptions = self
886            .client
887            .local_node
888            .get_event_subscriptions(self.chain_id)
889            .await?;
890        // Group subscribed streams by publisher chain.
891        let mut streams_by_chain = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
892        for ((chain_id, stream_id), _) in &subscriptions {
893            if *chain_id != self.chain_id {
894                streams_by_chain
895                    .entry(*chain_id)
896                    .or_default()
897                    .insert(stream_id.clone());
898            }
899        }
900        // Always fully sync the admin chain for epoch changes.
901        let admin_chain_id = self.client.admin_chain_id;
902        if admin_chain_id != self.chain_id {
903            self.client.synchronize_chain_state(admin_chain_id).await?;
904        }
905        // For event publisher chains, do a partial sync using previous_event_blocks.
906        let (_, committee) = self.admin_committee().await?;
907        let nodes = self.client.make_nodes(&committee)?;
908        let tasks = streams_by_chain
909            .into_iter()
910            .filter(|(chain_id, _)| *chain_id != admin_chain_id)
911            .map(|(chain_id, stream_ids)| {
912                self.sync_publisher_chain_events(chain_id, stream_ids, &nodes, &committee)
913            })
914            .collect::<Vec<_>>();
915        stream::iter(tasks)
916            .buffer_unordered(self.options.max_joined_tasks)
917            .collect::<Vec<_>>()
918            .await
919            .into_iter()
920            .collect::<Result<Vec<_>, _>>()?;
921        Ok(())
922    }
923
924    /// Downloads only event-bearing blocks for the given publisher chain and streams.
925    ///
926    /// Uses `communicate_with_quorum` so that each validator is queried for
927    /// `previous_event_blocks` and the claimed blocks are downloaded from that same
928    /// validator. Waiting for a quorum guarantees that any confirmed event is discovered,
929    /// since at least one honest validator in the quorum must have it.
930    async fn sync_publisher_chain_events(
931        &self,
932        publisher_chain_id: ChainId,
933        stream_ids: BTreeSet<StreamId>,
934        nodes: &[RemoteNode<Env::ValidatorNode>],
935        committee: &Committee,
936    ) -> Result<(), Error> {
937        let stream_ids_ref = &stream_ids;
938        communicate_with_quorum(
939            nodes,
940            committee,
941            |_: &()| (),
942            |remote_node| async move {
943                self.client
944                    .sync_events_from_node(publisher_chain_id, stream_ids_ref, &remote_node)
945                    .await
946            },
947            self.options.quorum_grace_period,
948        )
949        .await?;
950        Ok(())
951    }
952
953    /// Attempts to download new received certificates.
954    ///
955    /// This is a best effort: it will only find certificates that have been confirmed
956    /// amongst sufficiently many validators of the current committee of the target
957    /// chain.
958    ///
959    /// However, this should be the case whenever a sender's chain is still in use and
960    /// is regularly upgraded to new committees.
961    #[instrument(level = "debug", skip(self), fields(chain_id = %self.chain_id))]
962    pub async fn find_received_certificates(&self) -> Result<(), Error> {
963        debug!("starting find_received_certificates");
964        #[cfg(with_metrics)]
965        let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
966        // Use network information from the local chain.
967        let chain_id = self.chain_id;
968        let (_, committee) = self.admin_committee().await?;
969        let nodes = self.client.make_nodes(&committee)?;
970
971        let trackers = self
972            .client
973            .local_node
974            .get_received_certificate_trackers(chain_id)
975            .await?;
976
977        trace!("find_received_certificates: read trackers");
978
979        let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
980        // Proceed to downloading received logs.
981        let result = communicate_with_quorum(
982            &nodes,
983            &committee,
984            |_| (),
985            |remote_node| {
986                let client = &self.client;
987                let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
988                let received_log_batches = Arc::clone(&received_log_batches);
989                Box::pin(async move {
990                    let batch = client
991                        .get_received_log_from_validator(chain_id, &remote_node, tracker)
992                        .await?;
993                    let mut batches = received_log_batches.lock().unwrap();
994                    batches.push((remote_node.public_key, batch));
995                    Ok(())
996                })
997            },
998            self.options.quorum_grace_period,
999        )
1000        .await;
1001
1002        if let Err(error) = result {
1003            error!(
1004                %error,
1005                "Failed to synchronize received_logs from at least a quorum of validators",
1006            );
1007        }
1008
1009        let received_logs: Vec<_> = {
1010            let mut received_log_batches = received_log_batches.lock().unwrap();
1011            std::mem::take(received_log_batches.as_mut())
1012        };
1013
1014        debug!(
1015            received_logs_len = %received_logs.len(),
1016            received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
1017            "collected received logs"
1018        );
1019
1020        let (received_logs, mut validator_trackers) = {
1021            (
1022                ReceivedLogs::from_received_result(received_logs.clone()),
1023                ValidatorTrackers::new(received_logs, &trackers),
1024            )
1025        };
1026
1027        debug!(
1028            num_chains = %received_logs.num_chains(),
1029            num_certs = %received_logs.num_certs(),
1030            "find_received_certificates: total number of chains and certificates to sync",
1031        );
1032
1033        let max_blocks_per_chain =
1034            self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
1035        for received_log in received_logs.into_batches(
1036            self.options.sender_certificate_download_batch_size,
1037            max_blocks_per_chain,
1038        ) {
1039            validator_trackers = self
1040                .receive_sender_certificates(received_log, validator_trackers, &nodes)
1041                .await?;
1042
1043            self.update_received_certificate_trackers(&validator_trackers)
1044                .await;
1045        }
1046
1047        info!("find_received_certificates finished");
1048
1049        Ok(())
1050    }
1051
1052    async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
1053        let updated_trackers = trackers.to_map();
1054        trace!(?updated_trackers, "updated tracker values");
1055
1056        // Update the trackers.
1057        if let Err(error) = self
1058            .client
1059            .local_node
1060            .update_received_certificate_trackers(self.chain_id, updated_trackers)
1061            .await
1062        {
1063            error!(
1064                chain_id = %self.chain_id,
1065                %error,
1066                "Failed to update the certificate trackers",
1067            );
1068        }
1069    }
1070
1071    /// Downloads and processes or preprocesses the certificates for blocks sending messages to
1072    /// this chain that we are still missing.
1073    async fn receive_sender_certificates(
1074        &self,
1075        mut received_logs: ReceivedLogs,
1076        mut validator_trackers: ValidatorTrackers,
1077        nodes: &[RemoteNode<Env::ValidatorNode>],
1078    ) -> Result<ValidatorTrackers, Error> {
1079        debug!(
1080            num_chains = %received_logs.num_chains(),
1081            num_certs = %received_logs.num_certs(),
1082            "receive_sender_certificates: number of chains and certificates to sync",
1083        );
1084
1085        // Obtain the next block height we need in the local node, for each chain.
1086        let local_next_heights = self
1087            .client
1088            .local_node
1089            .next_outbox_heights(received_logs.chains(), self.chain_id)
1090            .await?;
1091
1092        validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
1093
1094        debug!(
1095            remaining_total_certificates = %received_logs.num_certs(),
1096            "receive_sender_certificates: computed remote_heights"
1097        );
1098
1099        let mut other_sender_chains = Vec::new();
1100        let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
1101
1102        let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
1103            let received_logs = &received_logs;
1104            let other_sender_chains = &mut other_sender_chains;
1105
1106            move |(sender_chain_id, remote_heights)| {
1107                if remote_heights.is_empty() {
1108                    // Our highest, locally executed block is higher than any block height
1109                    // from the current batch. Skip this batch, but remember to wait for
1110                    // the messages to be delivered to the inboxes.
1111                    other_sender_chains.push(sender_chain_id);
1112                    return None;
1113                };
1114                let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
1115                let sender = sender.clone();
1116                let client = self.client.clone();
1117                let nodes = nodes.to_vec();
1118                Some(async move {
1119                    client
1120                        .download_and_process_sender_chain(
1121                            sender_chain_id,
1122                            &nodes,
1123                            received_logs,
1124                            remote_heights,
1125                            sender,
1126                        )
1127                        .await
1128                })
1129            }
1130        });
1131
1132        future::join(
1133            stream::iter(cert_futures)
1134                .buffer_unordered(self.options.max_joined_tasks)
1135                .collect::<()>(),
1136            async {
1137                while let Some(chain_and_height) = receiver.recv().await {
1138                    validator_trackers.downloaded_cert(chain_and_height);
1139                }
1140            },
1141        )
1142        .await;
1143
1144        debug!(
1145            num_other_chains = %other_sender_chains.len(),
1146            "receive_sender_certificates: processing certificates finished"
1147        );
1148
1149        // Certificates for these chains were omitted from `certificates` because they were
1150        // already processed locally. If they were processed in a concurrent task, it is not
1151        // guaranteed that their cross-chain messages were already handled.
1152        self.retry_pending_cross_chain_requests_from_sender_chains(nodes, other_sender_chains)
1153            .await;
1154
1155        debug!("receive_sender_certificates: finished processing other_sender_chains");
1156
1157        Ok(validator_trackers)
1158    }
1159
1160    /// Retries cross chain requests on the chains which may have been processed on
1161    /// another task without the messages being correctly handled. Fetches missing blobs from
1162    /// the given nodes if necessary.
1163    async fn retry_pending_cross_chain_requests_from_sender_chains(
1164        &self,
1165        nodes: &[RemoteNode<Env::ValidatorNode>],
1166        other_sender_chains: Vec<ChainId>,
1167    ) {
1168        let stream = other_sender_chains
1169            .into_iter()
1170            .map(|chain_id| async move {
1171                if let Err(error) = match self
1172                    .client
1173                    .retry_pending_cross_chain_requests(chain_id)
1174                    .await
1175                {
1176                    Ok(()) => Ok(()),
1177                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1178                        if let Err(error) = self
1179                            .client
1180                            .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1181                            .await
1182                        {
1183                            error!(
1184                                ?blob_ids,
1185                                %error,
1186                                "Error while attempting to download blobs during retrying outgoing \
1187                                messages"
1188                            );
1189                        }
1190                        self.client
1191                            .retry_pending_cross_chain_requests(chain_id)
1192                            .await
1193                    }
1194                    err => err,
1195                } {
1196                    error!(
1197                        %chain_id,
1198                        %error,
1199                        "Failed to retry outgoing messages from chain"
1200                    );
1201                }
1202            })
1203            .collect::<FuturesUnordered<_>>();
1204        stream.for_each(future::ready).await;
1205    }
1206
1207    /// Sends money.
1208    #[instrument(level = "trace")]
1209    pub async fn transfer(
1210        &self,
1211        owner: AccountOwner,
1212        amount: Amount,
1213        recipient: Account,
1214    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1215        // TODO(#467): check the balance of `owner` before signing any block proposal.
1216        Box::pin(self.execute_operation(SystemOperation::Transfer {
1217            owner,
1218            recipient,
1219            amount,
1220        }))
1221        .await
1222    }
1223
1224    /// Verify if a data blob is readable from storage.
1225    // TODO(#2490): Consider removing or renaming this.
1226    #[instrument(level = "trace")]
1227    pub async fn read_data_blob(
1228        &self,
1229        hash: CryptoHash,
1230    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1231        let blob_id = BlobId {
1232            hash,
1233            blob_type: BlobType::Data,
1234        };
1235        Box::pin(self.execute_operation(SystemOperation::VerifyBlob { blob_id })).await
1236    }
1237
1238    /// Claims money in a remote chain.
1239    #[instrument(level = "trace")]
1240    pub async fn claim(
1241        &self,
1242        owner: AccountOwner,
1243        target_id: ChainId,
1244        recipient: Account,
1245        amount: Amount,
1246    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1247        Box::pin(self.execute_operation(SystemOperation::Claim {
1248            owner,
1249            target_id,
1250            recipient,
1251            amount,
1252        }))
1253        .await
1254    }
1255
1256    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
1257    /// certificate and sends it to all validators, to make them enter the next round.
1258    #[instrument(level = "trace")]
1259    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1260        let chain_id = self.chain_id;
1261        let committee = self.local_committee().await?;
1262        let info = self.chain_info().await?;
1263        let committee = &committee;
1264        let height = info.next_block_height;
1265        let round = info.manager.current_round;
1266        let action = CommunicateAction::RequestTimeout {
1267            height,
1268            round,
1269            chain_id,
1270        };
1271        let value = Timeout::new(chain_id, height, info.epoch);
1272        let certificate = Box::new(
1273            self.client
1274                .communicate_chain_action(committee, action, value)
1275                .await?,
1276        );
1277        self.client.handle_certificate(*certificate.clone()).await?;
1278        // The block height didn't increase, but this will communicate the timeout as well.
1279        self.client
1280            .communicate_chain_updates(
1281                committee,
1282                chain_id,
1283                height,
1284                CrossChainMessageDelivery::NonBlocking,
1285                None,
1286            )
1287            .await?;
1288        Ok(*certificate)
1289    }
1290
1291    /// Downloads and processes any certificates we are missing for the given chain.
1292    #[instrument(level = "trace", skip_all)]
1293    pub async fn synchronize_chain_state(
1294        &self,
1295        chain_id: ChainId,
1296    ) -> Result<Box<ChainInfo>, Error> {
1297        self.client.synchronize_chain_state(chain_id).await
1298    }
1299
1300    /// Downloads and processes any certificates we are missing for this chain, from the given
1301    /// committee.
1302    #[instrument(level = "trace", skip_all)]
1303    pub async fn synchronize_chain_state_from_committee(
1304        &self,
1305        committee: Arc<Committee>,
1306    ) -> Result<Box<ChainInfo>, Error> {
1307        Box::pin(
1308            self.client
1309                .synchronize_chain_state_from_committee(self.chain_id, committee),
1310        )
1311        .await
1312    }
1313
1314    /// Executes a list of operations.
1315    #[instrument(level = "trace", skip(operations, blobs))]
1316    pub async fn execute_operations(
1317        &self,
1318        operations: Vec<Operation>,
1319        blobs: Vec<Blob>,
1320    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1321        let timing_start = linera_base::time::Instant::now();
1322        tracing::debug!("execute_operations started");
1323
1324        let result = loop {
1325            let execute_block_start = linera_base::time::Instant::now();
1326            // TODO(#2066): Remove boxing once the call-stack is shallower
1327            tracing::debug!("calling execute_block");
1328            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1329                Ok(ClientOutcome::Committed(certificate)) => {
1330                    tracing::debug!(
1331                        execute_block_ms = execute_block_start.elapsed().as_millis(),
1332                        "execute_block succeeded"
1333                    );
1334                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1335                    break Ok(ClientOutcome::Committed(certificate));
1336                }
1337                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
1338                    break Ok(ClientOutcome::WaitForTimeout(timeout));
1339                }
1340                Ok(ClientOutcome::Conflict(certificate)) => {
1341                    info!(
1342                        height = %certificate.block().header.height,
1343                        "Another block was committed."
1344                    );
1345                    break Ok(ClientOutcome::Conflict(certificate));
1346                }
1347                Err(Error::CommunicationError(CommunicationError::Trusted(
1348                    NodeError::UnexpectedBlockHeight {
1349                        expected_block_height,
1350                        found_block_height,
1351                    },
1352                ))) if expected_block_height > found_block_height => {
1353                    tracing::info!(
1354                        chain_id = %self.chain_id,
1355                        "Local state is outdated; synchronizing chain"
1356                    );
1357                    self.synchronize_chain_state(self.chain_id).await?;
1358                }
1359                Err(err) => return Err(err),
1360            };
1361        };
1362
1363        self.send_timing(timing_start, TimingType::ExecuteOperations);
1364        tracing::debug!(
1365            total_execute_operations_ms = timing_start.elapsed().as_millis(),
1366            "execute_operations returning"
1367        );
1368
1369        result
1370    }
1371
1372    /// Executes an operation.
1373    pub async fn execute_operation(
1374        &self,
1375        operation: impl Into<Operation>,
1376    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1377        self.execute_operations(vec![operation.into()], vec![])
1378            .await
1379    }
1380
1381    /// Executes a new block.
1382    ///
1383    /// This must be preceded by a call to `prepare_chain()`.
1384    #[instrument(level = "trace", skip(operations, blobs))]
1385    async fn execute_block(
1386        &self,
1387        operations: Vec<Operation>,
1388        blobs: Vec<Blob>,
1389    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1390        #[cfg(with_metrics)]
1391        let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1392
1393        let mutex = self.proposal_mutex();
1394        let lock_start = linera_base::time::Instant::now();
1395        let mut proposal_guard = mutex.lock_owned().await;
1396        tracing::debug!(
1397            chain_id = %self.chain_id,
1398            lock_wait_ms = lock_start.elapsed().as_millis(),
1399            "acquired proposal_mutex in execute_block"
1400        );
1401        // TODO(#5092): We shouldn't need to call this explicitly.
1402        // Process any leftover pending proposal from a previous interrupted call.
1403        // Even if there is no pending proposal, this still calls
1404        // `request_leader_timeout_if_needed` which ensures the local chain state
1405        // is synchronized with the current consensus round.
1406        match self
1407            .process_pending_block_without_prepare(&mut proposal_guard)
1408            .await?
1409        {
1410            ClientOutcome::Committed(Some(certificate)) => {
1411                return Ok(ClientOutcome::Conflict(Box::new(certificate)))
1412            }
1413            ClientOutcome::WaitForTimeout(timeout) => {
1414                return Ok(ClientOutcome::WaitForTimeout(timeout))
1415            }
1416            ClientOutcome::Conflict(certificate) => {
1417                return Ok(ClientOutcome::Conflict(certificate))
1418            }
1419            ClientOutcome::Committed(None) => {}
1420        }
1421
1422        // Collect pending messages and epoch changes after acquiring the lock to avoid
1423        // race conditions where messages valid for one block height are proposed at a
1424        // different height.
1425        let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1426
1427        if transactions.is_empty() {
1428            return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1429                WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1430            )));
1431        }
1432
1433        let block = self
1434            .new_pending_block(transactions, blobs, &mut proposal_guard)
1435            .await?;
1436
1437        match self
1438            .process_pending_block_without_prepare(&mut proposal_guard)
1439            .await?
1440        {
1441            ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1442                Ok(ClientOutcome::Committed(certificate))
1443            }
1444            ClientOutcome::Committed(Some(certificate)) => {
1445                Ok(ClientOutcome::Conflict(Box::new(certificate)))
1446            }
1447            // Unreachable: We just set the pending proposal in the guard.
1448            ClientOutcome::Committed(None) => {
1449                Err(Error::BlockProposalError("Unexpected block proposal error"))
1450            }
1451            ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
1452            ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
1453        }
1454    }
1455
1456    /// Creates a vector of transactions which, in addition to the provided operations,
1457    /// also contains epoch changes, receiving message bundles and event stream updates
1458    /// (if there are any to be processed).
1459    /// This should be called when executing a block, in order to make sure that any pending
1460    /// messages or events are included in it.
1461    #[instrument(level = "trace", skip(operations))]
1462    async fn prepend_epochs_messages_and_events(
1463        &self,
1464        operations: Vec<Operation>,
1465    ) -> Result<Vec<Transaction>, Error> {
1466        let incoming_bundles = self.pending_message_bundles().await?;
1467        let stream_updates = self.collect_stream_updates().await?;
1468        Ok(self
1469            .collect_epoch_changes()
1470            .await?
1471            .into_iter()
1472            .map(Transaction::ExecuteOperation)
1473            .chain(
1474                incoming_bundles
1475                    .into_iter()
1476                    .map(Transaction::ReceiveMessages),
1477            )
1478            .chain(
1479                stream_updates
1480                    .into_iter()
1481                    .map(Transaction::ExecuteOperation),
1482            )
1483            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1484            .collect::<Vec<_>>())
1485    }
1486
1487    /// Creates a new pending block and stores it in `proposal_guard`.
1488    ///
1489    /// The caller must hold the proposal mutex. The pending proposal is written directly
1490    /// into the guard so that it is always synchronized with the mutex.
1491    #[instrument(level = "trace", skip(transactions, blobs, proposal_guard))]
1492    async fn new_pending_block(
1493        &self,
1494        transactions: Vec<Transaction>,
1495        blobs: Vec<Blob>,
1496        proposal_guard: &mut Option<PendingProposal>,
1497    ) -> Result<Block, Error> {
1498        let identity = self.identity().await?;
1499
1500        ensure!(
1501            proposal_guard.is_none(),
1502            Error::BlockProposalError(
1503                "Client state already has a pending block; \
1504                use the `linera retry-pending-block` command to commit that first"
1505            )
1506        );
1507        let info = self.chain_info_with_manager_values().await?;
1508        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1509        let proposed_block = ProposedBlock {
1510            epoch: info.epoch,
1511            chain_id: self.chain_id,
1512            transactions,
1513            previous_block_hash: info.block_hash,
1514            height: info.next_block_height,
1515            authenticated_signer: Some(identity),
1516            timestamp,
1517        };
1518
1519        let round = self.round_for_oracle(&info, &identity).await?;
1520        // Make sure every incoming message succeeds and otherwise remove them.
1521        // Also, compute the final certified hash while we're at it.
1522        let (block, _) = Box::pin(self.client.stage_block_execution(
1523            proposed_block,
1524            round,
1525            blobs.clone(),
1526            self.options.bundle_execution_policy(),
1527        ))
1528        .await?;
1529        let (proposed_block, _) = block.clone().into_proposal();
1530        *proposal_guard = Some(PendingProposal {
1531            block: proposed_block,
1532            blobs,
1533        });
1534        Ok(block)
1535    }
1536
1537    /// Returns a suitable timestamp for the next block.
1538    ///
1539    /// This will usually be the current time according to the local clock, but may be slightly
1540    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
1541    #[instrument(level = "trace", skip(transactions))]
1542    fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1543        let local_time = self.storage_client().clock().current_time();
1544        transactions
1545            .iter()
1546            .filter_map(Transaction::incoming_bundle)
1547            .map(|msg| msg.bundle.timestamp)
1548            .max()
1549            .map_or(local_time, |timestamp| timestamp.max(local_time))
1550            .max(block_time)
1551    }
1552
1553    /// Queries an application.
1554    #[instrument(level = "trace", skip(query))]
1555    pub async fn query_application(
1556        &self,
1557        query: Query,
1558        block_hash: Option<CryptoHash>,
1559    ) -> Result<(QueryOutcome, BlockHeight), Error> {
1560        loop {
1561            let result = self
1562                .client
1563                .local_node
1564                .query_application(self.chain_id, query.clone(), block_hash)
1565                .await;
1566            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1567                let validators = self.client.validator_nodes().await?;
1568                self.client
1569                    .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1570                    .await?;
1571                continue; // We found the missing blob: retry.
1572            }
1573            return Ok(result?);
1574        }
1575    }
1576
1577    /// Queries a system application.
1578    #[instrument(level = "trace", skip(query))]
1579    pub async fn query_system_application(
1580        &self,
1581        query: SystemQuery,
1582    ) -> Result<QueryOutcome<SystemResponse>, Error> {
1583        let (
1584            QueryOutcome {
1585                response,
1586                operations,
1587            },
1588            _,
1589        ) = self.query_application(Query::System(query), None).await?;
1590        match response {
1591            QueryResponse::System(response) => Ok(QueryOutcome {
1592                response,
1593                operations,
1594            }),
1595            _ => Err(Error::InternalError("Unexpected response for system query")),
1596        }
1597    }
1598
1599    /// Queries a user application.
1600    #[instrument(level = "trace", skip(application_id, query))]
1601    #[cfg(with_testing)]
1602    pub async fn query_user_application<A: Abi>(
1603        &self,
1604        application_id: ApplicationId<A>,
1605        query: &A::Query,
1606    ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1607        let query = Query::user(application_id, query)?;
1608        let (
1609            QueryOutcome {
1610                response,
1611                operations,
1612            },
1613            _,
1614        ) = self.query_application(query, None).await?;
1615        match response {
1616            QueryResponse::User(response_bytes) => {
1617                let response = serde_json::from_slice(&response_bytes)?;
1618                Ok(QueryOutcome {
1619                    response,
1620                    operations,
1621                })
1622            }
1623            _ => Err(Error::InternalError("Unexpected response for user query")),
1624        }
1625    }
1626
1627    /// Obtains the local balance of the chain account after staging the execution of
1628    /// incoming messages in a new block.
1629    ///
1630    /// Does not attempt to synchronize with validators. The result will reflect up to
1631    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1632    /// block.
1633    #[instrument(level = "trace")]
1634    pub async fn query_balance(&self) -> Result<Amount, Error> {
1635        let (balance, _) = Box::pin(self.query_balances_with_owner(AccountOwner::CHAIN)).await?;
1636        Ok(balance)
1637    }
1638
1639    /// Obtains the local balance of an account after staging the execution of incoming messages in
1640    /// a new block.
1641    ///
1642    /// Does not attempt to synchronize with validators. The result will reflect up to
1643    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1644    /// block.
1645    #[instrument(level = "trace", skip(owner))]
1646    pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1647        if owner.is_chain() {
1648            Box::pin(self.query_balance()).await
1649        } else {
1650            Ok(Box::pin(self.query_balances_with_owner(owner))
1651                .await?
1652                .1
1653                .unwrap_or(Amount::ZERO))
1654        }
1655    }
1656
1657    /// Obtains the local balance of an account and optionally another user after staging the
1658    /// execution of incoming messages in a new block.
1659    ///
1660    /// Does not attempt to synchronize with validators. The result will reflect up to
1661    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1662    /// block.
1663    #[instrument(level = "trace", skip(owner))]
1664    pub(crate) async fn query_balances_with_owner(
1665        &self,
1666        owner: AccountOwner,
1667    ) -> Result<(Amount, Option<Amount>), Error> {
1668        let incoming_bundles = self.pending_message_bundles().await?;
1669        // Since we disallow empty blocks, and there is no incoming messages,
1670        // that could change it, we query for the balance immediately.
1671        if incoming_bundles.is_empty() {
1672            let chain_balance = self.local_balance().await?;
1673            let owner_balance = self.local_owner_balance(owner).await?;
1674            return Ok((chain_balance, Some(owner_balance)));
1675        }
1676        let info = self.chain_info().await?;
1677        let transactions = incoming_bundles
1678            .into_iter()
1679            .map(Transaction::ReceiveMessages)
1680            .collect::<Vec<_>>();
1681        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1682        let block = ProposedBlock {
1683            epoch: info.epoch,
1684            chain_id: self.chain_id,
1685            transactions,
1686            previous_block_hash: info.block_hash,
1687            height: info.next_block_height,
1688            authenticated_signer: if owner == AccountOwner::CHAIN {
1689                None
1690            } else {
1691                Some(owner)
1692            },
1693            timestamp,
1694        };
1695        match Box::pin(self.client.stage_block_execution(
1696            block,
1697            None,
1698            Vec::new(),
1699            self.options.bundle_execution_policy(),
1700        ))
1701        .await
1702        {
1703            Ok((_, response)) => Ok((
1704                response.info.chain_balance,
1705                response.info.requested_owner_balance,
1706            )),
1707            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1708                error,
1709            )))) if matches!(
1710                &*error,
1711                ChainError::ExecutionError(
1712                    execution_error,
1713                    ChainExecutionContext::Block
1714                ) if matches!(
1715                    **execution_error,
1716                    ExecutionError::FeesExceedFunding { .. }
1717                )
1718            ) =>
1719            {
1720                // We can't even pay for the execution of one empty block. Let's return zero.
1721                Ok((Amount::ZERO, Some(Amount::ZERO)))
1722            }
1723            Err(error) => Err(error),
1724        }
1725    }
1726
1727    /// Reads the local balance of the chain account.
1728    ///
1729    /// Does not process the inbox or attempt to synchronize with validators.
1730    #[instrument(level = "trace")]
1731    pub async fn local_balance(&self) -> Result<Amount, Error> {
1732        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1733        Ok(balance)
1734    }
1735
1736    /// Reads the local balance of a user account.
1737    ///
1738    /// Does not process the inbox or attempt to synchronize with validators.
1739    #[instrument(level = "trace", skip(owner))]
1740    pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1741        if owner.is_chain() {
1742            self.local_balance().await
1743        } else {
1744            Ok(self
1745                .local_balances_with_owner(owner)
1746                .await?
1747                .1
1748                .unwrap_or(Amount::ZERO))
1749        }
1750    }
1751
1752    /// Reads the local balance of the chain account and optionally another user.
1753    ///
1754    /// Does not process the inbox or attempt to synchronize with validators.
1755    #[instrument(level = "trace", skip(owner))]
1756    pub(crate) async fn local_balances_with_owner(
1757        &self,
1758        owner: AccountOwner,
1759    ) -> Result<(Amount, Option<Amount>), Error> {
1760        ensure!(
1761            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1762            Error::WalletSynchronizationError
1763        );
1764        let mut query = ChainInfoQuery::new(self.chain_id);
1765        query.request_owner_balance = owner;
1766        let response = self
1767            .client
1768            .local_node
1769            .handle_chain_info_query(query)
1770            .await?;
1771        Ok((
1772            response.info.chain_balance,
1773            response.info.requested_owner_balance,
1774        ))
1775    }
1776
1777    /// Sends tokens to a chain.
1778    #[instrument(level = "trace")]
1779    pub async fn transfer_to_account(
1780        &self,
1781        from: AccountOwner,
1782        amount: Amount,
1783        account: Account,
1784    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1785        self.transfer(from, amount, account).await
1786    }
1787
1788    /// Burns tokens (transfer to a special address).
1789    #[cfg(with_testing)]
1790    #[instrument(level = "trace")]
1791    pub async fn burn(
1792        &self,
1793        owner: AccountOwner,
1794        amount: Amount,
1795    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1796        let recipient = Account::burn_address(self.chain_id);
1797        self.transfer(owner, amount, recipient).await
1798    }
1799
1800    #[instrument(level = "trace")]
1801    pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1802        let validators = self.client.validator_nodes().await?;
1803        self.client
1804            .fetch_chain_info(self.chain_id, &validators)
1805            .await
1806    }
1807
1808    /// Attempts to synchronize chains that have sent us messages and populate our local
1809    /// inbox.
1810    ///
1811    /// To create a block that actually executes the messages in the inbox,
1812    /// `process_inbox` must be called separately.
1813    ///
1814    /// If the chain is in follow-only mode, this only downloads blocks for this chain without
1815    /// fetching manager values or sender/publisher chains.
1816    /// Synchronizes the chain state from validators, optionally stopping at a given
1817    /// block height or block timestamp.
1818    ///
1819    /// - If `next_height` is `Some`, downloads blocks up to (but not including) that height.
1820    /// - If `until_block_time` is `Some`, downloads blocks until one with timestamp greater than
1821    ///   the given value is found.
1822    #[instrument(level = "trace")]
1823    pub async fn synchronize_up_to(
1824        &self,
1825        next_height: Option<BlockHeight>,
1826        until_block_time: Option<Timestamp>,
1827    ) -> Result<Box<ChainInfo>, Error> {
1828        let (_, committee) = self.client.admin_committee().await?;
1829        let validators = self.client.make_nodes(&committee)?;
1830        Box::pin(self.client.fetch_chain_info(self.chain_id, &validators)).await?;
1831        communicate_with_quorum(
1832            &validators,
1833            &committee,
1834            |_: &()| (),
1835            |remote_node| async move {
1836                self.client
1837                    .download_certificates_from(
1838                        &remote_node,
1839                        self.chain_id,
1840                        next_height.unwrap_or(BlockHeight::MAX),
1841                        until_block_time,
1842                    )
1843                    .await?;
1844                Ok(())
1845            },
1846            self.client.options.quorum_grace_period,
1847        )
1848        .await?;
1849        self.client
1850            .local_node
1851            .chain_info(self.chain_id)
1852            .await
1853            .map_err(Into::into)
1854    }
1855
1856    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1857        if self.preferred_owner.is_none() {
1858            return self.client.synchronize_chain_state(self.chain_id).await;
1859        }
1860        let info = self.prepare_chain().await?;
1861        self.synchronize_publisher_chains().await?;
1862        self.find_received_certificates().await?;
1863        Ok(info)
1864    }
1865
1866    /// Processes the last pending block.
1867    #[instrument(level = "trace")]
1868    pub async fn process_pending_block(
1869        &self,
1870    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1871        self.prepare_chain().await?;
1872        let mutex = self.proposal_mutex();
1873        let mut proposal_guard = mutex.lock_owned().await;
1874        self.process_pending_block_without_prepare(&mut proposal_guard)
1875            .await
1876    }
1877
1878    /// Processes the last pending block. Assumes that the local chain is up to date.
1879    ///
1880    /// The caller must hold the proposal mutex via `proposal_guard`. The pending proposal
1881    /// is read from and cleared through the guard, ensuring synchronization.
1882    #[instrument(level = "debug", skip(self, proposal_guard), fields(chain_id = %self.chain_id))]
1883    async fn process_pending_block_without_prepare(
1884        &self,
1885        proposal_guard: &mut Option<PendingProposal>,
1886    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1887        let process_start = linera_base::time::Instant::now();
1888        tracing::debug!("process_pending_block_without_prepare started");
1889        let info = self.request_leader_timeout_if_needed().await?;
1890
1891        // Clear stale pending proposals whose height has already been committed.
1892        if let Some(pending) = &*proposal_guard {
1893            if pending.block.height < info.next_block_height {
1894                tracing::debug!(
1895                    "Clearing pending proposal: a block was committed at height {}",
1896                    pending.block.height
1897                );
1898                *proposal_guard = None;
1899            }
1900        }
1901
1902        // If there is a validated block in the current round, finalize it.
1903        if info.manager.has_locking_block_in_current_round()
1904            && !info.manager.current_round.is_fast()
1905        {
1906            return Box::pin(self.finalize_locking_block(info)).await;
1907        }
1908        let owner = self.identity().await?;
1909
1910        let local_node = &self.client.local_node;
1911        // Otherwise we have to re-propose the highest validated block, if there is one.
1912        let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1913            match &**locking {
1914                LockingBlock::Regular(certificate) => {
1915                    let blob_ids = certificate.block().required_blob_ids();
1916                    let blobs = local_node
1917                        .get_locking_blobs(&blob_ids, self.chain_id)
1918                        .await?
1919                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1920                    debug!("Retrying locking block from round {}", certificate.round);
1921                    (certificate.block().clone(), blobs)
1922                }
1923                LockingBlock::Fast(proposal) => {
1924                    let proposed_block = proposal.content.block.clone();
1925                    let blob_ids = proposed_block.published_blob_ids();
1926                    let blobs = local_node
1927                        .get_locking_blobs(&blob_ids, self.chain_id)
1928                        .await?
1929                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1930                    let block = self
1931                        .client
1932                        .stage_block_execution(
1933                            proposed_block,
1934                            None,
1935                            blobs.clone(),
1936                            BundleExecutionPolicy::committed(),
1937                        )
1938                        .await?
1939                        .0;
1940                    debug!("Retrying locking block from fast round.");
1941                    (block, blobs)
1942                }
1943            }
1944        } else if let Some(pending) = proposal_guard.as_ref() {
1945            // Otherwise we are free to propose our own pending block.
1946            let proposed_block = pending.block.clone();
1947            let blobs = pending.blobs.clone();
1948            let round = self.round_for_oracle(&info, &owner).await?;
1949            let (block, _) = self
1950                .client
1951                .stage_block_execution(
1952                    proposed_block,
1953                    round,
1954                    blobs.clone(),
1955                    BundleExecutionPolicy::committed(),
1956                )
1957                .await?;
1958            debug!("Proposing the local pending block.");
1959            (block, blobs)
1960        } else {
1961            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
1962        };
1963
1964        let has_oracle_responses = block.has_oracle_responses();
1965        let (proposed_block, outcome) = block.into_proposal();
1966        let round = match self
1967            .round_for_new_proposal(&info, &owner, has_oracle_responses)
1968            .await?
1969        {
1970            Either::Left(round) => round,
1971            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1972        };
1973        debug!("Proposing block for round {}", round);
1974
1975        let already_handled_locally = info
1976            .manager
1977            .already_handled_proposal(round, &proposed_block);
1978        // Create the final block proposal.
1979        let proposal = if let Some(locking) = info.manager.requested_locking {
1980            Box::new(match *locking {
1981                LockingBlock::Regular(cert) => {
1982                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
1983                        .await
1984                        .map_err(Error::signer_failure)?
1985                }
1986                LockingBlock::Fast(proposal) => {
1987                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
1988                        .await
1989                        .map_err(Error::signer_failure)?
1990                }
1991            })
1992        } else {
1993            Box::new(
1994                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
1995                    .await
1996                    .map_err(Error::signer_failure)?,
1997            )
1998        };
1999        if !already_handled_locally {
2000            // Check the final block proposal. This will be cheaper after #1401.
2001            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2002                match err {
2003                    LocalNodeError::BlobsNotFound(_) => {
2004                        local_node
2005                            .handle_pending_blobs(self.chain_id, blobs)
2006                            .await?;
2007                        local_node.handle_block_proposal(*proposal.clone()).await?;
2008                    }
2009                    err => return Err(err.into()),
2010                }
2011            }
2012        }
2013        let committee = self.local_committee().await?;
2014        let block = Block::new(proposed_block, outcome);
2015        // Send the query to validators.
2016        let submit_block_proposal_start = linera_base::time::Instant::now();
2017        let certificate = if round.is_fast() {
2018            let hashed_value = ConfirmedBlock::new(block);
2019            Box::pin(
2020                self.client
2021                    .submit_block_proposal(&committee, proposal, hashed_value),
2022            )
2023            .await?
2024        } else {
2025            let hashed_value = ValidatedBlock::new(block);
2026            let certificate = Box::pin(self.client.submit_block_proposal(
2027                &committee,
2028                proposal,
2029                hashed_value.clone(),
2030            ))
2031            .await?;
2032            Box::pin(self.client.finalize_block(&committee, certificate)).await?
2033        };
2034        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2035        debug!(round = %certificate.round, "Sending confirmed block to validators");
2036        let update_start = linera_base::time::Instant::now();
2037        Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
2038        tracing::debug!(
2039            update_validators_ms = update_start.elapsed().as_millis(),
2040            total_process_ms = process_start.elapsed().as_millis(),
2041            "process_pending_block_without_prepare completing"
2042        );
2043        // Clear the pending proposal now that the block has been committed.
2044        *proposal_guard = None;
2045        Ok(ClientOutcome::Committed(Some(certificate)))
2046    }
2047
2048    fn send_timing(&self, start: Instant, timing_type: TimingType) {
2049        let Some(sender) = &self.timing_sender else {
2050            return;
2051        };
2052        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2053            tracing::warn!(%err, "Failed to send timing info");
2054        }
2055    }
2056
2057    /// Requests a leader timeout certificate if the current round has timed out. Returns the
2058    /// chain info for the (possibly new) current round.
2059    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
2060        let mut info = self.chain_info_with_manager_values().await?;
2061        // If the current round has timed out, we request a timeout certificate and retry in
2062        // the next round.
2063        if let Some(round_timeout) = info.manager.round_timeout {
2064            if round_timeout <= self.storage_client().clock().current_time() {
2065                if let Err(e) = self.request_leader_timeout().await {
2066                    debug!("Failed to obtain a timeout certificate: {}", e);
2067                } else {
2068                    info = self.chain_info_with_manager_values().await?;
2069                }
2070            }
2071        }
2072        Ok(info)
2073    }
2074
2075    /// Finalizes the locking block.
2076    ///
2077    /// Panics if there is no locking block; fails if the locking block is not in the current round.
2078    async fn finalize_locking_block(
2079        &self,
2080        info: Box<ChainInfo>,
2081    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2082        let locking = info
2083            .manager
2084            .requested_locking
2085            .expect("Should have a locking block");
2086        let LockingBlock::Regular(certificate) = *locking else {
2087            panic!("Should have a locking validated block");
2088        };
2089        debug!(
2090            round = %certificate.round,
2091            "Finalizing locking block"
2092        );
2093        let committee = self.local_committee().await?;
2094        let certificate =
2095            Box::pin(self.client.finalize_block(&committee, certificate.clone())).await?;
2096        Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
2097        Ok(ClientOutcome::Committed(Some(certificate)))
2098    }
2099
2100    /// Returns the number for the round number oracle to use when staging a block proposal.
2101    async fn round_for_oracle(
2102        &self,
2103        info: &ChainInfo,
2104        identity: &AccountOwner,
2105    ) -> Result<Option<u32>, Error> {
2106        // Pretend we do use oracles: If we don't, the round number is never read anyway.
2107        match self.round_for_new_proposal(info, identity, true).await {
2108            // If it is a multi-leader round, use its number for the oracle.
2109            Ok(Either::Left(round)) => Ok(round.multi_leader()),
2110            // If there is no suitable round with oracles, use None: If it works without oracles,
2111            // the block won't read the value. If it returns a timeout, it will be a single-leader
2112            // round, in which the oracle returns None.
2113            Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
2114            Err(err) => Err(err),
2115        }
2116    }
2117
2118    /// Returns a round in which we can propose a new block or the given one, if possible.
2119    async fn round_for_new_proposal(
2120        &self,
2121        info: &ChainInfo,
2122        identity: &AccountOwner,
2123        has_oracle_responses: bool,
2124    ) -> Result<Either<Round, RoundTimeout>, Error> {
2125        let manager = &info.manager;
2126        let seed = self
2127            .client
2128            .local_node
2129            .get_manager_seed(self.chain_id)
2130            .await?;
2131        // If there is a conflicting proposal in the current round, we can only propose if the
2132        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
2133        // Similarly, we cannot propose a block that uses oracles in the fast round, and also
2134        // skip the fast round if fast blocks are not allowed.
2135        let skip_fast = manager.current_round.is_fast()
2136            && (has_oracle_responses || !self.options.allow_fast_blocks);
2137        let conflict = manager
2138            .requested_signed_proposal
2139            .as_ref()
2140            .into_iter()
2141            .chain(&manager.requested_proposed)
2142            .any(|proposal| proposal.content.round == manager.current_round)
2143            || skip_fast;
2144        let round = if !conflict {
2145            manager.current_round
2146        } else if let Some(round) = manager
2147            .ownership
2148            .next_round(manager.current_round)
2149            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
2150        {
2151            round
2152        } else if let Some(timeout) = info.round_timeout() {
2153            return Ok(Either::Right(timeout));
2154        } else {
2155            return Err(Error::BlockProposalError(
2156                "Conflicting proposal in the current round",
2157            ));
2158        };
2159        let current_committee = self
2160            .local_committee()
2161            .await?
2162            .validators
2163            .values()
2164            .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
2165            .collect();
2166        if manager.should_propose(identity, round, seed, &current_committee) {
2167            return Ok(Either::Left(round));
2168        }
2169        if let Some(timeout) = info.round_timeout() {
2170            return Ok(Either::Right(timeout));
2171        }
2172        Err(Error::BlockProposalError(
2173            "Not a leader in the current round",
2174        ))
2175    }
2176
2177    /// Clears the information on any operation that previously failed.
2178    #[cfg(with_testing)]
2179    #[instrument(level = "trace")]
2180    pub async fn clear_pending_proposal(&self) {
2181        *self.proposal_mutex().lock().await = None;
2182    }
2183
2184    /// Rotates the key of the chain.
2185    ///
2186    /// Replaces current owners of the chain with the new key pair.
2187    #[instrument(level = "trace")]
2188    pub async fn rotate_key_pair(
2189        &self,
2190        public_key: AccountPublicKey,
2191    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2192        Box::pin(self.transfer_ownership(public_key.into())).await
2193    }
2194
2195    /// Transfers ownership of the chain to a single super owner.
2196    #[instrument(level = "trace")]
2197    pub async fn transfer_ownership(
2198        &self,
2199        new_owner: AccountOwner,
2200    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2201        Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
2202            super_owners: vec![new_owner],
2203            owners: Vec::new(),
2204            multi_leader_rounds: 2,
2205            open_multi_leader_rounds: false,
2206            timeout_config: TimeoutConfig::default(),
2207        }))
2208        .await
2209    }
2210
2211    /// Adds another owner to the chain, and turns existing super owners into regular owners.
2212    #[instrument(level = "trace")]
2213    pub async fn share_ownership(
2214        &self,
2215        new_owner: AccountOwner,
2216        new_weight: u64,
2217    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2218        let ownership = self.prepare_chain().await?.manager.ownership;
2219        ensure!(
2220            ownership.is_active(),
2221            ChainError::InactiveChain(self.chain_id)
2222        );
2223        let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
2224        owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
2225        owners.push((new_owner, new_weight));
2226        let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
2227            super_owners: Vec::new(),
2228            owners,
2229            multi_leader_rounds: ownership.multi_leader_rounds,
2230            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2231            timeout_config: ownership.timeout_config,
2232        })];
2233        match self.execute_block(operations, vec![]).await? {
2234            ClientOutcome::Committed(certificate) => Ok(ClientOutcome::Committed(certificate)),
2235            ClientOutcome::Conflict(certificate) => {
2236                info!(
2237                    height = %certificate.block().header.height,
2238                    "Another block was committed."
2239                );
2240                Ok(ClientOutcome::Conflict(certificate))
2241            }
2242            ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
2243        }
2244    }
2245
2246    /// Returns the current ownership settings on this chain.
2247    #[instrument(level = "trace")]
2248    pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, Error> {
2249        Ok(self
2250            .client
2251            .local_node
2252            .chain_state_view(self.chain_id)
2253            .await?
2254            .execution_state
2255            .system
2256            .ownership
2257            .get()
2258            .await?
2259            .clone())
2260    }
2261
2262    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
2263    /// `remove_owners` is `true`.
2264    #[instrument(level = "trace")]
2265    pub async fn change_ownership(
2266        &self,
2267        ownership: ChainOwnership,
2268    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2269        Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
2270            super_owners: ownership.super_owners.into_iter().collect(),
2271            owners: ownership.owners.into_iter().collect(),
2272            multi_leader_rounds: ownership.multi_leader_rounds,
2273            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2274            timeout_config: ownership.timeout_config.clone(),
2275        }))
2276        .await
2277    }
2278
2279    /// Returns the current application permissions on this chain.
2280    #[instrument(level = "trace")]
2281    pub async fn query_application_permissions(&self) -> Result<ApplicationPermissions, Error> {
2282        Ok(self
2283            .client
2284            .local_node
2285            .chain_state_view(self.chain_id)
2286            .await?
2287            .execution_state
2288            .system
2289            .application_permissions
2290            .get()
2291            .await?
2292            .clone())
2293    }
2294
2295    /// Changes the application permissions configuration on this chain.
2296    #[instrument(level = "trace", skip(application_permissions))]
2297    pub async fn change_application_permissions(
2298        &self,
2299        application_permissions: ApplicationPermissions,
2300    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2301        Box::pin(
2302            self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2303                application_permissions,
2304            )),
2305        )
2306        .await
2307    }
2308
2309    /// Opens a new chain with a derived UID.
2310    #[instrument(level = "trace", skip(self))]
2311    pub async fn open_chain(
2312        &self,
2313        ownership: ChainOwnership,
2314        application_permissions: ApplicationPermissions,
2315        balance: Amount,
2316    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2317        let config = OpenChainConfig {
2318            ownership: ownership.clone(),
2319            balance,
2320            application_permissions: application_permissions.clone(),
2321        };
2322        let operation = Operation::system(SystemOperation::OpenChain(config));
2323        let certificate = match self.execute_block(vec![operation], vec![]).await? {
2324            ClientOutcome::Committed(certificate) => certificate,
2325            ClientOutcome::Conflict(certificate) => {
2326                return Ok(ClientOutcome::Conflict(certificate));
2327            }
2328            ClientOutcome::WaitForTimeout(timeout) => {
2329                return Ok(ClientOutcome::WaitForTimeout(timeout));
2330            }
2331        };
2332        // The only operation, i.e. the last transaction, created the new chain.
2333        let chain_blob = certificate
2334            .block()
2335            .body
2336            .blobs
2337            .last()
2338            .and_then(|blobs| blobs.last())
2339            .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2340        let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2341        // If we have a key for any owner, add it to the list of tracked chains.
2342        for owner in ownership.all_owners() {
2343            if self.client.has_key_for(owner).await? {
2344                self.client
2345                    .extend_chain_mode(description.id(), ListeningMode::FullChain);
2346                break;
2347            }
2348        }
2349        self.client
2350            .retry_pending_cross_chain_requests(self.chain_id)
2351            .await?;
2352        Ok(ClientOutcome::Committed((description, certificate)))
2353    }
2354
2355    /// Closes the chain (and loses everything in it!!).
2356    /// Returns `None` if the chain was already closed.
2357    #[instrument(level = "trace")]
2358    pub async fn close_chain(
2359        &self,
2360    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2361        match Box::pin(self.execute_operation(SystemOperation::CloseChain)).await {
2362            Ok(outcome) => Ok(outcome.map(Some)),
2363            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2364                chain_error,
2365            )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2366                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
2367            }
2368            Err(error) => Err(error),
2369        }
2370    }
2371
2372    /// Publishes some module.
2373    #[cfg(not(target_arch = "wasm32"))]
2374    #[instrument(level = "trace", skip(contract, service))]
2375    pub async fn publish_module(
2376        &self,
2377        contract: Bytecode,
2378        service: Bytecode,
2379        vm_runtime: VmRuntime,
2380    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2381        let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
2382        Box::pin(self.publish_module_blobs(blobs, module_id)).await
2383    }
2384
2385    /// Publishes some module.
2386    #[cfg(not(target_arch = "wasm32"))]
2387    #[instrument(level = "trace", skip(blobs, module_id))]
2388    pub async fn publish_module_blobs(
2389        &self,
2390        blobs: Vec<Blob>,
2391        module_id: ModuleId,
2392    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2393        self.execute_operations(
2394            vec![Operation::system(SystemOperation::PublishModule {
2395                module_id,
2396            })],
2397            blobs,
2398        )
2399        .await?
2400        .try_map(|certificate| Ok((module_id, certificate)))
2401    }
2402
2403    /// Publishes some data blobs.
2404    #[instrument(level = "trace", skip(bytes))]
2405    pub async fn publish_data_blobs(
2406        &self,
2407        bytes: Vec<Vec<u8>>,
2408    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2409        let blobs = bytes.into_iter().map(Blob::new_data);
2410        let publish_blob_operations = blobs
2411            .clone()
2412            .map(|blob| {
2413                Operation::system(SystemOperation::PublishDataBlob {
2414                    blob_hash: blob.id().hash,
2415                })
2416            })
2417            .collect();
2418        self.execute_operations(publish_blob_operations, blobs.collect())
2419            .await
2420    }
2421
2422    /// Publishes some data blob.
2423    #[instrument(level = "trace", skip(bytes))]
2424    pub async fn publish_data_blob(
2425        &self,
2426        bytes: Vec<u8>,
2427    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2428        Box::pin(self.publish_data_blobs(vec![bytes])).await
2429    }
2430
2431    /// Creates an application by instantiating some bytecode.
2432    #[instrument(
2433        level = "trace",
2434        skip(self, parameters, instantiation_argument, required_application_ids)
2435    )]
2436    pub async fn create_application<
2437        A: Abi,
2438        Parameters: Serialize,
2439        InstantiationArgument: Serialize,
2440    >(
2441        &self,
2442        module_id: ModuleId<A, Parameters, InstantiationArgument>,
2443        parameters: &Parameters,
2444        instantiation_argument: &InstantiationArgument,
2445        required_application_ids: Vec<ApplicationId>,
2446    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2447        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2448        let parameters = serde_json::to_vec(parameters)?;
2449        Ok(Box::pin(self.create_application_untyped(
2450            module_id.forget_abi(),
2451            parameters,
2452            instantiation_argument,
2453            required_application_ids,
2454        ))
2455        .await?
2456        .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2457    }
2458
2459    /// Creates an application by instantiating some bytecode.
2460    #[instrument(
2461        level = "trace",
2462        skip(
2463            self,
2464            module_id,
2465            parameters,
2466            instantiation_argument,
2467            required_application_ids
2468        )
2469    )]
2470    pub async fn create_application_untyped(
2471        &self,
2472        module_id: ModuleId,
2473        parameters: Vec<u8>,
2474        instantiation_argument: Vec<u8>,
2475        required_application_ids: Vec<ApplicationId>,
2476    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2477        Box::pin(self.execute_operation(SystemOperation::CreateApplication {
2478            module_id,
2479            parameters,
2480            instantiation_argument,
2481            required_application_ids,
2482        }))
2483        .await?
2484        .try_map(|certificate| {
2485            // The first message of the only operation created the application.
2486            let mut creation: Vec<_> = certificate
2487                .block()
2488                .created_blob_ids()
2489                .into_iter()
2490                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2491                .collect();
2492            if creation.len() > 1 {
2493                return Err(Error::InternalError(
2494                    "Unexpected number of application descriptions published",
2495                ));
2496            }
2497            let blob_id = creation.pop().ok_or(Error::InternalError(
2498                "ApplicationDescription blob not found.",
2499            ))?;
2500            let id = ApplicationId::new(blob_id.hash);
2501            Ok((id, certificate))
2502        })
2503    }
2504
2505    /// Creates a new committee and starts using it (admin chains only).
2506    #[instrument(level = "trace", skip(committee))]
2507    pub async fn stage_new_committee(
2508        &self,
2509        committee: Committee,
2510    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2511        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2512        let blob_hash = blob.id().hash;
2513        match self
2514            .execute_operations(
2515                vec![Operation::system(SystemOperation::Admin(
2516                    AdminOperation::PublishCommitteeBlob { blob_hash },
2517                ))],
2518                vec![blob],
2519            )
2520            .await?
2521        {
2522            ClientOutcome::Committed(_) => {}
2523            outcome @ ClientOutcome::WaitForTimeout(_) | outcome @ ClientOutcome::Conflict(_) => {
2524                return Ok(outcome)
2525            }
2526        }
2527        let epoch = Box::pin(self.chain_info()).await?.epoch.try_add_one()?;
2528        Box::pin(
2529            self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2530                epoch,
2531                blob_hash,
2532            })),
2533        )
2534        .await
2535    }
2536
2537    /// Synchronizes the chain with the validators and creates blocks without any operations to
2538    /// process all incoming messages. This may require several blocks.
2539    ///
2540    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2541    /// is returned, too.
2542    #[instrument(level = "trace")]
2543    pub async fn process_inbox(
2544        &self,
2545    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2546        self.prepare_chain().await?;
2547        self.process_inbox_without_prepare().await
2548    }
2549
2550    /// Creates blocks without any operations to process all incoming messages. This may require
2551    /// several blocks.
2552    ///
2553    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2554    /// is returned, too.
2555    #[instrument(level = "trace")]
2556    pub async fn process_inbox_without_prepare(
2557        &self,
2558    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2559        #[cfg(with_metrics)]
2560        let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2561
2562        let mut certificates = Vec::new();
2563        loop {
2564            // We provide no operations - this means that the only operations executed
2565            // will be epoch changes, receiving messages and processing event stream
2566            // updates, if any are pending.
2567            match self.execute_block(vec![], vec![]).await {
2568                Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
2569                Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
2570                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2571                    return Ok((certificates, Some(timeout)));
2572                }
2573                // Nothing in the inbox and no stream updates to be processed.
2574                Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2575                    WorkerError::ChainError(chain_error),
2576                ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2577                    return Ok((certificates, None));
2578                }
2579                Err(error) => return Err(error),
2580            };
2581        }
2582    }
2583
2584    /// Returns operations to process all pending epoch changes: first the new epochs, in order,
2585    /// then the removed epochs, in order.
2586    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2587        let (mut min_epoch, mut next_epoch) = {
2588            let (epoch, committees) = self.epoch_and_committees().await?;
2589            let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2590            (min_epoch, epoch.try_add_one()?)
2591        };
2592        let mut epoch_change_ops = Vec::new();
2593        while self
2594            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2595            .await?
2596        {
2597            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2598                next_epoch,
2599            )));
2600            next_epoch.try_add_assign_one()?;
2601        }
2602        while self
2603            .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2604            .await?
2605        {
2606            epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2607                min_epoch,
2608            )));
2609            min_epoch.try_add_assign_one()?;
2610        }
2611        Ok(epoch_change_ops)
2612    }
2613
2614    /// Returns whether the system event on the admin chain with the given stream name and key
2615    /// exists in storage.
2616    async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2617        let event_id = EventId {
2618            chain_id: self.client.admin_chain_id,
2619            stream_id: StreamId::system(stream_name),
2620            index,
2621        };
2622        Ok(self
2623            .client
2624            .storage_client()
2625            .read_event(event_id)
2626            .await?
2627            .is_some())
2628    }
2629
2630    /// Returns the indices and events from the storage
2631    pub async fn events_from_index(
2632        &self,
2633        stream_id: StreamId,
2634        start_index: u32,
2635    ) -> Result<Vec<IndexAndEvent>, Error> {
2636        Ok(self
2637            .client
2638            .storage_client()
2639            .read_events_from_index(&self.chain_id, &stream_id, start_index)
2640            .await?)
2641    }
2642
2643    /// Deprecates all the configurations of voting rights up to the given one (admin chains
2644    /// only). Currently, each individual chain is still entitled to wait before accepting
2645    /// this command. However, it is expected that deprecated validators stop functioning
2646    /// shortly after such command is issued.
2647    #[instrument(level = "trace")]
2648    pub async fn revoke_epochs(
2649        &self,
2650        revoked_epoch: Epoch,
2651    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2652        self.prepare_chain().await?;
2653        let (current_epoch, committees) = self.epoch_and_committees().await?;
2654        ensure!(
2655            revoked_epoch < current_epoch,
2656            Error::CannotRevokeCurrentEpoch(current_epoch)
2657        );
2658        ensure!(
2659            committees.contains_key(&revoked_epoch),
2660            Error::EpochAlreadyRevoked
2661        );
2662        let operations = committees
2663            .keys()
2664            .filter_map(|epoch| {
2665                if *epoch <= revoked_epoch {
2666                    Some(Operation::system(SystemOperation::Admin(
2667                        AdminOperation::RemoveCommittee { epoch: *epoch },
2668                    )))
2669                } else {
2670                    None
2671                }
2672            })
2673            .collect();
2674        self.execute_operations(operations, vec![]).await
2675    }
2676
2677    /// Sends money to a chain.
2678    /// Do not check balance. (This may block the client)
2679    /// Do not confirm the transaction.
2680    #[instrument(level = "trace")]
2681    pub async fn transfer_to_account_unsafe_unconfirmed(
2682        &self,
2683        owner: AccountOwner,
2684        amount: Amount,
2685        recipient: Account,
2686    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2687        Box::pin(self.execute_operation(SystemOperation::Transfer {
2688            owner,
2689            recipient,
2690            amount,
2691        }))
2692        .await
2693    }
2694
2695    #[instrument(level = "trace", skip(hash))]
2696    pub async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, Error> {
2697        let block = self
2698            .client
2699            .storage_client()
2700            .read_confirmed_block(hash)
2701            .await?;
2702        block
2703            .map(Arc::unwrap_or_clone)
2704            .ok_or(Error::MissingConfirmedBlock(hash))
2705    }
2706
2707    #[instrument(level = "trace", skip(hash))]
2708    pub async fn read_certificate(
2709        &self,
2710        hash: CryptoHash,
2711    ) -> Result<ConfirmedBlockCertificate, Error> {
2712        let certificate = self.client.storage_client().read_certificate(hash).await?;
2713        certificate
2714            .map(Arc::unwrap_or_clone)
2715            .ok_or(Error::ReadCertificatesError(vec![hash]))
2716    }
2717
2718    /// Handles any cross-chain requests for any pending outgoing messages.
2719    #[instrument(level = "trace")]
2720    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2721        self.client
2722            .retry_pending_cross_chain_requests(self.chain_id)
2723            .await?;
2724        Ok(())
2725    }
2726
2727    #[instrument(level = "trace", skip(local_node))]
2728    async fn local_chain_info(
2729        &self,
2730        chain_id: ChainId,
2731        local_node: &mut LocalNodeClient<Env::Storage>,
2732    ) -> Result<Option<Box<ChainInfo>>, Error> {
2733        match local_node.chain_info(chain_id).await {
2734            Ok(info) => Ok(Some(info)),
2735            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2736            Err(err) => Err(err.into()),
2737        }
2738    }
2739
2740    #[instrument(level = "trace", skip(chain_id, local_node))]
2741    async fn local_next_block_height(
2742        &self,
2743        chain_id: ChainId,
2744        local_node: &mut LocalNodeClient<Env::Storage>,
2745    ) -> Result<BlockHeight, Error> {
2746        Ok(self
2747            .local_chain_info(chain_id, local_node)
2748            .await?
2749            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2750    }
2751
2752    /// Returns the next height we expect to receive from the given sender chain, according to the
2753    /// local inbox.
2754    #[instrument(level = "trace")]
2755    async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2756        Ok(self
2757            .client
2758            .local_node
2759            .get_inbox_next_height(self.chain_id, origin)
2760            .await?)
2761    }
2762
2763    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2764    async fn process_notification(
2765        &self,
2766        remote_node: RemoteNode<Env::ValidatorNode>,
2767        mut local_node: LocalNodeClient<Env::Storage>,
2768        notification: Notification,
2769    ) -> Result<(), Error> {
2770        let listening_mode = self.client.chain_mode(notification.chain_id);
2771        let is_relevant = listening_mode
2772            .as_ref()
2773            .is_some_and(|mode| mode.is_relevant(&notification.reason));
2774        if !is_relevant {
2775            tracing::trace!(
2776                chain_id = %notification.chain_id,
2777                reason = ?notification.reason,
2778                ?listening_mode,
2779                "Ignoring notification due to listening mode"
2780            );
2781            return Ok(());
2782        }
2783        match notification.reason {
2784            Reason::NewIncomingBundle { origin, height } => {
2785                if self.local_next_height_to_receive(origin).await? > height {
2786                    debug!(
2787                        chain_id = %self.chain_id,
2788                        "Accepting redundant notification for new message"
2789                    );
2790                    return Ok(());
2791                }
2792                self.client
2793                    .download_sender_block_with_sending_ancestors(
2794                        self.chain_id,
2795                        origin,
2796                        height,
2797                        &remote_node,
2798                    )
2799                    .await?;
2800                if self.local_next_height_to_receive(origin).await? <= height {
2801                    info!(
2802                        chain_id = %self.chain_id,
2803                        "NewIncomingBundle: Fail to synchronize new message after notification"
2804                    );
2805                }
2806            }
2807            Reason::NewBlock {
2808                height,
2809                hash,
2810                event_streams,
2811                ..
2812            } => {
2813                let chain_id = notification.chain_id;
2814                let local_height = self
2815                    .local_next_block_height(chain_id, &mut local_node)
2816                    .await?;
2817                if local_height > height {
2818                    debug!(
2819                        chain_id = %self.chain_id,
2820                        "Accepting redundant notification for new block"
2821                    );
2822                    return Ok(());
2823                }
2824                // In EventsOnly mode, download only event-bearing blocks sparsely
2825                // instead of doing a full sync. This also handles old validators
2826                // that emit NewBlock but not NewEvents.
2827                if let Some(ListeningMode::EventsOnly(subscribed)) =
2828                    self.client.chain_mode(chain_id)
2829                {
2830                    if !event_streams.is_empty() {
2831                        self.client
2832                            .download_event_bearing_blocks(
2833                                chain_id,
2834                                BTreeSet::from([(height, hash)]),
2835                                local_height,
2836                                &subscribed,
2837                                &remote_node,
2838                            )
2839                            .await?;
2840                    }
2841                } else {
2842                    self.client
2843                        .synchronize_chain_state_from(&remote_node, chain_id)
2844                        .await?;
2845                    if self
2846                        .local_next_block_height(chain_id, &mut local_node)
2847                        .await?
2848                        <= height
2849                    {
2850                        error!("NewBlock: Fail to synchronize new block after notification");
2851                    }
2852                }
2853            }
2854            Reason::NewEvents { height, hash, .. } => {
2855                let chain_id = notification.chain_id;
2856                let local_height = self
2857                    .local_next_block_height(chain_id, &mut local_node)
2858                    .await?;
2859                if local_height > height {
2860                    debug!(
2861                        chain_id = %self.chain_id,
2862                        "Accepting redundant notification for new events"
2863                    );
2864                    return Ok(());
2865                }
2866                let subscribed = match self.client.chain_mode(chain_id) {
2867                    Some(ListeningMode::EventsOnly(streams)) => streams,
2868                    _ => return Ok(()),
2869                };
2870                self.client
2871                    .download_event_bearing_blocks(
2872                        chain_id,
2873                        BTreeSet::from([(height, hash)]),
2874                        local_height,
2875                        &subscribed,
2876                        &remote_node,
2877                    )
2878                    .await?;
2879            }
2880            Reason::NewRound { height, round } => {
2881                let chain_id = notification.chain_id;
2882                if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
2883                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
2884                        debug!(
2885                            chain_id = %self.chain_id,
2886                            "Accepting redundant notification for new round"
2887                        );
2888                        return Ok(());
2889                    }
2890                }
2891                self.client
2892                    .synchronize_chain_state_from(&remote_node, chain_id)
2893                    .await?;
2894                let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
2895                    error!(
2896                        chain_id = %self.chain_id,
2897                        "NewRound: Fail to read local chain info for {chain_id}"
2898                    );
2899                    return Ok(());
2900                };
2901                if (info.next_block_height, info.manager.current_round) < (height, round) {
2902                    info!(
2903                        chain_id = %self.chain_id,
2904                        "NewRound: Fail to synchronize new block after notification"
2905                    );
2906                }
2907            }
2908            Reason::BlockExecuted { .. } => {
2909                // No action needed.
2910            }
2911        }
2912        Ok(())
2913    }
2914
2915    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
2916    pub fn is_tracked(&self) -> bool {
2917        self.client.is_tracked(self.chain_id)
2918    }
2919
2920    /// Returns the listening mode for this chain, if it is tracked.
2921    pub fn listening_mode(&self) -> Option<ListeningMode> {
2922        self.client.chain_mode(self.chain_id)
2923    }
2924
2925    /// Spawns a task that listens to notifications about the current chain from all validators,
2926    /// and synchronizes the local state accordingly.
2927    ///
2928    /// The listening mode must be set in `Client::chain_modes` before calling this method.
2929    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2930    pub async fn listen(
2931        &self,
2932    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2933        use future::FutureExt as _;
2934
2935        async fn await_while_polling<F: FusedFuture>(
2936            future: F,
2937            background_work: impl FusedStream<Item = ()>,
2938        ) -> F::Output {
2939            tokio::pin!(future);
2940            tokio::pin!(background_work);
2941            loop {
2942                futures::select! {
2943                    _ = background_work.next() => (),
2944                    result = future => return result,
2945                }
2946            }
2947        }
2948
2949        let mut senders = HashMap::new();
2950        let mut circuit_breakers: HashMap<ValidatorPublicKey, CircuitBreakerState> = HashMap::new();
2951        let notifications = self.subscribe()?;
2952        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2953
2954        // Beware: if this future ceases to make progress, notification processing will
2955        // deadlock, because of the issue described in
2956        // https://github.com/linera-io/linera-protocol/pull/1173.
2957
2958        // TODO(#2013): replace this lock with an asynchronous communication channel
2959
2960        let mut process_notifications = FuturesUnordered::new();
2961
2962        match self
2963            .update_notification_streams(&mut senders, &mut circuit_breakers)
2964            .await
2965        {
2966            Ok(handler) => process_notifications.push(handler),
2967            Err(error) => error!("Failed to update committee: {error}"),
2968        };
2969
2970        let this = self.clone();
2971        let update_streams = async move {
2972            let mut abortable_notifications = abortable_notifications.fuse();
2973
2974            while let Some(notification) =
2975                await_while_polling(abortable_notifications.next(), &mut process_notifications)
2976                    .await
2977            {
2978                // Re-subscribe to validators on NewBlock to handle committee changes.
2979                // Skip this for EventsOnly chains — they don't participate in governance
2980                // and re-subscribing would trigger a full sync that defeats sparse download.
2981                if let Reason::NewBlock { .. } = notification.reason {
2982                    let is_events_only = this
2983                        .listening_mode()
2984                        .is_some_and(|m| matches!(m, ListeningMode::EventsOnly(_)));
2985                    if !is_events_only {
2986                        match Box::pin(await_while_polling(
2987                            this.update_notification_streams(&mut senders, &mut circuit_breakers)
2988                                .fuse(),
2989                            &mut process_notifications,
2990                        ))
2991                        .await
2992                        {
2993                            Ok(handler) => process_notifications.push(handler),
2994                            Err(error) => error!("Failed to update committee: {error}"),
2995                        }
2996                    }
2997                }
2998            }
2999
3000            for abort in senders.into_values() {
3001                abort.abort();
3002            }
3003
3004            let () = process_notifications.collect().await;
3005        }
3006        .in_current_span();
3007
3008        Ok((update_streams, AbortOnDrop(abort), notifications))
3009    }
3010
3011    #[instrument(level = "trace", skip(senders, circuit_breakers))]
3012    async fn update_notification_streams(
3013        &self,
3014        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3015        circuit_breakers: &mut HashMap<ValidatorPublicKey, CircuitBreakerState>,
3016    ) -> Result<impl Future<Output = ()>, Error> {
3017        let initial_probe_interval = self
3018            .options
3019            .notification_circuit_breaker_initial_probe_interval;
3020        let max_probe_interval = self.options.notification_circuit_breaker_max_probe_interval;
3021
3022        let events_only = self
3023            .listening_mode()
3024            .is_none_or(|m| matches!(m, ListeningMode::EventsOnly(_)));
3025        let (nodes, local_node) = {
3026            // For EventsOnly chains, use the admin chain's committee: the chain's own
3027            // committee may be stale (we don't track epoch changes), and
3028            // `local_committee()` could trigger a full sync on `BlobsNotFound`.
3029            let committee = if events_only {
3030                let (_, committee) = self.admin_committee().await?;
3031                committee
3032            } else {
3033                self.local_committee().await?
3034            };
3035            let nodes: HashMap<_, _> = self
3036                .client
3037                .validator_node_provider()
3038                .make_nodes(&committee)?
3039                .collect();
3040            (nodes, self.client.local_node.clone())
3041        };
3042
3043        // Detect circuit breaker state transitions before cleaning up senders.
3044        for (validator, abort) in senders.iter() {
3045            if abort.is_aborted() && nodes.contains_key(validator) {
3046                if let Some(state) = circuit_breakers.get_mut(validator) {
3047                    // Was probing -> probe failed -> escalate interval.
3048                    state.probe_interval = (state.probe_interval * 2).min(max_probe_interval);
3049                    state.next_probe_at = Instant::now() + state.probe_interval;
3050                    warn!(
3051                        %validator,
3052                        chain_id = %self.chain_id,
3053                        next_probe_in = ?state.probe_interval,
3054                        "Validator still unhealthy after probe; increasing probe interval"
3055                    );
3056                } else {
3057                    // First failure -> enter circuit breaker.
3058                    circuit_breakers.insert(
3059                        *validator,
3060                        CircuitBreakerState {
3061                            next_probe_at: Instant::now() + initial_probe_interval,
3062                            probe_interval: initial_probe_interval,
3063                        },
3064                    );
3065                    error!(
3066                        %validator,
3067                        chain_id = %self.chain_id,
3068                        next_probe_in = ?initial_probe_interval,
3069                        "Validator notification stream ended; entering circuit breaker"
3070                    );
3071                }
3072            } else if !abort.is_aborted() && circuit_breakers.contains_key(validator) {
3073                // Stream alive while in circuit breaker -> probe succeeded -> recovered.
3074                info!(
3075                    %validator,
3076                    chain_id = %self.chain_id,
3077                    "Validator recovered from circuit breaker"
3078                );
3079                circuit_breakers.remove(validator);
3080            }
3081        }
3082
3083        senders.retain(|validator, abort| {
3084            if !nodes.contains_key(validator) {
3085                abort.abort();
3086            }
3087            !abort.is_aborted()
3088        });
3089        circuit_breakers.retain(|validator, _| nodes.contains_key(validator));
3090
3091        let validator_tasks = FuturesUnordered::new();
3092        for (public_key, node) in nodes {
3093            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3094                continue;
3095            };
3096
3097            // Circuit breaker: skip if not time to probe yet.
3098            if let Some(state) = circuit_breakers.get(&public_key) {
3099                if Instant::now() < state.next_probe_at {
3100                    continue;
3101                }
3102                debug!(
3103                    validator = %public_key,
3104                    chain_id = %self.chain_id,
3105                    "Probing unhealthy validator"
3106                );
3107            }
3108
3109            let address = node.address();
3110            let this = self.clone();
3111            let stream = stream::once({
3112                let node = node.clone();
3113                async move {
3114                    let stream = node.subscribe(vec![this.chain_id]).await?;
3115                    // Only now the notification stream is established. We may have missed
3116                    // notifications since the last time we synchronized.
3117                    if !events_only {
3118                        let remote_node = RemoteNode { public_key, node };
3119                        this.client
3120                            .synchronize_chain_state_from(&remote_node, this.chain_id)
3121                            .await?;
3122                    } else {
3123                        // For EventsOnly chains, do a lightweight initial sync:
3124                        // query the validator for the latest event-bearing blocks
3125                        // for our subscribed streams, then download them sparsely.
3126                        let remote_node = RemoteNode { public_key, node };
3127                        if let Some(ListeningMode::EventsOnly(subscribed)) = this.listening_mode() {
3128                            if let Err(error) = this
3129                                .client
3130                                .sync_events_from_node(this.chain_id, &subscribed, &remote_node)
3131                                .await
3132                            {
3133                                debug!(
3134                                    chain_id = %this.chain_id,
3135                                    %error,
3136                                    "Failed initial sparse sync for EventsOnly chain"
3137                                );
3138                            }
3139                        }
3140                    }
3141                    Ok::<_, Error>(stream)
3142                }
3143            })
3144            .filter_map(move |result| {
3145                let address = address.clone();
3146                async move {
3147                    if let Err(error) = &result {
3148                        info!(?error, address, "could not connect to validator");
3149                    } else {
3150                        debug!(address, "connected to validator");
3151                    }
3152                    result.ok()
3153                }
3154            })
3155            .flatten();
3156            let (stream, abort) = stream::abortable(stream);
3157            let abort_on_exit = abort.clone();
3158            let mut stream = Box::pin(stream);
3159            let this = self.clone();
3160            let local_node = local_node.clone();
3161            let remote_node = RemoteNode { public_key, node };
3162            validator_tasks.push(async move {
3163                while let Some(notification) = stream.next().await {
3164                    if let Err(error) = this
3165                        .process_notification(
3166                            remote_node.clone(),
3167                            local_node.clone(),
3168                            notification.clone(),
3169                        )
3170                        .await
3171                    {
3172                        tracing::info!(
3173                            chain_id = %this.chain_id,
3174                            address = remote_node.address(),
3175                            ?notification,
3176                            %error,
3177                            "failed to process notification",
3178                        );
3179                    }
3180                }
3181                warn!(
3182                    chain_id = %this.chain_id,
3183                    address = remote_node.address(),
3184                    "Validator notification stream ended"
3185                );
3186                abort_on_exit.abort();
3187            });
3188            entry.insert(abort);
3189        }
3190        Ok(validator_tasks.collect())
3191    }
3192
3193    /// Attempts to update a validator with the local information.
3194    #[instrument(level = "trace", skip(remote_node))]
3195    pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
3196        let validator_next_block_height = match remote_node
3197            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3198            .await
3199        {
3200            Ok(info) => info.info.next_block_height,
3201            Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
3202            Err(err) => return Err(err.into()),
3203        };
3204        let local_next_block_height = self.chain_info().await?.next_block_height;
3205
3206        if validator_next_block_height >= local_next_block_height {
3207            debug!("Validator is up-to-date with local state");
3208            return Ok(());
3209        }
3210
3211        let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
3212            .map(BlockHeight)
3213            .collect();
3214
3215        let certificates = self
3216            .client
3217            .storage_client()
3218            .read_certificates_by_heights(self.chain_id, &heights)
3219            .await?
3220            .into_iter()
3221            .flatten()
3222            .map(Arc::unwrap_or_clone)
3223            .collect::<Vec<_>>();
3224
3225        for certificate in certificates {
3226            match remote_node
3227                .handle_confirmed_certificate(
3228                    certificate.clone(),
3229                    CrossChainMessageDelivery::NonBlocking,
3230                )
3231                .await
3232            {
3233                Ok(_) => (),
3234                Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
3235                    // Upload the missing blobs we have and retry.
3236                    let missing_blobs: Vec<_> = self
3237                        .client
3238                        .storage_client()
3239                        .read_blobs(&missing_blob_ids)
3240                        .await?
3241                        .into_iter()
3242                        .flatten()
3243                        .map(Arc::unwrap_or_clone)
3244                        .collect();
3245                    remote_node.upload_blobs(missing_blobs).await?;
3246                    remote_node
3247                        .handle_confirmed_certificate(
3248                            certificate,
3249                            CrossChainMessageDelivery::NonBlocking,
3250                        )
3251                        .await?;
3252                }
3253                Err(err) => return Err(err.into()),
3254            }
3255        }
3256
3257        Ok(())
3258    }
3259}
3260
3261#[cfg(with_testing)]
3262impl<Env: Environment> ChainClient<Env> {
3263    pub async fn process_notification_from(
3264        &self,
3265        notification: Notification,
3266        validator: (ValidatorPublicKey, &str),
3267    ) {
3268        let mut node_list = self
3269            .client
3270            .validator_node_provider()
3271            .make_nodes_from_list(vec![validator])
3272            .unwrap();
3273        let (public_key, node) = node_list.next().unwrap();
3274        let remote_node = RemoteNode { node, public_key };
3275        let local_node = self.client.local_node.clone();
3276        self.process_notification(remote_node, local_node, notification)
3277            .await
3278            .unwrap();
3279    }
3280}