Skip to main content

linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14    data_types::{
15        ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp,
16    },
17    doc_scalar,
18    hashed::Hashed,
19    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
20    time::Instant,
21    util::traits::DynError,
22};
23#[cfg(with_testing)]
24use linera_chain::ChainExecutionContext;
25use linera_chain::{
26    data_types::{
27        BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock,
28    },
29    types::{
30        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
31        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
32    },
33    ChainError, ChainStateView,
34};
35use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
36use linera_storage::Storage;
37use linera_views::{context::InactiveContext, ViewError};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
41use tracing::{error, instrument, trace, warn};
42
43/// Re-export of [`EventSubscriptionsResult`] for use by other crate modules.
44pub(crate) use crate::chain_worker::{
45    ChainWorkerRequestReceiver, ChainWorkerRequestSender, EventSubscriptionsResult,
46};
47use crate::{
48    chain_worker::{
49        BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
50    },
51    client::ListeningMode,
52    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
53    join_set_ext::{JoinSet, JoinSetExt},
54    notifier::Notifier,
55    value_cache::ValueCache,
56    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
57};
58
59const BLOCK_CACHE_SIZE: usize = 5_000;
60const EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
61
62#[cfg(test)]
63#[path = "unit_tests/worker_tests.rs"]
64mod worker_tests;
65
66#[cfg(with_metrics)]
67mod metrics {
68    use std::sync::LazyLock;
69
70    use linera_base::prometheus_util::{
71        exponential_bucket_interval, register_histogram, register_histogram_vec,
72        register_int_counter, register_int_counter_vec, register_int_gauge,
73    };
74    use linera_chain::types::ConfirmedBlockCertificate;
75    use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge};
76
77    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
78        register_histogram_vec(
79            "num_rounds_in_certificate",
80            "Number of rounds in certificate",
81            &["certificate_value", "round_type"],
82            exponential_bucket_interval(0.1, 50.0),
83        )
84    });
85
86    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
87        register_histogram_vec(
88            "num_rounds_in_block_proposal",
89            "Number of rounds in block proposal",
90            &["round_type"],
91            exponential_bucket_interval(0.1, 50.0),
92        )
93    });
94
95    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
96        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
97
98    pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
99        LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
100
101    pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
102        LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
103
104    pub static OPERATION_COUNT: LazyLock<IntCounter> =
105        LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
106
107    pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
108        register_histogram(
109            "operations_per_block",
110            "Number of operations per block",
111            exponential_bucket_interval(1.0, 10000.0),
112        )
113    });
114
115    pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
116        register_histogram(
117            "incoming_bundles_per_block",
118            "Number of incoming bundles per block",
119            exponential_bucket_interval(1.0, 10000.0),
120        )
121    });
122
123    pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
124        register_histogram(
125            "transactions_per_block",
126            "Number of transactions per block",
127            exponential_bucket_interval(1.0, 10000.0),
128        )
129    });
130
131    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
132        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
133    });
134
135    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
136        register_int_counter_vec(
137            "certificates_signed",
138            "Number of confirmed block certificates signed by each validator",
139            &["validator_name"],
140        )
141    });
142
143    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
144        register_int_counter(
145            "chain_info_queries",
146            "Number of chain info queries processed",
147        )
148    });
149
150    /// Number of cached chain worker channel endpoints in the map.
151    pub static CHAIN_WORKER_ENDPOINTS_CACHED: LazyLock<IntGauge> = LazyLock::new(|| {
152        register_int_gauge(
153            "chain_worker_endpoints_cached",
154            "Number of cached chain worker channel endpoints",
155        )
156    });
157
158    /// Holds metrics data extracted from a confirmed block certificate.
159    pub struct MetricsData {
160        certificate_log_str: &'static str,
161        round_type: &'static str,
162        round_number: u32,
163        confirmed_transactions: u64,
164        confirmed_incoming_bundles: u64,
165        confirmed_incoming_messages: u64,
166        confirmed_operations: u64,
167        validators_with_signatures: Vec<String>,
168    }
169
170    impl MetricsData {
171        /// Creates a new `MetricsData` by extracting data from the certificate.
172        pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
173            Self {
174                certificate_log_str: certificate.inner().to_log_str(),
175                round_type: certificate.round.type_name(),
176                round_number: certificate.round.number(),
177                confirmed_transactions: certificate.block().body.transactions.len() as u64,
178                confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
179                    as u64,
180                confirmed_incoming_messages: certificate
181                    .block()
182                    .body
183                    .incoming_bundles()
184                    .map(|b| b.messages().count())
185                    .sum::<usize>() as u64,
186                confirmed_operations: certificate.block().body.operations().count() as u64,
187                validators_with_signatures: certificate
188                    .signatures()
189                    .iter()
190                    .map(|(validator_name, _)| validator_name.to_string())
191                    .collect(),
192            }
193        }
194
195        /// Records the metrics for a processed block.
196        pub fn record(self) {
197            NUM_BLOCKS.with_label_values(&[]).inc();
198            NUM_ROUNDS_IN_CERTIFICATE
199                .with_label_values(&[self.certificate_log_str, self.round_type])
200                .observe(self.round_number as f64);
201            TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
202            INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
203            OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
204            if self.confirmed_transactions > 0 {
205                TRANSACTION_COUNT
206                    .with_label_values(&[])
207                    .inc_by(self.confirmed_transactions);
208                if self.confirmed_incoming_bundles > 0 {
209                    INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
210                }
211                if self.confirmed_incoming_messages > 0 {
212                    INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
213                }
214                if self.confirmed_operations > 0 {
215                    OPERATION_COUNT.inc_by(self.confirmed_operations);
216                }
217            }
218
219            for validator_name in self.validators_with_signatures {
220                CERTIFICATES_SIGNED
221                    .with_label_values(&[&validator_name])
222                    .inc();
223            }
224        }
225    }
226}
227
228/// Instruct the networking layer to send cross-chain requests and/or push notifications.
229#[derive(Default, Debug)]
230pub struct NetworkActions {
231    /// The cross-chain requests
232    pub cross_chain_requests: Vec<CrossChainRequest>,
233    /// The push notifications.
234    pub notifications: Vec<Notification>,
235}
236
237impl NetworkActions {
238    pub fn extend(&mut self, other: NetworkActions) {
239        self.cross_chain_requests.extend(other.cross_chain_requests);
240        self.notifications.extend(other.notifications);
241    }
242}
243
244#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
245/// Notification that a chain has a new certified block or a new message.
246pub struct Notification {
247    pub chain_id: ChainId,
248    pub reason: Reason,
249}
250
251doc_scalar!(
252    Notification,
253    "Notify that a chain has a new certified block or a new message"
254);
255
256#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
257/// Reason for the notification.
258pub enum Reason {
259    NewBlock {
260        height: BlockHeight,
261        hash: CryptoHash,
262        event_streams: BTreeSet<StreamId>,
263    },
264    NewIncomingBundle {
265        origin: ChainId,
266        height: BlockHeight,
267    },
268    NewRound {
269        height: BlockHeight,
270        round: Round,
271    },
272    BlockExecuted {
273        height: BlockHeight,
274        hash: CryptoHash,
275    },
276}
277
278/// Error type for worker operations.
279#[derive(Debug, Error)]
280pub enum WorkerError {
281    #[error(transparent)]
282    CryptoError(#[from] CryptoError),
283
284    #[error(transparent)]
285    ArithmeticError(#[from] ArithmeticError),
286
287    #[error(transparent)]
288    ViewError(#[from] ViewError),
289
290    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
291    ReadCertificatesError(Vec<CryptoHash>),
292
293    #[error(transparent)]
294    ChainError(#[from] Box<ChainError>),
295
296    #[error(transparent)]
297    BcsError(#[from] bcs::Error),
298
299    // Chain access control
300    #[error("Block was not signed by an authorized owner")]
301    InvalidOwner,
302
303    #[error("Operations in the block are not authenticated by the proper signer: {0}")]
304    InvalidSigner(AccountOwner),
305
306    // Chaining
307    #[error(
308        "Chain is expecting a next block at height {expected_block_height} but the given block \
309        is at height {found_block_height} instead"
310    )]
311    UnexpectedBlockHeight {
312        expected_block_height: BlockHeight,
313        found_block_height: BlockHeight,
314    },
315    #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
316    InvalidEpoch {
317        chain_id: ChainId,
318        chain_epoch: Epoch,
319        epoch: Epoch,
320    },
321
322    #[error("Events not found: {0:?}")]
323    EventsNotFound(Vec<EventId>),
324
325    // Other server-side errors
326    #[error("Invalid cross-chain request")]
327    InvalidCrossChainRequest,
328    #[error("The block does not contain the hash that we expected for the previous block")]
329    InvalidBlockChaining,
330    #[error(
331        "The given outcome is not what we computed after executing the block.\n\
332        Computed: {computed:#?}\n\
333        Submitted: {submitted:#?}"
334    )]
335    IncorrectOutcome {
336        computed: Box<BlockExecutionOutcome>,
337        submitted: Box<BlockExecutionOutcome>,
338    },
339    #[error(
340        "Block timestamp ({block_timestamp}) is further in the future from local time \
341        ({local_time}) than block time grace period ({block_time_grace_period:?}) \
342        [us:{block_timestamp_us}:{local_time_us}]",
343        block_timestamp_us = block_timestamp.micros(),
344        local_time_us = local_time.micros(),
345    )]
346    InvalidTimestamp {
347        block_timestamp: Timestamp,
348        local_time: Timestamp,
349        block_time_grace_period: Duration,
350    },
351    #[error("We don't have the value for the certificate.")]
352    MissingCertificateValue,
353    #[error("The hash certificate doesn't match its value.")]
354    InvalidLiteCertificate,
355    #[error("Fast blocks cannot query oracles")]
356    FastBlockUsingOracles,
357    #[error("Blobs not found: {0:?}")]
358    BlobsNotFound(Vec<BlobId>),
359    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
360    ConfirmedLogEntryNotFound {
361        height: BlockHeight,
362        chain_id: ChainId,
363    },
364    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
365    PreprocessedBlocksEntryNotFound {
366        height: BlockHeight,
367        chain_id: ChainId,
368    },
369    #[error("The block proposal is invalid: {0}")]
370    InvalidBlockProposal(String),
371    #[error("Blob was not required by any pending block")]
372    UnexpectedBlob,
373    #[error("Number of published blobs per block must not exceed {0}")]
374    TooManyPublishedBlobs(u64),
375    #[error("Missing network description")]
376    MissingNetworkDescription,
377    #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
378    ChainActorSendError {
379        chain_id: ChainId,
380        error: Box<dyn DynError>,
381    },
382    #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
383    ChainActorRecvError {
384        chain_id: ChainId,
385        error: Box<dyn DynError>,
386    },
387
388    #[error("thread error: {0}")]
389    Thread(#[from] web_thread_pool::Error),
390
391    #[error("Fallback mode is not available on this network")]
392    NoFallbackMode,
393}
394
395impl WorkerError {
396    /// Returns whether this error is caused by an issue in the local node.
397    ///
398    /// Returns `false` whenever the error could be caused by a bad message from a peer.
399    pub fn is_local(&self) -> bool {
400        match self {
401            WorkerError::CryptoError(_)
402            | WorkerError::ArithmeticError(_)
403            | WorkerError::InvalidOwner
404            | WorkerError::InvalidSigner(_)
405            | WorkerError::UnexpectedBlockHeight { .. }
406            | WorkerError::InvalidEpoch { .. }
407            | WorkerError::EventsNotFound(_)
408            | WorkerError::InvalidBlockChaining
409            | WorkerError::IncorrectOutcome { .. }
410            | WorkerError::InvalidTimestamp { .. }
411            | WorkerError::MissingCertificateValue
412            | WorkerError::InvalidLiteCertificate
413            | WorkerError::FastBlockUsingOracles
414            | WorkerError::BlobsNotFound(_)
415            | WorkerError::InvalidBlockProposal(_)
416            | WorkerError::UnexpectedBlob
417            | WorkerError::TooManyPublishedBlobs(_)
418            | WorkerError::NoFallbackMode
419            | WorkerError::ViewError(ViewError::NotFound(_)) => false,
420            WorkerError::BcsError(_)
421            | WorkerError::InvalidCrossChainRequest
422            | WorkerError::ViewError(_)
423            | WorkerError::ConfirmedLogEntryNotFound { .. }
424            | WorkerError::PreprocessedBlocksEntryNotFound { .. }
425            | WorkerError::MissingNetworkDescription
426            | WorkerError::ChainActorSendError { .. }
427            | WorkerError::ChainActorRecvError { .. }
428            | WorkerError::Thread(_)
429            | WorkerError::ReadCertificatesError(_) => true,
430            WorkerError::ChainError(chain_error) => chain_error.is_local(),
431        }
432    }
433}
434
435impl From<ChainError> for WorkerError {
436    #[instrument(level = "trace", skip(chain_error))]
437    fn from(chain_error: ChainError) -> Self {
438        match chain_error {
439            ChainError::ExecutionError(execution_error, context) => match *execution_error {
440                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
441                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
442                _ => Self::ChainError(Box::new(ChainError::ExecutionError(
443                    execution_error,
444                    context,
445                ))),
446            },
447            error => Self::ChainError(Box::new(error)),
448        }
449    }
450}
451
452#[cfg(with_testing)]
453impl WorkerError {
454    /// Returns the inner [`ExecutionError`] in this error.
455    ///
456    /// # Panics
457    ///
458    /// If this is not caused by an [`ExecutionError`].
459    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
460        let WorkerError::ChainError(chain_error) = self else {
461            panic!("Expected an `ExecutionError`. Got: {self:#?}");
462        };
463
464        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
465            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
466        };
467
468        assert_eq!(context, expected_context);
469
470        *execution_error
471    }
472}
473
474/// State of a worker in a validator or a local node.
475pub struct WorkerState<StorageClient>
476where
477    StorageClient: Storage,
478{
479    /// A name used for logging
480    nickname: String,
481    /// Access to local persistent storage.
482    storage: StorageClient,
483    /// Configuration options for the [`ChainWorker`]s.
484    chain_worker_config: ChainWorkerConfig,
485    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
486    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
487    /// Chains tracked by a worker, along with their listening modes.
488    chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
489    /// One-shot channels to notify callers when messages of a particular chain have been
490    /// delivered.
491    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
492    /// The set of spawned [`ChainWorkerActor`] tasks.
493    chain_worker_tasks: Arc<Mutex<JoinSet>>,
494    /// The cache of running [`ChainWorkerActor`]s.
495    chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
496}
497
498impl<StorageClient> Clone for WorkerState<StorageClient>
499where
500    StorageClient: Storage + Clone,
501{
502    fn clone(&self) -> Self {
503        WorkerState {
504            nickname: self.nickname.clone(),
505            storage: self.storage.clone(),
506            chain_worker_config: self.chain_worker_config.clone(),
507            block_cache: self.block_cache.clone(),
508            execution_state_cache: self.execution_state_cache.clone(),
509            chain_modes: self.chain_modes.clone(),
510            delivery_notifiers: self.delivery_notifiers.clone(),
511            chain_worker_tasks: self.chain_worker_tasks.clone(),
512            chain_workers: self.chain_workers.clone(),
513        }
514    }
515}
516
517/// The sender endpoint for [`ChainWorkerRequest`]s.
518type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
519    ChainWorkerRequest<<StorageClient as Storage>::Context>,
520    tracing::Span,
521    Instant,
522)>;
523
524pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
525
526impl<StorageClient> WorkerState<StorageClient>
527where
528    StorageClient: Storage,
529{
530    #[instrument(level = "trace", skip(nickname, key_pair, storage))]
531    pub fn new(
532        nickname: String,
533        key_pair: Option<ValidatorSecretKey>,
534        storage: StorageClient,
535    ) -> Self {
536        WorkerState {
537            nickname,
538            storage,
539            chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
540            block_cache: Arc::new(ValueCache::new(BLOCK_CACHE_SIZE)),
541            execution_state_cache: Arc::new(ValueCache::new(EXECUTION_STATE_CACHE_SIZE)),
542            chain_modes: None,
543            delivery_notifiers: Arc::default(),
544            chain_worker_tasks: Arc::default(),
545            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
546        }
547    }
548
549    #[instrument(level = "trace", skip(nickname, storage))]
550    pub fn new_for_client(
551        nickname: String,
552        storage: StorageClient,
553        chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
554    ) -> Self {
555        WorkerState {
556            nickname,
557            storage,
558            chain_worker_config: ChainWorkerConfig::default(),
559            block_cache: Arc::new(ValueCache::new(BLOCK_CACHE_SIZE)),
560            execution_state_cache: Arc::new(ValueCache::new(EXECUTION_STATE_CACHE_SIZE)),
561            chain_modes: Some(chain_modes),
562            delivery_notifiers: Arc::default(),
563            chain_worker_tasks: Arc::default(),
564            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
565        }
566    }
567
568    #[instrument(level = "trace", skip(self, value))]
569    pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
570        self.chain_worker_config.allow_inactive_chains = value;
571        self
572    }
573
574    #[instrument(level = "trace", skip(self, value))]
575    pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
576        self.chain_worker_config
577            .allow_messages_from_deprecated_epochs = value;
578        self
579    }
580
581    #[instrument(level = "trace", skip(self, value))]
582    pub fn with_long_lived_services(mut self, value: bool) -> Self {
583        self.chain_worker_config.long_lived_services = value;
584        self
585    }
586
587    /// Returns an instance with the specified block time grace period.
588    ///
589    /// Blocks with a timestamp this far in the future will still be accepted, but the validator
590    /// will wait until that timestamp before voting.
591    #[instrument(level = "trace", skip(self))]
592    pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self {
593        self.chain_worker_config.block_time_grace_period = block_time_grace_period;
594        self
595    }
596
597    /// Returns an instance with the specified chain worker TTL.
598    ///
599    /// Idle chain workers free their memory after that duration without requests.
600    #[instrument(level = "trace", skip(self))]
601    pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
602        self.chain_worker_config.ttl = chain_worker_ttl;
603        self
604    }
605
606    /// Returns an instance with the specified sender chain worker TTL.
607    ///
608    /// Idle sender chain workers free their memory after that duration without requests.
609    #[instrument(level = "trace", skip(self))]
610    pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
611        self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
612        self
613    }
614
615    /// Returns an instance with the specified maximum size for received_log entries.
616    ///
617    /// Sizes below `CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES` should be avoided.
618    #[instrument(level = "trace", skip(self))]
619    pub fn with_chain_info_max_received_log_entries(
620        mut self,
621        chain_info_max_received_log_entries: usize,
622    ) -> Self {
623        if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
624            warn!(
625                "The value set for the maximum size of received_log entries \
626                   may not be compatible with the latest clients: {} instead of {}",
627                chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
628            );
629        }
630        self.chain_worker_config.chain_info_max_received_log_entries =
631            chain_info_max_received_log_entries;
632        self
633    }
634
635    #[instrument(level = "trace", skip(self))]
636    pub fn nickname(&self) -> &str {
637        &self.nickname
638    }
639
640    /// Returns the storage client so that it can be manipulated or queried.
641    #[instrument(level = "trace", skip(self))]
642    #[cfg(not(feature = "test"))]
643    pub(crate) fn storage_client(&self) -> &StorageClient {
644        &self.storage
645    }
646
647    /// Returns the storage client so that it can be manipulated or queried by tests in other
648    /// crates.
649    #[instrument(level = "trace", skip(self))]
650    #[cfg(feature = "test")]
651    pub fn storage_client(&self) -> &StorageClient {
652        &self.storage
653    }
654
655    #[instrument(level = "trace", skip(self, certificate))]
656    pub(crate) async fn full_certificate(
657        &self,
658        certificate: LiteCertificate<'_>,
659    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
660        let block = self
661            .block_cache
662            .get(&certificate.value.value_hash)
663            .ok_or(WorkerError::MissingCertificateValue)?;
664
665        match certificate.value.kind {
666            linera_chain::types::CertificateKind::Confirmed => {
667                let value = ConfirmedBlock::from_hashed(block);
668                Ok(Either::Left(
669                    certificate
670                        .with_value(value)
671                        .ok_or(WorkerError::InvalidLiteCertificate)?,
672                ))
673            }
674            linera_chain::types::CertificateKind::Validated => {
675                let value = ValidatedBlock::from_hashed(block);
676                Ok(Either::Right(
677                    certificate
678                        .with_value(value)
679                        .ok_or(WorkerError::InvalidLiteCertificate)?,
680                ))
681            }
682            _ => Err(WorkerError::InvalidLiteCertificate),
683        }
684    }
685}
686
687#[allow(async_fn_in_trait)]
688#[cfg_attr(not(web), trait_variant::make(Send))]
689pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
690    async fn process_certificate<S: Storage + Clone + 'static>(
691        worker: &WorkerState<S>,
692        certificate: GenericCertificate<Self>,
693    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
694}
695
696impl ProcessableCertificate for ConfirmedBlock {
697    async fn process_certificate<S: Storage + Clone + 'static>(
698        worker: &WorkerState<S>,
699        certificate: ConfirmedBlockCertificate,
700    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
701        Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
702    }
703}
704
705impl ProcessableCertificate for ValidatedBlock {
706    async fn process_certificate<S: Storage + Clone + 'static>(
707        worker: &WorkerState<S>,
708        certificate: ValidatedBlockCertificate,
709    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
710        Box::pin(worker.handle_validated_certificate(certificate)).await
711    }
712}
713
714impl ProcessableCertificate for Timeout {
715    async fn process_certificate<S: Storage + Clone + 'static>(
716        worker: &WorkerState<S>,
717        certificate: TimeoutCertificate,
718    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
719        worker.handle_timeout_certificate(certificate).await
720    }
721}
722
723impl<StorageClient> WorkerState<StorageClient>
724where
725    StorageClient: Storage + Clone + 'static,
726{
727    #[instrument(level = "trace", skip(self, certificate, notifier))]
728    #[inline]
729    pub async fn fully_handle_certificate_with_notifications<T>(
730        &self,
731        certificate: GenericCertificate<T>,
732        notifier: &impl Notifier,
733    ) -> Result<ChainInfoResponse, WorkerError>
734    where
735        T: ProcessableCertificate,
736    {
737        let notifications = (*notifier).clone();
738        let this = self.clone();
739        linera_base::task::spawn(async move {
740            let (response, actions) =
741                ProcessableCertificate::process_certificate(&this, certificate).await?;
742            notifications.notify(&actions.notifications);
743            let mut requests = VecDeque::from(actions.cross_chain_requests);
744            while let Some(request) = requests.pop_front() {
745                let actions = this.handle_cross_chain_request(request).await?;
746                requests.extend(actions.cross_chain_requests);
747                notifications.notify(&actions.notifications);
748            }
749            Ok(response)
750        })
751        .await
752    }
753
754    /// Tries to execute a block proposal without any verification other than block execution.
755    #[instrument(level = "trace", skip(self, block))]
756    pub async fn stage_block_execution(
757        &self,
758        block: ProposedBlock,
759        round: Option<u32>,
760        published_blobs: Vec<Blob>,
761    ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> {
762        self.query_chain_worker(block.chain_id, move |callback| {
763            ChainWorkerRequest::StageBlockExecution {
764                block,
765                round,
766                published_blobs,
767                callback,
768            }
769        })
770        .await
771    }
772
773    /// Tries to execute a block proposal with a policy for handling bundle failures.
774    ///
775    /// Returns the modified block (bundles may be rejected/removed), the executed block,
776    /// chain info response, and resource tracker.
777    #[instrument(level = "trace", skip(self, block))]
778    pub async fn stage_block_execution_with_policy(
779        &self,
780        block: ProposedBlock,
781        round: Option<u32>,
782        published_blobs: Vec<Blob>,
783        policy: BundleExecutionPolicy,
784    ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> {
785        self.query_chain_worker(block.chain_id, move |callback| {
786            ChainWorkerRequest::StageBlockExecutionWithPolicy {
787                block,
788                round,
789                published_blobs,
790                policy,
791                callback,
792            }
793        })
794        .await
795    }
796
797    /// Executes a [`Query`] for an application's state on a specific chain.
798    ///
799    /// If `block_hash` is specified, system will query the application's state
800    /// at that block. If it doesn't exist, it uses latest state.
801    #[instrument(level = "trace", skip(self, chain_id, query))]
802    pub async fn query_application(
803        &self,
804        chain_id: ChainId,
805        query: Query,
806        block_hash: Option<CryptoHash>,
807    ) -> Result<QueryOutcome, WorkerError> {
808        self.query_chain_worker(chain_id, move |callback| {
809            ChainWorkerRequest::QueryApplication {
810                query,
811                block_hash,
812                callback,
813            }
814        })
815        .await
816    }
817
818    #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
819        nickname = %self.nickname,
820        chain_id = %chain_id,
821        application_id = %application_id
822    ))]
823    pub async fn describe_application(
824        &self,
825        chain_id: ChainId,
826        application_id: ApplicationId,
827    ) -> Result<ApplicationDescription, WorkerError> {
828        self.query_chain_worker(chain_id, move |callback| {
829            ChainWorkerRequest::DescribeApplication {
830                application_id,
831                callback,
832            }
833        })
834        .await
835    }
836
837    /// Processes a confirmed block (aka a commit).
838    #[instrument(
839        level = "trace",
840        skip(self, certificate, notify_when_messages_are_delivered),
841        fields(
842            nickname = %self.nickname,
843            chain_id = %certificate.block().header.chain_id,
844            block_height = %certificate.block().header.height
845        )
846    )]
847    async fn process_confirmed_block(
848        &self,
849        certificate: ConfirmedBlockCertificate,
850        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
851    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
852        let chain_id = certificate.block().header.chain_id;
853        self.query_chain_worker(chain_id, move |callback| {
854            ChainWorkerRequest::ProcessConfirmedBlock {
855                certificate,
856                notify_when_messages_are_delivered,
857                callback,
858            }
859        })
860        .await
861    }
862
863    /// Processes a validated block issued from a multi-owner chain.
864    #[instrument(level = "trace", skip(self, certificate), fields(
865        nickname = %self.nickname,
866        chain_id = %certificate.block().header.chain_id,
867        block_height = %certificate.block().header.height
868    ))]
869    async fn process_validated_block(
870        &self,
871        certificate: ValidatedBlockCertificate,
872    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
873        let chain_id = certificate.block().header.chain_id;
874        self.query_chain_worker(chain_id, move |callback| {
875            ChainWorkerRequest::ProcessValidatedBlock {
876                certificate,
877                callback,
878            }
879        })
880        .await
881    }
882
883    /// Processes a leader timeout issued from a multi-owner chain.
884    #[instrument(level = "trace", skip(self, certificate), fields(
885        nickname = %self.nickname,
886        chain_id = %certificate.value().chain_id(),
887        height = %certificate.value().height()
888    ))]
889    async fn process_timeout(
890        &self,
891        certificate: TimeoutCertificate,
892    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
893        let chain_id = certificate.value().chain_id();
894        self.query_chain_worker(chain_id, move |callback| {
895            ChainWorkerRequest::ProcessTimeout {
896                certificate,
897                callback,
898            }
899        })
900        .await
901    }
902
903    #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
904        nickname = %self.nickname,
905        origin = %origin,
906        recipient = %recipient,
907        num_bundles = %bundles.len()
908    ))]
909    async fn process_cross_chain_update(
910        &self,
911        origin: ChainId,
912        recipient: ChainId,
913        bundles: Vec<(Epoch, MessageBundle)>,
914    ) -> Result<Option<BlockHeight>, WorkerError> {
915        self.query_chain_worker(recipient, move |callback| {
916            ChainWorkerRequest::ProcessCrossChainUpdate {
917                origin,
918                bundles,
919                callback,
920            }
921        })
922        .await
923    }
924
925    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
926    #[instrument(level = "trace", skip(self, chain_id, height), fields(
927        nickname = %self.nickname,
928        chain_id = %chain_id,
929        height = %height
930    ))]
931    #[cfg(with_testing)]
932    pub async fn read_certificate(
933        &self,
934        chain_id: ChainId,
935        height: BlockHeight,
936    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
937        self.query_chain_worker(chain_id, move |callback| {
938            ChainWorkerRequest::ReadCertificate { height, callback }
939        })
940        .await
941    }
942
943    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
944    /// [`ChainId`].
945    ///
946    /// The returned view holds a lock on the chain state, which prevents the worker from changing
947    /// the state of that chain.
948    #[instrument(level = "trace", skip(self), fields(
949        nickname = %self.nickname,
950        chain_id = %chain_id
951    ))]
952    pub async fn chain_state_view(
953        &self,
954        chain_id: ChainId,
955    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
956        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
957            callback,
958        })
959        .await
960    }
961
962    /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
963    #[instrument(level = "trace", skip(self, request_builder), fields(
964        nickname = %self.nickname,
965        chain_id = %chain_id
966    ))]
967    async fn query_chain_worker<Response>(
968        &self,
969        chain_id: ChainId,
970        request_builder: impl FnOnce(
971            oneshot::Sender<Result<Response, WorkerError>>,
972        ) -> ChainWorkerRequest<StorageClient::Context>,
973    ) -> Result<Response, WorkerError> {
974        // Build the request.
975        let (callback, response) = oneshot::channel();
976        let request = request_builder(callback);
977
978        // Call the endpoint, possibly a new one.
979        let new_channel = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
980
981        // We just created a channel: spawn the actor.
982        if let Some((sender, receiver)) = new_channel {
983            let delivery_notifier = self
984                .delivery_notifiers
985                .lock()
986                .unwrap()
987                .entry(chain_id)
988                .or_default()
989                .clone();
990
991            let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
992                chain_modes
993                    .read()
994                    .unwrap()
995                    .get(&chain_id)
996                    .is_some_and(ListeningMode::is_full)
997            });
998
999            let actor_task = ChainWorkerActor::run(
1000                self.chain_worker_config.clone(),
1001                self.storage.clone(),
1002                self.block_cache.clone(),
1003                self.execution_state_cache.clone(),
1004                self.chain_modes.clone(),
1005                delivery_notifier,
1006                chain_id,
1007                sender,
1008                receiver,
1009                is_tracked,
1010            );
1011
1012            self.chain_worker_tasks
1013                .lock()
1014                .unwrap()
1015                .spawn_task(actor_task);
1016        }
1017
1018        // Finally, wait a response.
1019        match response.await {
1020            Err(e) => {
1021                // The actor endpoint was dropped. Better luck next time!
1022                Err(WorkerError::ChainActorRecvError {
1023                    chain_id,
1024                    error: Box::new(e),
1025                })
1026            }
1027            Ok(response) => response,
1028        }
1029    }
1030
1031    /// Find an endpoint and call it. Create the endpoint if necessary.
1032    ///
1033    /// Returns `Some((sender, receiver))` if a new channel was created and the actor needs to be
1034    /// spawned, or `None` if an existing endpoint was used.
1035    #[instrument(level = "trace", skip(self), fields(
1036        nickname = %self.nickname,
1037        chain_id = %chain_id
1038    ))]
1039    #[expect(clippy::type_complexity)]
1040    fn call_and_maybe_create_chain_worker_endpoint(
1041        &self,
1042        chain_id: ChainId,
1043        request: ChainWorkerRequest<StorageClient::Context>,
1044    ) -> Result<
1045        Option<(
1046            ChainWorkerRequestSender<StorageClient::Context>,
1047            ChainWorkerRequestReceiver<StorageClient::Context>,
1048        )>,
1049        WorkerError,
1050    > {
1051        let mut chain_workers = self.chain_workers.lock().unwrap();
1052
1053        let (sender, new_channel) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
1054            (endpoint, None)
1055        } else {
1056            let (sender, receiver) = mpsc::unbounded_channel();
1057            (sender.clone(), Some((sender, receiver)))
1058        };
1059
1060        if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) {
1061            // The actor was dropped. Give up without (re-)inserting the endpoint in the cache.
1062            return Err(WorkerError::ChainActorSendError {
1063                chain_id,
1064                error: Box::new(e),
1065            });
1066        }
1067
1068        // Put back the sender in the cache for next time.
1069        chain_workers.insert(chain_id, sender);
1070        #[cfg(with_metrics)]
1071        metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64);
1072
1073        Ok(new_channel)
1074    }
1075
1076    #[instrument(skip_all, fields(
1077        nick = self.nickname,
1078        chain_id = format!("{:.8}", proposal.content.block.chain_id),
1079        height = %proposal.content.block.height,
1080    ))]
1081    pub async fn handle_block_proposal(
1082        &self,
1083        proposal: BlockProposal,
1084    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1085        trace!("{} <-- {:?}", self.nickname, proposal);
1086        #[cfg(with_metrics)]
1087        let round = proposal.content.round;
1088        let response = self
1089            .query_chain_worker(proposal.content.block.chain_id, move |callback| {
1090                ChainWorkerRequest::HandleBlockProposal { proposal, callback }
1091            })
1092            .await?;
1093        #[cfg(with_metrics)]
1094        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1095            .with_label_values(&[round.type_name()])
1096            .observe(round.number() as f64);
1097        Ok(response)
1098    }
1099
1100    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
1101    // Other fields will be included in handle_certificate's span.
1102    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
1103    pub async fn handle_lite_certificate(
1104        &self,
1105        certificate: LiteCertificate<'_>,
1106        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1107    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1108        match self.full_certificate(certificate).await? {
1109            Either::Left(confirmed) => {
1110                Box::pin(
1111                    self.handle_confirmed_certificate(
1112                        confirmed,
1113                        notify_when_messages_are_delivered,
1114                    ),
1115                )
1116                .await
1117            }
1118            Either::Right(validated) => {
1119                if let Some(notifier) = notify_when_messages_are_delivered {
1120                    // Nothing to wait for.
1121                    if let Err(()) = notifier.send(()) {
1122                        warn!("Failed to notify message delivery to caller");
1123                    }
1124                }
1125                Box::pin(self.handle_validated_certificate(validated)).await
1126            }
1127        }
1128    }
1129
1130    /// Processes a confirmed block certificate.
1131    #[instrument(skip_all, fields(
1132        nick = self.nickname,
1133        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1134        height = %certificate.block().header.height,
1135    ))]
1136    pub async fn handle_confirmed_certificate(
1137        &self,
1138        certificate: ConfirmedBlockCertificate,
1139        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1140    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1141        trace!("{} <-- {:?}", self.nickname, certificate);
1142        #[cfg(with_metrics)]
1143        let metrics_data = metrics::MetricsData::new(&certificate);
1144
1145        let (info, actions, _outcome) =
1146            Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1147                .await?;
1148
1149        #[cfg(with_metrics)]
1150        if matches!(_outcome, BlockOutcome::Processed) {
1151            metrics_data.record();
1152        }
1153        Ok((info, actions))
1154    }
1155
1156    /// Processes a validated block certificate.
1157    #[instrument(skip_all, fields(
1158        nick = self.nickname,
1159        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1160        height = %certificate.block().header.height,
1161    ))]
1162    pub async fn handle_validated_certificate(
1163        &self,
1164        certificate: ValidatedBlockCertificate,
1165    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1166        trace!("{} <-- {:?}", self.nickname, certificate);
1167
1168        #[cfg(with_metrics)]
1169        let round = certificate.round;
1170        #[cfg(with_metrics)]
1171        let cert_str = certificate.inner().to_log_str();
1172
1173        let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1174        #[cfg(with_metrics)]
1175        {
1176            if matches!(_outcome, BlockOutcome::Processed) {
1177                metrics::NUM_ROUNDS_IN_CERTIFICATE
1178                    .with_label_values(&[cert_str, round.type_name()])
1179                    .observe(round.number() as f64);
1180            }
1181        }
1182        Ok((info, actions))
1183    }
1184
1185    /// Processes a timeout certificate
1186    #[instrument(skip_all, fields(
1187        nick = self.nickname,
1188        chain_id = format!("{:.8}", certificate.inner().chain_id()),
1189        height = %certificate.inner().height(),
1190    ))]
1191    pub async fn handle_timeout_certificate(
1192        &self,
1193        certificate: TimeoutCertificate,
1194    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1195        trace!("{} <-- {:?}", self.nickname, certificate);
1196        self.process_timeout(certificate).await
1197    }
1198
1199    #[instrument(skip_all, fields(
1200        nick = self.nickname,
1201        chain_id = format!("{:.8}", query.chain_id)
1202    ))]
1203    pub async fn handle_chain_info_query(
1204        &self,
1205        query: ChainInfoQuery,
1206    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1207        trace!("{} <-- {:?}", self.nickname, query);
1208        #[cfg(with_metrics)]
1209        metrics::CHAIN_INFO_QUERIES.inc();
1210        let result = self
1211            .query_chain_worker(query.chain_id, move |callback| {
1212                ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1213            })
1214            .await;
1215        trace!("{} --> {:?}", self.nickname, result);
1216        result
1217    }
1218
1219    #[instrument(skip_all, fields(
1220        nick = self.nickname,
1221        chain_id = format!("{:.8}", chain_id)
1222    ))]
1223    pub async fn download_pending_blob(
1224        &self,
1225        chain_id: ChainId,
1226        blob_id: BlobId,
1227    ) -> Result<Blob, WorkerError> {
1228        trace!(
1229            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1230            self.nickname
1231        );
1232        let result = self
1233            .query_chain_worker(chain_id, move |callback| {
1234                ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1235            })
1236            .await;
1237        trace!(
1238            "{} --> {:?}",
1239            self.nickname,
1240            result.as_ref().map(|_| blob_id)
1241        );
1242        result
1243    }
1244
1245    #[instrument(skip_all, fields(
1246        nick = self.nickname,
1247        chain_id = format!("{:.8}", chain_id)
1248    ))]
1249    pub async fn handle_pending_blob(
1250        &self,
1251        chain_id: ChainId,
1252        blob: Blob,
1253    ) -> Result<ChainInfoResponse, WorkerError> {
1254        let blob_id = blob.id();
1255        trace!(
1256            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1257            self.nickname
1258        );
1259        let result = self
1260            .query_chain_worker(chain_id, move |callback| {
1261                ChainWorkerRequest::HandlePendingBlob { blob, callback }
1262            })
1263            .await;
1264        trace!(
1265            "{} --> {:?}",
1266            self.nickname,
1267            result.as_ref().map(|_| blob_id)
1268        );
1269        result
1270    }
1271
1272    #[instrument(skip_all, fields(
1273        nick = self.nickname,
1274        chain_id = format!("{:.8}", request.target_chain_id())
1275    ))]
1276    pub async fn handle_cross_chain_request(
1277        &self,
1278        request: CrossChainRequest,
1279    ) -> Result<NetworkActions, WorkerError> {
1280        trace!("{} <-- {:?}", self.nickname, request);
1281        match request {
1282            CrossChainRequest::UpdateRecipient {
1283                sender,
1284                recipient,
1285                bundles,
1286            } => {
1287                let mut actions = NetworkActions::default();
1288                let origin = sender;
1289                let Some(height) = self
1290                    .process_cross_chain_update(origin, recipient, bundles)
1291                    .await?
1292                else {
1293                    return Ok(actions);
1294                };
1295                actions.notifications.push(Notification {
1296                    chain_id: recipient,
1297                    reason: Reason::NewIncomingBundle { origin, height },
1298                });
1299                actions
1300                    .cross_chain_requests
1301                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1302                        sender,
1303                        recipient,
1304                        latest_height: height,
1305                    });
1306                Ok(actions)
1307            }
1308            CrossChainRequest::ConfirmUpdatedRecipient {
1309                sender,
1310                recipient,
1311                latest_height,
1312            } => {
1313                self.query_chain_worker(sender, move |callback| {
1314                    ChainWorkerRequest::ConfirmUpdatedRecipient {
1315                        recipient,
1316                        latest_height,
1317                        callback,
1318                    }
1319                })
1320                .await?;
1321                Ok(NetworkActions::default())
1322            }
1323        }
1324    }
1325
1326    /// Updates the received certificate trackers to at least the given values.
1327    #[instrument(skip_all, fields(
1328        nickname = %self.nickname,
1329        chain_id = %chain_id,
1330        num_trackers = %new_trackers.len()
1331    ))]
1332    pub async fn update_received_certificate_trackers(
1333        &self,
1334        chain_id: ChainId,
1335        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1336    ) -> Result<(), WorkerError> {
1337        self.query_chain_worker(chain_id, move |callback| {
1338            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1339                new_trackers,
1340                callback,
1341            }
1342        })
1343        .await
1344    }
1345
1346    /// Gets preprocessed block hashes in a given height range.
1347    #[instrument(skip_all, fields(
1348        nickname = %self.nickname,
1349        chain_id = %chain_id,
1350        start = %start,
1351        end = %end
1352    ))]
1353    pub async fn get_preprocessed_block_hashes(
1354        &self,
1355        chain_id: ChainId,
1356        start: BlockHeight,
1357        end: BlockHeight,
1358    ) -> Result<Vec<CryptoHash>, WorkerError> {
1359        self.query_chain_worker(chain_id, move |callback| {
1360            ChainWorkerRequest::GetPreprocessedBlockHashes {
1361                start,
1362                end,
1363                callback,
1364            }
1365        })
1366        .await
1367    }
1368
1369    /// Gets the next block height to receive from an inbox.
1370    #[instrument(skip_all, fields(
1371        nickname = %self.nickname,
1372        chain_id = %chain_id,
1373        origin = %origin
1374    ))]
1375    pub async fn get_inbox_next_height(
1376        &self,
1377        chain_id: ChainId,
1378        origin: ChainId,
1379    ) -> Result<BlockHeight, WorkerError> {
1380        self.query_chain_worker(chain_id, move |callback| {
1381            ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1382        })
1383        .await
1384    }
1385
1386    /// Gets locking blobs for specific blob IDs.
1387    /// Returns `Ok(None)` if any of the blobs is not found.
1388    #[instrument(skip_all, fields(
1389        nickname = %self.nickname,
1390        chain_id = %chain_id,
1391        num_blob_ids = %blob_ids.len()
1392    ))]
1393    pub async fn get_locking_blobs(
1394        &self,
1395        chain_id: ChainId,
1396        blob_ids: Vec<BlobId>,
1397    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1398        self.query_chain_worker(chain_id, move |callback| {
1399            ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1400        })
1401        .await
1402    }
1403
1404    /// Gets block hashes for the given heights.
1405    pub async fn get_block_hashes(
1406        &self,
1407        chain_id: ChainId,
1408        heights: Vec<BlockHeight>,
1409    ) -> Result<Vec<CryptoHash>, WorkerError> {
1410        self.query_chain_worker(chain_id, move |callback| {
1411            ChainWorkerRequest::GetBlockHashes { heights, callback }
1412        })
1413        .await
1414    }
1415
1416    /// Gets proposed blobs from the manager for specified blob IDs.
1417    pub async fn get_proposed_blobs(
1418        &self,
1419        chain_id: ChainId,
1420        blob_ids: Vec<BlobId>,
1421    ) -> Result<Vec<Blob>, WorkerError> {
1422        self.query_chain_worker(chain_id, move |callback| {
1423            ChainWorkerRequest::GetProposedBlobs { blob_ids, callback }
1424        })
1425        .await
1426    }
1427
1428    /// Gets event subscriptions from the chain.
1429    pub async fn get_event_subscriptions(
1430        &self,
1431        chain_id: ChainId,
1432    ) -> Result<EventSubscriptionsResult, WorkerError> {
1433        self.query_chain_worker(chain_id, |callback| {
1434            ChainWorkerRequest::GetEventSubscriptions { callback }
1435        })
1436        .await
1437    }
1438
1439    /// Gets the stream event count for a stream.
1440    pub async fn get_stream_event_count(
1441        &self,
1442        chain_id: ChainId,
1443        stream_id: StreamId,
1444    ) -> Result<Option<u32>, WorkerError> {
1445        self.query_chain_worker(chain_id, move |callback| {
1446            ChainWorkerRequest::GetStreamEventCount {
1447                stream_id,
1448                callback,
1449            }
1450        })
1451        .await
1452    }
1453
1454    /// Gets received certificate trackers.
1455    pub async fn get_received_certificate_trackers(
1456        &self,
1457        chain_id: ChainId,
1458    ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1459        self.query_chain_worker(chain_id, |callback| {
1460            ChainWorkerRequest::GetReceivedCertificateTrackers { callback }
1461        })
1462        .await
1463    }
1464
1465    /// Gets tip state and outbox info for next_outbox_heights calculation.
1466    pub async fn get_tip_state_and_outbox_info(
1467        &self,
1468        chain_id: ChainId,
1469        receiver_id: ChainId,
1470    ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1471        self.query_chain_worker(chain_id, move |callback| {
1472            ChainWorkerRequest::GetTipStateAndOutboxInfo {
1473                receiver_id,
1474                callback,
1475            }
1476        })
1477        .await
1478    }
1479
1480    /// Gets the next height to preprocess.
1481    pub async fn get_next_height_to_preprocess(
1482        &self,
1483        chain_id: ChainId,
1484    ) -> Result<BlockHeight, WorkerError> {
1485        self.query_chain_worker(chain_id, |callback| {
1486            ChainWorkerRequest::GetNextHeightToPreprocess { callback }
1487        })
1488        .await
1489    }
1490
1491    /// Gets the chain manager's seed for leader election.
1492    pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, WorkerError> {
1493        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetManagerSeed {
1494            callback,
1495        })
1496        .await
1497    }
1498}
1499
1500#[cfg(with_testing)]
1501impl<StorageClient> WorkerState<StorageClient>
1502where
1503    StorageClient: Storage,
1504{
1505    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1506    ///
1507    /// # Panics
1508    ///
1509    /// If the validator doesn't have a key pair assigned to it.
1510    #[instrument(level = "trace", skip(self))]
1511    pub fn public_key(&self) -> ValidatorPublicKey {
1512        self.chain_worker_config
1513            .key_pair()
1514            .expect(
1515                "Test validator should have a key pair assigned to it \
1516                in order to obtain it's public key",
1517            )
1518            .public()
1519    }
1520}