Skip to main content

linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14    data_types::{
15        ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp,
16    },
17    doc_scalar,
18    hashed::Hashed,
19    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
20    time::Instant,
21    util::traits::DynError,
22};
23#[cfg(with_testing)]
24use linera_chain::ChainExecutionContext;
25use linera_chain::{
26    data_types::{
27        BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock,
28    },
29    types::{
30        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
31        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
32    },
33    ChainError, ChainStateView,
34};
35use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
36use linera_storage::Storage;
37use linera_views::{context::InactiveContext, ViewError};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
41use tracing::{error, instrument, trace, warn};
42
43/// Re-export of [`EventSubscriptionsResult`] for use by other crate modules.
44pub(crate) use crate::chain_worker::{
45    ChainWorkerRequestReceiver, ChainWorkerRequestSender, EventSubscriptionsResult,
46};
47use crate::{
48    chain_worker::{
49        BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
50    },
51    client::ListeningMode,
52    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
53    join_set_ext::{JoinSet, JoinSetExt},
54    notifier::Notifier,
55    value_cache::ValueCache,
56    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
57};
58
59const BLOCK_CACHE_SIZE: usize = 5_000;
60const EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
61
62#[cfg(test)]
63#[path = "unit_tests/worker_tests.rs"]
64mod worker_tests;
65
66#[cfg(with_metrics)]
67mod metrics {
68    use std::sync::LazyLock;
69
70    use linera_base::prometheus_util::{
71        exponential_bucket_interval, register_histogram, register_histogram_vec,
72        register_int_counter, register_int_counter_vec, register_int_gauge,
73    };
74    use linera_chain::types::ConfirmedBlockCertificate;
75    use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge};
76
77    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
78        register_histogram_vec(
79            "num_rounds_in_certificate",
80            "Number of rounds in certificate",
81            &["certificate_value", "round_type"],
82            exponential_bucket_interval(0.1, 50.0),
83        )
84    });
85
86    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
87        register_histogram_vec(
88            "num_rounds_in_block_proposal",
89            "Number of rounds in block proposal",
90            &["round_type"],
91            exponential_bucket_interval(0.1, 50.0),
92        )
93    });
94
95    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
96        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
97
98    pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
99        LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
100
101    pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
102        LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
103
104    pub static OPERATION_COUNT: LazyLock<IntCounter> =
105        LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
106
107    pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
108        register_histogram(
109            "operations_per_block",
110            "Number of operations per block",
111            exponential_bucket_interval(1.0, 10000.0),
112        )
113    });
114
115    pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
116        register_histogram(
117            "incoming_bundles_per_block",
118            "Number of incoming bundles per block",
119            exponential_bucket_interval(1.0, 10000.0),
120        )
121    });
122
123    pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
124        register_histogram(
125            "transactions_per_block",
126            "Number of transactions per block",
127            exponential_bucket_interval(1.0, 10000.0),
128        )
129    });
130
131    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
132        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
133    });
134
135    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
136        register_int_counter_vec(
137            "certificates_signed",
138            "Number of confirmed block certificates signed by each validator",
139            &["validator_name"],
140        )
141    });
142
143    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
144        register_int_counter(
145            "chain_info_queries",
146            "Number of chain info queries processed",
147        )
148    });
149
150    /// Number of cached chain worker channel endpoints in the map.
151    pub static CHAIN_WORKER_ENDPOINTS_CACHED: LazyLock<IntGauge> = LazyLock::new(|| {
152        register_int_gauge(
153            "chain_worker_endpoints_cached",
154            "Number of cached chain worker channel endpoints",
155        )
156    });
157
158    /// Holds metrics data extracted from a confirmed block certificate.
159    pub struct MetricsData {
160        certificate_log_str: &'static str,
161        round_type: &'static str,
162        round_number: u32,
163        confirmed_transactions: u64,
164        confirmed_incoming_bundles: u64,
165        confirmed_incoming_messages: u64,
166        confirmed_operations: u64,
167        validators_with_signatures: Vec<String>,
168    }
169
170    impl MetricsData {
171        /// Creates a new `MetricsData` by extracting data from the certificate.
172        pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
173            Self {
174                certificate_log_str: certificate.inner().to_log_str(),
175                round_type: certificate.round.type_name(),
176                round_number: certificate.round.number(),
177                confirmed_transactions: certificate.block().body.transactions.len() as u64,
178                confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
179                    as u64,
180                confirmed_incoming_messages: certificate
181                    .block()
182                    .body
183                    .incoming_bundles()
184                    .map(|b| b.messages().count())
185                    .sum::<usize>() as u64,
186                confirmed_operations: certificate.block().body.operations().count() as u64,
187                validators_with_signatures: certificate
188                    .signatures()
189                    .iter()
190                    .map(|(validator_name, _)| validator_name.to_string())
191                    .collect(),
192            }
193        }
194
195        /// Records the metrics for a processed block.
196        pub fn record(self) {
197            NUM_BLOCKS.with_label_values(&[]).inc();
198            NUM_ROUNDS_IN_CERTIFICATE
199                .with_label_values(&[self.certificate_log_str, self.round_type])
200                .observe(self.round_number as f64);
201            TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
202            INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
203            OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
204            if self.confirmed_transactions > 0 {
205                TRANSACTION_COUNT
206                    .with_label_values(&[])
207                    .inc_by(self.confirmed_transactions);
208                if self.confirmed_incoming_bundles > 0 {
209                    INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
210                }
211                if self.confirmed_incoming_messages > 0 {
212                    INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
213                }
214                if self.confirmed_operations > 0 {
215                    OPERATION_COUNT.inc_by(self.confirmed_operations);
216                }
217            }
218
219            for validator_name in self.validators_with_signatures {
220                CERTIFICATES_SIGNED
221                    .with_label_values(&[&validator_name])
222                    .inc();
223            }
224        }
225    }
226}
227
228/// Instruct the networking layer to send cross-chain requests and/or push notifications.
229#[derive(Default, Debug)]
230pub struct NetworkActions {
231    /// The cross-chain requests
232    pub cross_chain_requests: Vec<CrossChainRequest>,
233    /// The push notifications.
234    pub notifications: Vec<Notification>,
235}
236
237impl NetworkActions {
238    pub fn extend(&mut self, other: NetworkActions) {
239        self.cross_chain_requests.extend(other.cross_chain_requests);
240        self.notifications.extend(other.notifications);
241    }
242}
243
244#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
245/// Notification that a chain has a new certified block or a new message.
246pub struct Notification {
247    pub chain_id: ChainId,
248    pub reason: Reason,
249}
250
251doc_scalar!(
252    Notification,
253    "Notify that a chain has a new certified block or a new message"
254);
255
256#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
257/// Reason for the notification.
258pub enum Reason {
259    NewBlock {
260        height: BlockHeight,
261        hash: CryptoHash,
262        event_streams: BTreeSet<StreamId>,
263    },
264    NewIncomingBundle {
265        origin: ChainId,
266        height: BlockHeight,
267    },
268    NewRound {
269        height: BlockHeight,
270        round: Round,
271    },
272    BlockExecuted {
273        height: BlockHeight,
274        hash: CryptoHash,
275    },
276    // NOTE: Keep this at the end for backward compatibility with old validators
277    // that use bincode integer-based variant indices. Old validators don't emit
278    // NewEvents, and inserting it before existing variants would shift their indices.
279    NewEvents {
280        height: BlockHeight,
281        hash: CryptoHash,
282        event_streams: BTreeSet<StreamId>,
283    },
284}
285
286/// Error type for worker operations.
287#[derive(Debug, Error)]
288pub enum WorkerError {
289    #[error(transparent)]
290    CryptoError(#[from] CryptoError),
291
292    #[error(transparent)]
293    ArithmeticError(#[from] ArithmeticError),
294
295    #[error(transparent)]
296    ViewError(#[from] ViewError),
297
298    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
299    ReadCertificatesError(Vec<CryptoHash>),
300
301    #[error(transparent)]
302    ChainError(#[from] Box<ChainError>),
303
304    #[error(transparent)]
305    BcsError(#[from] bcs::Error),
306
307    // Chain access control
308    #[error("Block was not signed by an authorized owner")]
309    InvalidOwner,
310
311    #[error("Operations in the block are not authenticated by the proper signer: {0}")]
312    InvalidSigner(AccountOwner),
313
314    // Chaining
315    #[error(
316        "Chain is expecting a next block at height {expected_block_height} but the given block \
317        is at height {found_block_height} instead"
318    )]
319    UnexpectedBlockHeight {
320        expected_block_height: BlockHeight,
321        found_block_height: BlockHeight,
322    },
323    #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
324    InvalidEpoch {
325        chain_id: ChainId,
326        chain_epoch: Epoch,
327        epoch: Epoch,
328    },
329
330    #[error("Events not found: {0:?}")]
331    EventsNotFound(Vec<EventId>),
332
333    // Other server-side errors
334    #[error("Invalid cross-chain request")]
335    InvalidCrossChainRequest,
336    #[error("The block does not contain the hash that we expected for the previous block")]
337    InvalidBlockChaining,
338    #[error(
339        "The given outcome is not what we computed after executing the block.\n\
340        Computed: {computed:#?}\n\
341        Submitted: {submitted:#?}"
342    )]
343    IncorrectOutcome {
344        computed: Box<BlockExecutionOutcome>,
345        submitted: Box<BlockExecutionOutcome>,
346    },
347    #[error(
348        "Block timestamp ({block_timestamp}) is further in the future from local time \
349        ({local_time}) than block time grace period ({block_time_grace_period:?}) \
350        [us:{block_timestamp_us}:{local_time_us}]",
351        block_timestamp_us = block_timestamp.micros(),
352        local_time_us = local_time.micros(),
353    )]
354    InvalidTimestamp {
355        block_timestamp: Timestamp,
356        local_time: Timestamp,
357        block_time_grace_period: Duration,
358    },
359    #[error("We don't have the value for the certificate.")]
360    MissingCertificateValue,
361    #[error("The hash certificate doesn't match its value.")]
362    InvalidLiteCertificate,
363    #[error("Fast blocks cannot query oracles")]
364    FastBlockUsingOracles,
365    #[error("Blobs not found: {0:?}")]
366    BlobsNotFound(Vec<BlobId>),
367    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
368    ConfirmedLogEntryNotFound {
369        height: BlockHeight,
370        chain_id: ChainId,
371    },
372    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
373    PreprocessedBlocksEntryNotFound {
374        height: BlockHeight,
375        chain_id: ChainId,
376    },
377    #[error("The block proposal is invalid: {0}")]
378    InvalidBlockProposal(String),
379    #[error("Blob was not required by any pending block")]
380    UnexpectedBlob,
381    #[error("Number of published blobs per block must not exceed {0}")]
382    TooManyPublishedBlobs(u64),
383    #[error("Missing network description")]
384    MissingNetworkDescription,
385    #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
386    ChainActorSendError {
387        chain_id: ChainId,
388        error: Box<dyn DynError>,
389    },
390    #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
391    ChainActorRecvError {
392        chain_id: ChainId,
393        error: Box<dyn DynError>,
394    },
395
396    #[error("thread error: {0}")]
397    Thread(#[from] web_thread_pool::Error),
398
399    #[error("Fallback mode is not available on this network")]
400    NoFallbackMode,
401}
402
403impl WorkerError {
404    /// Returns whether this error is caused by an issue in the local node.
405    ///
406    /// Returns `false` whenever the error could be caused by a bad message from a peer.
407    pub fn is_local(&self) -> bool {
408        match self {
409            WorkerError::CryptoError(_)
410            | WorkerError::ArithmeticError(_)
411            | WorkerError::InvalidOwner
412            | WorkerError::InvalidSigner(_)
413            | WorkerError::UnexpectedBlockHeight { .. }
414            | WorkerError::InvalidEpoch { .. }
415            | WorkerError::EventsNotFound(_)
416            | WorkerError::InvalidBlockChaining
417            | WorkerError::IncorrectOutcome { .. }
418            | WorkerError::InvalidTimestamp { .. }
419            | WorkerError::MissingCertificateValue
420            | WorkerError::InvalidLiteCertificate
421            | WorkerError::FastBlockUsingOracles
422            | WorkerError::BlobsNotFound(_)
423            | WorkerError::InvalidBlockProposal(_)
424            | WorkerError::UnexpectedBlob
425            | WorkerError::TooManyPublishedBlobs(_)
426            | WorkerError::NoFallbackMode
427            | WorkerError::ViewError(ViewError::NotFound(_)) => false,
428            WorkerError::BcsError(_)
429            | WorkerError::InvalidCrossChainRequest
430            | WorkerError::ViewError(_)
431            | WorkerError::ConfirmedLogEntryNotFound { .. }
432            | WorkerError::PreprocessedBlocksEntryNotFound { .. }
433            | WorkerError::MissingNetworkDescription
434            | WorkerError::ChainActorSendError { .. }
435            | WorkerError::ChainActorRecvError { .. }
436            | WorkerError::Thread(_)
437            | WorkerError::ReadCertificatesError(_) => true,
438            WorkerError::ChainError(chain_error) => chain_error.is_local(),
439        }
440    }
441}
442
443impl From<ChainError> for WorkerError {
444    #[instrument(level = "trace", skip(chain_error))]
445    fn from(chain_error: ChainError) -> Self {
446        match chain_error {
447            ChainError::ExecutionError(execution_error, context) => match *execution_error {
448                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
449                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
450                _ => Self::ChainError(Box::new(ChainError::ExecutionError(
451                    execution_error,
452                    context,
453                ))),
454            },
455            error => Self::ChainError(Box::new(error)),
456        }
457    }
458}
459
460#[cfg(with_testing)]
461impl WorkerError {
462    /// Returns the inner [`ExecutionError`] in this error.
463    ///
464    /// # Panics
465    ///
466    /// If this is not caused by an [`ExecutionError`].
467    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
468        let WorkerError::ChainError(chain_error) = self else {
469            panic!("Expected an `ExecutionError`. Got: {self:#?}");
470        };
471
472        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
473            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
474        };
475
476        assert_eq!(context, expected_context);
477
478        *execution_error
479    }
480}
481
482/// State of a worker in a validator or a local node.
483pub struct WorkerState<StorageClient>
484where
485    StorageClient: Storage,
486{
487    /// A name used for logging
488    nickname: String,
489    /// Access to local persistent storage.
490    storage: StorageClient,
491    /// Configuration options for the [`ChainWorker`]s.
492    chain_worker_config: ChainWorkerConfig,
493    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
494    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
495    /// Chains tracked by a worker, along with their listening modes.
496    chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
497    /// One-shot channels to notify callers when messages of a particular chain have been
498    /// delivered.
499    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
500    /// The set of spawned [`ChainWorkerActor`] tasks.
501    chain_worker_tasks: Arc<Mutex<JoinSet>>,
502    /// The cache of running [`ChainWorkerActor`]s.
503    chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
504}
505
506impl<StorageClient> Clone for WorkerState<StorageClient>
507where
508    StorageClient: Storage + Clone,
509{
510    fn clone(&self) -> Self {
511        WorkerState {
512            nickname: self.nickname.clone(),
513            storage: self.storage.clone(),
514            chain_worker_config: self.chain_worker_config.clone(),
515            block_cache: self.block_cache.clone(),
516            execution_state_cache: self.execution_state_cache.clone(),
517            chain_modes: self.chain_modes.clone(),
518            delivery_notifiers: self.delivery_notifiers.clone(),
519            chain_worker_tasks: self.chain_worker_tasks.clone(),
520            chain_workers: self.chain_workers.clone(),
521        }
522    }
523}
524
525/// The sender endpoint for [`ChainWorkerRequest`]s.
526type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
527    ChainWorkerRequest<<StorageClient as Storage>::Context>,
528    tracing::Span,
529    Instant,
530)>;
531
532pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
533
534impl<StorageClient> WorkerState<StorageClient>
535where
536    StorageClient: Storage,
537{
538    #[instrument(level = "trace", skip(nickname, key_pair, storage))]
539    pub fn new(
540        nickname: String,
541        key_pair: Option<ValidatorSecretKey>,
542        storage: StorageClient,
543    ) -> Self {
544        WorkerState {
545            nickname,
546            storage,
547            chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
548            block_cache: Arc::new(ValueCache::new(BLOCK_CACHE_SIZE)),
549            execution_state_cache: Arc::new(ValueCache::new(EXECUTION_STATE_CACHE_SIZE)),
550            chain_modes: None,
551            delivery_notifiers: Arc::default(),
552            chain_worker_tasks: Arc::default(),
553            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
554        }
555    }
556
557    #[instrument(level = "trace", skip(nickname, storage))]
558    pub fn new_for_client(
559        nickname: String,
560        storage: StorageClient,
561        chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
562    ) -> Self {
563        WorkerState {
564            nickname,
565            storage,
566            chain_worker_config: ChainWorkerConfig::default(),
567            block_cache: Arc::new(ValueCache::new(BLOCK_CACHE_SIZE)),
568            execution_state_cache: Arc::new(ValueCache::new(EXECUTION_STATE_CACHE_SIZE)),
569            chain_modes: Some(chain_modes),
570            delivery_notifiers: Arc::default(),
571            chain_worker_tasks: Arc::default(),
572            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
573        }
574    }
575
576    #[instrument(level = "trace", skip(self, value))]
577    pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
578        self.chain_worker_config.allow_inactive_chains = value;
579        self
580    }
581
582    #[instrument(level = "trace", skip(self, value))]
583    pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
584        self.chain_worker_config
585            .allow_messages_from_deprecated_epochs = value;
586        self
587    }
588
589    #[instrument(level = "trace", skip(self, value))]
590    pub fn with_long_lived_services(mut self, value: bool) -> Self {
591        self.chain_worker_config.long_lived_services = value;
592        self
593    }
594
595    /// Returns an instance with the specified block time grace period.
596    ///
597    /// Blocks with a timestamp this far in the future will still be accepted, but the validator
598    /// will wait until that timestamp before voting.
599    #[instrument(level = "trace", skip(self))]
600    pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self {
601        self.chain_worker_config.block_time_grace_period = block_time_grace_period;
602        self
603    }
604
605    /// Returns an instance with the specified chain worker TTL.
606    ///
607    /// Idle chain workers free their memory after that duration without requests.
608    #[instrument(level = "trace", skip(self))]
609    pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
610        self.chain_worker_config.ttl = chain_worker_ttl;
611        self
612    }
613
614    /// Returns an instance with the specified sender chain worker TTL.
615    ///
616    /// Idle sender chain workers free their memory after that duration without requests.
617    #[instrument(level = "trace", skip(self))]
618    pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
619        self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
620        self
621    }
622
623    /// Returns an instance with the specified maximum size for received_log entries.
624    ///
625    /// Sizes below `CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES` should be avoided.
626    #[instrument(level = "trace", skip(self))]
627    pub fn with_chain_info_max_received_log_entries(
628        mut self,
629        chain_info_max_received_log_entries: usize,
630    ) -> Self {
631        if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
632            warn!(
633                "The value set for the maximum size of received_log entries \
634                   may not be compatible with the latest clients: {} instead of {}",
635                chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
636            );
637        }
638        self.chain_worker_config.chain_info_max_received_log_entries =
639            chain_info_max_received_log_entries;
640        self
641    }
642
643    #[instrument(level = "trace", skip(self))]
644    pub fn nickname(&self) -> &str {
645        &self.nickname
646    }
647
648    /// Returns the storage client so that it can be manipulated or queried.
649    #[instrument(level = "trace", skip(self))]
650    #[cfg(not(feature = "test"))]
651    pub(crate) fn storage_client(&self) -> &StorageClient {
652        &self.storage
653    }
654
655    /// Returns the storage client so that it can be manipulated or queried by tests in other
656    /// crates.
657    #[instrument(level = "trace", skip(self))]
658    #[cfg(feature = "test")]
659    pub fn storage_client(&self) -> &StorageClient {
660        &self.storage
661    }
662
663    #[instrument(level = "trace", skip(self, certificate))]
664    pub(crate) async fn full_certificate(
665        &self,
666        certificate: LiteCertificate<'_>,
667    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
668        let block = self
669            .block_cache
670            .get(&certificate.value.value_hash)
671            .ok_or(WorkerError::MissingCertificateValue)?;
672
673        match certificate.value.kind {
674            linera_chain::types::CertificateKind::Confirmed => {
675                let value = ConfirmedBlock::from_hashed(block);
676                Ok(Either::Left(
677                    certificate
678                        .with_value(value)
679                        .ok_or(WorkerError::InvalidLiteCertificate)?,
680                ))
681            }
682            linera_chain::types::CertificateKind::Validated => {
683                let value = ValidatedBlock::from_hashed(block);
684                Ok(Either::Right(
685                    certificate
686                        .with_value(value)
687                        .ok_or(WorkerError::InvalidLiteCertificate)?,
688                ))
689            }
690            _ => Err(WorkerError::InvalidLiteCertificate),
691        }
692    }
693}
694
695#[allow(async_fn_in_trait)]
696#[cfg_attr(not(web), trait_variant::make(Send))]
697pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
698    async fn process_certificate<S: Storage + Clone + 'static>(
699        worker: &WorkerState<S>,
700        certificate: GenericCertificate<Self>,
701    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
702}
703
704impl ProcessableCertificate for ConfirmedBlock {
705    async fn process_certificate<S: Storage + Clone + 'static>(
706        worker: &WorkerState<S>,
707        certificate: ConfirmedBlockCertificate,
708    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
709        Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
710    }
711}
712
713impl ProcessableCertificate for ValidatedBlock {
714    async fn process_certificate<S: Storage + Clone + 'static>(
715        worker: &WorkerState<S>,
716        certificate: ValidatedBlockCertificate,
717    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
718        Box::pin(worker.handle_validated_certificate(certificate)).await
719    }
720}
721
722impl ProcessableCertificate for Timeout {
723    async fn process_certificate<S: Storage + Clone + 'static>(
724        worker: &WorkerState<S>,
725        certificate: TimeoutCertificate,
726    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
727        worker.handle_timeout_certificate(certificate).await
728    }
729}
730
731impl<StorageClient> WorkerState<StorageClient>
732where
733    StorageClient: Storage + Clone + 'static,
734{
735    #[instrument(level = "trace", skip(self, certificate, notifier))]
736    #[inline]
737    pub async fn fully_handle_certificate_with_notifications<T>(
738        &self,
739        certificate: GenericCertificate<T>,
740        notifier: &impl Notifier,
741    ) -> Result<ChainInfoResponse, WorkerError>
742    where
743        T: ProcessableCertificate,
744    {
745        let notifications = (*notifier).clone();
746        let this = self.clone();
747        linera_base::Task::spawn(async move {
748            let (response, actions) =
749                ProcessableCertificate::process_certificate(&this, certificate).await?;
750            notifications.notify(&actions.notifications);
751            let mut requests = VecDeque::from(actions.cross_chain_requests);
752            while let Some(request) = requests.pop_front() {
753                let actions = this.handle_cross_chain_request(request).await?;
754                requests.extend(actions.cross_chain_requests);
755                notifications.notify(&actions.notifications);
756            }
757            Ok(response)
758        })
759        .await
760    }
761
762    /// Tries to execute a block proposal without any verification other than block execution.
763    #[instrument(level = "trace", skip(self, block))]
764    pub async fn stage_block_execution(
765        &self,
766        block: ProposedBlock,
767        round: Option<u32>,
768        published_blobs: Vec<Blob>,
769    ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> {
770        self.query_chain_worker(block.chain_id, move |callback| {
771            ChainWorkerRequest::StageBlockExecution {
772                block,
773                round,
774                published_blobs,
775                callback,
776            }
777        })
778        .await
779    }
780
781    /// Tries to execute a block proposal with a policy for handling bundle failures.
782    ///
783    /// Returns the modified block (bundles may be rejected/removed), the executed block,
784    /// chain info response, and resource tracker.
785    #[instrument(level = "trace", skip(self, block))]
786    pub async fn stage_block_execution_with_policy(
787        &self,
788        block: ProposedBlock,
789        round: Option<u32>,
790        published_blobs: Vec<Blob>,
791        policy: BundleExecutionPolicy,
792    ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> {
793        self.query_chain_worker(block.chain_id, move |callback| {
794            ChainWorkerRequest::StageBlockExecutionWithPolicy {
795                block,
796                round,
797                published_blobs,
798                policy,
799                callback,
800            }
801        })
802        .await
803    }
804
805    /// Executes a [`Query`] for an application's state on a specific chain.
806    ///
807    /// If `block_hash` is specified, system will query the application's state
808    /// at that block. If it doesn't exist, it uses latest state.
809    #[instrument(level = "trace", skip(self, chain_id, query))]
810    pub async fn query_application(
811        &self,
812        chain_id: ChainId,
813        query: Query,
814        block_hash: Option<CryptoHash>,
815    ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
816        self.query_chain_worker(chain_id, move |callback| {
817            ChainWorkerRequest::QueryApplication {
818                query,
819                block_hash,
820                callback,
821            }
822        })
823        .await
824    }
825
826    #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
827        nickname = %self.nickname,
828        chain_id = %chain_id,
829        application_id = %application_id
830    ))]
831    pub async fn describe_application(
832        &self,
833        chain_id: ChainId,
834        application_id: ApplicationId,
835    ) -> Result<ApplicationDescription, WorkerError> {
836        self.query_chain_worker(chain_id, move |callback| {
837            ChainWorkerRequest::DescribeApplication {
838                application_id,
839                callback,
840            }
841        })
842        .await
843    }
844
845    /// Processes a confirmed block (aka a commit).
846    #[instrument(
847        level = "trace",
848        skip(self, certificate, notify_when_messages_are_delivered),
849        fields(
850            nickname = %self.nickname,
851            chain_id = %certificate.block().header.chain_id,
852            block_height = %certificate.block().header.height
853        )
854    )]
855    async fn process_confirmed_block(
856        &self,
857        certificate: ConfirmedBlockCertificate,
858        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
859    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
860        let chain_id = certificate.block().header.chain_id;
861        self.query_chain_worker(chain_id, move |callback| {
862            ChainWorkerRequest::ProcessConfirmedBlock {
863                certificate,
864                notify_when_messages_are_delivered,
865                callback,
866            }
867        })
868        .await
869    }
870
871    /// Processes a validated block issued from a multi-owner chain.
872    #[instrument(level = "trace", skip(self, certificate), fields(
873        nickname = %self.nickname,
874        chain_id = %certificate.block().header.chain_id,
875        block_height = %certificate.block().header.height
876    ))]
877    async fn process_validated_block(
878        &self,
879        certificate: ValidatedBlockCertificate,
880    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
881        let chain_id = certificate.block().header.chain_id;
882        self.query_chain_worker(chain_id, move |callback| {
883            ChainWorkerRequest::ProcessValidatedBlock {
884                certificate,
885                callback,
886            }
887        })
888        .await
889    }
890
891    /// Processes a leader timeout issued from a multi-owner chain.
892    #[instrument(level = "trace", skip(self, certificate), fields(
893        nickname = %self.nickname,
894        chain_id = %certificate.value().chain_id(),
895        height = %certificate.value().height()
896    ))]
897    async fn process_timeout(
898        &self,
899        certificate: TimeoutCertificate,
900    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
901        let chain_id = certificate.value().chain_id();
902        self.query_chain_worker(chain_id, move |callback| {
903            ChainWorkerRequest::ProcessTimeout {
904                certificate,
905                callback,
906            }
907        })
908        .await
909    }
910
911    #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
912        nickname = %self.nickname,
913        origin = %origin,
914        recipient = %recipient,
915        num_bundles = %bundles.len()
916    ))]
917    async fn process_cross_chain_update(
918        &self,
919        origin: ChainId,
920        recipient: ChainId,
921        bundles: Vec<(Epoch, MessageBundle)>,
922    ) -> Result<Option<BlockHeight>, WorkerError> {
923        self.query_chain_worker(recipient, move |callback| {
924            ChainWorkerRequest::ProcessCrossChainUpdate {
925                origin,
926                bundles,
927                callback,
928            }
929        })
930        .await
931    }
932
933    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
934    #[instrument(level = "trace", skip(self, chain_id, height), fields(
935        nickname = %self.nickname,
936        chain_id = %chain_id,
937        height = %height
938    ))]
939    #[cfg(with_testing)]
940    pub async fn read_certificate(
941        &self,
942        chain_id: ChainId,
943        height: BlockHeight,
944    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
945        self.query_chain_worker(chain_id, move |callback| {
946            ChainWorkerRequest::ReadCertificate { height, callback }
947        })
948        .await
949    }
950
951    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
952    /// [`ChainId`].
953    ///
954    /// The returned view holds a lock on the chain state, which prevents the worker from changing
955    /// the state of that chain.
956    #[instrument(level = "trace", skip(self), fields(
957        nickname = %self.nickname,
958        chain_id = %chain_id
959    ))]
960    pub async fn chain_state_view(
961        &self,
962        chain_id: ChainId,
963    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
964        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
965            callback,
966        })
967        .await
968    }
969
970    /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
971    #[instrument(level = "trace", skip(self, request_builder), fields(
972        nickname = %self.nickname,
973        chain_id = %chain_id
974    ))]
975    async fn query_chain_worker<Response>(
976        &self,
977        chain_id: ChainId,
978        request_builder: impl FnOnce(
979            oneshot::Sender<Result<Response, WorkerError>>,
980        ) -> ChainWorkerRequest<StorageClient::Context>,
981    ) -> Result<Response, WorkerError> {
982        // Build the request.
983        let (callback, response) = oneshot::channel();
984        let request = request_builder(callback);
985
986        // Call the endpoint, possibly a new one.
987        let new_channel = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
988
989        // We just created a channel: spawn the actor.
990        if let Some((sender, receiver)) = new_channel {
991            let delivery_notifier = self
992                .delivery_notifiers
993                .lock()
994                .unwrap()
995                .entry(chain_id)
996                .or_default()
997                .clone();
998
999            let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
1000                chain_modes
1001                    .read()
1002                    .unwrap()
1003                    .get(&chain_id)
1004                    .is_some_and(ListeningMode::is_full)
1005            });
1006
1007            let actor_task = ChainWorkerActor::run(
1008                self.chain_worker_config.clone(),
1009                self.storage.clone(),
1010                self.block_cache.clone(),
1011                self.execution_state_cache.clone(),
1012                self.chain_modes.clone(),
1013                delivery_notifier,
1014                chain_id,
1015                sender,
1016                receiver,
1017                is_tracked,
1018            );
1019
1020            self.chain_worker_tasks
1021                .lock()
1022                .unwrap()
1023                .spawn_task(actor_task);
1024        }
1025
1026        // Finally, wait a response.
1027        match response.await {
1028            Err(e) => {
1029                // The actor endpoint was dropped. Better luck next time!
1030                Err(WorkerError::ChainActorRecvError {
1031                    chain_id,
1032                    error: Box::new(e),
1033                })
1034            }
1035            Ok(response) => response,
1036        }
1037    }
1038
1039    /// Find an endpoint and call it. Create the endpoint if necessary.
1040    ///
1041    /// Returns `Some((sender, receiver))` if a new channel was created and the actor needs to be
1042    /// spawned, or `None` if an existing endpoint was used.
1043    #[instrument(level = "trace", skip(self), fields(
1044        nickname = %self.nickname,
1045        chain_id = %chain_id
1046    ))]
1047    #[expect(clippy::type_complexity)]
1048    fn call_and_maybe_create_chain_worker_endpoint(
1049        &self,
1050        chain_id: ChainId,
1051        request: ChainWorkerRequest<StorageClient::Context>,
1052    ) -> Result<
1053        Option<(
1054            ChainWorkerRequestSender<StorageClient::Context>,
1055            ChainWorkerRequestReceiver<StorageClient::Context>,
1056        )>,
1057        WorkerError,
1058    > {
1059        let mut chain_workers = self.chain_workers.lock().unwrap();
1060
1061        let (sender, new_channel) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
1062            (endpoint, None)
1063        } else {
1064            let (sender, receiver) = mpsc::unbounded_channel();
1065            (sender.clone(), Some((sender, receiver)))
1066        };
1067
1068        if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) {
1069            // The actor was dropped. Give up without (re-)inserting the endpoint in the cache.
1070            return Err(WorkerError::ChainActorSendError {
1071                chain_id,
1072                error: Box::new(e),
1073            });
1074        }
1075
1076        // Put back the sender in the cache for next time.
1077        chain_workers.insert(chain_id, sender);
1078        #[cfg(with_metrics)]
1079        metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64);
1080
1081        Ok(new_channel)
1082    }
1083
1084    #[instrument(skip_all, fields(
1085        nick = self.nickname,
1086        chain_id = format!("{:.8}", proposal.content.block.chain_id),
1087        height = %proposal.content.block.height,
1088    ))]
1089    pub async fn handle_block_proposal(
1090        &self,
1091        proposal: BlockProposal,
1092    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1093        trace!("{} <-- {:?}", self.nickname, proposal);
1094        #[cfg(with_metrics)]
1095        let round = proposal.content.round;
1096        let response = self
1097            .query_chain_worker(proposal.content.block.chain_id, move |callback| {
1098                ChainWorkerRequest::HandleBlockProposal { proposal, callback }
1099            })
1100            .await?;
1101        #[cfg(with_metrics)]
1102        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1103            .with_label_values(&[round.type_name()])
1104            .observe(round.number() as f64);
1105        Ok(response)
1106    }
1107
1108    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
1109    // Other fields will be included in the caller's span.
1110    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
1111    pub async fn handle_lite_certificate(
1112        &self,
1113        certificate: LiteCertificate<'_>,
1114        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1115    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1116        match self.full_certificate(certificate).await? {
1117            Either::Left(confirmed) => {
1118                Box::pin(
1119                    self.handle_confirmed_certificate(
1120                        confirmed,
1121                        notify_when_messages_are_delivered,
1122                    ),
1123                )
1124                .await
1125            }
1126            Either::Right(validated) => {
1127                if let Some(notifier) = notify_when_messages_are_delivered {
1128                    // Nothing to wait for.
1129                    if let Err(()) = notifier.send(()) {
1130                        warn!("Failed to notify message delivery to caller");
1131                    }
1132                }
1133                Box::pin(self.handle_validated_certificate(validated)).await
1134            }
1135        }
1136    }
1137
1138    /// Processes a confirmed block certificate.
1139    #[instrument(skip_all, fields(
1140        nick = self.nickname,
1141        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1142        height = %certificate.block().header.height,
1143    ))]
1144    pub async fn handle_confirmed_certificate(
1145        &self,
1146        certificate: ConfirmedBlockCertificate,
1147        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1148    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1149        trace!("{} <-- {:?}", self.nickname, certificate);
1150        #[cfg(with_metrics)]
1151        let metrics_data = metrics::MetricsData::new(&certificate);
1152
1153        let (info, actions, _outcome) =
1154            Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1155                .await?;
1156
1157        #[cfg(with_metrics)]
1158        if matches!(_outcome, BlockOutcome::Processed) {
1159            metrics_data.record();
1160        }
1161        Ok((info, actions))
1162    }
1163
1164    /// Processes a validated block certificate.
1165    #[instrument(skip_all, fields(
1166        nick = self.nickname,
1167        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1168        height = %certificate.block().header.height,
1169    ))]
1170    pub async fn handle_validated_certificate(
1171        &self,
1172        certificate: ValidatedBlockCertificate,
1173    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1174        trace!("{} <-- {:?}", self.nickname, certificate);
1175
1176        #[cfg(with_metrics)]
1177        let round = certificate.round;
1178        #[cfg(with_metrics)]
1179        let cert_str = certificate.inner().to_log_str();
1180
1181        let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1182        #[cfg(with_metrics)]
1183        {
1184            if matches!(_outcome, BlockOutcome::Processed) {
1185                metrics::NUM_ROUNDS_IN_CERTIFICATE
1186                    .with_label_values(&[cert_str, round.type_name()])
1187                    .observe(round.number() as f64);
1188            }
1189        }
1190        Ok((info, actions))
1191    }
1192
1193    /// Processes a timeout certificate
1194    #[instrument(skip_all, fields(
1195        nick = self.nickname,
1196        chain_id = format!("{:.8}", certificate.inner().chain_id()),
1197        height = %certificate.inner().height(),
1198    ))]
1199    pub async fn handle_timeout_certificate(
1200        &self,
1201        certificate: TimeoutCertificate,
1202    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1203        trace!("{} <-- {:?}", self.nickname, certificate);
1204        self.process_timeout(certificate).await
1205    }
1206
1207    #[instrument(skip_all, fields(
1208        nick = self.nickname,
1209        chain_id = format!("{:.8}", query.chain_id)
1210    ))]
1211    pub async fn handle_chain_info_query(
1212        &self,
1213        query: ChainInfoQuery,
1214    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1215        trace!("{} <-- {:?}", self.nickname, query);
1216        #[cfg(with_metrics)]
1217        metrics::CHAIN_INFO_QUERIES.inc();
1218        let result = self
1219            .query_chain_worker(query.chain_id, move |callback| {
1220                ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1221            })
1222            .await;
1223        trace!("{} --> {:?}", self.nickname, result);
1224        result
1225    }
1226
1227    #[instrument(skip_all, fields(
1228        nick = self.nickname,
1229        chain_id = format!("{:.8}", chain_id)
1230    ))]
1231    pub async fn download_pending_blob(
1232        &self,
1233        chain_id: ChainId,
1234        blob_id: BlobId,
1235    ) -> Result<Blob, WorkerError> {
1236        trace!(
1237            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1238            self.nickname
1239        );
1240        let result = self
1241            .query_chain_worker(chain_id, move |callback| {
1242                ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1243            })
1244            .await;
1245        trace!(
1246            "{} --> {:?}",
1247            self.nickname,
1248            result.as_ref().map(|_| blob_id)
1249        );
1250        result
1251    }
1252
1253    #[instrument(skip_all, fields(
1254        nick = self.nickname,
1255        chain_id = format!("{:.8}", chain_id)
1256    ))]
1257    pub async fn handle_pending_blob(
1258        &self,
1259        chain_id: ChainId,
1260        blob: Blob,
1261    ) -> Result<ChainInfoResponse, WorkerError> {
1262        let blob_id = blob.id();
1263        trace!(
1264            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1265            self.nickname
1266        );
1267        let result = self
1268            .query_chain_worker(chain_id, move |callback| {
1269                ChainWorkerRequest::HandlePendingBlob { blob, callback }
1270            })
1271            .await;
1272        trace!(
1273            "{} --> {:?}",
1274            self.nickname,
1275            result.as_ref().map(|_| blob_id)
1276        );
1277        result
1278    }
1279
1280    #[instrument(skip_all, fields(
1281        nick = self.nickname,
1282        chain_id = format!("{:.8}", request.target_chain_id())
1283    ))]
1284    pub async fn handle_cross_chain_request(
1285        &self,
1286        request: CrossChainRequest,
1287    ) -> Result<NetworkActions, WorkerError> {
1288        trace!("{} <-- {:?}", self.nickname, request);
1289        match request {
1290            CrossChainRequest::UpdateRecipient {
1291                sender,
1292                recipient,
1293                bundles,
1294            } => {
1295                let mut actions = NetworkActions::default();
1296                let origin = sender;
1297                let Some(height) = self
1298                    .process_cross_chain_update(origin, recipient, bundles)
1299                    .await?
1300                else {
1301                    return Ok(actions);
1302                };
1303                actions.notifications.push(Notification {
1304                    chain_id: recipient,
1305                    reason: Reason::NewIncomingBundle { origin, height },
1306                });
1307                actions
1308                    .cross_chain_requests
1309                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1310                        sender,
1311                        recipient,
1312                        latest_height: height,
1313                    });
1314                Ok(actions)
1315            }
1316            CrossChainRequest::ConfirmUpdatedRecipient {
1317                sender,
1318                recipient,
1319                latest_height,
1320            } => {
1321                self.query_chain_worker(sender, move |callback| {
1322                    ChainWorkerRequest::ConfirmUpdatedRecipient {
1323                        recipient,
1324                        latest_height,
1325                        callback,
1326                    }
1327                })
1328                .await?;
1329                Ok(NetworkActions::default())
1330            }
1331        }
1332    }
1333
1334    /// Updates the received certificate trackers to at least the given values.
1335    #[instrument(skip_all, fields(
1336        nickname = %self.nickname,
1337        chain_id = %chain_id,
1338        num_trackers = %new_trackers.len()
1339    ))]
1340    pub async fn update_received_certificate_trackers(
1341        &self,
1342        chain_id: ChainId,
1343        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1344    ) -> Result<(), WorkerError> {
1345        self.query_chain_worker(chain_id, move |callback| {
1346            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1347                new_trackers,
1348                callback,
1349            }
1350        })
1351        .await
1352    }
1353
1354    /// Gets preprocessed block hashes in a given height range.
1355    #[instrument(skip_all, fields(
1356        nickname = %self.nickname,
1357        chain_id = %chain_id,
1358        start = %start,
1359        end = %end
1360    ))]
1361    pub async fn get_preprocessed_block_hashes(
1362        &self,
1363        chain_id: ChainId,
1364        start: BlockHeight,
1365        end: BlockHeight,
1366    ) -> Result<Vec<CryptoHash>, WorkerError> {
1367        self.query_chain_worker(chain_id, move |callback| {
1368            ChainWorkerRequest::GetPreprocessedBlockHashes {
1369                start,
1370                end,
1371                callback,
1372            }
1373        })
1374        .await
1375    }
1376
1377    /// Gets the next block height to receive from an inbox.
1378    #[instrument(skip_all, fields(
1379        nickname = %self.nickname,
1380        chain_id = %chain_id,
1381        origin = %origin
1382    ))]
1383    pub async fn get_inbox_next_height(
1384        &self,
1385        chain_id: ChainId,
1386        origin: ChainId,
1387    ) -> Result<BlockHeight, WorkerError> {
1388        self.query_chain_worker(chain_id, move |callback| {
1389            ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1390        })
1391        .await
1392    }
1393
1394    /// Gets locking blobs for specific blob IDs.
1395    /// Returns `Ok(None)` if any of the blobs is not found.
1396    #[instrument(skip_all, fields(
1397        nickname = %self.nickname,
1398        chain_id = %chain_id,
1399        num_blob_ids = %blob_ids.len()
1400    ))]
1401    pub async fn get_locking_blobs(
1402        &self,
1403        chain_id: ChainId,
1404        blob_ids: Vec<BlobId>,
1405    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1406        self.query_chain_worker(chain_id, move |callback| {
1407            ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1408        })
1409        .await
1410    }
1411
1412    /// Gets block hashes for the given heights.
1413    pub async fn get_block_hashes(
1414        &self,
1415        chain_id: ChainId,
1416        heights: Vec<BlockHeight>,
1417    ) -> Result<Vec<CryptoHash>, WorkerError> {
1418        self.query_chain_worker(chain_id, move |callback| {
1419            ChainWorkerRequest::GetBlockHashes { heights, callback }
1420        })
1421        .await
1422    }
1423
1424    /// Gets proposed blobs from the manager for specified blob IDs.
1425    pub async fn get_proposed_blobs(
1426        &self,
1427        chain_id: ChainId,
1428        blob_ids: Vec<BlobId>,
1429    ) -> Result<Vec<Blob>, WorkerError> {
1430        self.query_chain_worker(chain_id, move |callback| {
1431            ChainWorkerRequest::GetProposedBlobs { blob_ids, callback }
1432        })
1433        .await
1434    }
1435
1436    /// Gets event subscriptions from the chain.
1437    pub async fn get_event_subscriptions(
1438        &self,
1439        chain_id: ChainId,
1440    ) -> Result<EventSubscriptionsResult, WorkerError> {
1441        self.query_chain_worker(chain_id, |callback| {
1442            ChainWorkerRequest::GetEventSubscriptions { callback }
1443        })
1444        .await
1445    }
1446
1447    /// Gets the stream event count for a stream.
1448    pub async fn get_stream_event_count(
1449        &self,
1450        chain_id: ChainId,
1451        stream_id: StreamId,
1452    ) -> Result<Option<u32>, WorkerError> {
1453        self.query_chain_worker(chain_id, move |callback| {
1454            ChainWorkerRequest::GetStreamEventCount {
1455                stream_id,
1456                callback,
1457            }
1458        })
1459        .await
1460    }
1461
1462    /// Gets received certificate trackers.
1463    pub async fn get_received_certificate_trackers(
1464        &self,
1465        chain_id: ChainId,
1466    ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1467        self.query_chain_worker(chain_id, |callback| {
1468            ChainWorkerRequest::GetReceivedCertificateTrackers { callback }
1469        })
1470        .await
1471    }
1472
1473    /// Gets tip state and outbox info for next_outbox_heights calculation.
1474    pub async fn get_tip_state_and_outbox_info(
1475        &self,
1476        chain_id: ChainId,
1477        receiver_id: ChainId,
1478    ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1479        self.query_chain_worker(chain_id, move |callback| {
1480            ChainWorkerRequest::GetTipStateAndOutboxInfo {
1481                receiver_id,
1482                callback,
1483            }
1484        })
1485        .await
1486    }
1487
1488    /// Gets the next height to preprocess.
1489    pub async fn get_next_height_to_preprocess(
1490        &self,
1491        chain_id: ChainId,
1492    ) -> Result<BlockHeight, WorkerError> {
1493        self.query_chain_worker(chain_id, |callback| {
1494            ChainWorkerRequest::GetNextHeightToPreprocess { callback }
1495        })
1496        .await
1497    }
1498
1499    /// Gets the chain manager's seed for leader election.
1500    pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, WorkerError> {
1501        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetManagerSeed {
1502            callback,
1503        })
1504        .await
1505    }
1506}
1507
1508#[cfg(with_testing)]
1509impl<StorageClient> WorkerState<StorageClient>
1510where
1511    StorageClient: Storage,
1512{
1513    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1514    ///
1515    /// # Panics
1516    ///
1517    /// If the validator doesn't have a key pair assigned to it.
1518    #[instrument(level = "trace", skip(self))]
1519    pub fn public_key(&self) -> ValidatorPublicKey {
1520        self.chain_worker_config
1521            .key_pair()
1522            .expect(
1523                "Test validator should have a key pair assigned to it \
1524                in order to obtain its public key",
1525            )
1526            .public()
1527    }
1528}