linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14    data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round},
15    doc_scalar,
16    hashed::Hashed,
17    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
18    time::timer::{sleep, timeout},
19};
20#[cfg(with_testing)]
21use linera_chain::ChainExecutionContext;
22use linera_chain::{
23    data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24    types::{
25        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
26        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
27    },
28    ChainError, ChainStateView,
29};
30use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
31use linera_storage::Storage;
32use linera_views::{context::InactiveContext, ViewError};
33use serde::{Deserialize, Serialize};
34use thiserror::Error;
35use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
36use tracing::{error, instrument, trace, warn};
37
38use crate::{
39    chain_worker::{ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier},
40    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
41    join_set_ext::{JoinSet, JoinSetExt},
42    notifier::Notifier,
43    value_cache::ValueCache,
44};
45
46#[cfg(test)]
47#[path = "unit_tests/worker_tests.rs"]
48mod worker_tests;
49
50#[cfg(with_metrics)]
51mod metrics {
52    use std::sync::LazyLock;
53
54    use linera_base::prometheus_util::{
55        exponential_bucket_interval, register_histogram_vec, register_int_counter,
56        register_int_counter_vec,
57    };
58    use prometheus::{HistogramVec, IntCounter, IntCounterVec};
59
60    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
61        register_histogram_vec(
62            "num_rounds_in_certificate",
63            "Number of rounds in certificate",
64            &["certificate_value", "round_type"],
65            exponential_bucket_interval(0.1, 50.0),
66        )
67    });
68
69    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
70        register_histogram_vec(
71            "num_rounds_in_block_proposal",
72            "Number of rounds in block proposal",
73            &["round_type"],
74            exponential_bucket_interval(0.1, 50.0),
75        )
76    });
77
78    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
79        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
80
81    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
82        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
83    });
84
85    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
86        register_int_counter_vec(
87            "certificates_signed",
88            "Number of confirmed block certificates signed by each validator",
89            &["validator_name"],
90        )
91    });
92
93    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
94        register_int_counter(
95            "chain_info_queries",
96            "Number of chain info queries processed",
97        )
98    });
99}
100
101/// Instruct the networking layer to send cross-chain requests and/or push notifications.
102#[derive(Default, Debug)]
103pub struct NetworkActions {
104    /// The cross-chain requests
105    pub cross_chain_requests: Vec<CrossChainRequest>,
106    /// The push notifications.
107    pub notifications: Vec<Notification>,
108}
109
110impl NetworkActions {
111    pub fn extend(&mut self, other: NetworkActions) {
112        self.cross_chain_requests.extend(other.cross_chain_requests);
113        self.notifications.extend(other.notifications);
114    }
115}
116
117#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
118/// Notification that a chain has a new certified block or a new message.
119pub struct Notification {
120    pub chain_id: ChainId,
121    pub reason: Reason,
122}
123
124doc_scalar!(
125    Notification,
126    "Notify that a chain has a new certified block or a new message"
127);
128
129#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
130/// Reason for the notification.
131pub enum Reason {
132    NewBlock {
133        height: BlockHeight,
134        hash: CryptoHash,
135        event_streams: BTreeSet<StreamId>,
136    },
137    NewIncomingBundle {
138        origin: ChainId,
139        height: BlockHeight,
140    },
141    NewRound {
142        height: BlockHeight,
143        round: Round,
144    },
145}
146
147/// Error type for worker operations.
148#[derive(Debug, Error)]
149pub enum WorkerError {
150    #[error(transparent)]
151    CryptoError(#[from] CryptoError),
152
153    #[error(transparent)]
154    ArithmeticError(#[from] ArithmeticError),
155
156    #[error(transparent)]
157    ViewError(#[from] ViewError),
158
159    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
160    ReadCertificatesError(Vec<CryptoHash>),
161
162    #[error(transparent)]
163    ChainError(#[from] Box<ChainError>),
164
165    // Chain access control
166    #[error("Block was not signed by an authorized owner")]
167    InvalidOwner,
168
169    #[error("Operations in the block are not authenticated by the proper signer: {0}")]
170    InvalidSigner(AccountOwner),
171
172    // Chaining
173    #[error(
174        "Was expecting block height {expected_block_height} but found {found_block_height} instead"
175    )]
176    UnexpectedBlockHeight {
177        expected_block_height: BlockHeight,
178        found_block_height: BlockHeight,
179    },
180    #[error("Unexpected epoch {epoch:}: chain {chain_id:} is at {chain_epoch:}")]
181    InvalidEpoch {
182        chain_id: ChainId,
183        chain_epoch: Epoch,
184        epoch: Epoch,
185    },
186
187    #[error("Events not found: {0:?}")]
188    EventsNotFound(Vec<EventId>),
189
190    // Other server-side errors
191    #[error("Invalid cross-chain request")]
192    InvalidCrossChainRequest,
193    #[error("The block does not contain the hash that we expected for the previous block")]
194    InvalidBlockChaining,
195    #[error(
196        "The given outcome is not what we computed after executing the block.\n\
197        Computed: {computed:#?}\n\
198        Submitted: {submitted:#?}"
199    )]
200    IncorrectOutcome {
201        computed: Box<BlockExecutionOutcome>,
202        submitted: Box<BlockExecutionOutcome>,
203    },
204    #[error("The block timestamp is in the future.")]
205    InvalidTimestamp,
206    #[error("We don't have the value for the certificate.")]
207    MissingCertificateValue,
208    #[error("The hash certificate doesn't match its value.")]
209    InvalidLiteCertificate,
210    #[error("Fast blocks cannot query oracles")]
211    FastBlockUsingOracles,
212    #[error("Blobs not found: {0:?}")]
213    BlobsNotFound(Vec<BlobId>),
214    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
215    ConfirmedLogEntryNotFound {
216        height: BlockHeight,
217        chain_id: ChainId,
218    },
219    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
220    PreprocessedBlocksEntryNotFound {
221        height: BlockHeight,
222        chain_id: ChainId,
223    },
224    #[error("The block proposal is invalid: {0}")]
225    InvalidBlockProposal(String),
226    #[error("The worker is too busy to handle new chains")]
227    FullChainWorkerCache,
228    #[error("Failed to join spawned worker task")]
229    JoinError,
230    #[error("Blob was not required by any pending block")]
231    UnexpectedBlob,
232    #[error("Number of published blobs per block must not exceed {0}")]
233    TooManyPublishedBlobs(u64),
234    #[error("Missing network description")]
235    MissingNetworkDescription,
236}
237
238impl From<ChainError> for WorkerError {
239    #[instrument(level = "trace", skip(chain_error))]
240    fn from(chain_error: ChainError) -> Self {
241        match chain_error {
242            ChainError::ExecutionError(execution_error, context) => {
243                if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
244                    Self::BlobsNotFound(blob_ids)
245                } else {
246                    Self::ChainError(Box::new(ChainError::ExecutionError(
247                        execution_error,
248                        context,
249                    )))
250                }
251            }
252            error => Self::ChainError(Box::new(error)),
253        }
254    }
255}
256
257#[cfg(with_testing)]
258impl WorkerError {
259    /// Returns the inner [`ExecutionError`] in this error.
260    ///
261    /// # Panics
262    ///
263    /// If this is not caused by an [`ExecutionError`].
264    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
265        let WorkerError::ChainError(chain_error) = self else {
266            panic!("Expected an `ExecutionError`. Got: {self:#?}");
267        };
268
269        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
270            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
271        };
272
273        assert_eq!(context, expected_context);
274
275        *execution_error
276    }
277}
278
279/// State of a worker in a validator or a local node.
280pub struct WorkerState<StorageClient>
281where
282    StorageClient: Storage,
283{
284    /// A name used for logging
285    nickname: String,
286    /// Access to local persistent storage.
287    storage: StorageClient,
288    /// Configuration options for the [`ChainWorker`]s.
289    chain_worker_config: ChainWorkerConfig,
290    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
291    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
292    /// Chain IDs that should be tracked by a worker.
293    tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
294    /// One-shot channels to notify callers when messages of a particular chain have been
295    /// delivered.
296    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
297    /// The set of spawned [`ChainWorkerActor`] tasks.
298    chain_worker_tasks: Arc<Mutex<JoinSet>>,
299    /// The cache of running [`ChainWorkerActor`]s.
300    chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
301}
302
303impl<StorageClient> Clone for WorkerState<StorageClient>
304where
305    StorageClient: Storage + Clone,
306{
307    fn clone(&self) -> Self {
308        WorkerState {
309            nickname: self.nickname.clone(),
310            storage: self.storage.clone(),
311            chain_worker_config: self.chain_worker_config.clone(),
312            block_cache: self.block_cache.clone(),
313            execution_state_cache: self.execution_state_cache.clone(),
314            tracked_chains: self.tracked_chains.clone(),
315            delivery_notifiers: self.delivery_notifiers.clone(),
316            chain_worker_tasks: self.chain_worker_tasks.clone(),
317            chain_workers: self.chain_workers.clone(),
318        }
319    }
320}
321
322/// The sender endpoint for [`ChainWorkerRequest`]s.
323type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
324    ChainWorkerRequest<<StorageClient as Storage>::Context>,
325    tracing::Span,
326)>;
327
328pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
329
330impl<StorageClient> WorkerState<StorageClient>
331where
332    StorageClient: Storage,
333{
334    #[instrument(level = "trace", skip(nickname, key_pair, storage))]
335    pub fn new(
336        nickname: String,
337        key_pair: Option<ValidatorSecretKey>,
338        storage: StorageClient,
339    ) -> Self {
340        WorkerState {
341            nickname,
342            storage,
343            chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
344            block_cache: Arc::new(ValueCache::default()),
345            execution_state_cache: Arc::new(ValueCache::default()),
346            tracked_chains: None,
347            delivery_notifiers: Arc::default(),
348            chain_worker_tasks: Arc::default(),
349            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
350        }
351    }
352
353    #[instrument(level = "trace", skip(nickname, storage))]
354    pub fn new_for_client(
355        nickname: String,
356        storage: StorageClient,
357        tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
358    ) -> Self {
359        WorkerState {
360            nickname,
361            storage,
362            chain_worker_config: ChainWorkerConfig::default(),
363            block_cache: Arc::new(ValueCache::default()),
364            execution_state_cache: Arc::new(ValueCache::default()),
365            tracked_chains: Some(tracked_chains),
366            delivery_notifiers: Arc::default(),
367            chain_worker_tasks: Arc::default(),
368            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
369        }
370    }
371
372    #[instrument(level = "trace", skip(self, value))]
373    pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
374        self.chain_worker_config.allow_inactive_chains = value;
375        self
376    }
377
378    #[instrument(level = "trace", skip(self, value))]
379    pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
380        self.chain_worker_config
381            .allow_messages_from_deprecated_epochs = value;
382        self
383    }
384
385    #[instrument(level = "trace", skip(self, value))]
386    pub fn with_long_lived_services(mut self, value: bool) -> Self {
387        self.chain_worker_config.long_lived_services = value;
388        self
389    }
390
391    /// Returns an instance with the specified grace period.
392    ///
393    /// Blocks with a timestamp this far in the future will still be accepted, but the validator
394    /// will wait until that timestamp before voting.
395    #[instrument(level = "trace", skip(self))]
396    pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
397        self.chain_worker_config.grace_period = grace_period;
398        self
399    }
400
401    /// Returns an instance with the specified chain worker TTL.
402    ///
403    /// Idle chain workers free their memory after that duration without requests.
404    #[instrument(level = "trace", skip(self))]
405    pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
406        self.chain_worker_config.ttl = chain_worker_ttl;
407        self
408    }
409
410    #[instrument(level = "trace", skip(self))]
411    pub fn nickname(&self) -> &str {
412        &self.nickname
413    }
414
415    /// Returns the storage client so that it can be manipulated or queried.
416    #[instrument(level = "trace", skip(self))]
417    #[cfg(not(feature = "test"))]
418    pub(crate) fn storage_client(&self) -> &StorageClient {
419        &self.storage
420    }
421
422    /// Returns the storage client so that it can be manipulated or queried by tests in other
423    /// crates.
424    #[instrument(level = "trace", skip(self))]
425    #[cfg(feature = "test")]
426    pub fn storage_client(&self) -> &StorageClient {
427        &self.storage
428    }
429
430    #[instrument(level = "trace", skip(self, certificate))]
431    pub(crate) async fn full_certificate(
432        &self,
433        certificate: LiteCertificate<'_>,
434    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
435        let block = self
436            .block_cache
437            .get(&certificate.value.value_hash)
438            .ok_or(WorkerError::MissingCertificateValue)?;
439
440        match certificate.value.kind {
441            linera_chain::types::CertificateKind::Confirmed => {
442                let value = ConfirmedBlock::from_hashed(block);
443                Ok(Either::Left(
444                    certificate
445                        .with_value(value)
446                        .ok_or(WorkerError::InvalidLiteCertificate)?,
447                ))
448            }
449            linera_chain::types::CertificateKind::Validated => {
450                let value = ValidatedBlock::from_hashed(block);
451                Ok(Either::Right(
452                    certificate
453                        .with_value(value)
454                        .ok_or(WorkerError::InvalidLiteCertificate)?,
455                ))
456            }
457            _ => return Err(WorkerError::InvalidLiteCertificate),
458        }
459    }
460}
461
462#[allow(async_fn_in_trait)]
463#[cfg_attr(not(web), trait_variant::make(Send))]
464pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
465    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
466        worker: &WorkerState<S>,
467        certificate: GenericCertificate<Self>,
468    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
469}
470
471impl ProcessableCertificate for ConfirmedBlock {
472    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
473        worker: &WorkerState<S>,
474        certificate: ConfirmedBlockCertificate,
475    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
476        worker.handle_confirmed_certificate(certificate, None).await
477    }
478}
479
480impl ProcessableCertificate for ValidatedBlock {
481    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
482        worker: &WorkerState<S>,
483        certificate: ValidatedBlockCertificate,
484    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
485        worker.handle_validated_certificate(certificate).await
486    }
487}
488
489impl ProcessableCertificate for Timeout {
490    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
491        worker: &WorkerState<S>,
492        certificate: TimeoutCertificate,
493    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
494        worker.handle_timeout_certificate(certificate).await
495    }
496}
497
498impl<StorageClient> WorkerState<StorageClient>
499where
500    StorageClient: Storage + Clone + Send + Sync + 'static,
501{
502    #[instrument(level = "trace", skip(self, certificate, notifier))]
503    #[inline]
504    pub async fn fully_handle_certificate_with_notifications<T>(
505        &self,
506        certificate: GenericCertificate<T>,
507        notifier: &impl Notifier,
508    ) -> Result<ChainInfoResponse, WorkerError>
509    where
510        T: ProcessableCertificate,
511    {
512        let notifications = (*notifier).clone();
513        let this = self.clone();
514        linera_base::task::spawn(async move {
515            let (response, actions) =
516                ProcessableCertificate::process_certificate(&this, certificate).await?;
517            notifications.notify(&actions.notifications);
518            let mut requests = VecDeque::from(actions.cross_chain_requests);
519            while let Some(request) = requests.pop_front() {
520                let actions = this.handle_cross_chain_request(request).await?;
521                requests.extend(actions.cross_chain_requests);
522                notifications.notify(&actions.notifications);
523            }
524            Ok(response)
525        })
526        .await
527        .unwrap_or_else(|_| Err(WorkerError::JoinError))
528    }
529
530    /// Tries to execute a block proposal without any verification other than block execution.
531    #[instrument(level = "trace", skip(self, block))]
532    pub async fn stage_block_execution(
533        &self,
534        block: ProposedBlock,
535        round: Option<u32>,
536        published_blobs: Vec<Blob>,
537    ) -> Result<(Block, ChainInfoResponse), WorkerError> {
538        self.query_chain_worker(block.chain_id, move |callback| {
539            ChainWorkerRequest::StageBlockExecution {
540                block,
541                round,
542                published_blobs,
543                callback,
544            }
545        })
546        .await
547    }
548
549    /// Executes a [`Query`] for an application's state on a specific chain.
550    #[instrument(level = "trace", skip(self, chain_id, query))]
551    pub async fn query_application(
552        &self,
553        chain_id: ChainId,
554        query: Query,
555    ) -> Result<QueryOutcome, WorkerError> {
556        self.query_chain_worker(chain_id, move |callback| {
557            ChainWorkerRequest::QueryApplication { query, callback }
558        })
559        .await
560    }
561
562    #[instrument(level = "trace", skip(self, chain_id, application_id))]
563    pub async fn describe_application(
564        &self,
565        chain_id: ChainId,
566        application_id: ApplicationId,
567    ) -> Result<ApplicationDescription, WorkerError> {
568        self.query_chain_worker(chain_id, move |callback| {
569            ChainWorkerRequest::DescribeApplication {
570                application_id,
571                callback,
572            }
573        })
574        .await
575    }
576
577    /// Processes a confirmed block (aka a commit).
578    #[instrument(
579        level = "trace",
580        skip(self, certificate, notify_when_messages_are_delivered)
581    )]
582    async fn process_confirmed_block(
583        &self,
584        certificate: ConfirmedBlockCertificate,
585        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
586    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
587        let chain_id = certificate.block().header.chain_id;
588        self.query_chain_worker(chain_id, move |callback| {
589            ChainWorkerRequest::ProcessConfirmedBlock {
590                certificate,
591                notify_when_messages_are_delivered,
592                callback,
593            }
594        })
595        .await
596    }
597
598    /// Processes a validated block issued from a multi-owner chain.
599    #[instrument(level = "trace", skip(self, certificate))]
600    async fn process_validated_block(
601        &self,
602        certificate: ValidatedBlockCertificate,
603    ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
604        let chain_id = certificate.block().header.chain_id;
605        self.query_chain_worker(chain_id, move |callback| {
606            ChainWorkerRequest::ProcessValidatedBlock {
607                certificate,
608                callback,
609            }
610        })
611        .await
612    }
613
614    /// Processes a leader timeout issued from a multi-owner chain.
615    #[instrument(level = "trace", skip(self, certificate))]
616    async fn process_timeout(
617        &self,
618        certificate: TimeoutCertificate,
619    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
620        let chain_id = certificate.value().chain_id();
621        self.query_chain_worker(chain_id, move |callback| {
622            ChainWorkerRequest::ProcessTimeout {
623                certificate,
624                callback,
625            }
626        })
627        .await
628    }
629
630    #[instrument(level = "trace", skip(self, origin, recipient, bundles))]
631    async fn process_cross_chain_update(
632        &self,
633        origin: ChainId,
634        recipient: ChainId,
635        bundles: Vec<(Epoch, MessageBundle)>,
636    ) -> Result<Option<BlockHeight>, WorkerError> {
637        self.query_chain_worker(recipient, move |callback| {
638            ChainWorkerRequest::ProcessCrossChainUpdate {
639                origin,
640                bundles,
641                callback,
642            }
643        })
644        .await
645    }
646
647    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
648    #[instrument(level = "trace", skip(self, chain_id, height))]
649    #[cfg(with_testing)]
650    pub async fn read_certificate(
651        &self,
652        chain_id: ChainId,
653        height: BlockHeight,
654    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
655        self.query_chain_worker(chain_id, move |callback| {
656            ChainWorkerRequest::ReadCertificate { height, callback }
657        })
658        .await
659    }
660
661    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
662    /// [`ChainId`].
663    ///
664    /// The returned view holds a lock on the chain state, which prevents the worker from changing
665    /// the state of that chain.
666    #[instrument(level = "trace", skip(self))]
667    pub async fn chain_state_view(
668        &self,
669        chain_id: ChainId,
670    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
671        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
672            callback,
673        })
674        .await
675    }
676
677    #[instrument(level = "trace", skip(self, request_builder))]
678    /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
679    async fn query_chain_worker<Response>(
680        &self,
681        chain_id: ChainId,
682        request_builder: impl FnOnce(
683            oneshot::Sender<Result<Response, WorkerError>>,
684        ) -> ChainWorkerRequest<StorageClient::Context>,
685    ) -> Result<Response, WorkerError> {
686        let chain_actor = self.get_chain_worker_endpoint(chain_id).await?;
687        let (callback, response) = oneshot::channel();
688
689        chain_actor
690            .send((request_builder(callback), tracing::Span::current()))
691            .expect("`ChainWorkerActor` stopped executing unexpectedly");
692
693        response
694            .await
695            .expect("`ChainWorkerActor` stopped executing without responding")
696    }
697
698    /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, creating one and adding it
699    /// to the cache if needed.
700    #[instrument(level = "trace", skip(self))]
701    async fn get_chain_worker_endpoint(
702        &self,
703        chain_id: ChainId,
704    ) -> Result<ChainActorEndpoint<StorageClient>, WorkerError> {
705        let (sender, new_receiver) = timeout(Duration::from_secs(3), async move {
706            loop {
707                match self.try_get_chain_worker_endpoint(chain_id) {
708                    Some(endpoint) => break endpoint,
709                    None => sleep(Duration::from_millis(250)).await,
710                }
711            }
712        })
713        .await
714        .map_err(|_| WorkerError::FullChainWorkerCache)?;
715
716        if let Some(receiver) = new_receiver {
717            let delivery_notifier = self
718                .delivery_notifiers
719                .lock()
720                .unwrap()
721                .entry(chain_id)
722                .or_default()
723                .clone();
724
725            let actor_task = ChainWorkerActor::run(
726                self.chain_worker_config.clone(),
727                self.storage.clone(),
728                self.block_cache.clone(),
729                self.execution_state_cache.clone(),
730                self.tracked_chains.clone(),
731                delivery_notifier,
732                chain_id,
733                receiver,
734            );
735
736            self.chain_worker_tasks
737                .lock()
738                .unwrap()
739                .spawn_task(actor_task);
740        }
741
742        Ok(sender)
743    }
744
745    /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, attempting to create one
746    /// and add it to the cache if needed.
747    ///
748    /// Returns [`None`] if the cache is full and no candidate for eviction was found.
749    #[instrument(level = "trace", skip(self))]
750    #[expect(clippy::type_complexity)]
751    fn try_get_chain_worker_endpoint(
752        &self,
753        chain_id: ChainId,
754    ) -> Option<(
755        ChainActorEndpoint<StorageClient>,
756        Option<
757            mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
758        >,
759    )> {
760        let mut chain_workers = self.chain_workers.lock().unwrap();
761
762        if let Some(endpoint) = chain_workers.get(&chain_id) {
763            Some((endpoint.clone(), None))
764        } else {
765            let (sender, receiver) = mpsc::unbounded_channel();
766            chain_workers.insert(chain_id, sender.clone());
767            Some((sender, Some(receiver)))
768        }
769    }
770
771    #[instrument(skip_all, fields(
772        nick = self.nickname,
773        chain_id = format!("{:.8}", proposal.content.block.chain_id),
774        height = %proposal.content.block.height,
775    ))]
776    pub async fn handle_block_proposal(
777        &self,
778        proposal: BlockProposal,
779    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
780        trace!("{} <-- {:?}", self.nickname, proposal);
781        #[cfg(with_metrics)]
782        let round = proposal.content.round;
783        let response = self
784            .query_chain_worker(proposal.content.block.chain_id, move |callback| {
785                ChainWorkerRequest::HandleBlockProposal { proposal, callback }
786            })
787            .await?;
788        #[cfg(with_metrics)]
789        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
790            .with_label_values(&[round.type_name()])
791            .observe(round.number() as f64);
792        Ok(response)
793    }
794
795    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
796    // Other fields will be included in handle_certificate's span.
797    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
798    pub async fn handle_lite_certificate(
799        &self,
800        certificate: LiteCertificate<'_>,
801        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
802    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
803        match self.full_certificate(certificate).await? {
804            Either::Left(confirmed) => {
805                self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
806                    .await
807            }
808            Either::Right(validated) => {
809                if let Some(notifier) = notify_when_messages_are_delivered {
810                    // Nothing to wait for.
811                    if let Err(()) = notifier.send(()) {
812                        warn!("Failed to notify message delivery to caller");
813                    }
814                }
815                self.handle_validated_certificate(validated).await
816            }
817        }
818    }
819
820    /// Processes a confirmed block certificate.
821    #[instrument(skip_all, fields(
822        nick = self.nickname,
823        chain_id = format!("{:.8}", certificate.block().header.chain_id),
824        height = %certificate.block().header.height,
825    ))]
826    pub async fn handle_confirmed_certificate(
827        &self,
828        certificate: ConfirmedBlockCertificate,
829        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
830    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
831        trace!("{} <-- {:?}", self.nickname, certificate);
832        #[cfg(with_metrics)]
833        let metrics_data = if self
834            .chain_state_view(certificate.block().header.chain_id)
835            .await?
836            .tip_state
837            .get()
838            .next_block_height
839            == certificate.block().header.height
840        {
841            Some((
842                certificate.inner().to_log_str(),
843                certificate.round.type_name(),
844                certificate.round.number(),
845                certificate.block().body.transactions.len() as u64,
846                certificate
847                    .signatures()
848                    .iter()
849                    .map(|(validator_name, _)| validator_name.to_string())
850                    .collect::<Vec<_>>(),
851            ))
852        } else {
853            // Block already processed or will only be preprocessed, no metrics to report.
854            None
855        };
856
857        let result = self
858            .process_confirmed_block(certificate, notify_when_messages_are_delivered)
859            .await?;
860
861        #[cfg(with_metrics)]
862        {
863            if let Some(metrics_data) = metrics_data {
864                let (
865                    certificate_log_str,
866                    round_type,
867                    round_number,
868                    confirmed_transactions,
869                    validators_with_signatures,
870                ) = metrics_data;
871                metrics::NUM_BLOCKS.with_label_values(&[]).inc();
872                metrics::NUM_ROUNDS_IN_CERTIFICATE
873                    .with_label_values(&[certificate_log_str, round_type])
874                    .observe(round_number as f64);
875                if confirmed_transactions > 0 {
876                    metrics::TRANSACTION_COUNT
877                        .with_label_values(&[])
878                        .inc_by(confirmed_transactions);
879                }
880
881                for validator_name in validators_with_signatures {
882                    metrics::CERTIFICATES_SIGNED
883                        .with_label_values(&[&validator_name])
884                        .inc();
885                }
886            }
887        }
888        Ok(result)
889    }
890
891    /// Processes a validated block certificate.
892    #[instrument(skip_all, fields(
893        nick = self.nickname,
894        chain_id = format!("{:.8}", certificate.block().header.chain_id),
895        height = %certificate.block().header.height,
896    ))]
897    pub async fn handle_validated_certificate(
898        &self,
899        certificate: ValidatedBlockCertificate,
900    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
901        trace!("{} <-- {:?}", self.nickname, certificate);
902
903        #[cfg(with_metrics)]
904        let round = certificate.round;
905        #[cfg(with_metrics)]
906        let cert_str = certificate.inner().to_log_str();
907
908        let (info, actions, _duplicated) = self.process_validated_block(certificate).await?;
909        #[cfg(with_metrics)]
910        {
911            if !_duplicated {
912                metrics::NUM_ROUNDS_IN_CERTIFICATE
913                    .with_label_values(&[cert_str, round.type_name()])
914                    .observe(round.number() as f64);
915            }
916        }
917        Ok((info, actions))
918    }
919
920    /// Processes a timeout certificate
921    #[instrument(skip_all, fields(
922        nick = self.nickname,
923        chain_id = format!("{:.8}", certificate.inner().chain_id()),
924        height = %certificate.inner().height(),
925    ))]
926    pub async fn handle_timeout_certificate(
927        &self,
928        certificate: TimeoutCertificate,
929    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
930        trace!("{} <-- {:?}", self.nickname, certificate);
931        self.process_timeout(certificate).await
932    }
933
934    #[instrument(skip_all, fields(
935        nick = self.nickname,
936        chain_id = format!("{:.8}", query.chain_id)
937    ))]
938    pub async fn handle_chain_info_query(
939        &self,
940        query: ChainInfoQuery,
941    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
942        trace!("{} <-- {:?}", self.nickname, query);
943        #[cfg(with_metrics)]
944        metrics::CHAIN_INFO_QUERIES.inc();
945        let result = self
946            .query_chain_worker(query.chain_id, move |callback| {
947                ChainWorkerRequest::HandleChainInfoQuery { query, callback }
948            })
949            .await;
950        trace!("{} --> {:?}", self.nickname, result);
951        result
952    }
953
954    #[instrument(skip_all, fields(
955        nick = self.nickname,
956        chain_id = format!("{:.8}", chain_id)
957    ))]
958    pub async fn download_pending_blob(
959        &self,
960        chain_id: ChainId,
961        blob_id: BlobId,
962    ) -> Result<Blob, WorkerError> {
963        trace!(
964            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
965            self.nickname
966        );
967        let result = self
968            .query_chain_worker(chain_id, move |callback| {
969                ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
970            })
971            .await;
972        trace!(
973            "{} --> {:?}",
974            self.nickname,
975            result.as_ref().map(|_| blob_id)
976        );
977        result
978    }
979
980    #[instrument(skip_all, fields(
981        nick = self.nickname,
982        chain_id = format!("{:.8}", chain_id)
983    ))]
984    pub async fn handle_pending_blob(
985        &self,
986        chain_id: ChainId,
987        blob: Blob,
988    ) -> Result<ChainInfoResponse, WorkerError> {
989        let blob_id = blob.id();
990        trace!(
991            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
992            self.nickname
993        );
994        let result = self
995            .query_chain_worker(chain_id, move |callback| {
996                ChainWorkerRequest::HandlePendingBlob { blob, callback }
997            })
998            .await;
999        trace!(
1000            "{} --> {:?}",
1001            self.nickname,
1002            result.as_ref().map(|_| blob_id)
1003        );
1004        result
1005    }
1006
1007    #[instrument(skip_all, fields(
1008        nick = self.nickname,
1009        chain_id = format!("{:.8}", request.target_chain_id())
1010    ))]
1011    pub async fn handle_cross_chain_request(
1012        &self,
1013        request: CrossChainRequest,
1014    ) -> Result<NetworkActions, WorkerError> {
1015        trace!("{} <-- {:?}", self.nickname, request);
1016        match request {
1017            CrossChainRequest::UpdateRecipient {
1018                sender,
1019                recipient,
1020                bundles,
1021            } => {
1022                let mut actions = NetworkActions::default();
1023                let origin = sender;
1024                let Some(height) = self
1025                    .process_cross_chain_update(origin, recipient, bundles)
1026                    .await?
1027                else {
1028                    return Ok(actions);
1029                };
1030                actions.notifications.push(Notification {
1031                    chain_id: recipient,
1032                    reason: Reason::NewIncomingBundle { origin, height },
1033                });
1034                actions
1035                    .cross_chain_requests
1036                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1037                        sender,
1038                        recipient,
1039                        latest_height: height,
1040                    });
1041                Ok(actions)
1042            }
1043            CrossChainRequest::ConfirmUpdatedRecipient {
1044                sender,
1045                recipient,
1046                latest_height,
1047            } => {
1048                self.query_chain_worker(sender, move |callback| {
1049                    ChainWorkerRequest::ConfirmUpdatedRecipient {
1050                        recipient,
1051                        latest_height,
1052                        callback,
1053                    }
1054                })
1055                .await?;
1056                Ok(NetworkActions::default())
1057            }
1058        }
1059    }
1060
1061    /// Updates the received certificate trackers to at least the given values.
1062    pub async fn update_received_certificate_trackers(
1063        &self,
1064        chain_id: ChainId,
1065        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1066    ) -> Result<(), WorkerError> {
1067        self.query_chain_worker(chain_id, move |callback| {
1068            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1069                new_trackers,
1070                callback,
1071            }
1072        })
1073        .await
1074    }
1075}
1076
1077#[cfg(with_testing)]
1078impl<StorageClient> WorkerState<StorageClient>
1079where
1080    StorageClient: Storage,
1081{
1082    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1083    ///
1084    /// # Panics
1085    ///
1086    /// If the validator doesn't have a key pair assigned to it.
1087    #[instrument(level = "trace", skip(self))]
1088    pub fn public_key(&self) -> ValidatorPublicKey {
1089        self.chain_worker_config
1090            .key_pair()
1091            .expect(
1092                "Test validator should have a key pair assigned to it \
1093                in order to obtain it's public key",
1094            )
1095            .public()
1096    }
1097}