1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14 data_types::{
15 ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp,
16 },
17 doc_scalar,
18 hashed::Hashed,
19 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
20 time::Instant,
21 util::traits::DynError,
22};
23#[cfg(with_testing)]
24use linera_chain::ChainExecutionContext;
25use linera_chain::{
26 data_types::{
27 BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock,
28 },
29 types::{
30 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
31 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
32 },
33 ChainError, ChainStateView,
34};
35use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
36use linera_storage::Storage;
37use linera_views::{context::InactiveContext, ViewError};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
41use tracing::{error, instrument, trace, warn};
42
43pub(crate) use crate::chain_worker::{
45 ChainWorkerRequestReceiver, ChainWorkerRequestSender, EventSubscriptionsResult,
46};
47use crate::{
48 chain_worker::{
49 BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
50 },
51 client::ListeningMode,
52 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
53 join_set_ext::{JoinSet, JoinSetExt},
54 notifier::Notifier,
55 value_cache::ValueCache,
56 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
57};
58
59const BLOCK_CACHE_SIZE: usize = 5_000;
60const EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
61
62#[cfg(test)]
63#[path = "unit_tests/worker_tests.rs"]
64mod worker_tests;
65
66#[cfg(with_metrics)]
67mod metrics {
68 use std::sync::LazyLock;
69
70 use linera_base::prometheus_util::{
71 exponential_bucket_interval, register_histogram, register_histogram_vec,
72 register_int_counter, register_int_counter_vec, register_int_gauge,
73 };
74 use linera_chain::types::ConfirmedBlockCertificate;
75 use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge};
76
77 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
78 register_histogram_vec(
79 "num_rounds_in_certificate",
80 "Number of rounds in certificate",
81 &["certificate_value", "round_type"],
82 exponential_bucket_interval(0.1, 50.0),
83 )
84 });
85
86 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
87 register_histogram_vec(
88 "num_rounds_in_block_proposal",
89 "Number of rounds in block proposal",
90 &["round_type"],
91 exponential_bucket_interval(0.1, 50.0),
92 )
93 });
94
95 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
96 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
97
98 pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
99 LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
100
101 pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
102 LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
103
104 pub static OPERATION_COUNT: LazyLock<IntCounter> =
105 LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
106
107 pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
108 register_histogram(
109 "operations_per_block",
110 "Number of operations per block",
111 exponential_bucket_interval(1.0, 10000.0),
112 )
113 });
114
115 pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
116 register_histogram(
117 "incoming_bundles_per_block",
118 "Number of incoming bundles per block",
119 exponential_bucket_interval(1.0, 10000.0),
120 )
121 });
122
123 pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
124 register_histogram(
125 "transactions_per_block",
126 "Number of transactions per block",
127 exponential_bucket_interval(1.0, 10000.0),
128 )
129 });
130
131 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
132 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
133 });
134
135 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
136 register_int_counter_vec(
137 "certificates_signed",
138 "Number of confirmed block certificates signed by each validator",
139 &["validator_name"],
140 )
141 });
142
143 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
144 register_int_counter(
145 "chain_info_queries",
146 "Number of chain info queries processed",
147 )
148 });
149
150 pub static CHAIN_WORKER_ENDPOINTS_CACHED: LazyLock<IntGauge> = LazyLock::new(|| {
152 register_int_gauge(
153 "chain_worker_endpoints_cached",
154 "Number of cached chain worker channel endpoints",
155 )
156 });
157
158 pub struct MetricsData {
160 certificate_log_str: &'static str,
161 round_type: &'static str,
162 round_number: u32,
163 confirmed_transactions: u64,
164 confirmed_incoming_bundles: u64,
165 confirmed_incoming_messages: u64,
166 confirmed_operations: u64,
167 validators_with_signatures: Vec<String>,
168 }
169
170 impl MetricsData {
171 pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
173 Self {
174 certificate_log_str: certificate.inner().to_log_str(),
175 round_type: certificate.round.type_name(),
176 round_number: certificate.round.number(),
177 confirmed_transactions: certificate.block().body.transactions.len() as u64,
178 confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
179 as u64,
180 confirmed_incoming_messages: certificate
181 .block()
182 .body
183 .incoming_bundles()
184 .map(|b| b.messages().count())
185 .sum::<usize>() as u64,
186 confirmed_operations: certificate.block().body.operations().count() as u64,
187 validators_with_signatures: certificate
188 .signatures()
189 .iter()
190 .map(|(validator_name, _)| validator_name.to_string())
191 .collect(),
192 }
193 }
194
195 pub fn record(self) {
197 NUM_BLOCKS.with_label_values(&[]).inc();
198 NUM_ROUNDS_IN_CERTIFICATE
199 .with_label_values(&[self.certificate_log_str, self.round_type])
200 .observe(self.round_number as f64);
201 TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
202 INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
203 OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
204 if self.confirmed_transactions > 0 {
205 TRANSACTION_COUNT
206 .with_label_values(&[])
207 .inc_by(self.confirmed_transactions);
208 if self.confirmed_incoming_bundles > 0 {
209 INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
210 }
211 if self.confirmed_incoming_messages > 0 {
212 INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
213 }
214 if self.confirmed_operations > 0 {
215 OPERATION_COUNT.inc_by(self.confirmed_operations);
216 }
217 }
218
219 for validator_name in self.validators_with_signatures {
220 CERTIFICATES_SIGNED
221 .with_label_values(&[&validator_name])
222 .inc();
223 }
224 }
225 }
226}
227
228#[derive(Default, Debug)]
230pub struct NetworkActions {
231 pub cross_chain_requests: Vec<CrossChainRequest>,
233 pub notifications: Vec<Notification>,
235}
236
237impl NetworkActions {
238 pub fn extend(&mut self, other: NetworkActions) {
239 self.cross_chain_requests.extend(other.cross_chain_requests);
240 self.notifications.extend(other.notifications);
241 }
242}
243
244#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
245pub struct Notification {
247 pub chain_id: ChainId,
248 pub reason: Reason,
249}
250
251doc_scalar!(
252 Notification,
253 "Notify that a chain has a new certified block or a new message"
254);
255
256#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
257pub enum Reason {
259 NewBlock {
260 height: BlockHeight,
261 hash: CryptoHash,
262 event_streams: BTreeSet<StreamId>,
263 },
264 NewIncomingBundle {
265 origin: ChainId,
266 height: BlockHeight,
267 },
268 NewRound {
269 height: BlockHeight,
270 round: Round,
271 },
272 BlockExecuted {
273 height: BlockHeight,
274 hash: CryptoHash,
275 },
276}
277
278#[derive(Debug, Error)]
280pub enum WorkerError {
281 #[error(transparent)]
282 CryptoError(#[from] CryptoError),
283
284 #[error(transparent)]
285 ArithmeticError(#[from] ArithmeticError),
286
287 #[error(transparent)]
288 ViewError(#[from] ViewError),
289
290 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
291 ReadCertificatesError(Vec<CryptoHash>),
292
293 #[error(transparent)]
294 ChainError(#[from] Box<ChainError>),
295
296 #[error(transparent)]
297 BcsError(#[from] bcs::Error),
298
299 #[error("Block was not signed by an authorized owner")]
301 InvalidOwner,
302
303 #[error("Operations in the block are not authenticated by the proper signer: {0}")]
304 InvalidSigner(AccountOwner),
305
306 #[error(
308 "Chain is expecting a next block at height {expected_block_height} but the given block \
309 is at height {found_block_height} instead"
310 )]
311 UnexpectedBlockHeight {
312 expected_block_height: BlockHeight,
313 found_block_height: BlockHeight,
314 },
315 #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
316 InvalidEpoch {
317 chain_id: ChainId,
318 chain_epoch: Epoch,
319 epoch: Epoch,
320 },
321
322 #[error("Events not found: {0:?}")]
323 EventsNotFound(Vec<EventId>),
324
325 #[error("Invalid cross-chain request")]
327 InvalidCrossChainRequest,
328 #[error("The block does not contain the hash that we expected for the previous block")]
329 InvalidBlockChaining,
330 #[error(
331 "The given outcome is not what we computed after executing the block.\n\
332 Computed: {computed:#?}\n\
333 Submitted: {submitted:#?}"
334 )]
335 IncorrectOutcome {
336 computed: Box<BlockExecutionOutcome>,
337 submitted: Box<BlockExecutionOutcome>,
338 },
339 #[error(
340 "Block timestamp ({block_timestamp}) is further in the future from local time \
341 ({local_time}) than block time grace period ({block_time_grace_period:?}) \
342 [us:{block_timestamp_us}:{local_time_us}]",
343 block_timestamp_us = block_timestamp.micros(),
344 local_time_us = local_time.micros(),
345 )]
346 InvalidTimestamp {
347 block_timestamp: Timestamp,
348 local_time: Timestamp,
349 block_time_grace_period: Duration,
350 },
351 #[error("We don't have the value for the certificate.")]
352 MissingCertificateValue,
353 #[error("The hash certificate doesn't match its value.")]
354 InvalidLiteCertificate,
355 #[error("Fast blocks cannot query oracles")]
356 FastBlockUsingOracles,
357 #[error("Blobs not found: {0:?}")]
358 BlobsNotFound(Vec<BlobId>),
359 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
360 ConfirmedLogEntryNotFound {
361 height: BlockHeight,
362 chain_id: ChainId,
363 },
364 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
365 PreprocessedBlocksEntryNotFound {
366 height: BlockHeight,
367 chain_id: ChainId,
368 },
369 #[error("The block proposal is invalid: {0}")]
370 InvalidBlockProposal(String),
371 #[error("Blob was not required by any pending block")]
372 UnexpectedBlob,
373 #[error("Number of published blobs per block must not exceed {0}")]
374 TooManyPublishedBlobs(u64),
375 #[error("Missing network description")]
376 MissingNetworkDescription,
377 #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
378 ChainActorSendError {
379 chain_id: ChainId,
380 error: Box<dyn DynError>,
381 },
382 #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
383 ChainActorRecvError {
384 chain_id: ChainId,
385 error: Box<dyn DynError>,
386 },
387
388 #[error("thread error: {0}")]
389 Thread(#[from] web_thread_pool::Error),
390
391 #[error("Fallback mode is not available on this network")]
392 NoFallbackMode,
393}
394
395impl WorkerError {
396 pub fn is_local(&self) -> bool {
400 match self {
401 WorkerError::CryptoError(_)
402 | WorkerError::ArithmeticError(_)
403 | WorkerError::InvalidOwner
404 | WorkerError::InvalidSigner(_)
405 | WorkerError::UnexpectedBlockHeight { .. }
406 | WorkerError::InvalidEpoch { .. }
407 | WorkerError::EventsNotFound(_)
408 | WorkerError::InvalidBlockChaining
409 | WorkerError::IncorrectOutcome { .. }
410 | WorkerError::InvalidTimestamp { .. }
411 | WorkerError::MissingCertificateValue
412 | WorkerError::InvalidLiteCertificate
413 | WorkerError::FastBlockUsingOracles
414 | WorkerError::BlobsNotFound(_)
415 | WorkerError::InvalidBlockProposal(_)
416 | WorkerError::UnexpectedBlob
417 | WorkerError::TooManyPublishedBlobs(_)
418 | WorkerError::NoFallbackMode
419 | WorkerError::ViewError(ViewError::NotFound(_)) => false,
420 WorkerError::BcsError(_)
421 | WorkerError::InvalidCrossChainRequest
422 | WorkerError::ViewError(_)
423 | WorkerError::ConfirmedLogEntryNotFound { .. }
424 | WorkerError::PreprocessedBlocksEntryNotFound { .. }
425 | WorkerError::MissingNetworkDescription
426 | WorkerError::ChainActorSendError { .. }
427 | WorkerError::ChainActorRecvError { .. }
428 | WorkerError::Thread(_)
429 | WorkerError::ReadCertificatesError(_) => true,
430 WorkerError::ChainError(chain_error) => chain_error.is_local(),
431 }
432 }
433}
434
435impl From<ChainError> for WorkerError {
436 #[instrument(level = "trace", skip(chain_error))]
437 fn from(chain_error: ChainError) -> Self {
438 match chain_error {
439 ChainError::ExecutionError(execution_error, context) => match *execution_error {
440 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
441 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
442 _ => Self::ChainError(Box::new(ChainError::ExecutionError(
443 execution_error,
444 context,
445 ))),
446 },
447 error => Self::ChainError(Box::new(error)),
448 }
449 }
450}
451
452#[cfg(with_testing)]
453impl WorkerError {
454 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
460 let WorkerError::ChainError(chain_error) = self else {
461 panic!("Expected an `ExecutionError`. Got: {self:#?}");
462 };
463
464 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
465 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
466 };
467
468 assert_eq!(context, expected_context);
469
470 *execution_error
471 }
472}
473
474pub struct WorkerState<StorageClient>
476where
477 StorageClient: Storage,
478{
479 nickname: String,
481 storage: StorageClient,
483 chain_worker_config: ChainWorkerConfig,
485 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
486 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
487 chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
489 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
492 chain_worker_tasks: Arc<Mutex<JoinSet>>,
494 chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
496}
497
498impl<StorageClient> Clone for WorkerState<StorageClient>
499where
500 StorageClient: Storage + Clone,
501{
502 fn clone(&self) -> Self {
503 WorkerState {
504 nickname: self.nickname.clone(),
505 storage: self.storage.clone(),
506 chain_worker_config: self.chain_worker_config.clone(),
507 block_cache: self.block_cache.clone(),
508 execution_state_cache: self.execution_state_cache.clone(),
509 chain_modes: self.chain_modes.clone(),
510 delivery_notifiers: self.delivery_notifiers.clone(),
511 chain_worker_tasks: self.chain_worker_tasks.clone(),
512 chain_workers: self.chain_workers.clone(),
513 }
514 }
515}
516
517type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
519 ChainWorkerRequest<<StorageClient as Storage>::Context>,
520 tracing::Span,
521 Instant,
522)>;
523
524pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
525
526impl<StorageClient> WorkerState<StorageClient>
527where
528 StorageClient: Storage,
529{
530 #[instrument(level = "trace", skip(nickname, key_pair, storage))]
531 pub fn new(
532 nickname: String,
533 key_pair: Option<ValidatorSecretKey>,
534 storage: StorageClient,
535 ) -> Self {
536 WorkerState {
537 nickname,
538 storage,
539 chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
540 block_cache: Arc::new(ValueCache::new(BLOCK_CACHE_SIZE)),
541 execution_state_cache: Arc::new(ValueCache::new(EXECUTION_STATE_CACHE_SIZE)),
542 chain_modes: None,
543 delivery_notifiers: Arc::default(),
544 chain_worker_tasks: Arc::default(),
545 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
546 }
547 }
548
549 #[instrument(level = "trace", skip(nickname, storage))]
550 pub fn new_for_client(
551 nickname: String,
552 storage: StorageClient,
553 chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
554 ) -> Self {
555 WorkerState {
556 nickname,
557 storage,
558 chain_worker_config: ChainWorkerConfig::default(),
559 block_cache: Arc::new(ValueCache::new(BLOCK_CACHE_SIZE)),
560 execution_state_cache: Arc::new(ValueCache::new(EXECUTION_STATE_CACHE_SIZE)),
561 chain_modes: Some(chain_modes),
562 delivery_notifiers: Arc::default(),
563 chain_worker_tasks: Arc::default(),
564 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
565 }
566 }
567
568 #[instrument(level = "trace", skip(self, value))]
569 pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
570 self.chain_worker_config.allow_inactive_chains = value;
571 self
572 }
573
574 #[instrument(level = "trace", skip(self, value))]
575 pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
576 self.chain_worker_config
577 .allow_messages_from_deprecated_epochs = value;
578 self
579 }
580
581 #[instrument(level = "trace", skip(self, value))]
582 pub fn with_long_lived_services(mut self, value: bool) -> Self {
583 self.chain_worker_config.long_lived_services = value;
584 self
585 }
586
587 #[instrument(level = "trace", skip(self))]
592 pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self {
593 self.chain_worker_config.block_time_grace_period = block_time_grace_period;
594 self
595 }
596
597 #[instrument(level = "trace", skip(self))]
601 pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
602 self.chain_worker_config.ttl = chain_worker_ttl;
603 self
604 }
605
606 #[instrument(level = "trace", skip(self))]
610 pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
611 self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
612 self
613 }
614
615 #[instrument(level = "trace", skip(self))]
619 pub fn with_chain_info_max_received_log_entries(
620 mut self,
621 chain_info_max_received_log_entries: usize,
622 ) -> Self {
623 if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
624 warn!(
625 "The value set for the maximum size of received_log entries \
626 may not be compatible with the latest clients: {} instead of {}",
627 chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
628 );
629 }
630 self.chain_worker_config.chain_info_max_received_log_entries =
631 chain_info_max_received_log_entries;
632 self
633 }
634
635 #[instrument(level = "trace", skip(self))]
636 pub fn nickname(&self) -> &str {
637 &self.nickname
638 }
639
640 #[instrument(level = "trace", skip(self))]
642 #[cfg(not(feature = "test"))]
643 pub(crate) fn storage_client(&self) -> &StorageClient {
644 &self.storage
645 }
646
647 #[instrument(level = "trace", skip(self))]
650 #[cfg(feature = "test")]
651 pub fn storage_client(&self) -> &StorageClient {
652 &self.storage
653 }
654
655 #[instrument(level = "trace", skip(self, certificate))]
656 pub(crate) async fn full_certificate(
657 &self,
658 certificate: LiteCertificate<'_>,
659 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
660 let block = self
661 .block_cache
662 .get(&certificate.value.value_hash)
663 .ok_or(WorkerError::MissingCertificateValue)?;
664
665 match certificate.value.kind {
666 linera_chain::types::CertificateKind::Confirmed => {
667 let value = ConfirmedBlock::from_hashed(block);
668 Ok(Either::Left(
669 certificate
670 .with_value(value)
671 .ok_or(WorkerError::InvalidLiteCertificate)?,
672 ))
673 }
674 linera_chain::types::CertificateKind::Validated => {
675 let value = ValidatedBlock::from_hashed(block);
676 Ok(Either::Right(
677 certificate
678 .with_value(value)
679 .ok_or(WorkerError::InvalidLiteCertificate)?,
680 ))
681 }
682 _ => Err(WorkerError::InvalidLiteCertificate),
683 }
684 }
685}
686
687#[allow(async_fn_in_trait)]
688#[cfg_attr(not(web), trait_variant::make(Send))]
689pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
690 async fn process_certificate<S: Storage + Clone + 'static>(
691 worker: &WorkerState<S>,
692 certificate: GenericCertificate<Self>,
693 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
694}
695
696impl ProcessableCertificate for ConfirmedBlock {
697 async fn process_certificate<S: Storage + Clone + 'static>(
698 worker: &WorkerState<S>,
699 certificate: ConfirmedBlockCertificate,
700 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
701 Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
702 }
703}
704
705impl ProcessableCertificate for ValidatedBlock {
706 async fn process_certificate<S: Storage + Clone + 'static>(
707 worker: &WorkerState<S>,
708 certificate: ValidatedBlockCertificate,
709 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
710 Box::pin(worker.handle_validated_certificate(certificate)).await
711 }
712}
713
714impl ProcessableCertificate for Timeout {
715 async fn process_certificate<S: Storage + Clone + 'static>(
716 worker: &WorkerState<S>,
717 certificate: TimeoutCertificate,
718 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
719 worker.handle_timeout_certificate(certificate).await
720 }
721}
722
723impl<StorageClient> WorkerState<StorageClient>
724where
725 StorageClient: Storage + Clone + 'static,
726{
727 #[instrument(level = "trace", skip(self, certificate, notifier))]
728 #[inline]
729 pub async fn fully_handle_certificate_with_notifications<T>(
730 &self,
731 certificate: GenericCertificate<T>,
732 notifier: &impl Notifier,
733 ) -> Result<ChainInfoResponse, WorkerError>
734 where
735 T: ProcessableCertificate,
736 {
737 let notifications = (*notifier).clone();
738 let this = self.clone();
739 linera_base::task::spawn(async move {
740 let (response, actions) =
741 ProcessableCertificate::process_certificate(&this, certificate).await?;
742 notifications.notify(&actions.notifications);
743 let mut requests = VecDeque::from(actions.cross_chain_requests);
744 while let Some(request) = requests.pop_front() {
745 let actions = this.handle_cross_chain_request(request).await?;
746 requests.extend(actions.cross_chain_requests);
747 notifications.notify(&actions.notifications);
748 }
749 Ok(response)
750 })
751 .await
752 }
753
754 #[instrument(level = "trace", skip(self, block))]
756 pub async fn stage_block_execution(
757 &self,
758 block: ProposedBlock,
759 round: Option<u32>,
760 published_blobs: Vec<Blob>,
761 ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> {
762 self.query_chain_worker(block.chain_id, move |callback| {
763 ChainWorkerRequest::StageBlockExecution {
764 block,
765 round,
766 published_blobs,
767 callback,
768 }
769 })
770 .await
771 }
772
773 #[instrument(level = "trace", skip(self, block))]
778 pub async fn stage_block_execution_with_policy(
779 &self,
780 block: ProposedBlock,
781 round: Option<u32>,
782 published_blobs: Vec<Blob>,
783 policy: BundleExecutionPolicy,
784 ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> {
785 self.query_chain_worker(block.chain_id, move |callback| {
786 ChainWorkerRequest::StageBlockExecutionWithPolicy {
787 block,
788 round,
789 published_blobs,
790 policy,
791 callback,
792 }
793 })
794 .await
795 }
796
797 #[instrument(level = "trace", skip(self, chain_id, query))]
802 pub async fn query_application(
803 &self,
804 chain_id: ChainId,
805 query: Query,
806 block_hash: Option<CryptoHash>,
807 ) -> Result<QueryOutcome, WorkerError> {
808 self.query_chain_worker(chain_id, move |callback| {
809 ChainWorkerRequest::QueryApplication {
810 query,
811 block_hash,
812 callback,
813 }
814 })
815 .await
816 }
817
818 #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
819 nickname = %self.nickname,
820 chain_id = %chain_id,
821 application_id = %application_id
822 ))]
823 pub async fn describe_application(
824 &self,
825 chain_id: ChainId,
826 application_id: ApplicationId,
827 ) -> Result<ApplicationDescription, WorkerError> {
828 self.query_chain_worker(chain_id, move |callback| {
829 ChainWorkerRequest::DescribeApplication {
830 application_id,
831 callback,
832 }
833 })
834 .await
835 }
836
837 #[instrument(
839 level = "trace",
840 skip(self, certificate, notify_when_messages_are_delivered),
841 fields(
842 nickname = %self.nickname,
843 chain_id = %certificate.block().header.chain_id,
844 block_height = %certificate.block().header.height
845 )
846 )]
847 async fn process_confirmed_block(
848 &self,
849 certificate: ConfirmedBlockCertificate,
850 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
851 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
852 let chain_id = certificate.block().header.chain_id;
853 self.query_chain_worker(chain_id, move |callback| {
854 ChainWorkerRequest::ProcessConfirmedBlock {
855 certificate,
856 notify_when_messages_are_delivered,
857 callback,
858 }
859 })
860 .await
861 }
862
863 #[instrument(level = "trace", skip(self, certificate), fields(
865 nickname = %self.nickname,
866 chain_id = %certificate.block().header.chain_id,
867 block_height = %certificate.block().header.height
868 ))]
869 async fn process_validated_block(
870 &self,
871 certificate: ValidatedBlockCertificate,
872 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
873 let chain_id = certificate.block().header.chain_id;
874 self.query_chain_worker(chain_id, move |callback| {
875 ChainWorkerRequest::ProcessValidatedBlock {
876 certificate,
877 callback,
878 }
879 })
880 .await
881 }
882
883 #[instrument(level = "trace", skip(self, certificate), fields(
885 nickname = %self.nickname,
886 chain_id = %certificate.value().chain_id(),
887 height = %certificate.value().height()
888 ))]
889 async fn process_timeout(
890 &self,
891 certificate: TimeoutCertificate,
892 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
893 let chain_id = certificate.value().chain_id();
894 self.query_chain_worker(chain_id, move |callback| {
895 ChainWorkerRequest::ProcessTimeout {
896 certificate,
897 callback,
898 }
899 })
900 .await
901 }
902
903 #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
904 nickname = %self.nickname,
905 origin = %origin,
906 recipient = %recipient,
907 num_bundles = %bundles.len()
908 ))]
909 async fn process_cross_chain_update(
910 &self,
911 origin: ChainId,
912 recipient: ChainId,
913 bundles: Vec<(Epoch, MessageBundle)>,
914 ) -> Result<Option<BlockHeight>, WorkerError> {
915 self.query_chain_worker(recipient, move |callback| {
916 ChainWorkerRequest::ProcessCrossChainUpdate {
917 origin,
918 bundles,
919 callback,
920 }
921 })
922 .await
923 }
924
925 #[instrument(level = "trace", skip(self, chain_id, height), fields(
927 nickname = %self.nickname,
928 chain_id = %chain_id,
929 height = %height
930 ))]
931 #[cfg(with_testing)]
932 pub async fn read_certificate(
933 &self,
934 chain_id: ChainId,
935 height: BlockHeight,
936 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
937 self.query_chain_worker(chain_id, move |callback| {
938 ChainWorkerRequest::ReadCertificate { height, callback }
939 })
940 .await
941 }
942
943 #[instrument(level = "trace", skip(self), fields(
949 nickname = %self.nickname,
950 chain_id = %chain_id
951 ))]
952 pub async fn chain_state_view(
953 &self,
954 chain_id: ChainId,
955 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
956 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
957 callback,
958 })
959 .await
960 }
961
962 #[instrument(level = "trace", skip(self, request_builder), fields(
964 nickname = %self.nickname,
965 chain_id = %chain_id
966 ))]
967 async fn query_chain_worker<Response>(
968 &self,
969 chain_id: ChainId,
970 request_builder: impl FnOnce(
971 oneshot::Sender<Result<Response, WorkerError>>,
972 ) -> ChainWorkerRequest<StorageClient::Context>,
973 ) -> Result<Response, WorkerError> {
974 let (callback, response) = oneshot::channel();
976 let request = request_builder(callback);
977
978 let new_channel = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
980
981 if let Some((sender, receiver)) = new_channel {
983 let delivery_notifier = self
984 .delivery_notifiers
985 .lock()
986 .unwrap()
987 .entry(chain_id)
988 .or_default()
989 .clone();
990
991 let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
992 chain_modes
993 .read()
994 .unwrap()
995 .get(&chain_id)
996 .is_some_and(ListeningMode::is_full)
997 });
998
999 let actor_task = ChainWorkerActor::run(
1000 self.chain_worker_config.clone(),
1001 self.storage.clone(),
1002 self.block_cache.clone(),
1003 self.execution_state_cache.clone(),
1004 self.chain_modes.clone(),
1005 delivery_notifier,
1006 chain_id,
1007 sender,
1008 receiver,
1009 is_tracked,
1010 );
1011
1012 self.chain_worker_tasks
1013 .lock()
1014 .unwrap()
1015 .spawn_task(actor_task);
1016 }
1017
1018 match response.await {
1020 Err(e) => {
1021 Err(WorkerError::ChainActorRecvError {
1023 chain_id,
1024 error: Box::new(e),
1025 })
1026 }
1027 Ok(response) => response,
1028 }
1029 }
1030
1031 #[instrument(level = "trace", skip(self), fields(
1036 nickname = %self.nickname,
1037 chain_id = %chain_id
1038 ))]
1039 #[expect(clippy::type_complexity)]
1040 fn call_and_maybe_create_chain_worker_endpoint(
1041 &self,
1042 chain_id: ChainId,
1043 request: ChainWorkerRequest<StorageClient::Context>,
1044 ) -> Result<
1045 Option<(
1046 ChainWorkerRequestSender<StorageClient::Context>,
1047 ChainWorkerRequestReceiver<StorageClient::Context>,
1048 )>,
1049 WorkerError,
1050 > {
1051 let mut chain_workers = self.chain_workers.lock().unwrap();
1052
1053 let (sender, new_channel) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
1054 (endpoint, None)
1055 } else {
1056 let (sender, receiver) = mpsc::unbounded_channel();
1057 (sender.clone(), Some((sender, receiver)))
1058 };
1059
1060 if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) {
1061 return Err(WorkerError::ChainActorSendError {
1063 chain_id,
1064 error: Box::new(e),
1065 });
1066 }
1067
1068 chain_workers.insert(chain_id, sender);
1070 #[cfg(with_metrics)]
1071 metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64);
1072
1073 Ok(new_channel)
1074 }
1075
1076 #[instrument(skip_all, fields(
1077 nick = self.nickname,
1078 chain_id = format!("{:.8}", proposal.content.block.chain_id),
1079 height = %proposal.content.block.height,
1080 ))]
1081 pub async fn handle_block_proposal(
1082 &self,
1083 proposal: BlockProposal,
1084 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1085 trace!("{} <-- {:?}", self.nickname, proposal);
1086 #[cfg(with_metrics)]
1087 let round = proposal.content.round;
1088 let response = self
1089 .query_chain_worker(proposal.content.block.chain_id, move |callback| {
1090 ChainWorkerRequest::HandleBlockProposal { proposal, callback }
1091 })
1092 .await?;
1093 #[cfg(with_metrics)]
1094 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1095 .with_label_values(&[round.type_name()])
1096 .observe(round.number() as f64);
1097 Ok(response)
1098 }
1099
1100 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
1103 pub async fn handle_lite_certificate(
1104 &self,
1105 certificate: LiteCertificate<'_>,
1106 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1107 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1108 match self.full_certificate(certificate).await? {
1109 Either::Left(confirmed) => {
1110 Box::pin(
1111 self.handle_confirmed_certificate(
1112 confirmed,
1113 notify_when_messages_are_delivered,
1114 ),
1115 )
1116 .await
1117 }
1118 Either::Right(validated) => {
1119 if let Some(notifier) = notify_when_messages_are_delivered {
1120 if let Err(()) = notifier.send(()) {
1122 warn!("Failed to notify message delivery to caller");
1123 }
1124 }
1125 Box::pin(self.handle_validated_certificate(validated)).await
1126 }
1127 }
1128 }
1129
1130 #[instrument(skip_all, fields(
1132 nick = self.nickname,
1133 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1134 height = %certificate.block().header.height,
1135 ))]
1136 pub async fn handle_confirmed_certificate(
1137 &self,
1138 certificate: ConfirmedBlockCertificate,
1139 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1140 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1141 trace!("{} <-- {:?}", self.nickname, certificate);
1142 #[cfg(with_metrics)]
1143 let metrics_data = metrics::MetricsData::new(&certificate);
1144
1145 let (info, actions, _outcome) =
1146 Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1147 .await?;
1148
1149 #[cfg(with_metrics)]
1150 if matches!(_outcome, BlockOutcome::Processed) {
1151 metrics_data.record();
1152 }
1153 Ok((info, actions))
1154 }
1155
1156 #[instrument(skip_all, fields(
1158 nick = self.nickname,
1159 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1160 height = %certificate.block().header.height,
1161 ))]
1162 pub async fn handle_validated_certificate(
1163 &self,
1164 certificate: ValidatedBlockCertificate,
1165 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1166 trace!("{} <-- {:?}", self.nickname, certificate);
1167
1168 #[cfg(with_metrics)]
1169 let round = certificate.round;
1170 #[cfg(with_metrics)]
1171 let cert_str = certificate.inner().to_log_str();
1172
1173 let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1174 #[cfg(with_metrics)]
1175 {
1176 if matches!(_outcome, BlockOutcome::Processed) {
1177 metrics::NUM_ROUNDS_IN_CERTIFICATE
1178 .with_label_values(&[cert_str, round.type_name()])
1179 .observe(round.number() as f64);
1180 }
1181 }
1182 Ok((info, actions))
1183 }
1184
1185 #[instrument(skip_all, fields(
1187 nick = self.nickname,
1188 chain_id = format!("{:.8}", certificate.inner().chain_id()),
1189 height = %certificate.inner().height(),
1190 ))]
1191 pub async fn handle_timeout_certificate(
1192 &self,
1193 certificate: TimeoutCertificate,
1194 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1195 trace!("{} <-- {:?}", self.nickname, certificate);
1196 self.process_timeout(certificate).await
1197 }
1198
1199 #[instrument(skip_all, fields(
1200 nick = self.nickname,
1201 chain_id = format!("{:.8}", query.chain_id)
1202 ))]
1203 pub async fn handle_chain_info_query(
1204 &self,
1205 query: ChainInfoQuery,
1206 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1207 trace!("{} <-- {:?}", self.nickname, query);
1208 #[cfg(with_metrics)]
1209 metrics::CHAIN_INFO_QUERIES.inc();
1210 let result = self
1211 .query_chain_worker(query.chain_id, move |callback| {
1212 ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1213 })
1214 .await;
1215 trace!("{} --> {:?}", self.nickname, result);
1216 result
1217 }
1218
1219 #[instrument(skip_all, fields(
1220 nick = self.nickname,
1221 chain_id = format!("{:.8}", chain_id)
1222 ))]
1223 pub async fn download_pending_blob(
1224 &self,
1225 chain_id: ChainId,
1226 blob_id: BlobId,
1227 ) -> Result<Blob, WorkerError> {
1228 trace!(
1229 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1230 self.nickname
1231 );
1232 let result = self
1233 .query_chain_worker(chain_id, move |callback| {
1234 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1235 })
1236 .await;
1237 trace!(
1238 "{} --> {:?}",
1239 self.nickname,
1240 result.as_ref().map(|_| blob_id)
1241 );
1242 result
1243 }
1244
1245 #[instrument(skip_all, fields(
1246 nick = self.nickname,
1247 chain_id = format!("{:.8}", chain_id)
1248 ))]
1249 pub async fn handle_pending_blob(
1250 &self,
1251 chain_id: ChainId,
1252 blob: Blob,
1253 ) -> Result<ChainInfoResponse, WorkerError> {
1254 let blob_id = blob.id();
1255 trace!(
1256 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1257 self.nickname
1258 );
1259 let result = self
1260 .query_chain_worker(chain_id, move |callback| {
1261 ChainWorkerRequest::HandlePendingBlob { blob, callback }
1262 })
1263 .await;
1264 trace!(
1265 "{} --> {:?}",
1266 self.nickname,
1267 result.as_ref().map(|_| blob_id)
1268 );
1269 result
1270 }
1271
1272 #[instrument(skip_all, fields(
1273 nick = self.nickname,
1274 chain_id = format!("{:.8}", request.target_chain_id())
1275 ))]
1276 pub async fn handle_cross_chain_request(
1277 &self,
1278 request: CrossChainRequest,
1279 ) -> Result<NetworkActions, WorkerError> {
1280 trace!("{} <-- {:?}", self.nickname, request);
1281 match request {
1282 CrossChainRequest::UpdateRecipient {
1283 sender,
1284 recipient,
1285 bundles,
1286 } => {
1287 let mut actions = NetworkActions::default();
1288 let origin = sender;
1289 let Some(height) = self
1290 .process_cross_chain_update(origin, recipient, bundles)
1291 .await?
1292 else {
1293 return Ok(actions);
1294 };
1295 actions.notifications.push(Notification {
1296 chain_id: recipient,
1297 reason: Reason::NewIncomingBundle { origin, height },
1298 });
1299 actions
1300 .cross_chain_requests
1301 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1302 sender,
1303 recipient,
1304 latest_height: height,
1305 });
1306 Ok(actions)
1307 }
1308 CrossChainRequest::ConfirmUpdatedRecipient {
1309 sender,
1310 recipient,
1311 latest_height,
1312 } => {
1313 self.query_chain_worker(sender, move |callback| {
1314 ChainWorkerRequest::ConfirmUpdatedRecipient {
1315 recipient,
1316 latest_height,
1317 callback,
1318 }
1319 })
1320 .await?;
1321 Ok(NetworkActions::default())
1322 }
1323 }
1324 }
1325
1326 #[instrument(skip_all, fields(
1328 nickname = %self.nickname,
1329 chain_id = %chain_id,
1330 num_trackers = %new_trackers.len()
1331 ))]
1332 pub async fn update_received_certificate_trackers(
1333 &self,
1334 chain_id: ChainId,
1335 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1336 ) -> Result<(), WorkerError> {
1337 self.query_chain_worker(chain_id, move |callback| {
1338 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1339 new_trackers,
1340 callback,
1341 }
1342 })
1343 .await
1344 }
1345
1346 #[instrument(skip_all, fields(
1348 nickname = %self.nickname,
1349 chain_id = %chain_id,
1350 start = %start,
1351 end = %end
1352 ))]
1353 pub async fn get_preprocessed_block_hashes(
1354 &self,
1355 chain_id: ChainId,
1356 start: BlockHeight,
1357 end: BlockHeight,
1358 ) -> Result<Vec<CryptoHash>, WorkerError> {
1359 self.query_chain_worker(chain_id, move |callback| {
1360 ChainWorkerRequest::GetPreprocessedBlockHashes {
1361 start,
1362 end,
1363 callback,
1364 }
1365 })
1366 .await
1367 }
1368
1369 #[instrument(skip_all, fields(
1371 nickname = %self.nickname,
1372 chain_id = %chain_id,
1373 origin = %origin
1374 ))]
1375 pub async fn get_inbox_next_height(
1376 &self,
1377 chain_id: ChainId,
1378 origin: ChainId,
1379 ) -> Result<BlockHeight, WorkerError> {
1380 self.query_chain_worker(chain_id, move |callback| {
1381 ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1382 })
1383 .await
1384 }
1385
1386 #[instrument(skip_all, fields(
1389 nickname = %self.nickname,
1390 chain_id = %chain_id,
1391 num_blob_ids = %blob_ids.len()
1392 ))]
1393 pub async fn get_locking_blobs(
1394 &self,
1395 chain_id: ChainId,
1396 blob_ids: Vec<BlobId>,
1397 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1398 self.query_chain_worker(chain_id, move |callback| {
1399 ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1400 })
1401 .await
1402 }
1403
1404 pub async fn get_block_hashes(
1406 &self,
1407 chain_id: ChainId,
1408 heights: Vec<BlockHeight>,
1409 ) -> Result<Vec<CryptoHash>, WorkerError> {
1410 self.query_chain_worker(chain_id, move |callback| {
1411 ChainWorkerRequest::GetBlockHashes { heights, callback }
1412 })
1413 .await
1414 }
1415
1416 pub async fn get_proposed_blobs(
1418 &self,
1419 chain_id: ChainId,
1420 blob_ids: Vec<BlobId>,
1421 ) -> Result<Vec<Blob>, WorkerError> {
1422 self.query_chain_worker(chain_id, move |callback| {
1423 ChainWorkerRequest::GetProposedBlobs { blob_ids, callback }
1424 })
1425 .await
1426 }
1427
1428 pub async fn get_event_subscriptions(
1430 &self,
1431 chain_id: ChainId,
1432 ) -> Result<EventSubscriptionsResult, WorkerError> {
1433 self.query_chain_worker(chain_id, |callback| {
1434 ChainWorkerRequest::GetEventSubscriptions { callback }
1435 })
1436 .await
1437 }
1438
1439 pub async fn get_stream_event_count(
1441 &self,
1442 chain_id: ChainId,
1443 stream_id: StreamId,
1444 ) -> Result<Option<u32>, WorkerError> {
1445 self.query_chain_worker(chain_id, move |callback| {
1446 ChainWorkerRequest::GetStreamEventCount {
1447 stream_id,
1448 callback,
1449 }
1450 })
1451 .await
1452 }
1453
1454 pub async fn get_received_certificate_trackers(
1456 &self,
1457 chain_id: ChainId,
1458 ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1459 self.query_chain_worker(chain_id, |callback| {
1460 ChainWorkerRequest::GetReceivedCertificateTrackers { callback }
1461 })
1462 .await
1463 }
1464
1465 pub async fn get_tip_state_and_outbox_info(
1467 &self,
1468 chain_id: ChainId,
1469 receiver_id: ChainId,
1470 ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1471 self.query_chain_worker(chain_id, move |callback| {
1472 ChainWorkerRequest::GetTipStateAndOutboxInfo {
1473 receiver_id,
1474 callback,
1475 }
1476 })
1477 .await
1478 }
1479
1480 pub async fn get_next_height_to_preprocess(
1482 &self,
1483 chain_id: ChainId,
1484 ) -> Result<BlockHeight, WorkerError> {
1485 self.query_chain_worker(chain_id, |callback| {
1486 ChainWorkerRequest::GetNextHeightToPreprocess { callback }
1487 })
1488 .await
1489 }
1490
1491 pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, WorkerError> {
1493 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetManagerSeed {
1494 callback,
1495 })
1496 .await
1497 }
1498}
1499
1500#[cfg(with_testing)]
1501impl<StorageClient> WorkerState<StorageClient>
1502where
1503 StorageClient: Storage,
1504{
1505 #[instrument(level = "trace", skip(self))]
1511 pub fn public_key(&self) -> ValidatorPublicKey {
1512 self.chain_worker_config
1513 .key_pair()
1514 .expect(
1515 "Test validator should have a key pair assigned to it \
1516 in order to obtain it's public key",
1517 )
1518 .public()
1519 }
1520}