Skip to main content

linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::{
12    future::{Either, Shared},
13    FutureExt as _,
14};
15use linera_base::{
16    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
17    data_types::{
18        ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta,
19        Timestamp,
20    },
21    doc_scalar,
22    hashed::Hashed,
23    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
24};
25use linera_cache::{UniqueValueCache, ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
26#[cfg(with_testing)]
27use linera_chain::ChainExecutionContext;
28use linera_chain::{
29    data_types::{BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock},
30    types::{
31        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
32        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
33    },
34    ChainError, ChainStateView,
35};
36use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
37use linera_storage::{Clock as _, Storage};
38use linera_views::{context::InactiveContext, ViewError};
39use serde::{Deserialize, Serialize};
40use thiserror::Error;
41use tokio::sync::{oneshot, OwnedRwLockReadGuard};
42use tracing::{instrument, trace, warn};
43
44/// A read guard providing access to a chain's [`ChainStateView`].
45///
46/// Holds a read lock on the chain worker state, preventing writes for its
47/// lifetime. The `OwnedRwLockReadGuard` internally holds a strong `Arc`
48/// reference to the `RwLock<ChainWorkerState>`, keeping the state alive.
49/// Dereferences to `ChainStateView`.
50pub struct ChainStateViewReadGuard<S: Storage>(
51    OwnedRwLockReadGuard<ChainWorkerState<S>, ChainStateView<S::Context>>,
52);
53
54impl<S: Storage> std::ops::Deref for ChainStateViewReadGuard<S> {
55    type Target = ChainStateView<S::Context>;
56
57    fn deref(&self) -> &Self::Target {
58        &self.0
59    }
60}
61
62/// Re-export of [`EventSubscriptionsResult`] for use by other crate modules.
63pub(crate) use crate::chain_worker::EventSubscriptionsResult;
64use crate::{
65    chain_worker::{
66        handle, state::ChainWorkerState, BlockOutcome, ChainWorkerConfig, CrossChainUpdateResult,
67        DeliveryNotifier,
68    },
69    client::ListeningMode,
70    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
71    notifier::Notifier,
72};
73
74#[cfg(test)]
75#[path = "unit_tests/worker_tests.rs"]
76mod worker_tests;
77
78/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds.
79/// On web targets the future is returned as-is.
80#[cfg(not(web))]
81pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> sync_wrapper::SyncFuture<F> {
82    sync_wrapper::SyncFuture::new(f)
83}
84
85/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds.
86/// On web targets the future is returned as-is.
87#[cfg(web)]
88pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> F {
89    f
90}
91
92pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000;
93pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
94
95#[cfg(with_metrics)]
96mod metrics {
97    use std::sync::LazyLock;
98
99    use linera_base::prometheus_util::{
100        exponential_bucket_interval, register_histogram, register_histogram_vec,
101        register_int_counter, register_int_counter_vec,
102    };
103    use linera_chain::{data_types::MessageAction, types::ConfirmedBlockCertificate};
104    use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
105
106    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
107        register_histogram_vec(
108            "num_rounds_in_certificate",
109            "Number of rounds in certificate",
110            &["certificate_value", "round_type"],
111            exponential_bucket_interval(0.1, 50.0),
112        )
113    });
114
115    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
116        register_histogram_vec(
117            "num_rounds_in_block_proposal",
118            "Number of rounds in block proposal",
119            &["round_type"],
120            exponential_bucket_interval(0.1, 50.0),
121        )
122    });
123
124    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
125        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
126
127    pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
128        LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
129
130    pub static REJECTED_BUNDLE_COUNT: LazyLock<IntCounter> =
131        LazyLock::new(|| register_int_counter("rejected_bundle_count", "Rejected bundle count"));
132
133    pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
134        LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
135
136    pub static OPERATION_COUNT: LazyLock<IntCounter> =
137        LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
138
139    pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
140        register_histogram(
141            "operations_per_block",
142            "Number of operations per block",
143            exponential_bucket_interval(1.0, 10000.0),
144        )
145    });
146
147    pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
148        register_histogram(
149            "incoming_bundles_per_block",
150            "Number of incoming bundles per block",
151            exponential_bucket_interval(1.0, 10000.0),
152        )
153    });
154
155    pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
156        register_histogram(
157            "transactions_per_block",
158            "Number of transactions per block",
159            exponential_bucket_interval(1.0, 10000.0),
160        )
161    });
162
163    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
164        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
165    });
166
167    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
168        register_int_counter_vec(
169            "certificates_signed",
170            "Number of confirmed block certificates signed by each validator",
171            &["validator_name"],
172        )
173    });
174
175    pub static PREVIOUS_EVENT_BLOCKS_STREAM_COUNT: LazyLock<Histogram> = LazyLock::new(|| {
176        register_histogram(
177            "previous_event_blocks_stream_count",
178            "Number of event streams requested per PreviousEventBlocks query",
179            exponential_bucket_interval(1.0, 10000.0),
180        )
181    });
182
183    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
184        register_int_counter(
185            "chain_info_queries",
186            "Number of chain info queries processed",
187        )
188    });
189
190    /// Holds metrics data extracted from a confirmed block certificate.
191    pub struct MetricsData {
192        certificate_log_str: &'static str,
193        round_type: &'static str,
194        round_number: u32,
195        confirmed_transactions: u64,
196        confirmed_incoming_bundles: u64,
197        confirmed_rejected_bundles: u64,
198        confirmed_incoming_messages: u64,
199        confirmed_operations: u64,
200        validators_with_signatures: Vec<String>,
201    }
202
203    impl MetricsData {
204        /// Creates a new `MetricsData` by extracting data from the certificate.
205        pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
206            Self {
207                certificate_log_str: certificate.inner().to_log_str(),
208                round_type: certificate.round.type_name(),
209                round_number: certificate.round.number(),
210                confirmed_transactions: certificate.block().body.transactions.len() as u64,
211                confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
212                    as u64,
213                confirmed_rejected_bundles: certificate
214                    .block()
215                    .body
216                    .incoming_bundles()
217                    .filter(|b| b.action == MessageAction::Reject)
218                    .count() as u64,
219                confirmed_incoming_messages: certificate
220                    .block()
221                    .body
222                    .incoming_bundles()
223                    .map(|b| b.messages().count())
224                    .sum::<usize>() as u64,
225                confirmed_operations: certificate.block().body.operations().count() as u64,
226                validators_with_signatures: certificate
227                    .signatures()
228                    .iter()
229                    .map(|(validator_name, _)| validator_name.to_string())
230                    .collect(),
231            }
232        }
233
234        /// Records the metrics for a processed block.
235        pub fn record(self) {
236            NUM_BLOCKS.with_label_values(&[]).inc();
237            NUM_ROUNDS_IN_CERTIFICATE
238                .with_label_values(&[self.certificate_log_str, self.round_type])
239                .observe(self.round_number as f64);
240            TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
241            INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
242            OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
243            if self.confirmed_transactions > 0 {
244                TRANSACTION_COUNT
245                    .with_label_values(&[])
246                    .inc_by(self.confirmed_transactions);
247                if self.confirmed_incoming_bundles > 0 {
248                    INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
249                }
250                if self.confirmed_rejected_bundles > 0 {
251                    REJECTED_BUNDLE_COUNT.inc_by(self.confirmed_rejected_bundles);
252                }
253                if self.confirmed_incoming_messages > 0 {
254                    INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
255                }
256                if self.confirmed_operations > 0 {
257                    OPERATION_COUNT.inc_by(self.confirmed_operations);
258                }
259            }
260
261            for validator_name in self.validators_with_signatures {
262                CERTIFICATES_SIGNED
263                    .with_label_values(&[&validator_name])
264                    .inc();
265            }
266        }
267    }
268}
269
270/// Instruct the networking layer to send cross-chain requests and/or push notifications.
271#[derive(Default, Debug)]
272pub struct NetworkActions {
273    /// The cross-chain requests
274    pub cross_chain_requests: Vec<CrossChainRequest>,
275    /// The push notifications.
276    pub notifications: Vec<Notification>,
277}
278
279impl NetworkActions {
280    pub fn extend(&mut self, other: NetworkActions) {
281        self.cross_chain_requests.extend(other.cross_chain_requests);
282        self.notifications.extend(other.notifications);
283    }
284}
285
286#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
287/// Notification that a chain has a new certified block or a new message.
288pub struct Notification {
289    pub chain_id: ChainId,
290    pub reason: Reason,
291}
292
293doc_scalar!(
294    Notification,
295    "Notify that a chain has a new certified block or a new message"
296);
297
298#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
299/// Reason for the notification.
300pub enum Reason {
301    NewBlock {
302        height: BlockHeight,
303        hash: CryptoHash,
304        event_streams: BTreeSet<StreamId>,
305    },
306    NewIncomingBundle {
307        origin: ChainId,
308        height: BlockHeight,
309    },
310    NewRound {
311        height: BlockHeight,
312        round: Round,
313    },
314    BlockExecuted {
315        height: BlockHeight,
316        hash: CryptoHash,
317    },
318    // NOTE: Keep this at the end for backward compatibility with old validators
319    // that use bincode integer-based variant indices. Old validators don't emit
320    // NewEvents, and inserting it before existing variants would shift their indices.
321    NewEvents {
322        height: BlockHeight,
323        hash: CryptoHash,
324        event_streams: BTreeSet<StreamId>,
325    },
326}
327
328/// Error type for worker operations.
329#[derive(Debug, Error, strum::IntoStaticStr)]
330pub enum WorkerError {
331    #[error(transparent)]
332    CryptoError(#[from] CryptoError),
333
334    #[error(transparent)]
335    ArithmeticError(#[from] ArithmeticError),
336
337    #[error(transparent)]
338    ViewError(#[from] ViewError),
339
340    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
341    ReadCertificatesError(Vec<CryptoHash>),
342
343    #[error(transparent)]
344    ChainError(#[from] Box<ChainError>),
345
346    #[error(transparent)]
347    BcsError(#[from] bcs::Error),
348
349    // Chain access control
350    #[error("Block was not signed by an authorized owner")]
351    InvalidOwner,
352
353    #[error("Operations in the block are not authenticated by the proper owner: {0}")]
354    InvalidSigner(AccountOwner),
355
356    // Chaining
357    #[error(
358        "Chain is expecting a next block at height {expected_block_height} but the given block \
359        is at height {found_block_height} instead"
360    )]
361    UnexpectedBlockHeight {
362        expected_block_height: BlockHeight,
363        found_block_height: BlockHeight,
364    },
365    #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
366    InvalidEpoch {
367        chain_id: ChainId,
368        chain_epoch: Epoch,
369        epoch: Epoch,
370    },
371
372    #[error("Events not found: {0:?}")]
373    EventsNotFound(Vec<EventId>),
374
375    // Other server-side errors
376    #[error("Invalid cross-chain request")]
377    InvalidCrossChainRequest,
378    #[error("The block does not contain the hash that we expected for the previous block")]
379    InvalidBlockChaining,
380    #[error(
381        "Block timestamp ({block_timestamp}) is further in the future from local time \
382        ({local_time}) than block time grace period ({block_time_grace_period:?}) \
383        [us:{block_timestamp_us}:{local_time_us}]",
384        block_timestamp_us = block_timestamp.micros(),
385        local_time_us = local_time.micros(),
386    )]
387    InvalidTimestamp {
388        block_timestamp: Timestamp,
389        local_time: Timestamp,
390        block_time_grace_period: Duration,
391    },
392    #[error("We don't have the value for the certificate.")]
393    MissingCertificateValue,
394    #[error("Block at height {height} on chain {chain_id} not found in local storage")]
395    LocalBlockNotFound {
396        height: BlockHeight,
397        chain_id: ChainId,
398    },
399    #[error("The hash certificate doesn't match its value.")]
400    InvalidLiteCertificate,
401    #[error("Fast blocks cannot query oracles")]
402    FastBlockUsingOracles,
403    #[error("Blobs not found: {0:?}")]
404    BlobsNotFound(Vec<BlobId>),
405    #[error(
406        "confirmed_log/preprocessed_blocks entry at height {height} for chain {chain_id} not found"
407    )]
408    ConfirmedBlockHashNotFound {
409        height: BlockHeight,
410        chain_id: ChainId,
411    },
412    #[error("The block proposal is invalid: {0}")]
413    InvalidBlockProposal(String),
414    #[error("Blob was not required by any pending block")]
415    UnexpectedBlob,
416    #[error("Number of published blobs per block must not exceed {0}")]
417    TooManyPublishedBlobs(u64),
418    #[error("Missing network description")]
419    MissingNetworkDescription,
420    #[error("thread error: {0}")]
421    Thread(#[from] web_thread_pool::Error),
422
423    #[error("Fallback mode is not available on this network")]
424    NoFallbackMode,
425    #[error("Chain worker was poisoned by a journal resolution failure")]
426    PoisonedWorker,
427}
428
429impl WorkerError {
430    /// Returns whether this error is caused by an issue in the local node.
431    ///
432    /// Returns `false` whenever the error could be caused by a bad message from a peer.
433    pub fn is_local(&self) -> bool {
434        match self {
435            WorkerError::CryptoError(_)
436            | WorkerError::ArithmeticError(_)
437            | WorkerError::InvalidOwner
438            | WorkerError::InvalidSigner(_)
439            | WorkerError::UnexpectedBlockHeight { .. }
440            | WorkerError::InvalidEpoch { .. }
441            | WorkerError::EventsNotFound(_)
442            | WorkerError::InvalidBlockChaining
443            | WorkerError::InvalidTimestamp { .. }
444            | WorkerError::MissingCertificateValue
445            | WorkerError::InvalidLiteCertificate
446            | WorkerError::FastBlockUsingOracles
447            | WorkerError::BlobsNotFound(_)
448            | WorkerError::InvalidBlockProposal(_)
449            | WorkerError::UnexpectedBlob
450            | WorkerError::TooManyPublishedBlobs(_)
451            | WorkerError::NoFallbackMode
452            | WorkerError::ViewError(ViewError::NotFound(_)) => false,
453            WorkerError::BcsError(_)
454            | WorkerError::InvalidCrossChainRequest
455            | WorkerError::ViewError(_)
456            | WorkerError::ConfirmedBlockHashNotFound { .. }
457            | WorkerError::LocalBlockNotFound { .. }
458            | WorkerError::MissingNetworkDescription
459            | WorkerError::Thread(_)
460            | WorkerError::ReadCertificatesError(_)
461            | WorkerError::PoisonedWorker => true,
462            WorkerError::ChainError(chain_error) => chain_error.is_local(),
463        }
464    }
465
466    /// Returns the qualified error variant name for the `error_type` metric label,
467    /// e.g. `"WorkerError::UnexpectedBlockHeight"`.
468    ///
469    /// For `ChainError` variants, delegates to `ChainError::error_type()` to
470    /// surface the underlying error name rather than just `"ChainError"`.
471    pub fn error_type(&self) -> String {
472        match self {
473            WorkerError::ChainError(chain_error) => chain_error.error_type(),
474            other => {
475                let variant: &'static str = other.into();
476                format!("WorkerError::{variant}")
477            }
478        }
479    }
480
481    /// Returns `true` if this error indicates that the chain worker's in-memory
482    /// state may be inconsistent and must be evicted from the cache.
483    fn must_reload_view(&self) -> bool {
484        matches!(
485            self,
486            WorkerError::PoisonedWorker
487                | WorkerError::ViewError(ViewError::StoreError {
488                    must_reload_view: true,
489                    ..
490                })
491        )
492    }
493
494    /// Returns `true` if this error indicates that the chain's persisted state is
495    /// internally inconsistent, so the worker should consider resetting and
496    /// re-executing it from storage.
497    fn indicates_corrupted_chain_state(&self) -> bool {
498        matches!(
499            self,
500            WorkerError::ChainError(chain_error)
501                if matches!(chain_error.as_ref(), ChainError::CorruptedChainState(_))
502        )
503    }
504}
505
506impl From<ChainError> for WorkerError {
507    #[instrument(level = "trace", skip(chain_error))]
508    fn from(chain_error: ChainError) -> Self {
509        match chain_error {
510            ChainError::ExecutionError(execution_error, context) => match *execution_error {
511                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
512                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
513                _ => Self::ChainError(Box::new(ChainError::ExecutionError(
514                    execution_error,
515                    context,
516                ))),
517            },
518            error => Self::ChainError(Box::new(error)),
519        }
520    }
521}
522
523#[cfg(with_testing)]
524impl WorkerError {
525    /// Returns the inner [`ExecutionError`] in this error.
526    ///
527    /// # Panics
528    ///
529    /// If this is not caused by an [`ExecutionError`].
530    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
531        let WorkerError::ChainError(chain_error) = self else {
532            panic!("Expected an `ExecutionError`. Got: {self:#?}");
533        };
534
535        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
536            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
537        };
538
539        assert_eq!(context, expected_context);
540
541        *execution_error
542    }
543}
544
545type ChainWorkerArc<S> = Arc<tokio::sync::RwLock<ChainWorkerState<S>>>;
546type ChainWorkerWeak<S> = std::sync::Weak<tokio::sync::RwLock<ChainWorkerState<S>>>;
547type ChainWorkerFuture<S> = Shared<oneshot::Receiver<ChainWorkerWeak<S>>>;
548
549/// Each map entry is a `Shared<oneshot::Receiver<Weak<...>>>`:
550///
551/// - `peek()` returns `None` while a task is loading the worker from storage.
552/// - `peek()` returns `Some(Ok(weak))` once the worker is loaded.
553/// - `peek()` returns `Some(Err(_))` if loading failed (sender dropped).
554///
555/// Callers that find a pending entry clone the `Shared` future and await it.
556type ChainWorkerMap<S> = Arc<papaya::HashMap<ChainId, ChainWorkerFuture<S>>>;
557
558/// Starts a background task that periodically removes dead weak references
559/// from the chain handle map. The actual lifetime management is handled by
560/// each handle's keep-alive task.
561fn start_sweep<S: Storage + Clone + 'static>(
562    chain_workers: &ChainWorkerMap<S>,
563    config: &ChainWorkerConfig,
564) {
565    // Sweep at the smaller of the two TTLs. If both are None, workers
566    // live forever so there's nothing to sweep.
567    let interval = match (config.ttl, config.sender_chain_ttl) {
568        (None, None) => return,
569        (Some(d), None) | (None, Some(d)) => d,
570        (Some(a), Some(b)) => a.min(b),
571    };
572    let weak_map = Arc::downgrade(chain_workers);
573    linera_base::Task::spawn(async move {
574        loop {
575            linera_base::time::timer::sleep(interval).await;
576            let Some(map) = weak_map.upgrade() else {
577                break;
578            };
579            map.pin_owned().retain(|_, shared| match shared.peek() {
580                Some(Ok(weak)) => weak.strong_count() > 0,
581                Some(Err(_)) => false, // Loading failed; clean up.
582                None => true,          // Still loading; keep.
583            });
584        }
585    })
586    .forget();
587}
588
589/// State of a worker in a validator or a local node.
590pub struct WorkerState<StorageClient: Storage> {
591    /// Access to local persistent storage.
592    storage: StorageClient,
593    /// Configuration options for chain workers.
594    pub(crate) chain_worker_config: ChainWorkerConfig,
595    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
596    execution_state_cache:
597        Option<Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>>,
598    /// Chains tracked by a worker, along with their listening modes.
599    chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
600    /// One-shot channels to notify callers when messages of a particular chain have been
601    /// delivered.
602    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
603    /// The cache of loaded chain workers. Stores weak references; each worker
604    /// manages its own lifetime via a keep-alive task. A background sweep
605    /// periodically removes dead entries.
606    chain_workers: ChainWorkerMap<StorageClient>,
607    /// Shard-routing dispatcher for outbound cross-chain requests. Used when we need
608    /// to send cross-chain requests outside of the normal `NetworkActions` return
609    /// path — in particular, the `RevertConfirm`s emitted after resetting a
610    /// corrupted chain. The RPC server layer installs this; without it, we fall
611    /// back to dispatching locally through `handle_cross_chain_request`.
612    outbound_cross_chain_sender: Option<OutboundCrossChainSender>,
613}
614
615/// Dispatcher for outbound cross-chain requests that handles the source-shard-to-
616/// target-shard routing that the worker itself is not aware of.
617pub type OutboundCrossChainSender = Arc<dyn Fn(CrossChainRequest) + Send + Sync>;
618
619impl<StorageClient> Clone for WorkerState<StorageClient>
620where
621    StorageClient: Storage + Clone,
622{
623    fn clone(&self) -> Self {
624        WorkerState {
625            storage: self.storage.clone(),
626            chain_worker_config: self.chain_worker_config.clone(),
627            block_cache: self.block_cache.clone(),
628            execution_state_cache: self.execution_state_cache.clone(),
629            chain_modes: self.chain_modes.clone(),
630            delivery_notifiers: self.delivery_notifiers.clone(),
631            chain_workers: self.chain_workers.clone(),
632            outbound_cross_chain_sender: self.outbound_cross_chain_sender.clone(),
633        }
634    }
635}
636
637pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
638
639impl<StorageClient> WorkerState<StorageClient>
640where
641    StorageClient: Storage,
642{
643    /// Sets the cross-chain message chunk limit.
644    pub fn set_cross_chain_message_chunk_limit(&mut self, limit: usize) {
645        self.chain_worker_config.cross_chain_message_chunk_limit = limit;
646    }
647
648    #[instrument(level = "trace", skip(self))]
649    pub fn nickname(&self) -> &str {
650        &self.chain_worker_config.nickname
651    }
652
653    /// Sets the priority bundle origins.
654    pub fn with_priority_bundle_origins(
655        mut self,
656        origins: std::collections::HashSet<ChainId>,
657    ) -> Self {
658        self.chain_worker_config.priority_bundle_origins = origins;
659        self
660    }
661
662    /// Returns the storage client so that it can be manipulated or queried.
663    #[instrument(level = "trace", skip(self))]
664    #[cfg(not(feature = "test"))]
665    pub(crate) fn storage_client(&self) -> &StorageClient {
666        &self.storage
667    }
668
669    /// Returns the storage client so that it can be manipulated or queried by tests in other
670    /// crates.
671    #[instrument(level = "trace", skip(self))]
672    #[cfg(feature = "test")]
673    pub fn storage_client(&self) -> &StorageClient {
674        &self.storage
675    }
676
677    #[instrument(level = "trace", skip(self, certificate))]
678    pub(crate) async fn full_certificate(
679        &self,
680        certificate: LiteCertificate<'_>,
681    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
682        let block = self
683            .block_cache
684            .get(&certificate.value.value_hash)
685            .ok_or(WorkerError::MissingCertificateValue)?;
686        let block = Arc::unwrap_or_clone(block);
687
688        match certificate.value.kind {
689            linera_chain::types::CertificateKind::Confirmed => {
690                let value = ConfirmedBlock::from_hashed(block);
691                Ok(Either::Left(
692                    certificate
693                        .with_value(value)
694                        .ok_or(WorkerError::InvalidLiteCertificate)?,
695                ))
696            }
697            linera_chain::types::CertificateKind::Validated => {
698                let value = ValidatedBlock::from_hashed(block);
699                Ok(Either::Right(
700                    certificate
701                        .with_value(value)
702                        .ok_or(WorkerError::InvalidLiteCertificate)?,
703                ))
704            }
705            _ => Err(WorkerError::InvalidLiteCertificate),
706        }
707    }
708}
709
710#[allow(async_fn_in_trait)]
711#[cfg_attr(not(web), trait_variant::make(Send))]
712pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
713    async fn process_certificate<S: Storage + Clone + 'static>(
714        worker: &WorkerState<S>,
715        certificate: GenericCertificate<Self>,
716    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
717}
718
719impl ProcessableCertificate for ConfirmedBlock {
720    async fn process_certificate<S: Storage + Clone + 'static>(
721        worker: &WorkerState<S>,
722        certificate: ConfirmedBlockCertificate,
723    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
724        Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
725    }
726}
727
728impl ProcessableCertificate for ValidatedBlock {
729    async fn process_certificate<S: Storage + Clone + 'static>(
730        worker: &WorkerState<S>,
731        certificate: ValidatedBlockCertificate,
732    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
733        Box::pin(worker.handle_validated_certificate(certificate)).await
734    }
735}
736
737impl ProcessableCertificate for Timeout {
738    async fn process_certificate<S: Storage + Clone + 'static>(
739        worker: &WorkerState<S>,
740        certificate: TimeoutCertificate,
741    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
742        worker.handle_timeout_certificate(certificate).await
743    }
744}
745
746impl<StorageClient> WorkerState<StorageClient>
747where
748    StorageClient: Storage + Clone + 'static,
749{
750    /// Creates a new `WorkerState`.
751    ///
752    /// The `chain_worker_config` must be fully configured before calling this, because the
753    /// TTL sweep task is started immediately based on the config's TTL values.
754    #[instrument(level = "trace", skip(storage, chain_worker_config))]
755    pub fn new(
756        storage: StorageClient,
757        chain_worker_config: ChainWorkerConfig,
758        chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
759    ) -> Self {
760        let chain_workers = Arc::new(papaya::HashMap::new());
761        start_sweep(&chain_workers, &chain_worker_config);
762        let block_cache_size = chain_worker_config.block_cache_size;
763        let execution_state_cache_size = chain_worker_config.execution_state_cache_size;
764        WorkerState {
765            storage,
766            chain_worker_config,
767            block_cache: Arc::new(ValueCache::new(
768                block_cache_size,
769                DEFAULT_CLEANUP_INTERVAL_SECS,
770            )),
771            execution_state_cache: (execution_state_cache_size > 0)
772                .then(|| Arc::new(UniqueValueCache::new(execution_state_cache_size))),
773            chain_modes,
774            delivery_notifiers: Arc::default(),
775            chain_workers,
776            outbound_cross_chain_sender: None,
777        }
778    }
779
780    /// Installs a shard-routing dispatcher used for outbound cross-chain requests
781    /// generated outside the normal response path (specifically, after resetting a
782    /// corrupted chain). Without it, such requests are dispatched locally in a
783    /// loop via `handle_cross_chain_request`.
784    pub fn with_outbound_cross_chain_sender(mut self, sender: OutboundCrossChainSender) -> Self {
785        self.outbound_cross_chain_sender = Some(sender);
786        self
787    }
788
789    #[instrument(level = "trace", skip(self, certificate, notifier))]
790    #[inline]
791    pub async fn fully_handle_certificate_with_notifications<T>(
792        &self,
793        certificate: GenericCertificate<T>,
794        notifier: &impl Notifier,
795    ) -> Result<ChainInfoResponse, WorkerError>
796    where
797        T: ProcessableCertificate,
798    {
799        let notifications = (*notifier).clone();
800        let this = self.clone();
801        linera_base::Task::spawn(async move {
802            let (response, actions) =
803                ProcessableCertificate::process_certificate(&this, certificate).await?;
804            notifications.notify(&actions.notifications);
805            let mut requests = VecDeque::from(actions.cross_chain_requests);
806            while let Some(request) = requests.pop_front() {
807                let actions = this.handle_cross_chain_request(request).await?;
808                requests.extend(actions.cross_chain_requests);
809                notifications.notify(&actions.notifications);
810            }
811            Ok(response)
812        })
813        .await
814    }
815
816    /// Acquires a read lock on the chain worker and executes the given closure.
817    ///
818    /// The future is boxed to keep deeply nested types off the stack. On non-web
819    /// targets it is also wrapped in `SyncFuture` to satisfy `Sync` bounds.
820    async fn chain_read<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
821    where
822        F: FnOnce(OwnedRwLockReadGuard<ChainWorkerState<StorageClient>>) -> Fut,
823        Fut: std::future::Future<Output = Result<R, WorkerError>>,
824    {
825        let state = self.get_or_create_chain_worker(chain_id).await?;
826        let state_ref = &state;
827        let result = Box::pin(wrap_future(async move {
828            let guard = handle::read_lock(state_ref).await;
829            guard.check_not_poisoned()?;
830            f(guard).await
831        }))
832        .await;
833        if let Err(error) = &result {
834            if error.must_reload_view() {
835                self.evict_poisoned_worker(chain_id, &state);
836            }
837        }
838        result
839    }
840
841    /// Acquires a write lock on the chain worker and executes the given closure.
842    ///
843    /// The write work runs on a detached task (via [`linera_base::task::run_detached`])
844    /// so that caller cancellation does not unwind the task mid-save. The
845    /// [`RollbackGuard`] lives inside the detached task, so the write lock is held
846    /// until the DB round-trip and `post_save` have fully completed — subsequent
847    /// readers, including a freshly-loaded replacement worker, only see the
848    /// committed state.
849    async fn chain_write<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
850    where
851        F: FnOnce(handle::RollbackGuard<StorageClient>) -> Fut
852            + linera_base::task::MaybeSend
853            + 'static,
854        Fut: std::future::Future<Output = Result<R, WorkerError>> + linera_base::task::MaybeSend,
855        R: linera_base::task::MaybeSend + 'static,
856    {
857        let state = self.get_or_create_chain_worker(chain_id).await?;
858        let state_for_task = state.clone();
859        let result = Box::pin(wrap_future(linera_base::task::run_detached(async move {
860            let guard = handle::write_lock(&state_for_task).await;
861            guard.check_not_poisoned()?;
862            f(guard).await
863        })))
864        .await;
865        if let Err(error) = &result {
866            if error.must_reload_view() {
867                self.evict_poisoned_worker(chain_id, &state);
868            } else if error.indicates_corrupted_chain_state() {
869                self.spawn_reset_corrupted_chain_state(chain_id, state);
870            }
871        }
872        result
873    }
874
875    /// Spawns a detached task that re-acquires the write lock and recovers the
876    /// chain from a detected state corruption. Running the recovery in a separate
877    /// task ensures it survives cancellation of the originating request: if the
878    /// caller's future is dropped mid-way through re-execution, the chain would
879    /// otherwise be left at a partial tip and our safety snapshot would be lost.
880    /// Generated `RevertConfirm` requests are dispatched via the installed
881    /// shard-routing sender when present (sharded validators), or locally
882    /// through `handle_cross_chain_request` otherwise (client nodes and tests).
883    /// Errors are logged; the caller already has the original error to
884    /// propagate.
885    fn spawn_reset_corrupted_chain_state(
886        &self,
887        chain_id: ChainId,
888        state: ChainWorkerArc<StorageClient>,
889    ) where
890        StorageClient: Clone,
891    {
892        let this = self.clone();
893        linera_base::Task::spawn(async move {
894            let requests = {
895                let mut guard = handle::write_lock(&state).await;
896                match guard.maybe_reset_corrupted_chain_state().await {
897                    Ok(Some(requests)) => requests,
898                    Ok(None) => return,
899                    Err(error) => {
900                        tracing::error!(
901                            %chain_id, %error, "Failed to reset corrupted chain state"
902                        );
903                        return;
904                    }
905                }
906            };
907            if let Some(sender) = &this.outbound_cross_chain_sender {
908                // Sharded validator path: let the RPC layer route each request to
909                // the shard that owns the target chain.
910                for request in requests {
911                    sender(request);
912                }
913            } else {
914                // No routing dispatcher is installed (client node or test), so all
915                // involved chains are co-located on this worker. Dispatch locally
916                // in a loop, following any cascading cross-chain requests.
917                let mut queue = VecDeque::from(requests);
918                while let Some(request) = queue.pop_front() {
919                    match this.handle_cross_chain_request(request).await {
920                        Ok(actions) => queue.extend(actions.cross_chain_requests),
921                        Err(error) => {
922                            warn!(
923                                %chain_id, %error,
924                                "Failed to dispatch cross-chain request after \
925                                resetting corrupted chain state"
926                            );
927                        }
928                    }
929                }
930            }
931        })
932        .forget();
933    }
934
935    /// Evicts a poisoned chain worker from the cache, but only if the entry still
936    /// points to the same instance. This avoids removing a fresh replacement that
937    /// another task may have already loaded.
938    fn evict_poisoned_worker(&self, chain_id: ChainId, poisoned: &ChainWorkerArc<StorageClient>) {
939        tracing::warn!(%chain_id, "Evicting poisoned chain worker from cache");
940        let pin = self.chain_workers.pin();
941        let weak_poisoned = Arc::downgrade(poisoned);
942        let _ = pin.remove_if(&chain_id, |_key, future| {
943            future
944                .peek()
945                .and_then(|r| r.clone().ok())
946                .is_some_and(|weak| weak.ptr_eq(&weak_poisoned))
947        });
948    }
949
950    /// Gets or creates a chain worker for the given chain.
951    ///
952    /// The oneshot channel is created outside the `compute` closure to keep
953    /// the closure pure (papaya may call it more than once on CAS retry and
954    /// may memoize the output). If the fast path hits, the unused channel is
955    /// dropped harmlessly.
956    ///
957    /// Returns a type-erased future to keep `!Sync` intermediate types (e.g.
958    /// `std::sync::mpsc::Receiver` from `handle::ServiceRuntimeActor::spawn`) out of
959    /// the caller's future type.
960    fn get_or_create_chain_worker(
961        &self,
962        chain_id: ChainId,
963    ) -> std::pin::Pin<
964        Box<
965            impl std::future::Future<Output = Result<ChainWorkerArc<StorageClient>, WorkerError>> + '_,
966        >,
967    > {
968        Box::pin(wrap_future(async move {
969            loop {
970                // Create the channel outside the closure so that the tx/rx
971                // always match regardless of CAS retries.
972                let (tx, rx) = oneshot::channel();
973                let shared_rx = rx.shared();
974
975                // The papaya guard is !Send, so it must be dropped before
976                // any .await point.
977                let wait_or_tx = {
978                    let pin = self.chain_workers.pin();
979                    match pin.compute(chain_id, |existing| match existing {
980                        Some((_, entry)) => match entry.peek() {
981                            Some(Ok(weak)) => match weak.upgrade() {
982                                Some(arc) => papaya::Operation::Abort(Ok(arc)),
983                                None => papaya::Operation::Insert(shared_rx.clone()),
984                            },
985                            Some(Err(_)) => papaya::Operation::Insert(shared_rx.clone()),
986                            None => papaya::Operation::Abort(Err(entry.clone())),
987                        },
988                        None => papaya::Operation::Insert(shared_rx.clone()),
989                    }) {
990                        papaya::Compute::Aborted(Ok(arc), ..) => return Ok(arc),
991                        papaya::Compute::Aborted(Err(wait), ..) => Either::Left(wait),
992                        papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => {
993                            Either::Right(tx)
994                        }
995                        papaya::Compute::Removed { .. } => unreachable!(),
996                    }
997                };
998
999                match wait_or_tx {
1000                    Either::Left(wait) => {
1001                        // Another task is loading. Await the shared future.
1002                        if let Ok(weak) = wait.await {
1003                            if let Some(arc) = weak.upgrade() {
1004                                return Ok(arc);
1005                            }
1006                        }
1007                        // Loading failed or worker already dead; retry.
1008                    }
1009                    Either::Right(tx) => {
1010                        // We claimed the loading slot. Load from storage.
1011                        // On success, send the Weak through the channel.
1012                        // On error, dropping tx wakes waiters so they can retry.
1013                        let worker = self.load_chain_worker(chain_id).await?;
1014                        if tx.send(Arc::downgrade(&worker)).is_err() {
1015                            tracing::error!(%chain_id, "Receiver dropped while loading worker state.");
1016                            continue;
1017                        }
1018                        return Ok(worker);
1019                    }
1020                }
1021            }
1022        }))
1023    }
1024
1025    /// Loads a chain worker state from storage and wraps it in an Arc.
1026    async fn load_chain_worker(
1027        &self,
1028        chain_id: ChainId,
1029    ) -> Result<ChainWorkerArc<StorageClient>, WorkerError> {
1030        let delivery_notifier = self
1031            .delivery_notifiers
1032            .lock()
1033            .unwrap()
1034            .entry(chain_id)
1035            .or_default()
1036            .clone();
1037
1038        let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
1039            chain_modes
1040                .read()
1041                .unwrap()
1042                .get(&chain_id)
1043                .is_some_and(ListeningMode::is_full)
1044        });
1045
1046        let (service_runtime_endpoint, service_runtime_task) =
1047            if self.chain_worker_config.long_lived_services {
1048                let actor =
1049                    handle::ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await;
1050                (Some(actor.endpoint), Some(actor.task))
1051            } else {
1052                (None, None)
1053            };
1054
1055        let state = crate::chain_worker::state::ChainWorkerState::load(
1056            self.chain_worker_config.clone(),
1057            self.storage.clone(),
1058            self.block_cache.clone(),
1059            self.execution_state_cache.clone(),
1060            self.chain_modes.clone(),
1061            delivery_notifier,
1062            chain_id,
1063            service_runtime_endpoint,
1064            service_runtime_task,
1065        )
1066        .await?;
1067
1068        Ok(handle::create_chain_worker(
1069            state,
1070            is_tracked,
1071            &self.chain_worker_config,
1072        ))
1073    }
1074
1075    /// Tries to execute a block proposal without any verification other than block execution.
1076    #[instrument(level = "trace", skip(self, block))]
1077    pub async fn stage_block_execution(
1078        &self,
1079        block: ProposedBlock,
1080        round: Option<u32>,
1081        published_blobs: Vec<Blob>,
1082        policy: BundleExecutionPolicy,
1083    ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> {
1084        let chain_id = block.chain_id;
1085        self.chain_write(chain_id, move |mut guard| async move {
1086            guard
1087                .stage_block_execution(block, round, &published_blobs, policy)
1088                .await
1089        })
1090        .await
1091    }
1092
1093    /// Executes a [`Query`] for an application's state on a specific chain.
1094    ///
1095    /// If `block_hash` is specified, system will query the application's state
1096    /// at that block. If it doesn't exist, it uses latest state.
1097    #[instrument(level = "trace", skip(self, chain_id, query))]
1098    pub async fn query_application(
1099        &self,
1100        chain_id: ChainId,
1101        query: Query,
1102        block_hash: Option<CryptoHash>,
1103    ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
1104        self.chain_write(chain_id, move |mut guard| async move {
1105            guard.query_application(query, block_hash).await
1106        })
1107        .await
1108    }
1109
1110    #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
1111        nickname = %self.nickname(),
1112        chain_id = %chain_id,
1113        application_id = %application_id
1114    ))]
1115    pub async fn describe_application(
1116        &self,
1117        chain_id: ChainId,
1118        application_id: ApplicationId,
1119    ) -> Result<ApplicationDescription, WorkerError> {
1120        let state = self.get_or_create_chain_worker(chain_id).await?;
1121        let guard = handle::read_lock_initialized(&state).await?;
1122        guard.describe_application_readonly(application_id).await
1123    }
1124
1125    /// Processes a confirmed block (aka a commit).
1126    #[instrument(
1127        level = "trace",
1128        skip(self, certificate, notify_when_messages_are_delivered),
1129        fields(
1130            nickname = %self.nickname(),
1131            chain_id = %certificate.block().header.chain_id,
1132            block_height = %certificate.block().header.height
1133        )
1134    )]
1135    async fn process_confirmed_block(
1136        &self,
1137        certificate: ConfirmedBlockCertificate,
1138        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1139    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1140        let chain_id = certificate.block().header.chain_id;
1141        self.chain_write(chain_id, move |mut guard| async move {
1142            guard
1143                .process_confirmed_block(certificate, notify_when_messages_are_delivered)
1144                .await
1145        })
1146        .await
1147    }
1148
1149    /// Processes a validated block issued from a multi-owner chain.
1150    #[instrument(level = "trace", skip(self, certificate), fields(
1151        nickname = %self.nickname(),
1152        chain_id = %certificate.block().header.chain_id,
1153        block_height = %certificate.block().header.height
1154    ))]
1155    async fn process_validated_block(
1156        &self,
1157        certificate: ValidatedBlockCertificate,
1158    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1159        let chain_id = certificate.block().header.chain_id;
1160        self.chain_write(chain_id, move |mut guard| async move {
1161            guard.process_validated_block(certificate).await
1162        })
1163        .await
1164    }
1165
1166    /// Processes a leader timeout issued from a multi-owner chain.
1167    #[instrument(level = "trace", skip(self, certificate), fields(
1168        nickname = %self.nickname(),
1169        chain_id = %certificate.value().chain_id(),
1170        height = %certificate.value().height()
1171    ))]
1172    async fn process_timeout(
1173        &self,
1174        certificate: TimeoutCertificate,
1175    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1176        let chain_id = certificate.value().chain_id();
1177        self.chain_write(chain_id, move |mut guard| async move {
1178            guard.process_timeout(certificate).await
1179        })
1180        .await
1181    }
1182
1183    #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
1184        nickname = %self.nickname(),
1185        origin = %origin,
1186        recipient = %recipient,
1187        num_bundles = %bundles.len()
1188    ))]
1189    async fn process_cross_chain_update(
1190        &self,
1191        origin: ChainId,
1192        recipient: ChainId,
1193        bundles: Vec<(Epoch, MessageBundle)>,
1194        previous_height: Option<BlockHeight>,
1195    ) -> Result<CrossChainUpdateResult, WorkerError> {
1196        self.chain_write(recipient, move |mut guard| async move {
1197            guard
1198                .process_cross_chain_update(origin, bundles, previous_height)
1199                .await
1200        })
1201        .await
1202    }
1203
1204    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
1205    #[instrument(level = "trace", skip(self, chain_id, height), fields(
1206        nickname = %self.nickname(),
1207        chain_id = %chain_id,
1208        height = %height
1209    ))]
1210    #[cfg(with_testing)]
1211    pub async fn read_certificate(
1212        &self,
1213        chain_id: ChainId,
1214        height: BlockHeight,
1215    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
1216        let state = self.get_or_create_chain_worker(chain_id).await?;
1217        let guard = handle::read_lock_initialized(&state).await?;
1218        guard.read_certificate(height).await
1219    }
1220
1221    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
1222    /// [`ChainId`].
1223    ///
1224    /// The returned guard holds a read lock on the chain state, preventing writes for
1225    /// its lifetime. Multiple concurrent readers are allowed.
1226    #[instrument(level = "trace", skip(self), fields(
1227        nickname = %self.nickname(),
1228        chain_id = %chain_id
1229    ))]
1230    pub async fn chain_state_view(
1231        &self,
1232        chain_id: ChainId,
1233    ) -> Result<ChainStateViewReadGuard<StorageClient>, WorkerError> {
1234        let state = self.get_or_create_chain_worker(chain_id).await?;
1235        let guard = handle::read_lock(&state).await;
1236        Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map(
1237            guard,
1238            |s| s.chain(),
1239        )))
1240    }
1241
1242    #[instrument(skip_all, fields(
1243        nick = self.nickname(),
1244        chain_id = format!("{:.8}", proposal.content.block.chain_id),
1245        height = %proposal.content.block.height,
1246    ))]
1247    pub async fn handle_block_proposal(
1248        &self,
1249        proposal: BlockProposal,
1250    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1251        trace!("{} <-- {:?}", self.nickname(), proposal);
1252        #[cfg(with_metrics)]
1253        let round = proposal.content.round;
1254
1255        let chain_id = proposal.content.block.chain_id;
1256        // Delay if block timestamp is in the future but within grace period.
1257        let now = self.storage.clock().current_time();
1258        let block_timestamp = proposal.content.block.timestamp;
1259        let delta = block_timestamp.delta_since(now);
1260        let grace_period = TimeDelta::from_micros(
1261            u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros())
1262                .unwrap_or(u64::MAX),
1263        );
1264        if delta > TimeDelta::ZERO && delta <= grace_period {
1265            self.storage.clock().sleep_until(block_timestamp).await;
1266        }
1267
1268        let response = self
1269            .chain_write(chain_id, move |mut guard| async move {
1270                guard.handle_block_proposal(proposal).await
1271            })
1272            .await?;
1273        #[cfg(with_metrics)]
1274        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1275            .with_label_values(&[round.type_name()])
1276            .observe(round.number() as f64);
1277        Ok(response)
1278    }
1279
1280    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
1281    // Other fields will be included in the caller's span.
1282    #[instrument(skip_all, fields(
1283        chain_id = %certificate.value.chain_id,
1284        hash = %certificate.value.value_hash,
1285    ))]
1286    pub async fn handle_lite_certificate(
1287        &self,
1288        certificate: LiteCertificate<'_>,
1289        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1290    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1291        match self.full_certificate(certificate).await? {
1292            Either::Left(confirmed) => {
1293                Box::pin(
1294                    self.handle_confirmed_certificate(
1295                        confirmed,
1296                        notify_when_messages_are_delivered,
1297                    ),
1298                )
1299                .await
1300            }
1301            Either::Right(validated) => {
1302                if let Some(notifier) = notify_when_messages_are_delivered {
1303                    // Nothing to wait for.
1304                    if let Err(()) = notifier.send(()) {
1305                        warn!("Failed to notify message delivery to caller");
1306                    }
1307                }
1308                Box::pin(self.handle_validated_certificate(validated)).await
1309            }
1310        }
1311    }
1312
1313    /// Processes a confirmed block certificate.
1314    #[instrument(skip_all, fields(
1315        nick = self.nickname(),
1316        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1317        height = %certificate.block().header.height,
1318    ))]
1319    pub async fn handle_confirmed_certificate(
1320        &self,
1321        certificate: ConfirmedBlockCertificate,
1322        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1323    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1324        trace!("{} <-- {:?}", self.nickname(), certificate);
1325        #[cfg(with_metrics)]
1326        let metrics_data = metrics::MetricsData::new(&certificate);
1327
1328        #[allow(unused_variables)]
1329        let (info, actions, outcome) =
1330            Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1331                .await?;
1332
1333        #[cfg(with_metrics)]
1334        if matches!(outcome, BlockOutcome::Processed) {
1335            metrics_data.record();
1336        }
1337        Ok((info, actions))
1338    }
1339
1340    /// Processes a validated block certificate.
1341    #[instrument(skip_all, fields(
1342        nick = self.nickname(),
1343        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1344        height = %certificate.block().header.height,
1345    ))]
1346    pub async fn handle_validated_certificate(
1347        &self,
1348        certificate: ValidatedBlockCertificate,
1349    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1350        trace!("{} <-- {:?}", self.nickname(), certificate);
1351
1352        #[cfg(with_metrics)]
1353        let round = certificate.round;
1354        #[cfg(with_metrics)]
1355        let cert_str = certificate.inner().to_log_str();
1356
1357        #[allow(unused_variables)]
1358        let (info, actions, outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1359        #[cfg(with_metrics)]
1360        {
1361            if matches!(outcome, BlockOutcome::Processed) {
1362                metrics::NUM_ROUNDS_IN_CERTIFICATE
1363                    .with_label_values(&[cert_str, round.type_name()])
1364                    .observe(round.number() as f64);
1365            }
1366        }
1367        Ok((info, actions))
1368    }
1369
1370    /// Processes a timeout certificate
1371    #[instrument(skip_all, fields(
1372        nick = self.nickname(),
1373        chain_id = format!("{:.8}", certificate.inner().chain_id()),
1374        height = %certificate.inner().height(),
1375    ))]
1376    pub async fn handle_timeout_certificate(
1377        &self,
1378        certificate: TimeoutCertificate,
1379    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1380        trace!("{} <-- {:?}", self.nickname(), certificate);
1381        self.process_timeout(certificate).await
1382    }
1383
1384    #[instrument(skip_all, fields(
1385        nick = self.nickname(),
1386        chain_id = format!("{:.8}", query.chain_id)
1387    ))]
1388    pub async fn handle_chain_info_query(
1389        &self,
1390        query: ChainInfoQuery,
1391    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1392        trace!("{} <-- {:?}", self.nickname(), query);
1393        #[cfg(with_metrics)]
1394        metrics::CHAIN_INFO_QUERIES.inc();
1395        let chain_id = query.chain_id;
1396        let result = self
1397            .chain_write(chain_id, move |mut guard| async move {
1398                guard.handle_chain_info_query(query).await
1399            })
1400            .await;
1401        trace!("{} --> {:?}", self.nickname(), result);
1402        result
1403    }
1404
1405    #[instrument(skip_all, fields(
1406        nick = self.nickname(),
1407        chain_id = format!("{:.8}", chain_id)
1408    ))]
1409    pub async fn download_pending_blob(
1410        &self,
1411        chain_id: ChainId,
1412        blob_id: BlobId,
1413    ) -> Result<Blob, WorkerError> {
1414        trace!("{} <-- download_pending_blob({blob_id:8})", self.nickname());
1415        let result = self
1416            .chain_read(chain_id, |guard| async move {
1417                guard.download_pending_blob(blob_id).await
1418            })
1419            .await;
1420        trace!(
1421            "{} --> {:?}",
1422            self.nickname(),
1423            result.as_ref().map(|_| blob_id)
1424        );
1425        result
1426    }
1427
1428    #[instrument(skip_all, fields(
1429        nick = self.nickname(),
1430        chain_id = format!("{:.8}", chain_id)
1431    ))]
1432    pub async fn handle_pending_blob(
1433        &self,
1434        chain_id: ChainId,
1435        blob: Blob,
1436    ) -> Result<ChainInfoResponse, WorkerError> {
1437        let blob_id = blob.id();
1438        trace!("{} <-- handle_pending_blob({blob_id:8})", self.nickname());
1439        let result = self
1440            .chain_write(chain_id, move |mut guard| async move {
1441                guard.handle_pending_blob(blob).await
1442            })
1443            .await;
1444        trace!(
1445            "{} --> {:?}",
1446            self.nickname(),
1447            result.as_ref().map(|_| blob_id)
1448        );
1449        result
1450    }
1451
1452    #[instrument(skip_all, fields(
1453        nick = self.nickname(),
1454        chain_id = format!("{:.8}", request.target_chain_id())
1455    ))]
1456    pub async fn handle_cross_chain_request(
1457        &self,
1458        request: CrossChainRequest,
1459    ) -> Result<NetworkActions, WorkerError> {
1460        trace!("{} <-- {:?}", self.nickname(), request);
1461        match request {
1462            CrossChainRequest::UpdateRecipient {
1463                sender,
1464                recipient,
1465                bundles,
1466                previous_height,
1467            } => {
1468                let mut actions = NetworkActions::default();
1469                let origin = sender;
1470                match self
1471                    .process_cross_chain_update(origin, recipient, bundles, previous_height)
1472                    .await?
1473                {
1474                    CrossChainUpdateResult::NothingToDo => {}
1475                    CrossChainUpdateResult::Updated(height) => {
1476                        actions.notifications.push(Notification {
1477                            chain_id: recipient,
1478                            reason: Reason::NewIncomingBundle { origin, height },
1479                        });
1480                        actions.cross_chain_requests.push(
1481                            CrossChainRequest::ConfirmUpdatedRecipient {
1482                                sender,
1483                                recipient,
1484                                latest_height: height,
1485                            },
1486                        );
1487                    }
1488                    CrossChainUpdateResult::GapDetected {
1489                        origin,
1490                        retransmit_from,
1491                    } => {
1492                        actions
1493                            .cross_chain_requests
1494                            .push(CrossChainRequest::RevertConfirm {
1495                                sender: origin,
1496                                recipient,
1497                                retransmit_from,
1498                            });
1499                    }
1500                }
1501                Ok(actions)
1502            }
1503            CrossChainRequest::ConfirmUpdatedRecipient {
1504                sender,
1505                recipient,
1506                latest_height,
1507            } => {
1508                self.chain_write(sender, move |mut guard| async move {
1509                    guard
1510                        .confirm_updated_recipient(recipient, latest_height)
1511                        .await
1512                })
1513                .await
1514            }
1515            CrossChainRequest::RevertConfirm {
1516                sender,
1517                recipient,
1518                retransmit_from,
1519            } => {
1520                self.chain_write(sender, move |mut guard| async move {
1521                    guard
1522                        .handle_revert_confirm(recipient, retransmit_from)
1523                        .await
1524                })
1525                .await
1526            }
1527        }
1528    }
1529
1530    /// Updates the received certificate trackers to at least the given values.
1531    #[instrument(skip_all, fields(
1532        nickname = %self.nickname(),
1533        chain_id = %chain_id,
1534        num_trackers = %new_trackers.len()
1535    ))]
1536    pub async fn update_received_certificate_trackers(
1537        &self,
1538        chain_id: ChainId,
1539        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1540    ) -> Result<(), WorkerError> {
1541        self.chain_write(chain_id, move |mut guard| async move {
1542            guard
1543                .update_received_certificate_trackers(new_trackers)
1544                .await
1545        })
1546        .await
1547    }
1548
1549    /// Gets preprocessed block hashes in a given height range.
1550    #[instrument(skip_all, fields(
1551        nickname = %self.nickname(),
1552        chain_id = %chain_id,
1553        start = %start,
1554        end = %end
1555    ))]
1556    pub async fn get_preprocessed_block_hashes(
1557        &self,
1558        chain_id: ChainId,
1559        start: BlockHeight,
1560        end: BlockHeight,
1561    ) -> Result<Vec<CryptoHash>, WorkerError> {
1562        self.chain_read(chain_id, |guard| async move {
1563            guard.get_preprocessed_block_hashes(start, end).await
1564        })
1565        .await
1566    }
1567
1568    /// Gets the next block height to receive from an inbox.
1569    #[instrument(skip_all, fields(
1570        nickname = %self.nickname(),
1571        chain_id = %chain_id,
1572        origin = %origin
1573    ))]
1574    pub async fn get_inbox_next_height(
1575        &self,
1576        chain_id: ChainId,
1577        origin: ChainId,
1578    ) -> Result<BlockHeight, WorkerError> {
1579        self.chain_read(chain_id, |guard| async move {
1580            guard.get_inbox_next_height(origin).await
1581        })
1582        .await
1583    }
1584
1585    /// Gets locking blobs for specific blob IDs.
1586    /// Returns `Ok(None)` if any of the blobs is not found.
1587    #[instrument(skip_all, fields(
1588        nickname = %self.nickname(),
1589        chain_id = %chain_id,
1590        num_blob_ids = %blob_ids.len()
1591    ))]
1592    pub async fn get_locking_blobs(
1593        &self,
1594        chain_id: ChainId,
1595        blob_ids: Vec<BlobId>,
1596    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1597        self.chain_read(chain_id, |guard| async move {
1598            guard.get_locking_blobs(blob_ids).await
1599        })
1600        .await
1601    }
1602
1603    /// Gets block hashes for the given heights.
1604    pub async fn get_block_hashes(
1605        &self,
1606        chain_id: ChainId,
1607        heights: Vec<BlockHeight>,
1608    ) -> Result<Vec<CryptoHash>, WorkerError> {
1609        self.chain_read(chain_id, |guard| async move {
1610            guard.get_block_hashes(heights).await
1611        })
1612        .await
1613    }
1614
1615    /// Gets proposed blobs from the manager for specified blob IDs.
1616    pub async fn get_proposed_blobs(
1617        &self,
1618        chain_id: ChainId,
1619        blob_ids: Vec<BlobId>,
1620    ) -> Result<Vec<Blob>, WorkerError> {
1621        self.chain_read(chain_id, |guard| async move {
1622            guard.get_proposed_blobs(blob_ids).await
1623        })
1624        .await
1625    }
1626
1627    /// Gets event subscriptions from the chain.
1628    pub async fn get_event_subscriptions(
1629        &self,
1630        chain_id: ChainId,
1631    ) -> Result<EventSubscriptionsResult, WorkerError> {
1632        self.chain_read(chain_id, |guard| async move {
1633            guard.get_event_subscriptions().await
1634        })
1635        .await
1636    }
1637
1638    /// Gets received certificate trackers.
1639    pub async fn get_received_certificate_trackers(
1640        &self,
1641        chain_id: ChainId,
1642    ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1643        self.chain_read(chain_id, |guard| async move {
1644            guard.get_received_certificate_trackers().await
1645        })
1646        .await
1647    }
1648
1649    /// Returns the pending cross-chain network actions for this chain without
1650    /// initializing its execution state. Safe to call on chains whose
1651    /// `ChainDescription` blob is not available locally.
1652    pub async fn cross_chain_network_actions(
1653        &self,
1654        chain_id: ChainId,
1655    ) -> Result<NetworkActions, WorkerError> {
1656        self.chain_read(chain_id, |guard| async move {
1657            guard.cross_chain_network_actions().await
1658        })
1659        .await
1660    }
1661
1662    /// Gets tip state and outbox info for next_outbox_heights calculation.
1663    pub async fn get_tip_state_and_outbox_info(
1664        &self,
1665        chain_id: ChainId,
1666        receiver_id: ChainId,
1667    ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1668        self.chain_read(chain_id, |guard| async move {
1669            guard.get_tip_state_and_outbox_info(receiver_id).await
1670        })
1671        .await
1672    }
1673
1674    /// Gets the next height to preprocess.
1675    pub async fn get_next_height_to_preprocess(
1676        &self,
1677        chain_id: ChainId,
1678    ) -> Result<BlockHeight, WorkerError> {
1679        self.chain_read(chain_id, |guard| async move {
1680            guard.get_next_height_to_preprocess().await
1681        })
1682        .await
1683    }
1684
1685    /// Gets the chain manager's seed for leader election.
1686    pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, WorkerError> {
1687        self.chain_read(
1688            chain_id,
1689            |guard| async move { guard.get_manager_seed().await },
1690        )
1691        .await
1692    }
1693
1694    /// Gets the stream event count for a stream.
1695    pub async fn get_stream_event_count(
1696        &self,
1697        chain_id: ChainId,
1698        stream_id: StreamId,
1699    ) -> Result<Option<u32>, WorkerError> {
1700        self.chain_read(chain_id, |guard| async move {
1701            guard.get_stream_event_count(stream_id).await
1702        })
1703        .await
1704    }
1705
1706    /// Gets the `next_expected_events` indices for the given streams.
1707    pub async fn next_expected_events(
1708        &self,
1709        chain_id: ChainId,
1710        stream_ids: Vec<StreamId>,
1711    ) -> Result<BTreeMap<StreamId, u32>, WorkerError> {
1712        self.chain_read(chain_id, |guard| async move {
1713            guard.get_next_expected_events(stream_ids).await
1714        })
1715        .await
1716    }
1717
1718    /// Gets the previous event blocks for specific streams.
1719    pub async fn previous_event_blocks(
1720        &self,
1721        chain_id: ChainId,
1722        stream_ids: Vec<StreamId>,
1723    ) -> Result<BTreeMap<StreamId, (BlockHeight, CryptoHash)>, WorkerError> {
1724        #[cfg(with_metrics)]
1725        metrics::PREVIOUS_EVENT_BLOCKS_STREAM_COUNT.observe(stream_ids.len() as f64);
1726        self.chain_read(chain_id, |guard| async move {
1727            guard.get_previous_event_blocks(stream_ids).await
1728        })
1729        .await
1730    }
1731}
1732
1733#[cfg(with_testing)]
1734impl<StorageClient> WorkerState<StorageClient>
1735where
1736    StorageClient: Storage + Clone + 'static,
1737{
1738    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1739    ///
1740    /// # Panics
1741    ///
1742    /// If the validator doesn't have a key pair assigned to it.
1743    #[instrument(level = "trace", skip(self))]
1744    pub fn public_key(&self) -> ValidatorPublicKey {
1745        self.chain_worker_config
1746            .key_pair()
1747            .expect(
1748                "Test validator should have a key pair assigned to it \
1749                in order to obtain its public key",
1750            )
1751            .public()
1752    }
1753}