1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::{
12 future::{Either, Shared},
13 FutureExt as _,
14};
15use linera_base::{
16 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
17 data_types::{
18 ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta,
19 Timestamp,
20 },
21 doc_scalar,
22 hashed::Hashed,
23 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
24};
25use linera_cache::{UniqueValueCache, ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
26#[cfg(with_testing)]
27use linera_chain::ChainExecutionContext;
28use linera_chain::{
29 data_types::{BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock},
30 types::{
31 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
32 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
33 },
34 ChainError, ChainStateView,
35};
36use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
37use linera_storage::{Clock as _, Storage};
38use linera_views::{context::InactiveContext, ViewError};
39use serde::{Deserialize, Serialize};
40use thiserror::Error;
41use tokio::sync::{oneshot, OwnedRwLockReadGuard};
42use tracing::{instrument, trace, warn};
43
44pub struct ChainStateViewReadGuard<S: Storage>(
51 OwnedRwLockReadGuard<ChainWorkerState<S>, ChainStateView<S::Context>>,
52);
53
54impl<S: Storage> std::ops::Deref for ChainStateViewReadGuard<S> {
55 type Target = ChainStateView<S::Context>;
56
57 fn deref(&self) -> &Self::Target {
58 &self.0
59 }
60}
61
62pub(crate) use crate::chain_worker::EventSubscriptionsResult;
64use crate::{
65 chain_worker::{
66 handle, state::ChainWorkerState, BlockOutcome, ChainWorkerConfig, CrossChainUpdateResult,
67 DeliveryNotifier,
68 },
69 client::ListeningMode,
70 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
71 notifier::Notifier,
72};
73
74#[cfg(test)]
75#[path = "unit_tests/worker_tests.rs"]
76mod worker_tests;
77
78#[cfg(not(web))]
81pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> sync_wrapper::SyncFuture<F> {
82 sync_wrapper::SyncFuture::new(f)
83}
84
85#[cfg(web)]
88pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> F {
89 f
90}
91
92pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000;
93pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
94
95#[cfg(with_metrics)]
96mod metrics {
97 use std::sync::LazyLock;
98
99 use linera_base::prometheus_util::{
100 exponential_bucket_interval, register_histogram, register_histogram_vec,
101 register_int_counter, register_int_counter_vec,
102 };
103 use linera_chain::{data_types::MessageAction, types::ConfirmedBlockCertificate};
104 use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
105
106 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
107 register_histogram_vec(
108 "num_rounds_in_certificate",
109 "Number of rounds in certificate",
110 &["certificate_value", "round_type"],
111 exponential_bucket_interval(0.1, 50.0),
112 )
113 });
114
115 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
116 register_histogram_vec(
117 "num_rounds_in_block_proposal",
118 "Number of rounds in block proposal",
119 &["round_type"],
120 exponential_bucket_interval(0.1, 50.0),
121 )
122 });
123
124 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
125 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
126
127 pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
128 LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
129
130 pub static REJECTED_BUNDLE_COUNT: LazyLock<IntCounter> =
131 LazyLock::new(|| register_int_counter("rejected_bundle_count", "Rejected bundle count"));
132
133 pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
134 LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
135
136 pub static OPERATION_COUNT: LazyLock<IntCounter> =
137 LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
138
139 pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
140 register_histogram(
141 "operations_per_block",
142 "Number of operations per block",
143 exponential_bucket_interval(1.0, 10000.0),
144 )
145 });
146
147 pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
148 register_histogram(
149 "incoming_bundles_per_block",
150 "Number of incoming bundles per block",
151 exponential_bucket_interval(1.0, 10000.0),
152 )
153 });
154
155 pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
156 register_histogram(
157 "transactions_per_block",
158 "Number of transactions per block",
159 exponential_bucket_interval(1.0, 10000.0),
160 )
161 });
162
163 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
164 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
165 });
166
167 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
168 register_int_counter_vec(
169 "certificates_signed",
170 "Number of confirmed block certificates signed by each validator",
171 &["validator_name"],
172 )
173 });
174
175 pub static PREVIOUS_EVENT_BLOCKS_STREAM_COUNT: LazyLock<Histogram> = LazyLock::new(|| {
176 register_histogram(
177 "previous_event_blocks_stream_count",
178 "Number of event streams requested per PreviousEventBlocks query",
179 exponential_bucket_interval(1.0, 10000.0),
180 )
181 });
182
183 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
184 register_int_counter(
185 "chain_info_queries",
186 "Number of chain info queries processed",
187 )
188 });
189
190 pub struct MetricsData {
192 certificate_log_str: &'static str,
193 round_type: &'static str,
194 round_number: u32,
195 confirmed_transactions: u64,
196 confirmed_incoming_bundles: u64,
197 confirmed_rejected_bundles: u64,
198 confirmed_incoming_messages: u64,
199 confirmed_operations: u64,
200 validators_with_signatures: Vec<String>,
201 }
202
203 impl MetricsData {
204 pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
206 Self {
207 certificate_log_str: certificate.inner().to_log_str(),
208 round_type: certificate.round.type_name(),
209 round_number: certificate.round.number(),
210 confirmed_transactions: certificate.block().body.transactions.len() as u64,
211 confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
212 as u64,
213 confirmed_rejected_bundles: certificate
214 .block()
215 .body
216 .incoming_bundles()
217 .filter(|b| b.action == MessageAction::Reject)
218 .count() as u64,
219 confirmed_incoming_messages: certificate
220 .block()
221 .body
222 .incoming_bundles()
223 .map(|b| b.messages().count())
224 .sum::<usize>() as u64,
225 confirmed_operations: certificate.block().body.operations().count() as u64,
226 validators_with_signatures: certificate
227 .signatures()
228 .iter()
229 .map(|(validator_name, _)| validator_name.to_string())
230 .collect(),
231 }
232 }
233
234 pub fn record(self) {
236 NUM_BLOCKS.with_label_values(&[]).inc();
237 NUM_ROUNDS_IN_CERTIFICATE
238 .with_label_values(&[self.certificate_log_str, self.round_type])
239 .observe(self.round_number as f64);
240 TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
241 INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
242 OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
243 if self.confirmed_transactions > 0 {
244 TRANSACTION_COUNT
245 .with_label_values(&[])
246 .inc_by(self.confirmed_transactions);
247 if self.confirmed_incoming_bundles > 0 {
248 INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
249 }
250 if self.confirmed_rejected_bundles > 0 {
251 REJECTED_BUNDLE_COUNT.inc_by(self.confirmed_rejected_bundles);
252 }
253 if self.confirmed_incoming_messages > 0 {
254 INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
255 }
256 if self.confirmed_operations > 0 {
257 OPERATION_COUNT.inc_by(self.confirmed_operations);
258 }
259 }
260
261 for validator_name in self.validators_with_signatures {
262 CERTIFICATES_SIGNED
263 .with_label_values(&[&validator_name])
264 .inc();
265 }
266 }
267 }
268}
269
270#[derive(Default, Debug)]
272pub struct NetworkActions {
273 pub cross_chain_requests: Vec<CrossChainRequest>,
275 pub notifications: Vec<Notification>,
277}
278
279impl NetworkActions {
280 pub fn extend(&mut self, other: NetworkActions) {
281 self.cross_chain_requests.extend(other.cross_chain_requests);
282 self.notifications.extend(other.notifications);
283 }
284}
285
286#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
287pub struct Notification {
289 pub chain_id: ChainId,
290 pub reason: Reason,
291}
292
293doc_scalar!(
294 Notification,
295 "Notify that a chain has a new certified block or a new message"
296);
297
298#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
299pub enum Reason {
301 NewBlock {
302 height: BlockHeight,
303 hash: CryptoHash,
304 event_streams: BTreeSet<StreamId>,
305 },
306 NewIncomingBundle {
307 origin: ChainId,
308 height: BlockHeight,
309 },
310 NewRound {
311 height: BlockHeight,
312 round: Round,
313 },
314 BlockExecuted {
315 height: BlockHeight,
316 hash: CryptoHash,
317 },
318 NewEvents {
322 height: BlockHeight,
323 hash: CryptoHash,
324 event_streams: BTreeSet<StreamId>,
325 },
326}
327
328#[derive(Debug, Error, strum::IntoStaticStr)]
330pub enum WorkerError {
331 #[error(transparent)]
332 CryptoError(#[from] CryptoError),
333
334 #[error(transparent)]
335 ArithmeticError(#[from] ArithmeticError),
336
337 #[error(transparent)]
338 ViewError(#[from] ViewError),
339
340 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
341 ReadCertificatesError(Vec<CryptoHash>),
342
343 #[error(transparent)]
344 ChainError(#[from] Box<ChainError>),
345
346 #[error(transparent)]
347 BcsError(#[from] bcs::Error),
348
349 #[error("Block was not signed by an authorized owner")]
351 InvalidOwner,
352
353 #[error("Operations in the block are not authenticated by the proper owner: {0}")]
354 InvalidSigner(AccountOwner),
355
356 #[error(
358 "Chain is expecting a next block at height {expected_block_height} but the given block \
359 is at height {found_block_height} instead"
360 )]
361 UnexpectedBlockHeight {
362 expected_block_height: BlockHeight,
363 found_block_height: BlockHeight,
364 },
365 #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
366 InvalidEpoch {
367 chain_id: ChainId,
368 chain_epoch: Epoch,
369 epoch: Epoch,
370 },
371
372 #[error("Events not found: {0:?}")]
373 EventsNotFound(Vec<EventId>),
374
375 #[error("Invalid cross-chain request")]
377 InvalidCrossChainRequest,
378 #[error("The block does not contain the hash that we expected for the previous block")]
379 InvalidBlockChaining,
380 #[error(
381 "Block timestamp ({block_timestamp}) is further in the future from local time \
382 ({local_time}) than block time grace period ({block_time_grace_period:?}) \
383 [us:{block_timestamp_us}:{local_time_us}]",
384 block_timestamp_us = block_timestamp.micros(),
385 local_time_us = local_time.micros(),
386 )]
387 InvalidTimestamp {
388 block_timestamp: Timestamp,
389 local_time: Timestamp,
390 block_time_grace_period: Duration,
391 },
392 #[error("We don't have the value for the certificate.")]
393 MissingCertificateValue,
394 #[error("Block at height {height} on chain {chain_id} not found in local storage")]
395 LocalBlockNotFound {
396 height: BlockHeight,
397 chain_id: ChainId,
398 },
399 #[error("The hash certificate doesn't match its value.")]
400 InvalidLiteCertificate,
401 #[error("Fast blocks cannot query oracles")]
402 FastBlockUsingOracles,
403 #[error("Blobs not found: {0:?}")]
404 BlobsNotFound(Vec<BlobId>),
405 #[error(
406 "confirmed_log/preprocessed_blocks entry at height {height} for chain {chain_id} not found"
407 )]
408 ConfirmedBlockHashNotFound {
409 height: BlockHeight,
410 chain_id: ChainId,
411 },
412 #[error("The block proposal is invalid: {0}")]
413 InvalidBlockProposal(String),
414 #[error("Blob was not required by any pending block")]
415 UnexpectedBlob,
416 #[error("Number of published blobs per block must not exceed {0}")]
417 TooManyPublishedBlobs(u64),
418 #[error("Missing network description")]
419 MissingNetworkDescription,
420 #[error("thread error: {0}")]
421 Thread(#[from] web_thread_pool::Error),
422
423 #[error("Fallback mode is not available on this network")]
424 NoFallbackMode,
425 #[error("Chain worker was poisoned by a journal resolution failure")]
426 PoisonedWorker,
427}
428
429impl WorkerError {
430 pub fn is_local(&self) -> bool {
434 match self {
435 WorkerError::CryptoError(_)
436 | WorkerError::ArithmeticError(_)
437 | WorkerError::InvalidOwner
438 | WorkerError::InvalidSigner(_)
439 | WorkerError::UnexpectedBlockHeight { .. }
440 | WorkerError::InvalidEpoch { .. }
441 | WorkerError::EventsNotFound(_)
442 | WorkerError::InvalidBlockChaining
443 | WorkerError::InvalidTimestamp { .. }
444 | WorkerError::MissingCertificateValue
445 | WorkerError::InvalidLiteCertificate
446 | WorkerError::FastBlockUsingOracles
447 | WorkerError::BlobsNotFound(_)
448 | WorkerError::InvalidBlockProposal(_)
449 | WorkerError::UnexpectedBlob
450 | WorkerError::TooManyPublishedBlobs(_)
451 | WorkerError::NoFallbackMode
452 | WorkerError::ViewError(ViewError::NotFound(_)) => false,
453 WorkerError::BcsError(_)
454 | WorkerError::InvalidCrossChainRequest
455 | WorkerError::ViewError(_)
456 | WorkerError::ConfirmedBlockHashNotFound { .. }
457 | WorkerError::LocalBlockNotFound { .. }
458 | WorkerError::MissingNetworkDescription
459 | WorkerError::Thread(_)
460 | WorkerError::ReadCertificatesError(_)
461 | WorkerError::PoisonedWorker => true,
462 WorkerError::ChainError(chain_error) => chain_error.is_local(),
463 }
464 }
465
466 pub fn error_type(&self) -> String {
472 match self {
473 WorkerError::ChainError(chain_error) => chain_error.error_type(),
474 other => {
475 let variant: &'static str = other.into();
476 format!("WorkerError::{variant}")
477 }
478 }
479 }
480
481 fn must_reload_view(&self) -> bool {
484 matches!(
485 self,
486 WorkerError::PoisonedWorker
487 | WorkerError::ViewError(ViewError::StoreError {
488 must_reload_view: true,
489 ..
490 })
491 )
492 }
493
494 fn indicates_corrupted_chain_state(&self) -> bool {
498 matches!(
499 self,
500 WorkerError::ChainError(chain_error)
501 if matches!(chain_error.as_ref(), ChainError::CorruptedChainState(_))
502 )
503 }
504}
505
506impl From<ChainError> for WorkerError {
507 #[instrument(level = "trace", skip(chain_error))]
508 fn from(chain_error: ChainError) -> Self {
509 match chain_error {
510 ChainError::ExecutionError(execution_error, context) => match *execution_error {
511 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
512 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
513 _ => Self::ChainError(Box::new(ChainError::ExecutionError(
514 execution_error,
515 context,
516 ))),
517 },
518 error => Self::ChainError(Box::new(error)),
519 }
520 }
521}
522
523#[cfg(with_testing)]
524impl WorkerError {
525 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
531 let WorkerError::ChainError(chain_error) = self else {
532 panic!("Expected an `ExecutionError`. Got: {self:#?}");
533 };
534
535 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
536 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
537 };
538
539 assert_eq!(context, expected_context);
540
541 *execution_error
542 }
543}
544
545type ChainWorkerArc<S> = Arc<tokio::sync::RwLock<ChainWorkerState<S>>>;
546type ChainWorkerWeak<S> = std::sync::Weak<tokio::sync::RwLock<ChainWorkerState<S>>>;
547type ChainWorkerFuture<S> = Shared<oneshot::Receiver<ChainWorkerWeak<S>>>;
548
549type ChainWorkerMap<S> = Arc<papaya::HashMap<ChainId, ChainWorkerFuture<S>>>;
557
558fn start_sweep<S: Storage + Clone + 'static>(
562 chain_workers: &ChainWorkerMap<S>,
563 config: &ChainWorkerConfig,
564) {
565 let interval = match (config.ttl, config.sender_chain_ttl) {
568 (None, None) => return,
569 (Some(d), None) | (None, Some(d)) => d,
570 (Some(a), Some(b)) => a.min(b),
571 };
572 let weak_map = Arc::downgrade(chain_workers);
573 linera_base::Task::spawn(async move {
574 loop {
575 linera_base::time::timer::sleep(interval).await;
576 let Some(map) = weak_map.upgrade() else {
577 break;
578 };
579 map.pin_owned().retain(|_, shared| match shared.peek() {
580 Some(Ok(weak)) => weak.strong_count() > 0,
581 Some(Err(_)) => false, None => true, });
584 }
585 })
586 .forget();
587}
588
589pub struct WorkerState<StorageClient: Storage> {
591 storage: StorageClient,
593 pub(crate) chain_worker_config: ChainWorkerConfig,
595 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
596 execution_state_cache:
597 Option<Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>>,
598 chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
600 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
603 chain_workers: ChainWorkerMap<StorageClient>,
607 outbound_cross_chain_sender: Option<OutboundCrossChainSender>,
613}
614
615pub type OutboundCrossChainSender = Arc<dyn Fn(CrossChainRequest) + Send + Sync>;
618
619impl<StorageClient> Clone for WorkerState<StorageClient>
620where
621 StorageClient: Storage + Clone,
622{
623 fn clone(&self) -> Self {
624 WorkerState {
625 storage: self.storage.clone(),
626 chain_worker_config: self.chain_worker_config.clone(),
627 block_cache: self.block_cache.clone(),
628 execution_state_cache: self.execution_state_cache.clone(),
629 chain_modes: self.chain_modes.clone(),
630 delivery_notifiers: self.delivery_notifiers.clone(),
631 chain_workers: self.chain_workers.clone(),
632 outbound_cross_chain_sender: self.outbound_cross_chain_sender.clone(),
633 }
634 }
635}
636
637pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
638
639impl<StorageClient> WorkerState<StorageClient>
640where
641 StorageClient: Storage,
642{
643 pub fn set_cross_chain_message_chunk_limit(&mut self, limit: usize) {
645 self.chain_worker_config.cross_chain_message_chunk_limit = limit;
646 }
647
648 #[instrument(level = "trace", skip(self))]
649 pub fn nickname(&self) -> &str {
650 &self.chain_worker_config.nickname
651 }
652
653 pub fn with_priority_bundle_origins(
655 mut self,
656 origins: std::collections::HashSet<ChainId>,
657 ) -> Self {
658 self.chain_worker_config.priority_bundle_origins = origins;
659 self
660 }
661
662 #[instrument(level = "trace", skip(self))]
664 #[cfg(not(feature = "test"))]
665 pub(crate) fn storage_client(&self) -> &StorageClient {
666 &self.storage
667 }
668
669 #[instrument(level = "trace", skip(self))]
672 #[cfg(feature = "test")]
673 pub fn storage_client(&self) -> &StorageClient {
674 &self.storage
675 }
676
677 #[instrument(level = "trace", skip(self, certificate))]
678 pub(crate) async fn full_certificate(
679 &self,
680 certificate: LiteCertificate<'_>,
681 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
682 let block = self
683 .block_cache
684 .get(&certificate.value.value_hash)
685 .ok_or(WorkerError::MissingCertificateValue)?;
686 let block = Arc::unwrap_or_clone(block);
687
688 match certificate.value.kind {
689 linera_chain::types::CertificateKind::Confirmed => {
690 let value = ConfirmedBlock::from_hashed(block);
691 Ok(Either::Left(
692 certificate
693 .with_value(value)
694 .ok_or(WorkerError::InvalidLiteCertificate)?,
695 ))
696 }
697 linera_chain::types::CertificateKind::Validated => {
698 let value = ValidatedBlock::from_hashed(block);
699 Ok(Either::Right(
700 certificate
701 .with_value(value)
702 .ok_or(WorkerError::InvalidLiteCertificate)?,
703 ))
704 }
705 _ => Err(WorkerError::InvalidLiteCertificate),
706 }
707 }
708}
709
710#[allow(async_fn_in_trait)]
711#[cfg_attr(not(web), trait_variant::make(Send))]
712pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
713 async fn process_certificate<S: Storage + Clone + 'static>(
714 worker: &WorkerState<S>,
715 certificate: GenericCertificate<Self>,
716 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
717}
718
719impl ProcessableCertificate for ConfirmedBlock {
720 async fn process_certificate<S: Storage + Clone + 'static>(
721 worker: &WorkerState<S>,
722 certificate: ConfirmedBlockCertificate,
723 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
724 Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
725 }
726}
727
728impl ProcessableCertificate for ValidatedBlock {
729 async fn process_certificate<S: Storage + Clone + 'static>(
730 worker: &WorkerState<S>,
731 certificate: ValidatedBlockCertificate,
732 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
733 Box::pin(worker.handle_validated_certificate(certificate)).await
734 }
735}
736
737impl ProcessableCertificate for Timeout {
738 async fn process_certificate<S: Storage + Clone + 'static>(
739 worker: &WorkerState<S>,
740 certificate: TimeoutCertificate,
741 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
742 worker.handle_timeout_certificate(certificate).await
743 }
744}
745
746impl<StorageClient> WorkerState<StorageClient>
747where
748 StorageClient: Storage + Clone + 'static,
749{
750 #[instrument(level = "trace", skip(storage, chain_worker_config))]
755 pub fn new(
756 storage: StorageClient,
757 chain_worker_config: ChainWorkerConfig,
758 chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
759 ) -> Self {
760 let chain_workers = Arc::new(papaya::HashMap::new());
761 start_sweep(&chain_workers, &chain_worker_config);
762 let block_cache_size = chain_worker_config.block_cache_size;
763 let execution_state_cache_size = chain_worker_config.execution_state_cache_size;
764 WorkerState {
765 storage,
766 chain_worker_config,
767 block_cache: Arc::new(ValueCache::new(
768 block_cache_size,
769 DEFAULT_CLEANUP_INTERVAL_SECS,
770 )),
771 execution_state_cache: (execution_state_cache_size > 0)
772 .then(|| Arc::new(UniqueValueCache::new(execution_state_cache_size))),
773 chain_modes,
774 delivery_notifiers: Arc::default(),
775 chain_workers,
776 outbound_cross_chain_sender: None,
777 }
778 }
779
780 pub fn with_outbound_cross_chain_sender(mut self, sender: OutboundCrossChainSender) -> Self {
785 self.outbound_cross_chain_sender = Some(sender);
786 self
787 }
788
789 #[instrument(level = "trace", skip(self, certificate, notifier))]
790 #[inline]
791 pub async fn fully_handle_certificate_with_notifications<T>(
792 &self,
793 certificate: GenericCertificate<T>,
794 notifier: &impl Notifier,
795 ) -> Result<ChainInfoResponse, WorkerError>
796 where
797 T: ProcessableCertificate,
798 {
799 let notifications = (*notifier).clone();
800 let this = self.clone();
801 linera_base::Task::spawn(async move {
802 let (response, actions) =
803 ProcessableCertificate::process_certificate(&this, certificate).await?;
804 notifications.notify(&actions.notifications);
805 let mut requests = VecDeque::from(actions.cross_chain_requests);
806 while let Some(request) = requests.pop_front() {
807 let actions = this.handle_cross_chain_request(request).await?;
808 requests.extend(actions.cross_chain_requests);
809 notifications.notify(&actions.notifications);
810 }
811 Ok(response)
812 })
813 .await
814 }
815
816 async fn chain_read<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
821 where
822 F: FnOnce(OwnedRwLockReadGuard<ChainWorkerState<StorageClient>>) -> Fut,
823 Fut: std::future::Future<Output = Result<R, WorkerError>>,
824 {
825 let state = self.get_or_create_chain_worker(chain_id).await?;
826 let state_ref = &state;
827 let result = Box::pin(wrap_future(async move {
828 let guard = handle::read_lock(state_ref).await;
829 guard.check_not_poisoned()?;
830 f(guard).await
831 }))
832 .await;
833 if let Err(error) = &result {
834 if error.must_reload_view() {
835 self.evict_poisoned_worker(chain_id, &state);
836 }
837 }
838 result
839 }
840
841 async fn chain_write<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
850 where
851 F: FnOnce(handle::RollbackGuard<StorageClient>) -> Fut
852 + linera_base::task::MaybeSend
853 + 'static,
854 Fut: std::future::Future<Output = Result<R, WorkerError>> + linera_base::task::MaybeSend,
855 R: linera_base::task::MaybeSend + 'static,
856 {
857 let state = self.get_or_create_chain_worker(chain_id).await?;
858 let state_for_task = state.clone();
859 let result = Box::pin(wrap_future(linera_base::task::run_detached(async move {
860 let guard = handle::write_lock(&state_for_task).await;
861 guard.check_not_poisoned()?;
862 f(guard).await
863 })))
864 .await;
865 if let Err(error) = &result {
866 if error.must_reload_view() {
867 self.evict_poisoned_worker(chain_id, &state);
868 } else if error.indicates_corrupted_chain_state() {
869 self.spawn_reset_corrupted_chain_state(chain_id, state);
870 }
871 }
872 result
873 }
874
875 fn spawn_reset_corrupted_chain_state(
886 &self,
887 chain_id: ChainId,
888 state: ChainWorkerArc<StorageClient>,
889 ) where
890 StorageClient: Clone,
891 {
892 let this = self.clone();
893 linera_base::Task::spawn(async move {
894 let requests = {
895 let mut guard = handle::write_lock(&state).await;
896 match guard.maybe_reset_corrupted_chain_state().await {
897 Ok(Some(requests)) => requests,
898 Ok(None) => return,
899 Err(error) => {
900 tracing::error!(
901 %chain_id, %error, "Failed to reset corrupted chain state"
902 );
903 return;
904 }
905 }
906 };
907 if let Some(sender) = &this.outbound_cross_chain_sender {
908 for request in requests {
911 sender(request);
912 }
913 } else {
914 let mut queue = VecDeque::from(requests);
918 while let Some(request) = queue.pop_front() {
919 match this.handle_cross_chain_request(request).await {
920 Ok(actions) => queue.extend(actions.cross_chain_requests),
921 Err(error) => {
922 warn!(
923 %chain_id, %error,
924 "Failed to dispatch cross-chain request after \
925 resetting corrupted chain state"
926 );
927 }
928 }
929 }
930 }
931 })
932 .forget();
933 }
934
935 fn evict_poisoned_worker(&self, chain_id: ChainId, poisoned: &ChainWorkerArc<StorageClient>) {
939 tracing::warn!(%chain_id, "Evicting poisoned chain worker from cache");
940 let pin = self.chain_workers.pin();
941 let weak_poisoned = Arc::downgrade(poisoned);
942 let _ = pin.remove_if(&chain_id, |_key, future| {
943 future
944 .peek()
945 .and_then(|r| r.clone().ok())
946 .is_some_and(|weak| weak.ptr_eq(&weak_poisoned))
947 });
948 }
949
950 fn get_or_create_chain_worker(
961 &self,
962 chain_id: ChainId,
963 ) -> std::pin::Pin<
964 Box<
965 impl std::future::Future<Output = Result<ChainWorkerArc<StorageClient>, WorkerError>> + '_,
966 >,
967 > {
968 Box::pin(wrap_future(async move {
969 loop {
970 let (tx, rx) = oneshot::channel();
973 let shared_rx = rx.shared();
974
975 let wait_or_tx = {
978 let pin = self.chain_workers.pin();
979 match pin.compute(chain_id, |existing| match existing {
980 Some((_, entry)) => match entry.peek() {
981 Some(Ok(weak)) => match weak.upgrade() {
982 Some(arc) => papaya::Operation::Abort(Ok(arc)),
983 None => papaya::Operation::Insert(shared_rx.clone()),
984 },
985 Some(Err(_)) => papaya::Operation::Insert(shared_rx.clone()),
986 None => papaya::Operation::Abort(Err(entry.clone())),
987 },
988 None => papaya::Operation::Insert(shared_rx.clone()),
989 }) {
990 papaya::Compute::Aborted(Ok(arc), ..) => return Ok(arc),
991 papaya::Compute::Aborted(Err(wait), ..) => Either::Left(wait),
992 papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => {
993 Either::Right(tx)
994 }
995 papaya::Compute::Removed { .. } => unreachable!(),
996 }
997 };
998
999 match wait_or_tx {
1000 Either::Left(wait) => {
1001 if let Ok(weak) = wait.await {
1003 if let Some(arc) = weak.upgrade() {
1004 return Ok(arc);
1005 }
1006 }
1007 }
1009 Either::Right(tx) => {
1010 let worker = self.load_chain_worker(chain_id).await?;
1014 if tx.send(Arc::downgrade(&worker)).is_err() {
1015 tracing::error!(%chain_id, "Receiver dropped while loading worker state.");
1016 continue;
1017 }
1018 return Ok(worker);
1019 }
1020 }
1021 }
1022 }))
1023 }
1024
1025 async fn load_chain_worker(
1027 &self,
1028 chain_id: ChainId,
1029 ) -> Result<ChainWorkerArc<StorageClient>, WorkerError> {
1030 let delivery_notifier = self
1031 .delivery_notifiers
1032 .lock()
1033 .unwrap()
1034 .entry(chain_id)
1035 .or_default()
1036 .clone();
1037
1038 let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
1039 chain_modes
1040 .read()
1041 .unwrap()
1042 .get(&chain_id)
1043 .is_some_and(ListeningMode::is_full)
1044 });
1045
1046 let (service_runtime_endpoint, service_runtime_task) =
1047 if self.chain_worker_config.long_lived_services {
1048 let actor =
1049 handle::ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await;
1050 (Some(actor.endpoint), Some(actor.task))
1051 } else {
1052 (None, None)
1053 };
1054
1055 let state = crate::chain_worker::state::ChainWorkerState::load(
1056 self.chain_worker_config.clone(),
1057 self.storage.clone(),
1058 self.block_cache.clone(),
1059 self.execution_state_cache.clone(),
1060 self.chain_modes.clone(),
1061 delivery_notifier,
1062 chain_id,
1063 service_runtime_endpoint,
1064 service_runtime_task,
1065 )
1066 .await?;
1067
1068 Ok(handle::create_chain_worker(
1069 state,
1070 is_tracked,
1071 &self.chain_worker_config,
1072 ))
1073 }
1074
1075 #[instrument(level = "trace", skip(self, block))]
1077 pub async fn stage_block_execution(
1078 &self,
1079 block: ProposedBlock,
1080 round: Option<u32>,
1081 published_blobs: Vec<Blob>,
1082 policy: BundleExecutionPolicy,
1083 ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> {
1084 let chain_id = block.chain_id;
1085 self.chain_write(chain_id, move |mut guard| async move {
1086 guard
1087 .stage_block_execution(block, round, &published_blobs, policy)
1088 .await
1089 })
1090 .await
1091 }
1092
1093 #[instrument(level = "trace", skip(self, chain_id, query))]
1098 pub async fn query_application(
1099 &self,
1100 chain_id: ChainId,
1101 query: Query,
1102 block_hash: Option<CryptoHash>,
1103 ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
1104 self.chain_write(chain_id, move |mut guard| async move {
1105 guard.query_application(query, block_hash).await
1106 })
1107 .await
1108 }
1109
1110 #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
1111 nickname = %self.nickname(),
1112 chain_id = %chain_id,
1113 application_id = %application_id
1114 ))]
1115 pub async fn describe_application(
1116 &self,
1117 chain_id: ChainId,
1118 application_id: ApplicationId,
1119 ) -> Result<ApplicationDescription, WorkerError> {
1120 let state = self.get_or_create_chain_worker(chain_id).await?;
1121 let guard = handle::read_lock_initialized(&state).await?;
1122 guard.describe_application_readonly(application_id).await
1123 }
1124
1125 #[instrument(
1127 level = "trace",
1128 skip(self, certificate, notify_when_messages_are_delivered),
1129 fields(
1130 nickname = %self.nickname(),
1131 chain_id = %certificate.block().header.chain_id,
1132 block_height = %certificate.block().header.height
1133 )
1134 )]
1135 async fn process_confirmed_block(
1136 &self,
1137 certificate: ConfirmedBlockCertificate,
1138 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1139 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1140 let chain_id = certificate.block().header.chain_id;
1141 self.chain_write(chain_id, move |mut guard| async move {
1142 guard
1143 .process_confirmed_block(certificate, notify_when_messages_are_delivered)
1144 .await
1145 })
1146 .await
1147 }
1148
1149 #[instrument(level = "trace", skip(self, certificate), fields(
1151 nickname = %self.nickname(),
1152 chain_id = %certificate.block().header.chain_id,
1153 block_height = %certificate.block().header.height
1154 ))]
1155 async fn process_validated_block(
1156 &self,
1157 certificate: ValidatedBlockCertificate,
1158 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1159 let chain_id = certificate.block().header.chain_id;
1160 self.chain_write(chain_id, move |mut guard| async move {
1161 guard.process_validated_block(certificate).await
1162 })
1163 .await
1164 }
1165
1166 #[instrument(level = "trace", skip(self, certificate), fields(
1168 nickname = %self.nickname(),
1169 chain_id = %certificate.value().chain_id(),
1170 height = %certificate.value().height()
1171 ))]
1172 async fn process_timeout(
1173 &self,
1174 certificate: TimeoutCertificate,
1175 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1176 let chain_id = certificate.value().chain_id();
1177 self.chain_write(chain_id, move |mut guard| async move {
1178 guard.process_timeout(certificate).await
1179 })
1180 .await
1181 }
1182
1183 #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
1184 nickname = %self.nickname(),
1185 origin = %origin,
1186 recipient = %recipient,
1187 num_bundles = %bundles.len()
1188 ))]
1189 async fn process_cross_chain_update(
1190 &self,
1191 origin: ChainId,
1192 recipient: ChainId,
1193 bundles: Vec<(Epoch, MessageBundle)>,
1194 previous_height: Option<BlockHeight>,
1195 ) -> Result<CrossChainUpdateResult, WorkerError> {
1196 self.chain_write(recipient, move |mut guard| async move {
1197 guard
1198 .process_cross_chain_update(origin, bundles, previous_height)
1199 .await
1200 })
1201 .await
1202 }
1203
1204 #[instrument(level = "trace", skip(self, chain_id, height), fields(
1206 nickname = %self.nickname(),
1207 chain_id = %chain_id,
1208 height = %height
1209 ))]
1210 #[cfg(with_testing)]
1211 pub async fn read_certificate(
1212 &self,
1213 chain_id: ChainId,
1214 height: BlockHeight,
1215 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
1216 let state = self.get_or_create_chain_worker(chain_id).await?;
1217 let guard = handle::read_lock_initialized(&state).await?;
1218 guard.read_certificate(height).await
1219 }
1220
1221 #[instrument(level = "trace", skip(self), fields(
1227 nickname = %self.nickname(),
1228 chain_id = %chain_id
1229 ))]
1230 pub async fn chain_state_view(
1231 &self,
1232 chain_id: ChainId,
1233 ) -> Result<ChainStateViewReadGuard<StorageClient>, WorkerError> {
1234 let state = self.get_or_create_chain_worker(chain_id).await?;
1235 let guard = handle::read_lock(&state).await;
1236 Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map(
1237 guard,
1238 |s| s.chain(),
1239 )))
1240 }
1241
1242 #[instrument(skip_all, fields(
1243 nick = self.nickname(),
1244 chain_id = format!("{:.8}", proposal.content.block.chain_id),
1245 height = %proposal.content.block.height,
1246 ))]
1247 pub async fn handle_block_proposal(
1248 &self,
1249 proposal: BlockProposal,
1250 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1251 trace!("{} <-- {:?}", self.nickname(), proposal);
1252 #[cfg(with_metrics)]
1253 let round = proposal.content.round;
1254
1255 let chain_id = proposal.content.block.chain_id;
1256 let now = self.storage.clock().current_time();
1258 let block_timestamp = proposal.content.block.timestamp;
1259 let delta = block_timestamp.delta_since(now);
1260 let grace_period = TimeDelta::from_micros(
1261 u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros())
1262 .unwrap_or(u64::MAX),
1263 );
1264 if delta > TimeDelta::ZERO && delta <= grace_period {
1265 self.storage.clock().sleep_until(block_timestamp).await;
1266 }
1267
1268 let response = self
1269 .chain_write(chain_id, move |mut guard| async move {
1270 guard.handle_block_proposal(proposal).await
1271 })
1272 .await?;
1273 #[cfg(with_metrics)]
1274 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1275 .with_label_values(&[round.type_name()])
1276 .observe(round.number() as f64);
1277 Ok(response)
1278 }
1279
1280 #[instrument(skip_all, fields(
1283 chain_id = %certificate.value.chain_id,
1284 hash = %certificate.value.value_hash,
1285 ))]
1286 pub async fn handle_lite_certificate(
1287 &self,
1288 certificate: LiteCertificate<'_>,
1289 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1290 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1291 match self.full_certificate(certificate).await? {
1292 Either::Left(confirmed) => {
1293 Box::pin(
1294 self.handle_confirmed_certificate(
1295 confirmed,
1296 notify_when_messages_are_delivered,
1297 ),
1298 )
1299 .await
1300 }
1301 Either::Right(validated) => {
1302 if let Some(notifier) = notify_when_messages_are_delivered {
1303 if let Err(()) = notifier.send(()) {
1305 warn!("Failed to notify message delivery to caller");
1306 }
1307 }
1308 Box::pin(self.handle_validated_certificate(validated)).await
1309 }
1310 }
1311 }
1312
1313 #[instrument(skip_all, fields(
1315 nick = self.nickname(),
1316 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1317 height = %certificate.block().header.height,
1318 ))]
1319 pub async fn handle_confirmed_certificate(
1320 &self,
1321 certificate: ConfirmedBlockCertificate,
1322 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1323 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1324 trace!("{} <-- {:?}", self.nickname(), certificate);
1325 #[cfg(with_metrics)]
1326 let metrics_data = metrics::MetricsData::new(&certificate);
1327
1328 #[allow(unused_variables)]
1329 let (info, actions, outcome) =
1330 Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1331 .await?;
1332
1333 #[cfg(with_metrics)]
1334 if matches!(outcome, BlockOutcome::Processed) {
1335 metrics_data.record();
1336 }
1337 Ok((info, actions))
1338 }
1339
1340 #[instrument(skip_all, fields(
1342 nick = self.nickname(),
1343 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1344 height = %certificate.block().header.height,
1345 ))]
1346 pub async fn handle_validated_certificate(
1347 &self,
1348 certificate: ValidatedBlockCertificate,
1349 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1350 trace!("{} <-- {:?}", self.nickname(), certificate);
1351
1352 #[cfg(with_metrics)]
1353 let round = certificate.round;
1354 #[cfg(with_metrics)]
1355 let cert_str = certificate.inner().to_log_str();
1356
1357 #[allow(unused_variables)]
1358 let (info, actions, outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1359 #[cfg(with_metrics)]
1360 {
1361 if matches!(outcome, BlockOutcome::Processed) {
1362 metrics::NUM_ROUNDS_IN_CERTIFICATE
1363 .with_label_values(&[cert_str, round.type_name()])
1364 .observe(round.number() as f64);
1365 }
1366 }
1367 Ok((info, actions))
1368 }
1369
1370 #[instrument(skip_all, fields(
1372 nick = self.nickname(),
1373 chain_id = format!("{:.8}", certificate.inner().chain_id()),
1374 height = %certificate.inner().height(),
1375 ))]
1376 pub async fn handle_timeout_certificate(
1377 &self,
1378 certificate: TimeoutCertificate,
1379 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1380 trace!("{} <-- {:?}", self.nickname(), certificate);
1381 self.process_timeout(certificate).await
1382 }
1383
1384 #[instrument(skip_all, fields(
1385 nick = self.nickname(),
1386 chain_id = format!("{:.8}", query.chain_id)
1387 ))]
1388 pub async fn handle_chain_info_query(
1389 &self,
1390 query: ChainInfoQuery,
1391 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1392 trace!("{} <-- {:?}", self.nickname(), query);
1393 #[cfg(with_metrics)]
1394 metrics::CHAIN_INFO_QUERIES.inc();
1395 let chain_id = query.chain_id;
1396 let result = self
1397 .chain_write(chain_id, move |mut guard| async move {
1398 guard.handle_chain_info_query(query).await
1399 })
1400 .await;
1401 trace!("{} --> {:?}", self.nickname(), result);
1402 result
1403 }
1404
1405 #[instrument(skip_all, fields(
1406 nick = self.nickname(),
1407 chain_id = format!("{:.8}", chain_id)
1408 ))]
1409 pub async fn download_pending_blob(
1410 &self,
1411 chain_id: ChainId,
1412 blob_id: BlobId,
1413 ) -> Result<Blob, WorkerError> {
1414 trace!("{} <-- download_pending_blob({blob_id:8})", self.nickname());
1415 let result = self
1416 .chain_read(chain_id, |guard| async move {
1417 guard.download_pending_blob(blob_id).await
1418 })
1419 .await;
1420 trace!(
1421 "{} --> {:?}",
1422 self.nickname(),
1423 result.as_ref().map(|_| blob_id)
1424 );
1425 result
1426 }
1427
1428 #[instrument(skip_all, fields(
1429 nick = self.nickname(),
1430 chain_id = format!("{:.8}", chain_id)
1431 ))]
1432 pub async fn handle_pending_blob(
1433 &self,
1434 chain_id: ChainId,
1435 blob: Blob,
1436 ) -> Result<ChainInfoResponse, WorkerError> {
1437 let blob_id = blob.id();
1438 trace!("{} <-- handle_pending_blob({blob_id:8})", self.nickname());
1439 let result = self
1440 .chain_write(chain_id, move |mut guard| async move {
1441 guard.handle_pending_blob(blob).await
1442 })
1443 .await;
1444 trace!(
1445 "{} --> {:?}",
1446 self.nickname(),
1447 result.as_ref().map(|_| blob_id)
1448 );
1449 result
1450 }
1451
1452 #[instrument(skip_all, fields(
1453 nick = self.nickname(),
1454 chain_id = format!("{:.8}", request.target_chain_id())
1455 ))]
1456 pub async fn handle_cross_chain_request(
1457 &self,
1458 request: CrossChainRequest,
1459 ) -> Result<NetworkActions, WorkerError> {
1460 trace!("{} <-- {:?}", self.nickname(), request);
1461 match request {
1462 CrossChainRequest::UpdateRecipient {
1463 sender,
1464 recipient,
1465 bundles,
1466 previous_height,
1467 } => {
1468 let mut actions = NetworkActions::default();
1469 let origin = sender;
1470 match self
1471 .process_cross_chain_update(origin, recipient, bundles, previous_height)
1472 .await?
1473 {
1474 CrossChainUpdateResult::NothingToDo => {}
1475 CrossChainUpdateResult::Updated(height) => {
1476 actions.notifications.push(Notification {
1477 chain_id: recipient,
1478 reason: Reason::NewIncomingBundle { origin, height },
1479 });
1480 actions.cross_chain_requests.push(
1481 CrossChainRequest::ConfirmUpdatedRecipient {
1482 sender,
1483 recipient,
1484 latest_height: height,
1485 },
1486 );
1487 }
1488 CrossChainUpdateResult::GapDetected {
1489 origin,
1490 retransmit_from,
1491 } => {
1492 actions
1493 .cross_chain_requests
1494 .push(CrossChainRequest::RevertConfirm {
1495 sender: origin,
1496 recipient,
1497 retransmit_from,
1498 });
1499 }
1500 }
1501 Ok(actions)
1502 }
1503 CrossChainRequest::ConfirmUpdatedRecipient {
1504 sender,
1505 recipient,
1506 latest_height,
1507 } => {
1508 self.chain_write(sender, move |mut guard| async move {
1509 guard
1510 .confirm_updated_recipient(recipient, latest_height)
1511 .await
1512 })
1513 .await
1514 }
1515 CrossChainRequest::RevertConfirm {
1516 sender,
1517 recipient,
1518 retransmit_from,
1519 } => {
1520 self.chain_write(sender, move |mut guard| async move {
1521 guard
1522 .handle_revert_confirm(recipient, retransmit_from)
1523 .await
1524 })
1525 .await
1526 }
1527 }
1528 }
1529
1530 #[instrument(skip_all, fields(
1532 nickname = %self.nickname(),
1533 chain_id = %chain_id,
1534 num_trackers = %new_trackers.len()
1535 ))]
1536 pub async fn update_received_certificate_trackers(
1537 &self,
1538 chain_id: ChainId,
1539 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1540 ) -> Result<(), WorkerError> {
1541 self.chain_write(chain_id, move |mut guard| async move {
1542 guard
1543 .update_received_certificate_trackers(new_trackers)
1544 .await
1545 })
1546 .await
1547 }
1548
1549 #[instrument(skip_all, fields(
1551 nickname = %self.nickname(),
1552 chain_id = %chain_id,
1553 start = %start,
1554 end = %end
1555 ))]
1556 pub async fn get_preprocessed_block_hashes(
1557 &self,
1558 chain_id: ChainId,
1559 start: BlockHeight,
1560 end: BlockHeight,
1561 ) -> Result<Vec<CryptoHash>, WorkerError> {
1562 self.chain_read(chain_id, |guard| async move {
1563 guard.get_preprocessed_block_hashes(start, end).await
1564 })
1565 .await
1566 }
1567
1568 #[instrument(skip_all, fields(
1570 nickname = %self.nickname(),
1571 chain_id = %chain_id,
1572 origin = %origin
1573 ))]
1574 pub async fn get_inbox_next_height(
1575 &self,
1576 chain_id: ChainId,
1577 origin: ChainId,
1578 ) -> Result<BlockHeight, WorkerError> {
1579 self.chain_read(chain_id, |guard| async move {
1580 guard.get_inbox_next_height(origin).await
1581 })
1582 .await
1583 }
1584
1585 #[instrument(skip_all, fields(
1588 nickname = %self.nickname(),
1589 chain_id = %chain_id,
1590 num_blob_ids = %blob_ids.len()
1591 ))]
1592 pub async fn get_locking_blobs(
1593 &self,
1594 chain_id: ChainId,
1595 blob_ids: Vec<BlobId>,
1596 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1597 self.chain_read(chain_id, |guard| async move {
1598 guard.get_locking_blobs(blob_ids).await
1599 })
1600 .await
1601 }
1602
1603 pub async fn get_block_hashes(
1605 &self,
1606 chain_id: ChainId,
1607 heights: Vec<BlockHeight>,
1608 ) -> Result<Vec<CryptoHash>, WorkerError> {
1609 self.chain_read(chain_id, |guard| async move {
1610 guard.get_block_hashes(heights).await
1611 })
1612 .await
1613 }
1614
1615 pub async fn get_proposed_blobs(
1617 &self,
1618 chain_id: ChainId,
1619 blob_ids: Vec<BlobId>,
1620 ) -> Result<Vec<Blob>, WorkerError> {
1621 self.chain_read(chain_id, |guard| async move {
1622 guard.get_proposed_blobs(blob_ids).await
1623 })
1624 .await
1625 }
1626
1627 pub async fn get_event_subscriptions(
1629 &self,
1630 chain_id: ChainId,
1631 ) -> Result<EventSubscriptionsResult, WorkerError> {
1632 self.chain_read(chain_id, |guard| async move {
1633 guard.get_event_subscriptions().await
1634 })
1635 .await
1636 }
1637
1638 pub async fn get_received_certificate_trackers(
1640 &self,
1641 chain_id: ChainId,
1642 ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1643 self.chain_read(chain_id, |guard| async move {
1644 guard.get_received_certificate_trackers().await
1645 })
1646 .await
1647 }
1648
1649 pub async fn cross_chain_network_actions(
1653 &self,
1654 chain_id: ChainId,
1655 ) -> Result<NetworkActions, WorkerError> {
1656 self.chain_read(chain_id, |guard| async move {
1657 guard.cross_chain_network_actions().await
1658 })
1659 .await
1660 }
1661
1662 pub async fn get_tip_state_and_outbox_info(
1664 &self,
1665 chain_id: ChainId,
1666 receiver_id: ChainId,
1667 ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1668 self.chain_read(chain_id, |guard| async move {
1669 guard.get_tip_state_and_outbox_info(receiver_id).await
1670 })
1671 .await
1672 }
1673
1674 pub async fn get_next_height_to_preprocess(
1676 &self,
1677 chain_id: ChainId,
1678 ) -> Result<BlockHeight, WorkerError> {
1679 self.chain_read(chain_id, |guard| async move {
1680 guard.get_next_height_to_preprocess().await
1681 })
1682 .await
1683 }
1684
1685 pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, WorkerError> {
1687 self.chain_read(
1688 chain_id,
1689 |guard| async move { guard.get_manager_seed().await },
1690 )
1691 .await
1692 }
1693
1694 pub async fn get_stream_event_count(
1696 &self,
1697 chain_id: ChainId,
1698 stream_id: StreamId,
1699 ) -> Result<Option<u32>, WorkerError> {
1700 self.chain_read(chain_id, |guard| async move {
1701 guard.get_stream_event_count(stream_id).await
1702 })
1703 .await
1704 }
1705
1706 pub async fn next_expected_events(
1708 &self,
1709 chain_id: ChainId,
1710 stream_ids: Vec<StreamId>,
1711 ) -> Result<BTreeMap<StreamId, u32>, WorkerError> {
1712 self.chain_read(chain_id, |guard| async move {
1713 guard.get_next_expected_events(stream_ids).await
1714 })
1715 .await
1716 }
1717
1718 pub async fn previous_event_blocks(
1720 &self,
1721 chain_id: ChainId,
1722 stream_ids: Vec<StreamId>,
1723 ) -> Result<BTreeMap<StreamId, (BlockHeight, CryptoHash)>, WorkerError> {
1724 #[cfg(with_metrics)]
1725 metrics::PREVIOUS_EVENT_BLOCKS_STREAM_COUNT.observe(stream_ids.len() as f64);
1726 self.chain_read(chain_id, |guard| async move {
1727 guard.get_previous_event_blocks(stream_ids).await
1728 })
1729 .await
1730 }
1731}
1732
1733#[cfg(with_testing)]
1734impl<StorageClient> WorkerState<StorageClient>
1735where
1736 StorageClient: Storage + Clone + 'static,
1737{
1738 #[instrument(level = "trace", skip(self))]
1744 pub fn public_key(&self) -> ValidatorPublicKey {
1745 self.chain_worker_config
1746 .key_pair()
1747 .expect(
1748 "Test validator should have a key pair assigned to it \
1749 in order to obtain its public key",
1750 )
1751 .public()
1752 }
1753}