1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14 data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round},
15 doc_scalar,
16 hashed::Hashed,
17 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
18 time::timer::{sleep, timeout},
19};
20#[cfg(with_testing)]
21use linera_chain::ChainExecutionContext;
22use linera_chain::{
23 data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24 types::{
25 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
26 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
27 },
28 ChainError, ChainStateView,
29};
30use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
31use linera_storage::Storage;
32use linera_views::{context::InactiveContext, ViewError};
33use serde::{Deserialize, Serialize};
34use thiserror::Error;
35use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
36use tracing::{error, instrument, trace, warn};
37
38use crate::{
39 chain_worker::{ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier},
40 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
41 join_set_ext::{JoinSet, JoinSetExt},
42 notifier::Notifier,
43 value_cache::ValueCache,
44};
45
46#[cfg(test)]
47#[path = "unit_tests/worker_tests.rs"]
48mod worker_tests;
49
50#[cfg(with_metrics)]
51mod metrics {
52 use std::sync::LazyLock;
53
54 use linera_base::prometheus_util::{
55 exponential_bucket_interval, register_histogram_vec, register_int_counter,
56 register_int_counter_vec,
57 };
58 use prometheus::{HistogramVec, IntCounter, IntCounterVec};
59
60 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
61 register_histogram_vec(
62 "num_rounds_in_certificate",
63 "Number of rounds in certificate",
64 &["certificate_value", "round_type"],
65 exponential_bucket_interval(0.1, 50.0),
66 )
67 });
68
69 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
70 register_histogram_vec(
71 "num_rounds_in_block_proposal",
72 "Number of rounds in block proposal",
73 &["round_type"],
74 exponential_bucket_interval(0.1, 50.0),
75 )
76 });
77
78 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
79 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
80
81 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
82 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
83 });
84
85 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
86 register_int_counter_vec(
87 "certificates_signed",
88 "Number of confirmed block certificates signed by each validator",
89 &["validator_name"],
90 )
91 });
92
93 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
94 register_int_counter(
95 "chain_info_queries",
96 "Number of chain info queries processed",
97 )
98 });
99}
100
101#[derive(Default, Debug)]
103pub struct NetworkActions {
104 pub cross_chain_requests: Vec<CrossChainRequest>,
106 pub notifications: Vec<Notification>,
108}
109
110impl NetworkActions {
111 pub fn extend(&mut self, other: NetworkActions) {
112 self.cross_chain_requests.extend(other.cross_chain_requests);
113 self.notifications.extend(other.notifications);
114 }
115}
116
117#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
118pub struct Notification {
120 pub chain_id: ChainId,
121 pub reason: Reason,
122}
123
124doc_scalar!(
125 Notification,
126 "Notify that a chain has a new certified block or a new message"
127);
128
129#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
130pub enum Reason {
132 NewBlock {
133 height: BlockHeight,
134 hash: CryptoHash,
135 event_streams: BTreeSet<StreamId>,
136 },
137 NewIncomingBundle {
138 origin: ChainId,
139 height: BlockHeight,
140 },
141 NewRound {
142 height: BlockHeight,
143 round: Round,
144 },
145}
146
147#[derive(Debug, Error)]
149pub enum WorkerError {
150 #[error(transparent)]
151 CryptoError(#[from] CryptoError),
152
153 #[error(transparent)]
154 ArithmeticError(#[from] ArithmeticError),
155
156 #[error(transparent)]
157 ViewError(#[from] ViewError),
158
159 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
160 ReadCertificatesError(Vec<CryptoHash>),
161
162 #[error(transparent)]
163 ChainError(#[from] Box<ChainError>),
164
165 #[error("Block was not signed by an authorized owner")]
167 InvalidOwner,
168
169 #[error("Operations in the block are not authenticated by the proper signer: {0}")]
170 InvalidSigner(AccountOwner),
171
172 #[error(
174 "Was expecting block height {expected_block_height} but found {found_block_height} instead"
175 )]
176 UnexpectedBlockHeight {
177 expected_block_height: BlockHeight,
178 found_block_height: BlockHeight,
179 },
180 #[error("Unexpected epoch {epoch:}: chain {chain_id:} is at {chain_epoch:}")]
181 InvalidEpoch {
182 chain_id: ChainId,
183 chain_epoch: Epoch,
184 epoch: Epoch,
185 },
186
187 #[error("Events not found: {0:?}")]
188 EventsNotFound(Vec<EventId>),
189
190 #[error("Invalid cross-chain request")]
192 InvalidCrossChainRequest,
193 #[error("The block does not contain the hash that we expected for the previous block")]
194 InvalidBlockChaining,
195 #[error(
196 "The given outcome is not what we computed after executing the block.\n\
197 Computed: {computed:#?}\n\
198 Submitted: {submitted:#?}"
199 )]
200 IncorrectOutcome {
201 computed: Box<BlockExecutionOutcome>,
202 submitted: Box<BlockExecutionOutcome>,
203 },
204 #[error("The block timestamp is in the future.")]
205 InvalidTimestamp,
206 #[error("We don't have the value for the certificate.")]
207 MissingCertificateValue,
208 #[error("The hash certificate doesn't match its value.")]
209 InvalidLiteCertificate,
210 #[error("Fast blocks cannot query oracles")]
211 FastBlockUsingOracles,
212 #[error("Blobs not found: {0:?}")]
213 BlobsNotFound(Vec<BlobId>),
214 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
215 ConfirmedLogEntryNotFound {
216 height: BlockHeight,
217 chain_id: ChainId,
218 },
219 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
220 PreprocessedBlocksEntryNotFound {
221 height: BlockHeight,
222 chain_id: ChainId,
223 },
224 #[error("The block proposal is invalid: {0}")]
225 InvalidBlockProposal(String),
226 #[error("The worker is too busy to handle new chains")]
227 FullChainWorkerCache,
228 #[error("Failed to join spawned worker task")]
229 JoinError,
230 #[error("Blob was not required by any pending block")]
231 UnexpectedBlob,
232 #[error("Number of published blobs per block must not exceed {0}")]
233 TooManyPublishedBlobs(u64),
234 #[error("Missing network description")]
235 MissingNetworkDescription,
236}
237
238impl From<ChainError> for WorkerError {
239 #[instrument(level = "trace", skip(chain_error))]
240 fn from(chain_error: ChainError) -> Self {
241 match chain_error {
242 ChainError::ExecutionError(execution_error, context) => {
243 if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
244 Self::BlobsNotFound(blob_ids)
245 } else {
246 Self::ChainError(Box::new(ChainError::ExecutionError(
247 execution_error,
248 context,
249 )))
250 }
251 }
252 error => Self::ChainError(Box::new(error)),
253 }
254 }
255}
256
257#[cfg(with_testing)]
258impl WorkerError {
259 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
265 let WorkerError::ChainError(chain_error) = self else {
266 panic!("Expected an `ExecutionError`. Got: {self:#?}");
267 };
268
269 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
270 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
271 };
272
273 assert_eq!(context, expected_context);
274
275 *execution_error
276 }
277}
278
279pub struct WorkerState<StorageClient>
281where
282 StorageClient: Storage,
283{
284 nickname: String,
286 storage: StorageClient,
288 chain_worker_config: ChainWorkerConfig,
290 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
291 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
292 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
294 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
297 chain_worker_tasks: Arc<Mutex<JoinSet>>,
299 chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
301}
302
303impl<StorageClient> Clone for WorkerState<StorageClient>
304where
305 StorageClient: Storage + Clone,
306{
307 fn clone(&self) -> Self {
308 WorkerState {
309 nickname: self.nickname.clone(),
310 storage: self.storage.clone(),
311 chain_worker_config: self.chain_worker_config.clone(),
312 block_cache: self.block_cache.clone(),
313 execution_state_cache: self.execution_state_cache.clone(),
314 tracked_chains: self.tracked_chains.clone(),
315 delivery_notifiers: self.delivery_notifiers.clone(),
316 chain_worker_tasks: self.chain_worker_tasks.clone(),
317 chain_workers: self.chain_workers.clone(),
318 }
319 }
320}
321
322type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
324 ChainWorkerRequest<<StorageClient as Storage>::Context>,
325 tracing::Span,
326)>;
327
328pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
329
330impl<StorageClient> WorkerState<StorageClient>
331where
332 StorageClient: Storage,
333{
334 #[instrument(level = "trace", skip(nickname, key_pair, storage))]
335 pub fn new(
336 nickname: String,
337 key_pair: Option<ValidatorSecretKey>,
338 storage: StorageClient,
339 ) -> Self {
340 WorkerState {
341 nickname,
342 storage,
343 chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
344 block_cache: Arc::new(ValueCache::default()),
345 execution_state_cache: Arc::new(ValueCache::default()),
346 tracked_chains: None,
347 delivery_notifiers: Arc::default(),
348 chain_worker_tasks: Arc::default(),
349 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
350 }
351 }
352
353 #[instrument(level = "trace", skip(nickname, storage))]
354 pub fn new_for_client(
355 nickname: String,
356 storage: StorageClient,
357 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
358 ) -> Self {
359 WorkerState {
360 nickname,
361 storage,
362 chain_worker_config: ChainWorkerConfig::default(),
363 block_cache: Arc::new(ValueCache::default()),
364 execution_state_cache: Arc::new(ValueCache::default()),
365 tracked_chains: Some(tracked_chains),
366 delivery_notifiers: Arc::default(),
367 chain_worker_tasks: Arc::default(),
368 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
369 }
370 }
371
372 #[instrument(level = "trace", skip(self, value))]
373 pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
374 self.chain_worker_config.allow_inactive_chains = value;
375 self
376 }
377
378 #[instrument(level = "trace", skip(self, value))]
379 pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
380 self.chain_worker_config
381 .allow_messages_from_deprecated_epochs = value;
382 self
383 }
384
385 #[instrument(level = "trace", skip(self, value))]
386 pub fn with_long_lived_services(mut self, value: bool) -> Self {
387 self.chain_worker_config.long_lived_services = value;
388 self
389 }
390
391 #[instrument(level = "trace", skip(self))]
396 pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
397 self.chain_worker_config.grace_period = grace_period;
398 self
399 }
400
401 #[instrument(level = "trace", skip(self))]
405 pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
406 self.chain_worker_config.ttl = chain_worker_ttl;
407 self
408 }
409
410 #[instrument(level = "trace", skip(self))]
411 pub fn nickname(&self) -> &str {
412 &self.nickname
413 }
414
415 #[instrument(level = "trace", skip(self))]
417 #[cfg(not(feature = "test"))]
418 pub(crate) fn storage_client(&self) -> &StorageClient {
419 &self.storage
420 }
421
422 #[instrument(level = "trace", skip(self))]
425 #[cfg(feature = "test")]
426 pub fn storage_client(&self) -> &StorageClient {
427 &self.storage
428 }
429
430 #[instrument(level = "trace", skip(self, certificate))]
431 pub(crate) async fn full_certificate(
432 &self,
433 certificate: LiteCertificate<'_>,
434 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
435 let block = self
436 .block_cache
437 .get(&certificate.value.value_hash)
438 .ok_or(WorkerError::MissingCertificateValue)?;
439
440 match certificate.value.kind {
441 linera_chain::types::CertificateKind::Confirmed => {
442 let value = ConfirmedBlock::from_hashed(block);
443 Ok(Either::Left(
444 certificate
445 .with_value(value)
446 .ok_or(WorkerError::InvalidLiteCertificate)?,
447 ))
448 }
449 linera_chain::types::CertificateKind::Validated => {
450 let value = ValidatedBlock::from_hashed(block);
451 Ok(Either::Right(
452 certificate
453 .with_value(value)
454 .ok_or(WorkerError::InvalidLiteCertificate)?,
455 ))
456 }
457 _ => return Err(WorkerError::InvalidLiteCertificate),
458 }
459 }
460}
461
462#[allow(async_fn_in_trait)]
463#[cfg_attr(not(web), trait_variant::make(Send))]
464pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
465 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
466 worker: &WorkerState<S>,
467 certificate: GenericCertificate<Self>,
468 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
469}
470
471impl ProcessableCertificate for ConfirmedBlock {
472 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
473 worker: &WorkerState<S>,
474 certificate: ConfirmedBlockCertificate,
475 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
476 worker.handle_confirmed_certificate(certificate, None).await
477 }
478}
479
480impl ProcessableCertificate for ValidatedBlock {
481 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
482 worker: &WorkerState<S>,
483 certificate: ValidatedBlockCertificate,
484 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
485 worker.handle_validated_certificate(certificate).await
486 }
487}
488
489impl ProcessableCertificate for Timeout {
490 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
491 worker: &WorkerState<S>,
492 certificate: TimeoutCertificate,
493 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
494 worker.handle_timeout_certificate(certificate).await
495 }
496}
497
498impl<StorageClient> WorkerState<StorageClient>
499where
500 StorageClient: Storage + Clone + Send + Sync + 'static,
501{
502 #[instrument(level = "trace", skip(self, certificate, notifier))]
503 #[inline]
504 pub async fn fully_handle_certificate_with_notifications<T>(
505 &self,
506 certificate: GenericCertificate<T>,
507 notifier: &impl Notifier,
508 ) -> Result<ChainInfoResponse, WorkerError>
509 where
510 T: ProcessableCertificate,
511 {
512 let notifications = (*notifier).clone();
513 let this = self.clone();
514 linera_base::task::spawn(async move {
515 let (response, actions) =
516 ProcessableCertificate::process_certificate(&this, certificate).await?;
517 notifications.notify(&actions.notifications);
518 let mut requests = VecDeque::from(actions.cross_chain_requests);
519 while let Some(request) = requests.pop_front() {
520 let actions = this.handle_cross_chain_request(request).await?;
521 requests.extend(actions.cross_chain_requests);
522 notifications.notify(&actions.notifications);
523 }
524 Ok(response)
525 })
526 .await
527 .unwrap_or_else(|_| Err(WorkerError::JoinError))
528 }
529
530 #[instrument(level = "trace", skip(self, block))]
532 pub async fn stage_block_execution(
533 &self,
534 block: ProposedBlock,
535 round: Option<u32>,
536 published_blobs: Vec<Blob>,
537 ) -> Result<(Block, ChainInfoResponse), WorkerError> {
538 self.query_chain_worker(block.chain_id, move |callback| {
539 ChainWorkerRequest::StageBlockExecution {
540 block,
541 round,
542 published_blobs,
543 callback,
544 }
545 })
546 .await
547 }
548
549 #[instrument(level = "trace", skip(self, chain_id, query))]
551 pub async fn query_application(
552 &self,
553 chain_id: ChainId,
554 query: Query,
555 ) -> Result<QueryOutcome, WorkerError> {
556 self.query_chain_worker(chain_id, move |callback| {
557 ChainWorkerRequest::QueryApplication { query, callback }
558 })
559 .await
560 }
561
562 #[instrument(level = "trace", skip(self, chain_id, application_id))]
563 pub async fn describe_application(
564 &self,
565 chain_id: ChainId,
566 application_id: ApplicationId,
567 ) -> Result<ApplicationDescription, WorkerError> {
568 self.query_chain_worker(chain_id, move |callback| {
569 ChainWorkerRequest::DescribeApplication {
570 application_id,
571 callback,
572 }
573 })
574 .await
575 }
576
577 #[instrument(
579 level = "trace",
580 skip(self, certificate, notify_when_messages_are_delivered)
581 )]
582 async fn process_confirmed_block(
583 &self,
584 certificate: ConfirmedBlockCertificate,
585 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
586 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
587 let chain_id = certificate.block().header.chain_id;
588 self.query_chain_worker(chain_id, move |callback| {
589 ChainWorkerRequest::ProcessConfirmedBlock {
590 certificate,
591 notify_when_messages_are_delivered,
592 callback,
593 }
594 })
595 .await
596 }
597
598 #[instrument(level = "trace", skip(self, certificate))]
600 async fn process_validated_block(
601 &self,
602 certificate: ValidatedBlockCertificate,
603 ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
604 let chain_id = certificate.block().header.chain_id;
605 self.query_chain_worker(chain_id, move |callback| {
606 ChainWorkerRequest::ProcessValidatedBlock {
607 certificate,
608 callback,
609 }
610 })
611 .await
612 }
613
614 #[instrument(level = "trace", skip(self, certificate))]
616 async fn process_timeout(
617 &self,
618 certificate: TimeoutCertificate,
619 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
620 let chain_id = certificate.value().chain_id();
621 self.query_chain_worker(chain_id, move |callback| {
622 ChainWorkerRequest::ProcessTimeout {
623 certificate,
624 callback,
625 }
626 })
627 .await
628 }
629
630 #[instrument(level = "trace", skip(self, origin, recipient, bundles))]
631 async fn process_cross_chain_update(
632 &self,
633 origin: ChainId,
634 recipient: ChainId,
635 bundles: Vec<(Epoch, MessageBundle)>,
636 ) -> Result<Option<BlockHeight>, WorkerError> {
637 self.query_chain_worker(recipient, move |callback| {
638 ChainWorkerRequest::ProcessCrossChainUpdate {
639 origin,
640 bundles,
641 callback,
642 }
643 })
644 .await
645 }
646
647 #[instrument(level = "trace", skip(self, chain_id, height))]
649 #[cfg(with_testing)]
650 pub async fn read_certificate(
651 &self,
652 chain_id: ChainId,
653 height: BlockHeight,
654 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
655 self.query_chain_worker(chain_id, move |callback| {
656 ChainWorkerRequest::ReadCertificate { height, callback }
657 })
658 .await
659 }
660
661 #[instrument(level = "trace", skip(self))]
667 pub async fn chain_state_view(
668 &self,
669 chain_id: ChainId,
670 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
671 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
672 callback,
673 })
674 .await
675 }
676
677 #[instrument(level = "trace", skip(self, request_builder))]
678 async fn query_chain_worker<Response>(
680 &self,
681 chain_id: ChainId,
682 request_builder: impl FnOnce(
683 oneshot::Sender<Result<Response, WorkerError>>,
684 ) -> ChainWorkerRequest<StorageClient::Context>,
685 ) -> Result<Response, WorkerError> {
686 let chain_actor = self.get_chain_worker_endpoint(chain_id).await?;
687 let (callback, response) = oneshot::channel();
688
689 chain_actor
690 .send((request_builder(callback), tracing::Span::current()))
691 .expect("`ChainWorkerActor` stopped executing unexpectedly");
692
693 response
694 .await
695 .expect("`ChainWorkerActor` stopped executing without responding")
696 }
697
698 #[instrument(level = "trace", skip(self))]
701 async fn get_chain_worker_endpoint(
702 &self,
703 chain_id: ChainId,
704 ) -> Result<ChainActorEndpoint<StorageClient>, WorkerError> {
705 let (sender, new_receiver) = timeout(Duration::from_secs(3), async move {
706 loop {
707 match self.try_get_chain_worker_endpoint(chain_id) {
708 Some(endpoint) => break endpoint,
709 None => sleep(Duration::from_millis(250)).await,
710 }
711 }
712 })
713 .await
714 .map_err(|_| WorkerError::FullChainWorkerCache)?;
715
716 if let Some(receiver) = new_receiver {
717 let delivery_notifier = self
718 .delivery_notifiers
719 .lock()
720 .unwrap()
721 .entry(chain_id)
722 .or_default()
723 .clone();
724
725 let actor_task = ChainWorkerActor::run(
726 self.chain_worker_config.clone(),
727 self.storage.clone(),
728 self.block_cache.clone(),
729 self.execution_state_cache.clone(),
730 self.tracked_chains.clone(),
731 delivery_notifier,
732 chain_id,
733 receiver,
734 );
735
736 self.chain_worker_tasks
737 .lock()
738 .unwrap()
739 .spawn_task(actor_task);
740 }
741
742 Ok(sender)
743 }
744
745 #[instrument(level = "trace", skip(self))]
750 #[expect(clippy::type_complexity)]
751 fn try_get_chain_worker_endpoint(
752 &self,
753 chain_id: ChainId,
754 ) -> Option<(
755 ChainActorEndpoint<StorageClient>,
756 Option<
757 mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
758 >,
759 )> {
760 let mut chain_workers = self.chain_workers.lock().unwrap();
761
762 if let Some(endpoint) = chain_workers.get(&chain_id) {
763 Some((endpoint.clone(), None))
764 } else {
765 let (sender, receiver) = mpsc::unbounded_channel();
766 chain_workers.insert(chain_id, sender.clone());
767 Some((sender, Some(receiver)))
768 }
769 }
770
771 #[instrument(skip_all, fields(
772 nick = self.nickname,
773 chain_id = format!("{:.8}", proposal.content.block.chain_id),
774 height = %proposal.content.block.height,
775 ))]
776 pub async fn handle_block_proposal(
777 &self,
778 proposal: BlockProposal,
779 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
780 trace!("{} <-- {:?}", self.nickname, proposal);
781 #[cfg(with_metrics)]
782 let round = proposal.content.round;
783 let response = self
784 .query_chain_worker(proposal.content.block.chain_id, move |callback| {
785 ChainWorkerRequest::HandleBlockProposal { proposal, callback }
786 })
787 .await?;
788 #[cfg(with_metrics)]
789 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
790 .with_label_values(&[round.type_name()])
791 .observe(round.number() as f64);
792 Ok(response)
793 }
794
795 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
798 pub async fn handle_lite_certificate(
799 &self,
800 certificate: LiteCertificate<'_>,
801 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
802 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
803 match self.full_certificate(certificate).await? {
804 Either::Left(confirmed) => {
805 self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
806 .await
807 }
808 Either::Right(validated) => {
809 if let Some(notifier) = notify_when_messages_are_delivered {
810 if let Err(()) = notifier.send(()) {
812 warn!("Failed to notify message delivery to caller");
813 }
814 }
815 self.handle_validated_certificate(validated).await
816 }
817 }
818 }
819
820 #[instrument(skip_all, fields(
822 nick = self.nickname,
823 chain_id = format!("{:.8}", certificate.block().header.chain_id),
824 height = %certificate.block().header.height,
825 ))]
826 pub async fn handle_confirmed_certificate(
827 &self,
828 certificate: ConfirmedBlockCertificate,
829 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
830 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
831 trace!("{} <-- {:?}", self.nickname, certificate);
832 #[cfg(with_metrics)]
833 let metrics_data = if self
834 .chain_state_view(certificate.block().header.chain_id)
835 .await?
836 .tip_state
837 .get()
838 .next_block_height
839 == certificate.block().header.height
840 {
841 Some((
842 certificate.inner().to_log_str(),
843 certificate.round.type_name(),
844 certificate.round.number(),
845 certificate.block().body.transactions.len() as u64,
846 certificate
847 .signatures()
848 .iter()
849 .map(|(validator_name, _)| validator_name.to_string())
850 .collect::<Vec<_>>(),
851 ))
852 } else {
853 None
855 };
856
857 let result = self
858 .process_confirmed_block(certificate, notify_when_messages_are_delivered)
859 .await?;
860
861 #[cfg(with_metrics)]
862 {
863 if let Some(metrics_data) = metrics_data {
864 let (
865 certificate_log_str,
866 round_type,
867 round_number,
868 confirmed_transactions,
869 validators_with_signatures,
870 ) = metrics_data;
871 metrics::NUM_BLOCKS.with_label_values(&[]).inc();
872 metrics::NUM_ROUNDS_IN_CERTIFICATE
873 .with_label_values(&[certificate_log_str, round_type])
874 .observe(round_number as f64);
875 if confirmed_transactions > 0 {
876 metrics::TRANSACTION_COUNT
877 .with_label_values(&[])
878 .inc_by(confirmed_transactions);
879 }
880
881 for validator_name in validators_with_signatures {
882 metrics::CERTIFICATES_SIGNED
883 .with_label_values(&[&validator_name])
884 .inc();
885 }
886 }
887 }
888 Ok(result)
889 }
890
891 #[instrument(skip_all, fields(
893 nick = self.nickname,
894 chain_id = format!("{:.8}", certificate.block().header.chain_id),
895 height = %certificate.block().header.height,
896 ))]
897 pub async fn handle_validated_certificate(
898 &self,
899 certificate: ValidatedBlockCertificate,
900 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
901 trace!("{} <-- {:?}", self.nickname, certificate);
902
903 #[cfg(with_metrics)]
904 let round = certificate.round;
905 #[cfg(with_metrics)]
906 let cert_str = certificate.inner().to_log_str();
907
908 let (info, actions, _duplicated) = self.process_validated_block(certificate).await?;
909 #[cfg(with_metrics)]
910 {
911 if !_duplicated {
912 metrics::NUM_ROUNDS_IN_CERTIFICATE
913 .with_label_values(&[cert_str, round.type_name()])
914 .observe(round.number() as f64);
915 }
916 }
917 Ok((info, actions))
918 }
919
920 #[instrument(skip_all, fields(
922 nick = self.nickname,
923 chain_id = format!("{:.8}", certificate.inner().chain_id()),
924 height = %certificate.inner().height(),
925 ))]
926 pub async fn handle_timeout_certificate(
927 &self,
928 certificate: TimeoutCertificate,
929 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
930 trace!("{} <-- {:?}", self.nickname, certificate);
931 self.process_timeout(certificate).await
932 }
933
934 #[instrument(skip_all, fields(
935 nick = self.nickname,
936 chain_id = format!("{:.8}", query.chain_id)
937 ))]
938 pub async fn handle_chain_info_query(
939 &self,
940 query: ChainInfoQuery,
941 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
942 trace!("{} <-- {:?}", self.nickname, query);
943 #[cfg(with_metrics)]
944 metrics::CHAIN_INFO_QUERIES.inc();
945 let result = self
946 .query_chain_worker(query.chain_id, move |callback| {
947 ChainWorkerRequest::HandleChainInfoQuery { query, callback }
948 })
949 .await;
950 trace!("{} --> {:?}", self.nickname, result);
951 result
952 }
953
954 #[instrument(skip_all, fields(
955 nick = self.nickname,
956 chain_id = format!("{:.8}", chain_id)
957 ))]
958 pub async fn download_pending_blob(
959 &self,
960 chain_id: ChainId,
961 blob_id: BlobId,
962 ) -> Result<Blob, WorkerError> {
963 trace!(
964 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
965 self.nickname
966 );
967 let result = self
968 .query_chain_worker(chain_id, move |callback| {
969 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
970 })
971 .await;
972 trace!(
973 "{} --> {:?}",
974 self.nickname,
975 result.as_ref().map(|_| blob_id)
976 );
977 result
978 }
979
980 #[instrument(skip_all, fields(
981 nick = self.nickname,
982 chain_id = format!("{:.8}", chain_id)
983 ))]
984 pub async fn handle_pending_blob(
985 &self,
986 chain_id: ChainId,
987 blob: Blob,
988 ) -> Result<ChainInfoResponse, WorkerError> {
989 let blob_id = blob.id();
990 trace!(
991 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
992 self.nickname
993 );
994 let result = self
995 .query_chain_worker(chain_id, move |callback| {
996 ChainWorkerRequest::HandlePendingBlob { blob, callback }
997 })
998 .await;
999 trace!(
1000 "{} --> {:?}",
1001 self.nickname,
1002 result.as_ref().map(|_| blob_id)
1003 );
1004 result
1005 }
1006
1007 #[instrument(skip_all, fields(
1008 nick = self.nickname,
1009 chain_id = format!("{:.8}", request.target_chain_id())
1010 ))]
1011 pub async fn handle_cross_chain_request(
1012 &self,
1013 request: CrossChainRequest,
1014 ) -> Result<NetworkActions, WorkerError> {
1015 trace!("{} <-- {:?}", self.nickname, request);
1016 match request {
1017 CrossChainRequest::UpdateRecipient {
1018 sender,
1019 recipient,
1020 bundles,
1021 } => {
1022 let mut actions = NetworkActions::default();
1023 let origin = sender;
1024 let Some(height) = self
1025 .process_cross_chain_update(origin, recipient, bundles)
1026 .await?
1027 else {
1028 return Ok(actions);
1029 };
1030 actions.notifications.push(Notification {
1031 chain_id: recipient,
1032 reason: Reason::NewIncomingBundle { origin, height },
1033 });
1034 actions
1035 .cross_chain_requests
1036 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1037 sender,
1038 recipient,
1039 latest_height: height,
1040 });
1041 Ok(actions)
1042 }
1043 CrossChainRequest::ConfirmUpdatedRecipient {
1044 sender,
1045 recipient,
1046 latest_height,
1047 } => {
1048 self.query_chain_worker(sender, move |callback| {
1049 ChainWorkerRequest::ConfirmUpdatedRecipient {
1050 recipient,
1051 latest_height,
1052 callback,
1053 }
1054 })
1055 .await?;
1056 Ok(NetworkActions::default())
1057 }
1058 }
1059 }
1060
1061 pub async fn update_received_certificate_trackers(
1063 &self,
1064 chain_id: ChainId,
1065 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1066 ) -> Result<(), WorkerError> {
1067 self.query_chain_worker(chain_id, move |callback| {
1068 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1069 new_trackers,
1070 callback,
1071 }
1072 })
1073 .await
1074 }
1075}
1076
1077#[cfg(with_testing)]
1078impl<StorageClient> WorkerState<StorageClient>
1079where
1080 StorageClient: Storage,
1081{
1082 #[instrument(level = "trace", skip(self))]
1088 pub fn public_key(&self) -> ValidatorPublicKey {
1089 self.chain_worker_config
1090 .key_pair()
1091 .expect(
1092 "Test validator should have a key pair assigned to it \
1093 in order to obtain it's public key",
1094 )
1095 .public()
1096 }
1097}