1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14 data_types::{
15 ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp,
16 },
17 doc_scalar,
18 hashed::Hashed,
19 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
20 time::Instant,
21 util::traits::DynError,
22};
23#[cfg(with_testing)]
24use linera_chain::ChainExecutionContext;
25use linera_chain::{
26 data_types::{
27 BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock,
28 },
29 types::{
30 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
31 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
32 },
33 ChainError, ChainStateView,
34};
35use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
36use linera_storage::Storage;
37use linera_views::{context::InactiveContext, ViewError};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
41use tracing::{error, instrument, trace, warn};
42
43pub(crate) use crate::chain_worker::{
45 ChainWorkerRequestReceiver, ChainWorkerRequestSender, EventSubscriptionsResult,
46};
47use crate::{
48 chain_worker::{
49 BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
50 },
51 client::ListeningMode,
52 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
53 join_set_ext::{JoinSet, JoinSetExt},
54 notifier::Notifier,
55 value_cache::ValueCache,
56 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
57};
58
59const BLOCK_CACHE_SIZE: usize = 5_000;
60const EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
61
62#[cfg(test)]
63#[path = "unit_tests/worker_tests.rs"]
64mod worker_tests;
65
66#[cfg(with_metrics)]
67mod metrics {
68 use std::sync::LazyLock;
69
70 use linera_base::prometheus_util::{
71 exponential_bucket_interval, register_histogram, register_histogram_vec,
72 register_int_counter, register_int_counter_vec, register_int_gauge,
73 };
74 use linera_chain::types::ConfirmedBlockCertificate;
75 use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge};
76
77 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
78 register_histogram_vec(
79 "num_rounds_in_certificate",
80 "Number of rounds in certificate",
81 &["certificate_value", "round_type"],
82 exponential_bucket_interval(0.1, 50.0),
83 )
84 });
85
86 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
87 register_histogram_vec(
88 "num_rounds_in_block_proposal",
89 "Number of rounds in block proposal",
90 &["round_type"],
91 exponential_bucket_interval(0.1, 50.0),
92 )
93 });
94
95 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
96 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
97
98 pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
99 LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
100
101 pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
102 LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
103
104 pub static OPERATION_COUNT: LazyLock<IntCounter> =
105 LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
106
107 pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
108 register_histogram(
109 "operations_per_block",
110 "Number of operations per block",
111 exponential_bucket_interval(1.0, 10000.0),
112 )
113 });
114
115 pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
116 register_histogram(
117 "incoming_bundles_per_block",
118 "Number of incoming bundles per block",
119 exponential_bucket_interval(1.0, 10000.0),
120 )
121 });
122
123 pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
124 register_histogram(
125 "transactions_per_block",
126 "Number of transactions per block",
127 exponential_bucket_interval(1.0, 10000.0),
128 )
129 });
130
131 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
132 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
133 });
134
135 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
136 register_int_counter_vec(
137 "certificates_signed",
138 "Number of confirmed block certificates signed by each validator",
139 &["validator_name"],
140 )
141 });
142
143 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
144 register_int_counter(
145 "chain_info_queries",
146 "Number of chain info queries processed",
147 )
148 });
149
150 pub static CHAIN_WORKER_ENDPOINTS_CACHED: LazyLock<IntGauge> = LazyLock::new(|| {
152 register_int_gauge(
153 "chain_worker_endpoints_cached",
154 "Number of cached chain worker channel endpoints",
155 )
156 });
157
158 pub struct MetricsData {
160 certificate_log_str: &'static str,
161 round_type: &'static str,
162 round_number: u32,
163 confirmed_transactions: u64,
164 confirmed_incoming_bundles: u64,
165 confirmed_incoming_messages: u64,
166 confirmed_operations: u64,
167 validators_with_signatures: Vec<String>,
168 }
169
170 impl MetricsData {
171 pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
173 Self {
174 certificate_log_str: certificate.inner().to_log_str(),
175 round_type: certificate.round.type_name(),
176 round_number: certificate.round.number(),
177 confirmed_transactions: certificate.block().body.transactions.len() as u64,
178 confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
179 as u64,
180 confirmed_incoming_messages: certificate
181 .block()
182 .body
183 .incoming_bundles()
184 .map(|b| b.messages().count())
185 .sum::<usize>() as u64,
186 confirmed_operations: certificate.block().body.operations().count() as u64,
187 validators_with_signatures: certificate
188 .signatures()
189 .iter()
190 .map(|(validator_name, _)| validator_name.to_string())
191 .collect(),
192 }
193 }
194
195 pub fn record(self) {
197 NUM_BLOCKS.with_label_values(&[]).inc();
198 NUM_ROUNDS_IN_CERTIFICATE
199 .with_label_values(&[self.certificate_log_str, self.round_type])
200 .observe(self.round_number as f64);
201 TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
202 INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
203 OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
204 if self.confirmed_transactions > 0 {
205 TRANSACTION_COUNT
206 .with_label_values(&[])
207 .inc_by(self.confirmed_transactions);
208 if self.confirmed_incoming_bundles > 0 {
209 INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
210 }
211 if self.confirmed_incoming_messages > 0 {
212 INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
213 }
214 if self.confirmed_operations > 0 {
215 OPERATION_COUNT.inc_by(self.confirmed_operations);
216 }
217 }
218
219 for validator_name in self.validators_with_signatures {
220 CERTIFICATES_SIGNED
221 .with_label_values(&[&validator_name])
222 .inc();
223 }
224 }
225 }
226}
227
228#[derive(Default, Debug)]
230pub struct NetworkActions {
231 pub cross_chain_requests: Vec<CrossChainRequest>,
233 pub notifications: Vec<Notification>,
235}
236
237impl NetworkActions {
238 pub fn extend(&mut self, other: NetworkActions) {
239 self.cross_chain_requests.extend(other.cross_chain_requests);
240 self.notifications.extend(other.notifications);
241 }
242}
243
244#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
245pub struct Notification {
247 pub chain_id: ChainId,
248 pub reason: Reason,
249}
250
251doc_scalar!(
252 Notification,
253 "Notify that a chain has a new certified block or a new message"
254);
255
256#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
257pub enum Reason {
259 NewBlock {
260 height: BlockHeight,
261 hash: CryptoHash,
262 event_streams: BTreeSet<StreamId>,
263 },
264 NewIncomingBundle {
265 origin: ChainId,
266 height: BlockHeight,
267 },
268 NewRound {
269 height: BlockHeight,
270 round: Round,
271 },
272 BlockExecuted {
273 height: BlockHeight,
274 hash: CryptoHash,
275 },
276 NewEvents {
280 height: BlockHeight,
281 hash: CryptoHash,
282 event_streams: BTreeSet<StreamId>,
283 },
284}
285
286#[derive(Debug, Error)]
288pub enum WorkerError {
289 #[error(transparent)]
290 CryptoError(#[from] CryptoError),
291
292 #[error(transparent)]
293 ArithmeticError(#[from] ArithmeticError),
294
295 #[error(transparent)]
296 ViewError(#[from] ViewError),
297
298 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
299 ReadCertificatesError(Vec<CryptoHash>),
300
301 #[error(transparent)]
302 ChainError(#[from] Box<ChainError>),
303
304 #[error(transparent)]
305 BcsError(#[from] bcs::Error),
306
307 #[error("Block was not signed by an authorized owner")]
309 InvalidOwner,
310
311 #[error("Operations in the block are not authenticated by the proper signer: {0}")]
312 InvalidSigner(AccountOwner),
313
314 #[error(
316 "Chain is expecting a next block at height {expected_block_height} but the given block \
317 is at height {found_block_height} instead"
318 )]
319 UnexpectedBlockHeight {
320 expected_block_height: BlockHeight,
321 found_block_height: BlockHeight,
322 },
323 #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
324 InvalidEpoch {
325 chain_id: ChainId,
326 chain_epoch: Epoch,
327 epoch: Epoch,
328 },
329
330 #[error("Events not found: {0:?}")]
331 EventsNotFound(Vec<EventId>),
332
333 #[error("Invalid cross-chain request")]
335 InvalidCrossChainRequest,
336 #[error("The block does not contain the hash that we expected for the previous block")]
337 InvalidBlockChaining,
338 #[error(
339 "The given outcome is not what we computed after executing the block.\n\
340 Computed: {computed:#?}\n\
341 Submitted: {submitted:#?}"
342 )]
343 IncorrectOutcome {
344 computed: Box<BlockExecutionOutcome>,
345 submitted: Box<BlockExecutionOutcome>,
346 },
347 #[error(
348 "Block timestamp ({block_timestamp}) is further in the future from local time \
349 ({local_time}) than block time grace period ({block_time_grace_period:?}) \
350 [us:{block_timestamp_us}:{local_time_us}]",
351 block_timestamp_us = block_timestamp.micros(),
352 local_time_us = local_time.micros(),
353 )]
354 InvalidTimestamp {
355 block_timestamp: Timestamp,
356 local_time: Timestamp,
357 block_time_grace_period: Duration,
358 },
359 #[error("We don't have the value for the certificate.")]
360 MissingCertificateValue,
361 #[error("The hash certificate doesn't match its value.")]
362 InvalidLiteCertificate,
363 #[error("Fast blocks cannot query oracles")]
364 FastBlockUsingOracles,
365 #[error("Blobs not found: {0:?}")]
366 BlobsNotFound(Vec<BlobId>),
367 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
368 ConfirmedLogEntryNotFound {
369 height: BlockHeight,
370 chain_id: ChainId,
371 },
372 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
373 PreprocessedBlocksEntryNotFound {
374 height: BlockHeight,
375 chain_id: ChainId,
376 },
377 #[error("The block proposal is invalid: {0}")]
378 InvalidBlockProposal(String),
379 #[error("Blob was not required by any pending block")]
380 UnexpectedBlob,
381 #[error("Number of published blobs per block must not exceed {0}")]
382 TooManyPublishedBlobs(u64),
383 #[error("Missing network description")]
384 MissingNetworkDescription,
385 #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
386 ChainActorSendError {
387 chain_id: ChainId,
388 error: Box<dyn DynError>,
389 },
390 #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
391 ChainActorRecvError {
392 chain_id: ChainId,
393 error: Box<dyn DynError>,
394 },
395
396 #[error("thread error: {0}")]
397 Thread(#[from] web_thread_pool::Error),
398
399 #[error("Fallback mode is not available on this network")]
400 NoFallbackMode,
401}
402
403impl WorkerError {
404 pub fn is_local(&self) -> bool {
408 match self {
409 WorkerError::CryptoError(_)
410 | WorkerError::ArithmeticError(_)
411 | WorkerError::InvalidOwner
412 | WorkerError::InvalidSigner(_)
413 | WorkerError::UnexpectedBlockHeight { .. }
414 | WorkerError::InvalidEpoch { .. }
415 | WorkerError::EventsNotFound(_)
416 | WorkerError::InvalidBlockChaining
417 | WorkerError::IncorrectOutcome { .. }
418 | WorkerError::InvalidTimestamp { .. }
419 | WorkerError::MissingCertificateValue
420 | WorkerError::InvalidLiteCertificate
421 | WorkerError::FastBlockUsingOracles
422 | WorkerError::BlobsNotFound(_)
423 | WorkerError::InvalidBlockProposal(_)
424 | WorkerError::UnexpectedBlob
425 | WorkerError::TooManyPublishedBlobs(_)
426 | WorkerError::NoFallbackMode
427 | WorkerError::ViewError(ViewError::NotFound(_)) => false,
428 WorkerError::BcsError(_)
429 | WorkerError::InvalidCrossChainRequest
430 | WorkerError::ViewError(_)
431 | WorkerError::ConfirmedLogEntryNotFound { .. }
432 | WorkerError::PreprocessedBlocksEntryNotFound { .. }
433 | WorkerError::MissingNetworkDescription
434 | WorkerError::ChainActorSendError { .. }
435 | WorkerError::ChainActorRecvError { .. }
436 | WorkerError::Thread(_)
437 | WorkerError::ReadCertificatesError(_) => true,
438 WorkerError::ChainError(chain_error) => chain_error.is_local(),
439 }
440 }
441}
442
443impl From<ChainError> for WorkerError {
444 #[instrument(level = "trace", skip(chain_error))]
445 fn from(chain_error: ChainError) -> Self {
446 match chain_error {
447 ChainError::ExecutionError(execution_error, context) => match *execution_error {
448 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
449 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
450 _ => Self::ChainError(Box::new(ChainError::ExecutionError(
451 execution_error,
452 context,
453 ))),
454 },
455 error => Self::ChainError(Box::new(error)),
456 }
457 }
458}
459
460#[cfg(with_testing)]
461impl WorkerError {
462 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
468 let WorkerError::ChainError(chain_error) = self else {
469 panic!("Expected an `ExecutionError`. Got: {self:#?}");
470 };
471
472 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
473 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
474 };
475
476 assert_eq!(context, expected_context);
477
478 *execution_error
479 }
480}
481
482pub struct WorkerState<StorageClient>
484where
485 StorageClient: Storage,
486{
487 nickname: String,
489 storage: StorageClient,
491 chain_worker_config: ChainWorkerConfig,
493 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
494 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
495 chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
497 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
500 chain_worker_tasks: Arc<Mutex<JoinSet>>,
502 chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
504}
505
506impl<StorageClient> Clone for WorkerState<StorageClient>
507where
508 StorageClient: Storage + Clone,
509{
510 fn clone(&self) -> Self {
511 WorkerState {
512 nickname: self.nickname.clone(),
513 storage: self.storage.clone(),
514 chain_worker_config: self.chain_worker_config.clone(),
515 block_cache: self.block_cache.clone(),
516 execution_state_cache: self.execution_state_cache.clone(),
517 chain_modes: self.chain_modes.clone(),
518 delivery_notifiers: self.delivery_notifiers.clone(),
519 chain_worker_tasks: self.chain_worker_tasks.clone(),
520 chain_workers: self.chain_workers.clone(),
521 }
522 }
523}
524
525type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
527 ChainWorkerRequest<<StorageClient as Storage>::Context>,
528 tracing::Span,
529 Instant,
530)>;
531
532pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
533
534impl<StorageClient> WorkerState<StorageClient>
535where
536 StorageClient: Storage,
537{
538 #[instrument(level = "trace", skip(nickname, key_pair, storage))]
539 pub fn new(
540 nickname: String,
541 key_pair: Option<ValidatorSecretKey>,
542 storage: StorageClient,
543 ) -> Self {
544 WorkerState {
545 nickname,
546 storage,
547 chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
548 block_cache: Arc::new(ValueCache::new(BLOCK_CACHE_SIZE)),
549 execution_state_cache: Arc::new(ValueCache::new(EXECUTION_STATE_CACHE_SIZE)),
550 chain_modes: None,
551 delivery_notifiers: Arc::default(),
552 chain_worker_tasks: Arc::default(),
553 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
554 }
555 }
556
557 #[instrument(level = "trace", skip(nickname, storage))]
558 pub fn new_for_client(
559 nickname: String,
560 storage: StorageClient,
561 chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
562 ) -> Self {
563 WorkerState {
564 nickname,
565 storage,
566 chain_worker_config: ChainWorkerConfig::default(),
567 block_cache: Arc::new(ValueCache::new(BLOCK_CACHE_SIZE)),
568 execution_state_cache: Arc::new(ValueCache::new(EXECUTION_STATE_CACHE_SIZE)),
569 chain_modes: Some(chain_modes),
570 delivery_notifiers: Arc::default(),
571 chain_worker_tasks: Arc::default(),
572 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
573 }
574 }
575
576 #[instrument(level = "trace", skip(self, value))]
577 pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
578 self.chain_worker_config.allow_inactive_chains = value;
579 self
580 }
581
582 #[instrument(level = "trace", skip(self, value))]
583 pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
584 self.chain_worker_config
585 .allow_messages_from_deprecated_epochs = value;
586 self
587 }
588
589 #[instrument(level = "trace", skip(self, value))]
590 pub fn with_long_lived_services(mut self, value: bool) -> Self {
591 self.chain_worker_config.long_lived_services = value;
592 self
593 }
594
595 #[instrument(level = "trace", skip(self))]
600 pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self {
601 self.chain_worker_config.block_time_grace_period = block_time_grace_period;
602 self
603 }
604
605 #[instrument(level = "trace", skip(self))]
609 pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
610 self.chain_worker_config.ttl = chain_worker_ttl;
611 self
612 }
613
614 #[instrument(level = "trace", skip(self))]
618 pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
619 self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
620 self
621 }
622
623 #[instrument(level = "trace", skip(self))]
627 pub fn with_chain_info_max_received_log_entries(
628 mut self,
629 chain_info_max_received_log_entries: usize,
630 ) -> Self {
631 if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
632 warn!(
633 "The value set for the maximum size of received_log entries \
634 may not be compatible with the latest clients: {} instead of {}",
635 chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
636 );
637 }
638 self.chain_worker_config.chain_info_max_received_log_entries =
639 chain_info_max_received_log_entries;
640 self
641 }
642
643 #[instrument(level = "trace", skip(self))]
644 pub fn nickname(&self) -> &str {
645 &self.nickname
646 }
647
648 #[instrument(level = "trace", skip(self))]
650 #[cfg(not(feature = "test"))]
651 pub(crate) fn storage_client(&self) -> &StorageClient {
652 &self.storage
653 }
654
655 #[instrument(level = "trace", skip(self))]
658 #[cfg(feature = "test")]
659 pub fn storage_client(&self) -> &StorageClient {
660 &self.storage
661 }
662
663 #[instrument(level = "trace", skip(self, certificate))]
664 pub(crate) async fn full_certificate(
665 &self,
666 certificate: LiteCertificate<'_>,
667 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
668 let block = self
669 .block_cache
670 .get(&certificate.value.value_hash)
671 .ok_or(WorkerError::MissingCertificateValue)?;
672
673 match certificate.value.kind {
674 linera_chain::types::CertificateKind::Confirmed => {
675 let value = ConfirmedBlock::from_hashed(block);
676 Ok(Either::Left(
677 certificate
678 .with_value(value)
679 .ok_or(WorkerError::InvalidLiteCertificate)?,
680 ))
681 }
682 linera_chain::types::CertificateKind::Validated => {
683 let value = ValidatedBlock::from_hashed(block);
684 Ok(Either::Right(
685 certificate
686 .with_value(value)
687 .ok_or(WorkerError::InvalidLiteCertificate)?,
688 ))
689 }
690 _ => Err(WorkerError::InvalidLiteCertificate),
691 }
692 }
693}
694
695#[allow(async_fn_in_trait)]
696#[cfg_attr(not(web), trait_variant::make(Send))]
697pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
698 async fn process_certificate<S: Storage + Clone + 'static>(
699 worker: &WorkerState<S>,
700 certificate: GenericCertificate<Self>,
701 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
702}
703
704impl ProcessableCertificate for ConfirmedBlock {
705 async fn process_certificate<S: Storage + Clone + 'static>(
706 worker: &WorkerState<S>,
707 certificate: ConfirmedBlockCertificate,
708 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
709 Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
710 }
711}
712
713impl ProcessableCertificate for ValidatedBlock {
714 async fn process_certificate<S: Storage + Clone + 'static>(
715 worker: &WorkerState<S>,
716 certificate: ValidatedBlockCertificate,
717 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
718 Box::pin(worker.handle_validated_certificate(certificate)).await
719 }
720}
721
722impl ProcessableCertificate for Timeout {
723 async fn process_certificate<S: Storage + Clone + 'static>(
724 worker: &WorkerState<S>,
725 certificate: TimeoutCertificate,
726 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
727 worker.handle_timeout_certificate(certificate).await
728 }
729}
730
731impl<StorageClient> WorkerState<StorageClient>
732where
733 StorageClient: Storage + Clone + 'static,
734{
735 #[instrument(level = "trace", skip(self, certificate, notifier))]
736 #[inline]
737 pub async fn fully_handle_certificate_with_notifications<T>(
738 &self,
739 certificate: GenericCertificate<T>,
740 notifier: &impl Notifier,
741 ) -> Result<ChainInfoResponse, WorkerError>
742 where
743 T: ProcessableCertificate,
744 {
745 let notifications = (*notifier).clone();
746 let this = self.clone();
747 linera_base::Task::spawn(async move {
748 let (response, actions) =
749 ProcessableCertificate::process_certificate(&this, certificate).await?;
750 notifications.notify(&actions.notifications);
751 let mut requests = VecDeque::from(actions.cross_chain_requests);
752 while let Some(request) = requests.pop_front() {
753 let actions = this.handle_cross_chain_request(request).await?;
754 requests.extend(actions.cross_chain_requests);
755 notifications.notify(&actions.notifications);
756 }
757 Ok(response)
758 })
759 .await
760 }
761
762 #[instrument(level = "trace", skip(self, block))]
764 pub async fn stage_block_execution(
765 &self,
766 block: ProposedBlock,
767 round: Option<u32>,
768 published_blobs: Vec<Blob>,
769 ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> {
770 self.query_chain_worker(block.chain_id, move |callback| {
771 ChainWorkerRequest::StageBlockExecution {
772 block,
773 round,
774 published_blobs,
775 callback,
776 }
777 })
778 .await
779 }
780
781 #[instrument(level = "trace", skip(self, block))]
786 pub async fn stage_block_execution_with_policy(
787 &self,
788 block: ProposedBlock,
789 round: Option<u32>,
790 published_blobs: Vec<Blob>,
791 policy: BundleExecutionPolicy,
792 ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> {
793 self.query_chain_worker(block.chain_id, move |callback| {
794 ChainWorkerRequest::StageBlockExecutionWithPolicy {
795 block,
796 round,
797 published_blobs,
798 policy,
799 callback,
800 }
801 })
802 .await
803 }
804
805 #[instrument(level = "trace", skip(self, chain_id, query))]
810 pub async fn query_application(
811 &self,
812 chain_id: ChainId,
813 query: Query,
814 block_hash: Option<CryptoHash>,
815 ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
816 self.query_chain_worker(chain_id, move |callback| {
817 ChainWorkerRequest::QueryApplication {
818 query,
819 block_hash,
820 callback,
821 }
822 })
823 .await
824 }
825
826 #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
827 nickname = %self.nickname,
828 chain_id = %chain_id,
829 application_id = %application_id
830 ))]
831 pub async fn describe_application(
832 &self,
833 chain_id: ChainId,
834 application_id: ApplicationId,
835 ) -> Result<ApplicationDescription, WorkerError> {
836 self.query_chain_worker(chain_id, move |callback| {
837 ChainWorkerRequest::DescribeApplication {
838 application_id,
839 callback,
840 }
841 })
842 .await
843 }
844
845 #[instrument(
847 level = "trace",
848 skip(self, certificate, notify_when_messages_are_delivered),
849 fields(
850 nickname = %self.nickname,
851 chain_id = %certificate.block().header.chain_id,
852 block_height = %certificate.block().header.height
853 )
854 )]
855 async fn process_confirmed_block(
856 &self,
857 certificate: ConfirmedBlockCertificate,
858 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
859 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
860 let chain_id = certificate.block().header.chain_id;
861 self.query_chain_worker(chain_id, move |callback| {
862 ChainWorkerRequest::ProcessConfirmedBlock {
863 certificate,
864 notify_when_messages_are_delivered,
865 callback,
866 }
867 })
868 .await
869 }
870
871 #[instrument(level = "trace", skip(self, certificate), fields(
873 nickname = %self.nickname,
874 chain_id = %certificate.block().header.chain_id,
875 block_height = %certificate.block().header.height
876 ))]
877 async fn process_validated_block(
878 &self,
879 certificate: ValidatedBlockCertificate,
880 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
881 let chain_id = certificate.block().header.chain_id;
882 self.query_chain_worker(chain_id, move |callback| {
883 ChainWorkerRequest::ProcessValidatedBlock {
884 certificate,
885 callback,
886 }
887 })
888 .await
889 }
890
891 #[instrument(level = "trace", skip(self, certificate), fields(
893 nickname = %self.nickname,
894 chain_id = %certificate.value().chain_id(),
895 height = %certificate.value().height()
896 ))]
897 async fn process_timeout(
898 &self,
899 certificate: TimeoutCertificate,
900 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
901 let chain_id = certificate.value().chain_id();
902 self.query_chain_worker(chain_id, move |callback| {
903 ChainWorkerRequest::ProcessTimeout {
904 certificate,
905 callback,
906 }
907 })
908 .await
909 }
910
911 #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
912 nickname = %self.nickname,
913 origin = %origin,
914 recipient = %recipient,
915 num_bundles = %bundles.len()
916 ))]
917 async fn process_cross_chain_update(
918 &self,
919 origin: ChainId,
920 recipient: ChainId,
921 bundles: Vec<(Epoch, MessageBundle)>,
922 ) -> Result<Option<BlockHeight>, WorkerError> {
923 self.query_chain_worker(recipient, move |callback| {
924 ChainWorkerRequest::ProcessCrossChainUpdate {
925 origin,
926 bundles,
927 callback,
928 }
929 })
930 .await
931 }
932
933 #[instrument(level = "trace", skip(self, chain_id, height), fields(
935 nickname = %self.nickname,
936 chain_id = %chain_id,
937 height = %height
938 ))]
939 #[cfg(with_testing)]
940 pub async fn read_certificate(
941 &self,
942 chain_id: ChainId,
943 height: BlockHeight,
944 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
945 self.query_chain_worker(chain_id, move |callback| {
946 ChainWorkerRequest::ReadCertificate { height, callback }
947 })
948 .await
949 }
950
951 #[instrument(level = "trace", skip(self), fields(
957 nickname = %self.nickname,
958 chain_id = %chain_id
959 ))]
960 pub async fn chain_state_view(
961 &self,
962 chain_id: ChainId,
963 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
964 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
965 callback,
966 })
967 .await
968 }
969
970 #[instrument(level = "trace", skip(self, request_builder), fields(
972 nickname = %self.nickname,
973 chain_id = %chain_id
974 ))]
975 async fn query_chain_worker<Response>(
976 &self,
977 chain_id: ChainId,
978 request_builder: impl FnOnce(
979 oneshot::Sender<Result<Response, WorkerError>>,
980 ) -> ChainWorkerRequest<StorageClient::Context>,
981 ) -> Result<Response, WorkerError> {
982 let (callback, response) = oneshot::channel();
984 let request = request_builder(callback);
985
986 let new_channel = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
988
989 if let Some((sender, receiver)) = new_channel {
991 let delivery_notifier = self
992 .delivery_notifiers
993 .lock()
994 .unwrap()
995 .entry(chain_id)
996 .or_default()
997 .clone();
998
999 let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
1000 chain_modes
1001 .read()
1002 .unwrap()
1003 .get(&chain_id)
1004 .is_some_and(ListeningMode::is_full)
1005 });
1006
1007 let actor_task = ChainWorkerActor::run(
1008 self.chain_worker_config.clone(),
1009 self.storage.clone(),
1010 self.block_cache.clone(),
1011 self.execution_state_cache.clone(),
1012 self.chain_modes.clone(),
1013 delivery_notifier,
1014 chain_id,
1015 sender,
1016 receiver,
1017 is_tracked,
1018 );
1019
1020 self.chain_worker_tasks
1021 .lock()
1022 .unwrap()
1023 .spawn_task(actor_task);
1024 }
1025
1026 match response.await {
1028 Err(e) => {
1029 Err(WorkerError::ChainActorRecvError {
1031 chain_id,
1032 error: Box::new(e),
1033 })
1034 }
1035 Ok(response) => response,
1036 }
1037 }
1038
1039 #[instrument(level = "trace", skip(self), fields(
1044 nickname = %self.nickname,
1045 chain_id = %chain_id
1046 ))]
1047 #[expect(clippy::type_complexity)]
1048 fn call_and_maybe_create_chain_worker_endpoint(
1049 &self,
1050 chain_id: ChainId,
1051 request: ChainWorkerRequest<StorageClient::Context>,
1052 ) -> Result<
1053 Option<(
1054 ChainWorkerRequestSender<StorageClient::Context>,
1055 ChainWorkerRequestReceiver<StorageClient::Context>,
1056 )>,
1057 WorkerError,
1058 > {
1059 let mut chain_workers = self.chain_workers.lock().unwrap();
1060
1061 let (sender, new_channel) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
1062 (endpoint, None)
1063 } else {
1064 let (sender, receiver) = mpsc::unbounded_channel();
1065 (sender.clone(), Some((sender, receiver)))
1066 };
1067
1068 if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) {
1069 return Err(WorkerError::ChainActorSendError {
1071 chain_id,
1072 error: Box::new(e),
1073 });
1074 }
1075
1076 chain_workers.insert(chain_id, sender);
1078 #[cfg(with_metrics)]
1079 metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64);
1080
1081 Ok(new_channel)
1082 }
1083
1084 #[instrument(skip_all, fields(
1085 nick = self.nickname,
1086 chain_id = format!("{:.8}", proposal.content.block.chain_id),
1087 height = %proposal.content.block.height,
1088 ))]
1089 pub async fn handle_block_proposal(
1090 &self,
1091 proposal: BlockProposal,
1092 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1093 trace!("{} <-- {:?}", self.nickname, proposal);
1094 #[cfg(with_metrics)]
1095 let round = proposal.content.round;
1096 let response = self
1097 .query_chain_worker(proposal.content.block.chain_id, move |callback| {
1098 ChainWorkerRequest::HandleBlockProposal { proposal, callback }
1099 })
1100 .await?;
1101 #[cfg(with_metrics)]
1102 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1103 .with_label_values(&[round.type_name()])
1104 .observe(round.number() as f64);
1105 Ok(response)
1106 }
1107
1108 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
1111 pub async fn handle_lite_certificate(
1112 &self,
1113 certificate: LiteCertificate<'_>,
1114 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1115 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1116 match self.full_certificate(certificate).await? {
1117 Either::Left(confirmed) => {
1118 Box::pin(
1119 self.handle_confirmed_certificate(
1120 confirmed,
1121 notify_when_messages_are_delivered,
1122 ),
1123 )
1124 .await
1125 }
1126 Either::Right(validated) => {
1127 if let Some(notifier) = notify_when_messages_are_delivered {
1128 if let Err(()) = notifier.send(()) {
1130 warn!("Failed to notify message delivery to caller");
1131 }
1132 }
1133 Box::pin(self.handle_validated_certificate(validated)).await
1134 }
1135 }
1136 }
1137
1138 #[instrument(skip_all, fields(
1140 nick = self.nickname,
1141 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1142 height = %certificate.block().header.height,
1143 ))]
1144 pub async fn handle_confirmed_certificate(
1145 &self,
1146 certificate: ConfirmedBlockCertificate,
1147 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1148 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1149 trace!("{} <-- {:?}", self.nickname, certificate);
1150 #[cfg(with_metrics)]
1151 let metrics_data = metrics::MetricsData::new(&certificate);
1152
1153 let (info, actions, _outcome) =
1154 Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1155 .await?;
1156
1157 #[cfg(with_metrics)]
1158 if matches!(_outcome, BlockOutcome::Processed) {
1159 metrics_data.record();
1160 }
1161 Ok((info, actions))
1162 }
1163
1164 #[instrument(skip_all, fields(
1166 nick = self.nickname,
1167 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1168 height = %certificate.block().header.height,
1169 ))]
1170 pub async fn handle_validated_certificate(
1171 &self,
1172 certificate: ValidatedBlockCertificate,
1173 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1174 trace!("{} <-- {:?}", self.nickname, certificate);
1175
1176 #[cfg(with_metrics)]
1177 let round = certificate.round;
1178 #[cfg(with_metrics)]
1179 let cert_str = certificate.inner().to_log_str();
1180
1181 let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1182 #[cfg(with_metrics)]
1183 {
1184 if matches!(_outcome, BlockOutcome::Processed) {
1185 metrics::NUM_ROUNDS_IN_CERTIFICATE
1186 .with_label_values(&[cert_str, round.type_name()])
1187 .observe(round.number() as f64);
1188 }
1189 }
1190 Ok((info, actions))
1191 }
1192
1193 #[instrument(skip_all, fields(
1195 nick = self.nickname,
1196 chain_id = format!("{:.8}", certificate.inner().chain_id()),
1197 height = %certificate.inner().height(),
1198 ))]
1199 pub async fn handle_timeout_certificate(
1200 &self,
1201 certificate: TimeoutCertificate,
1202 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1203 trace!("{} <-- {:?}", self.nickname, certificate);
1204 self.process_timeout(certificate).await
1205 }
1206
1207 #[instrument(skip_all, fields(
1208 nick = self.nickname,
1209 chain_id = format!("{:.8}", query.chain_id)
1210 ))]
1211 pub async fn handle_chain_info_query(
1212 &self,
1213 query: ChainInfoQuery,
1214 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1215 trace!("{} <-- {:?}", self.nickname, query);
1216 #[cfg(with_metrics)]
1217 metrics::CHAIN_INFO_QUERIES.inc();
1218 let result = self
1219 .query_chain_worker(query.chain_id, move |callback| {
1220 ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1221 })
1222 .await;
1223 trace!("{} --> {:?}", self.nickname, result);
1224 result
1225 }
1226
1227 #[instrument(skip_all, fields(
1228 nick = self.nickname,
1229 chain_id = format!("{:.8}", chain_id)
1230 ))]
1231 pub async fn download_pending_blob(
1232 &self,
1233 chain_id: ChainId,
1234 blob_id: BlobId,
1235 ) -> Result<Blob, WorkerError> {
1236 trace!(
1237 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1238 self.nickname
1239 );
1240 let result = self
1241 .query_chain_worker(chain_id, move |callback| {
1242 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1243 })
1244 .await;
1245 trace!(
1246 "{} --> {:?}",
1247 self.nickname,
1248 result.as_ref().map(|_| blob_id)
1249 );
1250 result
1251 }
1252
1253 #[instrument(skip_all, fields(
1254 nick = self.nickname,
1255 chain_id = format!("{:.8}", chain_id)
1256 ))]
1257 pub async fn handle_pending_blob(
1258 &self,
1259 chain_id: ChainId,
1260 blob: Blob,
1261 ) -> Result<ChainInfoResponse, WorkerError> {
1262 let blob_id = blob.id();
1263 trace!(
1264 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1265 self.nickname
1266 );
1267 let result = self
1268 .query_chain_worker(chain_id, move |callback| {
1269 ChainWorkerRequest::HandlePendingBlob { blob, callback }
1270 })
1271 .await;
1272 trace!(
1273 "{} --> {:?}",
1274 self.nickname,
1275 result.as_ref().map(|_| blob_id)
1276 );
1277 result
1278 }
1279
1280 #[instrument(skip_all, fields(
1281 nick = self.nickname,
1282 chain_id = format!("{:.8}", request.target_chain_id())
1283 ))]
1284 pub async fn handle_cross_chain_request(
1285 &self,
1286 request: CrossChainRequest,
1287 ) -> Result<NetworkActions, WorkerError> {
1288 trace!("{} <-- {:?}", self.nickname, request);
1289 match request {
1290 CrossChainRequest::UpdateRecipient {
1291 sender,
1292 recipient,
1293 bundles,
1294 } => {
1295 let mut actions = NetworkActions::default();
1296 let origin = sender;
1297 let Some(height) = self
1298 .process_cross_chain_update(origin, recipient, bundles)
1299 .await?
1300 else {
1301 return Ok(actions);
1302 };
1303 actions.notifications.push(Notification {
1304 chain_id: recipient,
1305 reason: Reason::NewIncomingBundle { origin, height },
1306 });
1307 actions
1308 .cross_chain_requests
1309 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1310 sender,
1311 recipient,
1312 latest_height: height,
1313 });
1314 Ok(actions)
1315 }
1316 CrossChainRequest::ConfirmUpdatedRecipient {
1317 sender,
1318 recipient,
1319 latest_height,
1320 } => {
1321 self.query_chain_worker(sender, move |callback| {
1322 ChainWorkerRequest::ConfirmUpdatedRecipient {
1323 recipient,
1324 latest_height,
1325 callback,
1326 }
1327 })
1328 .await?;
1329 Ok(NetworkActions::default())
1330 }
1331 }
1332 }
1333
1334 #[instrument(skip_all, fields(
1336 nickname = %self.nickname,
1337 chain_id = %chain_id,
1338 num_trackers = %new_trackers.len()
1339 ))]
1340 pub async fn update_received_certificate_trackers(
1341 &self,
1342 chain_id: ChainId,
1343 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1344 ) -> Result<(), WorkerError> {
1345 self.query_chain_worker(chain_id, move |callback| {
1346 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1347 new_trackers,
1348 callback,
1349 }
1350 })
1351 .await
1352 }
1353
1354 #[instrument(skip_all, fields(
1356 nickname = %self.nickname,
1357 chain_id = %chain_id,
1358 start = %start,
1359 end = %end
1360 ))]
1361 pub async fn get_preprocessed_block_hashes(
1362 &self,
1363 chain_id: ChainId,
1364 start: BlockHeight,
1365 end: BlockHeight,
1366 ) -> Result<Vec<CryptoHash>, WorkerError> {
1367 self.query_chain_worker(chain_id, move |callback| {
1368 ChainWorkerRequest::GetPreprocessedBlockHashes {
1369 start,
1370 end,
1371 callback,
1372 }
1373 })
1374 .await
1375 }
1376
1377 #[instrument(skip_all, fields(
1379 nickname = %self.nickname,
1380 chain_id = %chain_id,
1381 origin = %origin
1382 ))]
1383 pub async fn get_inbox_next_height(
1384 &self,
1385 chain_id: ChainId,
1386 origin: ChainId,
1387 ) -> Result<BlockHeight, WorkerError> {
1388 self.query_chain_worker(chain_id, move |callback| {
1389 ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1390 })
1391 .await
1392 }
1393
1394 #[instrument(skip_all, fields(
1397 nickname = %self.nickname,
1398 chain_id = %chain_id,
1399 num_blob_ids = %blob_ids.len()
1400 ))]
1401 pub async fn get_locking_blobs(
1402 &self,
1403 chain_id: ChainId,
1404 blob_ids: Vec<BlobId>,
1405 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1406 self.query_chain_worker(chain_id, move |callback| {
1407 ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1408 })
1409 .await
1410 }
1411
1412 pub async fn get_block_hashes(
1414 &self,
1415 chain_id: ChainId,
1416 heights: Vec<BlockHeight>,
1417 ) -> Result<Vec<CryptoHash>, WorkerError> {
1418 self.query_chain_worker(chain_id, move |callback| {
1419 ChainWorkerRequest::GetBlockHashes { heights, callback }
1420 })
1421 .await
1422 }
1423
1424 pub async fn get_proposed_blobs(
1426 &self,
1427 chain_id: ChainId,
1428 blob_ids: Vec<BlobId>,
1429 ) -> Result<Vec<Blob>, WorkerError> {
1430 self.query_chain_worker(chain_id, move |callback| {
1431 ChainWorkerRequest::GetProposedBlobs { blob_ids, callback }
1432 })
1433 .await
1434 }
1435
1436 pub async fn get_event_subscriptions(
1438 &self,
1439 chain_id: ChainId,
1440 ) -> Result<EventSubscriptionsResult, WorkerError> {
1441 self.query_chain_worker(chain_id, |callback| {
1442 ChainWorkerRequest::GetEventSubscriptions { callback }
1443 })
1444 .await
1445 }
1446
1447 pub async fn get_stream_event_count(
1449 &self,
1450 chain_id: ChainId,
1451 stream_id: StreamId,
1452 ) -> Result<Option<u32>, WorkerError> {
1453 self.query_chain_worker(chain_id, move |callback| {
1454 ChainWorkerRequest::GetStreamEventCount {
1455 stream_id,
1456 callback,
1457 }
1458 })
1459 .await
1460 }
1461
1462 pub async fn get_received_certificate_trackers(
1464 &self,
1465 chain_id: ChainId,
1466 ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1467 self.query_chain_worker(chain_id, |callback| {
1468 ChainWorkerRequest::GetReceivedCertificateTrackers { callback }
1469 })
1470 .await
1471 }
1472
1473 pub async fn get_tip_state_and_outbox_info(
1475 &self,
1476 chain_id: ChainId,
1477 receiver_id: ChainId,
1478 ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1479 self.query_chain_worker(chain_id, move |callback| {
1480 ChainWorkerRequest::GetTipStateAndOutboxInfo {
1481 receiver_id,
1482 callback,
1483 }
1484 })
1485 .await
1486 }
1487
1488 pub async fn get_next_height_to_preprocess(
1490 &self,
1491 chain_id: ChainId,
1492 ) -> Result<BlockHeight, WorkerError> {
1493 self.query_chain_worker(chain_id, |callback| {
1494 ChainWorkerRequest::GetNextHeightToPreprocess { callback }
1495 })
1496 .await
1497 }
1498
1499 pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, WorkerError> {
1501 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetManagerSeed {
1502 callback,
1503 })
1504 .await
1505 }
1506}
1507
1508#[cfg(with_testing)]
1509impl<StorageClient> WorkerState<StorageClient>
1510where
1511 StorageClient: Storage,
1512{
1513 #[instrument(level = "trace", skip(self))]
1519 pub fn public_key(&self) -> ValidatorPublicKey {
1520 self.chain_worker_config
1521 .key_pair()
1522 .expect(
1523 "Test validator should have a key pair assigned to it \
1524 in order to obtain its public key",
1525 )
1526 .public()
1527 }
1528}