1use std::{
6 cmp::Ordering,
7 collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
8 convert::Infallible,
9 iter,
10 sync::{Arc, RwLock},
11};
12
13use chain_client_state::ChainClientState;
14use custom_debug_derive::Debug;
15use futures::{
16 future::{self, Either, FusedFuture, Future},
17 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
18};
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{
22 abi::Abi,
23 crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
24 data_types::{
25 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
26 ChainDescription, Epoch, MessagePolicy, Round, TimeDelta, Timestamp,
27 },
28 ensure,
29 identifiers::{
30 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
31 ModuleId, StreamId,
32 },
33 ownership::{ChainOwnership, TimeoutConfig},
34 time::{Duration, Instant},
35};
36#[cfg(not(target_arch = "wasm32"))]
37use linera_base::{data_types::Bytecode, vm::VmRuntime};
38use linera_chain::{
39 data_types::{
40 BlockProposal, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight, IncomingBundle,
41 LiteVote, ProposedBlock, Transaction,
42 },
43 manager::LockingBlock,
44 types::{
45 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
46 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
47 },
48 ChainError, ChainExecutionContext, ChainStateView,
49};
50use linera_execution::{
51 committee::Committee,
52 system::{
53 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
54 REMOVED_EPOCH_STREAM_NAME,
55 },
56 ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
57};
58use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
59use linera_views::ViewError;
60use rand::prelude::SliceRandom as _;
61use received_log::ReceivedLogs;
62use serde::{Deserialize, Serialize};
63use thiserror::Error;
64use tokio::sync::{mpsc, OwnedRwLockReadGuard};
65use tokio_stream::wrappers::UnboundedReceiverStream;
66use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
67use validator_trackers::ValidatorTrackers;
68
69use crate::{
70 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout},
71 environment::{wallet::Wallet as _, Environment},
72 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
73 node::{
74 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
75 ValidatorNodeProvider as _,
76 },
77 notifier::{ChannelNotifier, Notifier as _},
78 remote_node::RemoteNode,
79 updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
80 worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
81 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
82};
83
84mod chain_client_state;
85#[cfg(test)]
86#[path = "../unit_tests/client_tests.rs"]
87mod client_tests;
88pub mod requests_scheduler;
89
90pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
91mod received_log;
92mod validator_trackers;
93
94#[cfg(with_metrics)]
95mod metrics {
96 use std::sync::LazyLock;
97
98 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
99 use prometheus::HistogramVec;
100
101 pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
102 LazyLock::new(|| {
103 register_histogram_vec(
104 "process_inbox_latency",
105 "process_inbox latency",
106 &[],
107 exponential_bucket_latencies(500.0),
108 )
109 });
110
111 pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
112 register_histogram_vec(
113 "prepare_chain_latency",
114 "prepare_chain latency",
115 &[],
116 exponential_bucket_latencies(500.0),
117 )
118 });
119
120 pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
121 register_histogram_vec(
122 "synchronize_chain_state_latency",
123 "synchronize_chain_state latency",
124 &[],
125 exponential_bucket_latencies(500.0),
126 )
127 });
128
129 pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
130 register_histogram_vec(
131 "execute_block_latency",
132 "execute_block latency",
133 &[],
134 exponential_bucket_latencies(500.0),
135 )
136 });
137
138 pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
139 register_histogram_vec(
140 "find_received_certificates_latency",
141 "find_received_certificates latency",
142 &[],
143 exponential_bucket_latencies(500.0),
144 )
145 });
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
153pub enum ListeningMode {
154 FullChain,
157 FollowChain,
161 EventsOnly(BTreeSet<StreamId>),
163}
164
165impl PartialOrd for ListeningMode {
166 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
167 match (self, other) {
168 (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
169 (ListeningMode::FullChain, _) => Some(Ordering::Greater),
170 (_, ListeningMode::FullChain) => Some(Ordering::Less),
171 (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
172 (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
173 (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
174 (ListeningMode::EventsOnly(a), ListeningMode::EventsOnly(b)) => {
175 if a == b {
176 Some(Ordering::Equal)
177 } else if a.is_superset(b) {
178 Some(Ordering::Greater)
179 } else if b.is_superset(a) {
180 Some(Ordering::Less)
181 } else {
182 None
183 }
184 }
185 }
186 }
187}
188
189impl ListeningMode {
190 pub fn is_relevant(&self, reason: &Reason) -> bool {
193 match (reason, self) {
194 (Reason::NewEvents { .. }, ListeningMode::FollowChain | ListeningMode::FullChain) => {
195 false
196 }
197 (_, ListeningMode::FullChain) => true,
199 (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
202 (_, ListeningMode::FollowChain) => false,
203 (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant))
207 | (Reason::NewBlock { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
208 relevant.intersection(event_streams).next().is_some()
209 }
210 (_, ListeningMode::EventsOnly(_)) => false,
211 }
212 }
213
214 pub fn extend(&mut self, other: Option<ListeningMode>) {
215 match (self, other) {
216 (_, None) => (),
217 (ListeningMode::FullChain, _) => (),
218 (mode, Some(ListeningMode::FullChain)) => {
219 *mode = ListeningMode::FullChain;
220 }
221 (ListeningMode::FollowChain, _) => (),
222 (mode, Some(ListeningMode::FollowChain)) => {
223 *mode = ListeningMode::FollowChain;
224 }
225 (
226 ListeningMode::EventsOnly(self_events),
227 Some(ListeningMode::EventsOnly(other_events)),
228 ) => {
229 self_events.extend(other_events);
230 }
231 }
232 }
233
234 pub fn is_follow_only(&self) -> bool {
237 !matches!(self, ListeningMode::FullChain)
238 }
239
240 pub fn is_full(&self) -> bool {
243 matches!(self, ListeningMode::FullChain)
244 }
245}
246
247pub struct Client<Env: Environment> {
249 environment: Env,
250 pub local_node: LocalNodeClient<Env::Storage>,
253 requests_scheduler: RequestsScheduler<Env>,
255 admin_chain_id: ChainId,
257 chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
260 notifier: Arc<ChannelNotifier<Notification>>,
262 chains: papaya::HashMap<ChainId, ChainClientState>,
264 options: ChainClientOptions,
266}
267
268impl<Env: Environment> Client<Env> {
269 #[expect(clippy::too_many_arguments)]
271 #[instrument(level = "trace", skip_all)]
272 pub fn new(
273 environment: Env,
274 admin_chain_id: ChainId,
275 long_lived_services: bool,
276 chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
277 name: impl Into<String>,
278 chain_worker_ttl: Duration,
279 sender_chain_worker_ttl: Duration,
280 options: ChainClientOptions,
281 requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
282 ) -> Self {
283 let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect()));
284 let state = WorkerState::new_for_client(
285 name.into(),
286 environment.storage().clone(),
287 chain_modes.clone(),
288 )
289 .with_long_lived_services(long_lived_services)
290 .with_allow_inactive_chains(true)
291 .with_allow_messages_from_deprecated_epochs(true)
292 .with_chain_worker_ttl(chain_worker_ttl)
293 .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
294 let local_node = LocalNodeClient::new(state);
295 let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
296
297 Self {
298 environment,
299 local_node,
300 requests_scheduler,
301 chains: papaya::HashMap::new(),
302 admin_chain_id,
303 chain_modes,
304 notifier: Arc::new(ChannelNotifier::default()),
305 options,
306 }
307 }
308
309 pub fn admin_chain_id(&self) -> ChainId {
311 self.admin_chain_id
312 }
313
314 pub fn subscribe(
316 &self,
317 chain_ids: Vec<ChainId>,
318 ) -> tokio::sync::mpsc::UnboundedReceiver<Notification> {
319 self.notifier.subscribe(chain_ids)
320 }
321
322 pub fn subscribe_extra(
324 &self,
325 chain_ids: Vec<ChainId>,
326 sender: &tokio::sync::mpsc::UnboundedSender<Notification>,
327 ) {
328 self.notifier.add_sender(chain_ids, sender);
329 }
330
331 pub fn storage_client(&self) -> &Env::Storage {
333 self.environment.storage()
334 }
335
336 pub fn validator_node_provider(&self) -> &Env::Network {
337 self.environment.network()
338 }
339
340 #[instrument(level = "trace", skip(self))]
342 pub fn signer(&self) -> &Env::Signer {
343 self.environment.signer()
344 }
345
346 pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, ChainClientError> {
348 self.signer()
349 .contains_key(owner)
350 .await
351 .map_err(ChainClientError::signer_failure)
352 }
353
354 pub fn wallet(&self) -> &Env::Wallet {
356 self.environment.wallet()
357 }
358
359 async fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
364 match self.wallet().get(chain_id).await {
365 Ok(Some(chain)) => chain.owner.is_none(),
366 Ok(None) | Err(_) => true,
368 }
369 }
370
371 #[instrument(level = "trace", skip(self))]
374 pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
375 let mut chain_modes = self
376 .chain_modes
377 .write()
378 .expect("Panics should not happen while holding a lock to `chain_modes`");
379 let entry = chain_modes.entry(chain_id).or_insert(mode.clone());
380 entry.extend(Some(mode));
381 entry.clone()
382 }
383
384 pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
386 self.chain_modes
387 .read()
388 .expect("Panics should not happen while holding a lock to `chain_modes`")
389 .get(&chain_id)
390 .cloned()
391 }
392
393 pub fn is_tracked(&self, chain_id: ChainId) -> bool {
395 self.chain_modes
396 .read()
397 .expect("Panics should not happen while holding a lock to `chain_modes`")
398 .get(&chain_id)
399 .is_some_and(ListeningMode::is_full)
400 }
401
402 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
404 pub fn create_chain_client(
405 self: &Arc<Self>,
406 chain_id: ChainId,
407 block_hash: Option<CryptoHash>,
408 next_block_height: BlockHeight,
409 pending_proposal: Option<PendingProposal>,
410 preferred_owner: Option<AccountOwner>,
411 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
412 ) -> ChainClient<Env> {
413 self.chains
416 .pin()
417 .get_or_insert_with(chain_id, || ChainClientState::new(pending_proposal.clone()));
418
419 ChainClient {
420 client: self.clone(),
421 chain_id,
422 options: self.options.clone(),
423 preferred_owner,
424 initial_block_hash: block_hash,
425 initial_next_block_height: next_block_height,
426 timing_sender,
427 }
428 }
429
430 async fn fetch_chain_info(
432 &self,
433 chain_id: ChainId,
434 validators: &[RemoteNode<Env::ValidatorNode>],
435 ) -> Result<Box<ChainInfo>, ChainClientError> {
436 match self.local_node.chain_info(chain_id).await {
437 Ok(info) => Ok(info),
438 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
439 Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
441 self.update_local_node_with_blobs_from(blob_ids, validators)
444 .await?;
445 Ok(self.local_node.chain_info(chain_id).await?)
446 }
447 Err(err) => Err(err.into()),
448 }
449 }
450
451 #[instrument(level = "trace", skip(self))]
453 async fn download_certificates(
454 &self,
455 chain_id: ChainId,
456 target_next_block_height: BlockHeight,
457 ) -> Result<Box<ChainInfo>, ChainClientError> {
458 let validators = self.validator_nodes().await?;
459 let mut info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
460 if target_next_block_height <= info.next_block_height {
461 return Ok(info);
462 }
463 if let Some(new_info) = self
464 .download_certificates_using_all(&validators, chain_id, target_next_block_height)
465 .await?
466 {
467 info = new_info;
468 }
469 ensure!(
470 target_next_block_height <= info.next_block_height,
471 ChainClientError::CannotDownloadCertificates {
472 chain_id,
473 target_next_block_height,
474 }
475 );
476 Ok(info)
477 }
478
479 #[instrument(level = "trace", skip_all)]
482 async fn download_certificates_from(
483 &self,
484 remote_node: &RemoteNode<Env::ValidatorNode>,
485 chain_id: ChainId,
486 stop: BlockHeight,
487 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
488 let mut last_info = None;
489 let chain_info = self.local_node.chain_info(chain_id).await?;
491 let mut next_height = chain_info.next_block_height;
492 let hashes = self
493 .local_node
494 .get_preprocessed_block_hashes(chain_id, next_height, stop)
495 .await?;
496 let certificates = self.storage_client().read_certificates(&hashes).await?;
497 let certificates = match ResultReadCertificates::new(certificates, hashes) {
498 ResultReadCertificates::Certificates(certificates) => certificates,
499 ResultReadCertificates::InvalidHashes(hashes) => {
500 return Err(ChainClientError::ReadCertificatesError(hashes))
501 }
502 };
503 for certificate in certificates {
504 last_info = Some(self.handle_certificate(certificate).await?.info);
505 }
506 while next_height < stop {
508 let limit = u64::from(stop)
510 .checked_sub(u64::from(next_height))
511 .ok_or(ArithmeticError::Overflow)?
512 .min(self.options.certificate_download_batch_size);
513
514 let certificates = self
515 .requests_scheduler
516 .download_certificates(remote_node, chain_id, next_height, limit)
517 .await?;
518 let Some(info) = self.process_certificates(remote_node, certificates).await? else {
519 break;
520 };
521 assert!(info.next_block_height > next_height);
522 next_height = info.next_block_height;
523 last_info = Some(info);
524 }
525 Ok(last_info)
526 }
527
528 #[instrument(level = "trace", skip_all)]
532 async fn download_certificates_using_all(
533 &self,
534 validators: &[RemoteNode<Env::ValidatorNode>],
535 chain_id: ChainId,
536 stop: BlockHeight,
537 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
538 let mut last_info = None;
539 let chain_info = self.local_node.chain_info(chain_id).await?;
541 let mut next_height = chain_info.next_block_height;
542 let hashes = self
543 .local_node
544 .get_preprocessed_block_hashes(chain_id, next_height, stop)
545 .await?;
546 let certificates = self.storage_client().read_certificates(&hashes).await?;
547 let certificates = match ResultReadCertificates::new(certificates, hashes) {
548 ResultReadCertificates::Certificates(certificates) => certificates,
549 ResultReadCertificates::InvalidHashes(hashes) => {
550 return Err(ChainClientError::ReadCertificatesError(hashes))
551 }
552 };
553 for certificate in certificates {
554 last_info = Some(self.handle_certificate(certificate).await?.info);
555 }
556 while next_height < stop {
558 let limit = u64::from(stop)
559 .checked_sub(u64::from(next_height))
560 .ok_or(ArithmeticError::Overflow)?
561 .min(self.options.certificate_download_batch_size);
562 let certificates = self
563 .requests_scheduler
564 .download_certificates_from_validators(
565 validators,
566 chain_id,
567 next_height,
568 limit,
569 self.options.certificate_batch_download_timeout,
570 )
571 .await?;
572 let Some(info) = self
573 .process_certificates_using_all(validators, certificates)
574 .await?
575 else {
576 break;
577 };
578 assert!(info.next_block_height > next_height);
579 next_height = info.next_block_height;
580 last_info = Some(info);
581 }
582 Ok(last_info)
583 }
584
585 #[instrument(level = "trace", skip_all)]
587 async fn process_certificates_using_all(
588 &self,
589 validators: &[RemoteNode<Env::ValidatorNode>],
590 certificates: Vec<ConfirmedBlockCertificate>,
591 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
592 let mut info = None;
593 let required_blob_ids: Vec<_> = certificates
594 .iter()
595 .flat_map(|certificate| certificate.value().required_blob_ids())
596 .collect();
597
598 match self
599 .local_node
600 .read_blob_states_from_storage(&required_blob_ids)
601 .await
602 {
603 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
604 self.download_blobs(validators, &blob_ids).await?;
605 }
606 x => {
607 x?;
608 }
609 }
610
611 for certificate in certificates {
612 info = Some(
613 match self.handle_certificate(certificate.clone()).await {
614 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
615 self.download_blobs(validators, &blob_ids).await?;
616 self.handle_certificate(certificate).await?
617 }
618 x => x?,
619 }
620 .info,
621 );
622 }
623
624 Ok(info)
625 }
626
627 async fn download_blobs(
628 &self,
629 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
630 blob_ids: &[BlobId],
631 ) -> Result<(), ChainClientError> {
632 let blobs = &self
633 .requests_scheduler
634 .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
635 .await?
636 .ok_or_else(|| {
637 ChainClientError::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
638 })?;
639 self.local_node.store_blobs(blobs).await.map_err(Into::into)
640 }
641
642 #[instrument(level = "trace", skip_all)]
647 async fn download_publisher_chains_for_events(
648 &self,
649 event_ids: &[EventId],
650 ) -> Result<(), ChainClientError> {
651 let mut events_by_chain: BTreeMap<ChainId, Vec<EventId>> = BTreeMap::new();
653 for event_id in event_ids {
654 events_by_chain
655 .entry(event_id.chain_id)
656 .or_default()
657 .push(event_id.clone());
658 }
659 for (chain_id, chain_event_ids) in events_by_chain {
660 self.download_chain_until_events_found(chain_id, &chain_event_ids)
661 .await?;
662 }
663 Ok(())
664 }
665
666 #[instrument(level = "trace", skip_all, fields(chain_id, num_events = event_ids.len()))]
671 async fn download_chain_until_events_found(
672 &self,
673 chain_id: ChainId,
674 event_ids: &[EventId],
675 ) -> Result<(), ChainClientError> {
676 let validators = self.validator_nodes().await?;
677 let info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
678 let mut next_height = info.next_block_height;
679 loop {
681 let result = self
682 .requests_scheduler
683 .download_certificates_from_validators(
684 &validators,
685 chain_id,
686 next_height,
687 1,
688 self.options.certificate_batch_download_timeout,
689 )
690 .await;
691 let certificates = match result {
692 Ok(certificates) => certificates,
693 Err(_) => break, };
695 let Some(batch_info) = self
696 .process_certificates_using_all(&validators, certificates)
697 .await?
698 else {
699 break;
700 };
701 assert!(batch_info.next_block_height > next_height);
702 next_height = batch_info.next_block_height;
703 if self.has_all_events(event_ids).await? {
704 return Ok(());
705 }
706 }
707 Ok(())
708 }
709
710 async fn has_all_events(&self, event_ids: &[EventId]) -> Result<bool, ChainClientError> {
712 for event_id in event_ids {
713 if !self
714 .storage_client()
715 .contains_event(event_id.clone())
716 .await?
717 {
718 return Ok(false);
719 }
720 }
721 Ok(true)
722 }
723
724 #[instrument(level = "trace", skip_all)]
727 async fn process_certificates(
728 &self,
729 remote_node: &RemoteNode<Env::ValidatorNode>,
730 certificates: Vec<ConfirmedBlockCertificate>,
731 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
732 let mut info = None;
733 let required_blob_ids: Vec<_> = certificates
734 .iter()
735 .flat_map(|certificate| certificate.value().required_blob_ids())
736 .collect();
737
738 match self
739 .local_node
740 .read_blob_states_from_storage(&required_blob_ids)
741 .await
742 {
743 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
744 self.download_blobs(&[remote_node.clone()], &blob_ids)
745 .await?;
746 }
747 x => {
748 x?;
749 }
750 }
751
752 for certificate in certificates {
753 info = Some(
754 match self.handle_certificate(certificate.clone()).await {
755 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
756 self.download_blobs(&[remote_node.clone()], &blob_ids)
757 .await?;
758 self.handle_certificate(certificate).await?
759 }
760 x => x?,
761 }
762 .info,
763 );
764 }
765
766 Ok(info)
768 }
769
770 async fn handle_certificate<T: ProcessableCertificate>(
771 &self,
772 certificate: GenericCertificate<T>,
773 ) -> Result<ChainInfoResponse, LocalNodeError> {
774 let chain_id = certificate.inner().chain_id();
775 let response = self
776 .local_node
777 .handle_certificate(certificate, &self.notifier)
778 .await?;
779 if self.is_tracked(chain_id) {
780 self.update_publisher_chain_modes(chain_id).await?;
781 }
782 Ok(response)
783 }
784
785 async fn update_publisher_chain_modes(&self, chain_id: ChainId) -> Result<(), LocalNodeError> {
788 let subscriptions = self.local_node.get_event_subscriptions(chain_id).await?;
789 let mut publishers = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
790 for ((publisher_id, stream_name), _) in subscriptions {
791 publishers
792 .entry(publisher_id)
793 .or_default()
794 .insert(stream_name);
795 }
796 if chain_id != self.admin_chain_id {
797 publishers.entry(self.admin_chain_id).or_default();
798 }
799 for (publisher_id, streams) in publishers {
800 if publisher_id != chain_id {
801 self.extend_chain_mode(publisher_id, ListeningMode::EventsOnly(streams));
802 }
803 }
804 Ok(())
805 }
806
807 async fn chain_info_with_committees(
808 &self,
809 chain_id: ChainId,
810 ) -> Result<Box<ChainInfo>, LocalNodeError> {
811 let query = ChainInfoQuery::new(chain_id).with_committees();
812 let info = self.local_node.handle_chain_info_query(query).await?.info;
813 Ok(info)
814 }
815
816 #[instrument(level = "trace", skip_all)]
819 async fn admin_committees(
820 &self,
821 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
822 let info = self.chain_info_with_committees(self.admin_chain_id).await?;
823 Ok((info.epoch, info.into_committees()?))
824 }
825
826 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
828 let info = self.chain_info_with_committees(self.admin_chain_id).await?;
829 Ok((info.epoch, info.into_current_committee()?))
830 }
831
832 async fn validator_nodes(
834 &self,
835 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, ChainClientError> {
836 let (_, committee) = self.admin_committee().await?;
837 Ok(self.make_nodes(&committee)?)
838 }
839
840 fn make_nodes(
842 &self,
843 committee: &Committee,
844 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
845 Ok(self
846 .validator_node_provider()
847 .make_nodes(committee)?
848 .map(|(public_key, node)| RemoteNode { public_key, node })
849 .collect())
850 }
851
852 pub async fn get_chain_description_blob(
855 &self,
856 chain_id: ChainId,
857 ) -> Result<Blob, ChainClientError> {
858 let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
859 let blob = self
860 .local_node
861 .storage_client()
862 .read_blob(chain_desc_id)
863 .await?;
864 if let Some(blob) = blob {
865 return Ok(blob);
867 }
868 Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
870 let nodes = self.validator_nodes().await?;
871 Ok(self
872 .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
873 .await?
874 .pop()
875 .unwrap()) }
877
878 pub async fn get_chain_description(
881 &self,
882 chain_id: ChainId,
883 ) -> Result<ChainDescription, ChainClientError> {
884 let blob = self.get_chain_description_blob(chain_id).await?;
885 Ok(bcs::from_bytes(blob.bytes())?)
886 }
887
888 #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
890 fn update_from_info(&self, info: &ChainInfo) {
891 self.chains.pin().update(info.chain_id, |state| {
892 let mut state = state.clone_for_update_unchecked();
893 state.update_from_info(info);
894 state
895 });
896 }
897
898 #[instrument(level = "trace", skip_all)]
900 async fn process_certificate<T: ProcessableCertificate>(
901 &self,
902 certificate: Box<GenericCertificate<T>>,
903 ) -> Result<(), LocalNodeError> {
904 let info = self.handle_certificate(*certificate).await?.info;
905 self.update_from_info(&info);
906 Ok(())
907 }
908
909 #[instrument(level = "trace", skip_all)]
911 async fn finalize_block(
912 self: &Arc<Self>,
913 committee: &Committee,
914 certificate: ValidatedBlockCertificate,
915 ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
916 debug!(round = %certificate.round, "Submitting block for confirmation");
917 let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
918 let finalize_action = CommunicateAction::FinalizeBlock {
919 certificate: Box::new(certificate),
920 delivery: self.options.cross_chain_message_delivery,
921 };
922 let certificate = self
923 .communicate_chain_action(committee, finalize_action, hashed_value)
924 .await?;
925 self.receive_certificate_with_checked_signatures(certificate.clone())
926 .await?;
927 Ok(certificate)
928 }
929
930 #[instrument(level = "trace", skip_all)]
932 async fn submit_block_proposal<T: ProcessableCertificate>(
933 self: &Arc<Self>,
934 committee: &Committee,
935 proposal: Box<BlockProposal>,
936 value: T,
937 ) -> Result<GenericCertificate<T>, ChainClientError> {
938 debug!(
939 round = %proposal.content.round,
940 "Submitting block proposal to validators"
941 );
942
943 let block_timestamp = proposal.content.block.timestamp;
945 let local_time = self.local_node.storage_client().clock().current_time();
946 if block_timestamp > local_time {
947 info!(
948 chain_id = %proposal.content.block.chain_id,
949 %block_timestamp,
950 %local_time,
951 "Block timestamp is in the future; waiting until it can be proposed",
952 );
953 }
954
955 let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
957 let submit_action = CommunicateAction::SubmitBlock {
958 proposal,
959 blob_ids: value.required_blob_ids().into_iter().collect(),
960 clock_skew_sender,
961 };
962
963 let validity_threshold = committee.validity_threshold();
965 let committee_clone = committee.clone();
966 let clock_skew_check_handle = linera_base::Task::spawn(async move {
967 let mut skew_weight = 0u64;
968 let mut min_skew = TimeDelta::MAX;
969 let mut max_skew = TimeDelta::ZERO;
970 while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
971 if clock_skew.as_micros() > 0 {
972 skew_weight += committee_clone.weight(&public_key);
973 min_skew = min_skew.min(clock_skew);
974 max_skew = max_skew.max(clock_skew);
975 if skew_weight >= validity_threshold {
976 warn!(
977 skew_weight,
978 validity_threshold,
979 min_skew_ms = min_skew.as_micros() / 1000,
980 max_skew_ms = max_skew.as_micros() / 1000,
981 "A validity threshold of validators reported clock skew; \
982 consider checking your system clock",
983 );
984 return;
985 }
986 }
987 }
988 });
989
990 let certificate = self
991 .communicate_chain_action(committee, submit_action, value)
992 .await?;
993
994 clock_skew_check_handle.await;
995
996 self.process_certificate(Box::new(certificate.clone()))
997 .await?;
998 Ok(certificate)
999 }
1000
1001 #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
1003 async fn communicate_chain_updates(
1004 self: &Arc<Self>,
1005 committee: &Committee,
1006 chain_id: ChainId,
1007 height: BlockHeight,
1008 delivery: CrossChainMessageDelivery,
1009 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
1010 ) -> Result<(), ChainClientError> {
1011 let nodes = self.make_nodes(committee)?;
1012 communicate_with_quorum(
1013 &nodes,
1014 committee,
1015 |_: &()| (),
1016 |remote_node| {
1017 let mut updater = ValidatorUpdater {
1018 remote_node,
1019 client: self.clone(),
1020 admin_chain_id: self.admin_chain_id,
1021 };
1022 let certificate = latest_certificate.clone();
1023 Box::pin(async move {
1024 updater
1025 .send_chain_information(chain_id, height, delivery, certificate)
1026 .await
1027 })
1028 },
1029 self.options.quorum_grace_period,
1030 )
1031 .await?;
1032 Ok(())
1033 }
1034
1035 #[instrument(level = "trace", skip_all)]
1041 async fn communicate_chain_action<T: CertificateValue>(
1042 self: &Arc<Self>,
1043 committee: &Committee,
1044 action: CommunicateAction,
1045 value: T,
1046 ) -> Result<GenericCertificate<T>, ChainClientError> {
1047 let nodes = self.make_nodes(committee)?;
1048 let ((votes_hash, votes_round), votes) = communicate_with_quorum(
1049 &nodes,
1050 committee,
1051 |vote: &LiteVote| (vote.value.value_hash, vote.round),
1052 |remote_node| {
1053 let mut updater = ValidatorUpdater {
1054 remote_node,
1055 client: self.clone(),
1056 admin_chain_id: self.admin_chain_id,
1057 };
1058 let action = action.clone();
1059 Box::pin(async move { updater.send_chain_update(action).await })
1060 },
1061 self.options.quorum_grace_period,
1062 )
1063 .await?;
1064 ensure!(
1065 (votes_hash, votes_round) == (value.hash(), action.round()),
1066 ChainClientError::UnexpectedQuorum {
1067 hash: votes_hash,
1068 round: votes_round,
1069 expected_hash: value.hash(),
1070 expected_round: action.round(),
1071 }
1072 );
1073 let certificate = LiteCertificate::try_from_votes(votes)
1078 .ok_or_else(|| {
1079 ChainClientError::InternalError("Vote values or rounds don't match; this is a bug")
1080 })?
1081 .with_value(value)
1082 .ok_or_else(|| {
1083 ChainClientError::ProtocolError("A quorum voted for an unexpected value")
1084 })?;
1085 Ok(certificate)
1086 }
1087
1088 #[instrument(level = "trace", skip_all)]
1091 async fn receive_certificate_with_checked_signatures(
1092 &self,
1093 certificate: ConfirmedBlockCertificate,
1094 ) -> Result<(), ChainClientError> {
1095 let certificate = Box::new(certificate);
1096 let block = certificate.block();
1097 self.download_certificates(block.header.chain_id, block.header.height)
1099 .await?;
1100 if let Err(err) = self.process_certificate(certificate.clone()).await {
1103 match &err {
1104 LocalNodeError::BlobsNotFound(blob_ids) => {
1105 self.download_blobs(&self.validator_nodes().await?, blob_ids)
1106 .await
1107 .map_err(|_| err)?;
1108 self.process_certificate(certificate).await?;
1109 }
1110 _ => {
1111 warn!("Failed to process network hashed certificate value");
1113 return Err(err.into());
1114 }
1115 }
1116 }
1117
1118 Ok(())
1119 }
1120
1121 #[instrument(level = "trace", skip_all)]
1123 async fn receive_sender_certificate(
1124 &self,
1125 certificate: ConfirmedBlockCertificate,
1126 mode: ReceiveCertificateMode,
1127 nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
1128 ) -> Result<(), ChainClientError> {
1129 let (max_epoch, committees) = self.admin_committees().await?;
1131 if let ReceiveCertificateMode::NeedsCheck = mode {
1132 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
1133 }
1134 let nodes = if let Some(nodes) = nodes {
1136 nodes
1137 } else {
1138 self.validator_nodes().await?
1139 };
1140 if let Err(err) = self.handle_certificate(certificate.clone()).await {
1141 match &err {
1142 LocalNodeError::BlobsNotFound(blob_ids) => {
1143 self.download_blobs(&nodes, blob_ids).await?;
1144 self.handle_certificate(certificate.clone()).await?;
1145 }
1146 _ => {
1147 warn!("Failed to process network hashed certificate value");
1149 return Err(err.into());
1150 }
1151 }
1152 }
1153
1154 Ok(())
1155 }
1156
1157 #[instrument(level = "trace", skip_all)]
1159 async fn download_and_process_sender_chain(
1160 &self,
1161 sender_chain_id: ChainId,
1162 nodes: &[RemoteNode<Env::ValidatorNode>],
1163 received_log: &ReceivedLogs,
1164 mut remote_heights: Vec<BlockHeight>,
1165 sender: mpsc::UnboundedSender<ChainAndHeight>,
1166 ) {
1167 let (max_epoch, committees) = match self.admin_committees().await {
1168 Ok(result) => result,
1169 Err(error) => {
1170 error!(%error, %sender_chain_id, "could not read admin committees");
1171 return;
1172 }
1173 };
1174 let committees_ref = &committees;
1175 let mut nodes = nodes.to_vec();
1176 while !remote_heights.is_empty() {
1177 let remote_heights_ref = &remote_heights;
1178 nodes.shuffle(&mut rand::thread_rng());
1179 let certificates = match communicate_concurrently(
1180 &nodes,
1181 async move |remote_node| {
1182 let mut remote_heights = remote_heights_ref.clone();
1183 remote_heights.retain(|height| {
1186 received_log.validator_has_block(
1187 &remote_node.public_key,
1188 sender_chain_id,
1189 *height,
1190 )
1191 });
1192 if remote_heights.is_empty() {
1193 return Err(());
1196 }
1197 let certificates = self
1198 .requests_scheduler
1199 .download_certificates_by_heights(
1200 &remote_node,
1201 sender_chain_id,
1202 remote_heights,
1203 )
1204 .await
1205 .map_err(|_| ())?;
1206 let mut certificates_with_check_results = vec![];
1207 for cert in certificates {
1208 if let Ok(check_result) =
1209 Self::check_certificate(max_epoch, committees_ref, &cert)
1210 {
1211 certificates_with_check_results
1212 .push((cert, check_result.into_result().is_ok()));
1213 } else {
1214 return Err(());
1216 }
1217 }
1218 Ok(certificates_with_check_results)
1219 },
1220 |errors| {
1221 errors
1222 .into_iter()
1223 .map(|(validator, _error)| validator)
1224 .collect::<BTreeSet<_>>()
1225 },
1226 self.options.certificate_batch_download_timeout,
1227 )
1228 .await
1229 {
1230 Ok(certificates_with_check_results) => certificates_with_check_results,
1231 Err(faulty_validators) => {
1232 nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1234 if nodes.is_empty() {
1235 info!(
1236 chain_id = %sender_chain_id,
1237 "could not download certificates for chain - no more correct validators left"
1238 );
1239 return;
1240 }
1241 continue;
1242 }
1243 };
1244
1245 trace!(
1246 chain_id = %sender_chain_id,
1247 num_certificates = %certificates.len(),
1248 "received certificates",
1249 );
1250
1251 let mut to_remove_from_queue = BTreeSet::new();
1252
1253 for (certificate, check_result) in certificates {
1254 let hash = certificate.hash();
1255 let chain_id = certificate.block().header.chain_id;
1256 let height = certificate.block().header.height;
1257 if !check_result {
1258 to_remove_from_queue.insert(height);
1262 continue;
1263 }
1264 let mode = ReceiveCertificateMode::AlreadyChecked;
1266 if let Err(error) = self
1267 .receive_sender_certificate(certificate, mode, None)
1268 .await
1269 {
1270 warn!(%error, %hash, "Received invalid certificate");
1271 } else {
1272 to_remove_from_queue.insert(height);
1273 if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1274 error!(
1275 %chain_id,
1276 %height,
1277 %error,
1278 "failed to send chain and height over the channel",
1279 );
1280 }
1281 }
1282 }
1283
1284 remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1285 }
1286 trace!(
1287 chain_id = %sender_chain_id,
1288 "find_received_certificates: finished processing chain",
1289 );
1290 }
1291
1292 #[instrument(level = "trace", skip(self))]
1294 async fn get_received_log_from_validator(
1295 &self,
1296 chain_id: ChainId,
1297 remote_node: &RemoteNode<Env::ValidatorNode>,
1298 tracker: u64,
1299 ) -> Result<Vec<ChainAndHeight>, ChainClientError> {
1300 let mut offset = tracker;
1301
1302 let mut remote_log = Vec::new();
1304 loop {
1305 trace!("get_received_log_from_validator: looping");
1306 let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1307 let info = remote_node.handle_chain_info_query(query).await?;
1308 let received_entries = info.requested_received_log.len();
1309 offset += received_entries as u64;
1310 remote_log.extend(info.requested_received_log);
1311 trace!(
1312 remote_node = remote_node.address(),
1313 %received_entries,
1314 "get_received_log_from_validator: received log batch",
1315 );
1316 if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1317 break;
1318 }
1319 }
1320
1321 trace!(
1322 remote_node = remote_node.address(),
1323 num_entries = remote_log.len(),
1324 "get_received_log_from_validator: returning downloaded log",
1325 );
1326
1327 Ok(remote_log)
1328 }
1329
1330 async fn download_sender_block_with_sending_ancestors(
1336 &self,
1337 receiver_chain_id: ChainId,
1338 sender_chain_id: ChainId,
1339 height: BlockHeight,
1340 remote_node: &RemoteNode<Env::ValidatorNode>,
1341 ) -> Result<(), ChainClientError> {
1342 let next_outbox_height = self
1343 .local_node
1344 .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1345 .await?
1346 .get(&sender_chain_id)
1347 .copied()
1348 .unwrap_or(BlockHeight::ZERO);
1349 let (max_epoch, committees) = self.admin_committees().await?;
1350
1351 let mut certificates = BTreeMap::new();
1354 let mut current_height = height;
1355
1356 while current_height >= next_outbox_height {
1358 let downloaded = self
1360 .requests_scheduler
1361 .download_certificates_by_heights(
1362 remote_node,
1363 sender_chain_id,
1364 vec![current_height],
1365 )
1366 .await?;
1367 let Some(certificate) = downloaded.into_iter().next() else {
1368 return Err(ChainClientError::CannotDownloadMissingSenderBlock {
1369 chain_id: sender_chain_id,
1370 height: current_height,
1371 });
1372 };
1373
1374 Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1376 .into_result()?;
1377
1378 let block = certificate.block();
1380 let next_height = block
1381 .body
1382 .previous_message_blocks
1383 .get(&receiver_chain_id)
1384 .map(|(_prev_hash, prev_height)| *prev_height);
1385
1386 certificates.insert(current_height, certificate);
1388
1389 if let Some(prev_height) = next_height {
1390 current_height = prev_height;
1392 } else {
1393 break;
1395 }
1396 }
1397
1398 if certificates.is_empty() {
1399 self.local_node
1400 .retry_pending_cross_chain_requests(sender_chain_id)
1401 .await?;
1402 }
1403
1404 for certificate in certificates.into_values() {
1406 self.receive_sender_certificate(
1407 certificate,
1408 ReceiveCertificateMode::AlreadyChecked,
1409 Some(vec![remote_node.clone()]),
1410 )
1411 .await?;
1412 }
1413
1414 Ok(())
1415 }
1416
1417 async fn download_event_bearing_blocks(
1421 &self,
1422 sender_chain_id: ChainId,
1423 height: BlockHeight,
1424 hash: CryptoHash,
1425 local_next_block_height: BlockHeight,
1426 subscribed_streams: &BTreeSet<StreamId>,
1427 remote_node: &RemoteNode<Env::ValidatorNode>,
1428 ) -> Result<(), ChainClientError> {
1429 let (max_epoch, committees) = self.admin_committees().await?;
1430
1431 let mut certificates = BTreeMap::new();
1432 let mut blocks_to_fetch = BTreeSet::<_>::from([(height, hash)]);
1433 let next_expected_events = subscribed_streams
1434 .iter()
1435 .zip(
1436 self.local_node
1437 .chain_state_view(sender_chain_id)
1438 .await?
1439 .next_expected_events
1440 .multi_get(subscribed_streams)
1441 .await?
1442 .into_iter()
1443 .map(|maybe_index| maybe_index.unwrap_or_default()),
1444 )
1445 .collect::<BTreeMap<_, _>>();
1446
1447 while let Some((current_height, current_hash)) = blocks_to_fetch.pop_last() {
1448 if current_height < local_next_block_height {
1449 continue; }
1451 if certificates.contains_key(¤t_height) {
1452 continue;
1453 }
1454
1455 let certificate = if let Some(certificate) =
1456 self.storage_client().read_certificate(current_hash).await?
1457 {
1458 certificate
1459 } else {
1460 let downloaded = self
1461 .requests_scheduler
1462 .download_certificates(remote_node, sender_chain_id, current_height, 1)
1463 .await?;
1464 let Some(certificate) = downloaded.into_iter().next() else {
1465 continue;
1466 };
1467
1468 Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1469 .into_result()?;
1470
1471 certificate
1472 };
1473 let block = certificate.block();
1474 for stream_id in subscribed_streams {
1476 if let Some((prev_hash, prev_height)) =
1477 block.body.previous_event_blocks.get(stream_id)
1478 {
1479 if next_expected_events.get(stream_id).is_some_and(|index| {
1480 block
1481 .body
1482 .events
1483 .iter()
1484 .flatten()
1485 .find(|event| event.stream_id == *stream_id)
1486 .is_some_and(|event| event.index == *index)
1487 }) {
1488 continue;
1489 }
1490 if !certificates.contains_key(prev_height) {
1491 blocks_to_fetch.insert((*prev_height, *prev_hash));
1492 }
1493 }
1494 }
1495
1496 certificates.insert(current_height, certificate);
1497 }
1498
1499 for certificate in certificates.into_values() {
1501 self.receive_sender_certificate(
1502 certificate,
1503 ReceiveCertificateMode::AlreadyChecked,
1504 Some(vec![remote_node.clone()]),
1505 )
1506 .await?;
1507 }
1508
1509 Ok(())
1510 }
1511
1512 #[instrument(
1513 level = "trace", skip_all,
1514 fields(certificate_hash = ?incoming_certificate.hash()),
1515 )]
1516 fn check_certificate(
1517 highest_known_epoch: Epoch,
1518 committees: &BTreeMap<Epoch, Committee>,
1519 incoming_certificate: &ConfirmedBlockCertificate,
1520 ) -> Result<CheckCertificateResult, NodeError> {
1521 let block = incoming_certificate.block();
1522 if block.header.epoch > highest_known_epoch {
1524 return Ok(CheckCertificateResult::FutureEpoch);
1525 }
1526 if let Some(known_committee) = committees.get(&block.header.epoch) {
1527 incoming_certificate.check(known_committee)?;
1530 Ok(CheckCertificateResult::New)
1531 } else {
1532 Ok(CheckCertificateResult::OldEpoch)
1534 }
1535 }
1536
1537 #[instrument(level = "trace", skip_all)]
1541 pub(crate) async fn synchronize_chain_state(
1542 &self,
1543 chain_id: ChainId,
1544 ) -> Result<Box<ChainInfo>, ChainClientError> {
1545 let (_, committee) = self.admin_committee().await?;
1546 Box::pin(self.synchronize_chain_state_from_committee(chain_id, committee)).await
1547 }
1548
1549 #[instrument(level = "trace", skip_all)]
1554 pub async fn synchronize_chain_state_from_committee(
1555 &self,
1556 chain_id: ChainId,
1557 committee: Committee,
1558 ) -> Result<Box<ChainInfo>, ChainClientError> {
1559 #[cfg(with_metrics)]
1560 let _latency = if !self.is_chain_follow_only(chain_id).await {
1561 Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1562 } else {
1563 None
1564 };
1565
1566 let validators = self.make_nodes(&committee)?;
1567 Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1568 communicate_with_quorum(
1569 &validators,
1570 &committee,
1571 |_: &()| (),
1572 |remote_node| async move {
1573 self.synchronize_chain_state_from(&remote_node, chain_id)
1574 .await
1575 },
1576 self.options.quorum_grace_period,
1577 )
1578 .await?;
1579
1580 self.local_node
1581 .chain_info(chain_id)
1582 .await
1583 .map_err(Into::into)
1584 }
1585
1586 #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1592 pub(crate) async fn synchronize_chain_state_from(
1593 &self,
1594 remote_node: &RemoteNode<Env::ValidatorNode>,
1595 chain_id: ChainId,
1596 ) -> Result<(), ChainClientError> {
1597 let with_manager_values = !self.is_chain_follow_only(chain_id).await;
1598 let query = if with_manager_values {
1599 ChainInfoQuery::new(chain_id).with_manager_values()
1600 } else {
1601 ChainInfoQuery::new(chain_id)
1602 };
1603 let remote_info = remote_node.handle_chain_info_query(query).await?;
1604 let local_info = self
1605 .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1606 .await?;
1607
1608 if !with_manager_values {
1609 return Ok(());
1610 }
1611
1612 let local_height = match local_info {
1614 Some(info) => info.next_block_height,
1615 None => {
1616 self.local_node
1617 .chain_info(chain_id)
1618 .await?
1619 .next_block_height
1620 }
1621 };
1622 if local_height != remote_info.next_block_height {
1623 debug!(
1624 remote_node = remote_node.address(),
1625 remote_height = %remote_info.next_block_height,
1626 local_height = %local_height,
1627 "synced from validator, but remote height and local height are different",
1628 );
1629 return Ok(());
1630 };
1631
1632 if let Some(timeout) = remote_info.manager.timeout {
1633 self.handle_certificate(*timeout).await?;
1634 }
1635 let mut proposals = Vec::new();
1636 if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1637 proposals.push(*proposal);
1638 }
1639 if let Some(proposal) = remote_info.manager.requested_proposed {
1640 proposals.push(*proposal);
1641 }
1642 if let Some(locking) = remote_info.manager.requested_locking {
1643 match *locking {
1644 LockingBlock::Fast(proposal) => {
1645 proposals.push(proposal);
1646 }
1647 LockingBlock::Regular(cert) => {
1648 let hash = cert.hash();
1649 if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1650 {
1651 debug!(
1652 remote_node = remote_node.address(),
1653 %hash,
1654 height = %local_height,
1655 %error,
1656 "skipping locked block from validator",
1657 );
1658 }
1659 }
1660 }
1661 }
1662 'proposal_loop: for proposal in proposals {
1663 let owner: AccountOwner = proposal.owner();
1664 if let Err(mut err) =
1665 Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1666 {
1667 if let LocalNodeError::BlobsNotFound(_) = &err {
1668 let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1669 if !required_blob_ids.is_empty() {
1670 let mut blobs = Vec::new();
1671 for blob_id in required_blob_ids {
1672 let blob_content = match self
1673 .requests_scheduler
1674 .download_pending_blob(remote_node, chain_id, blob_id)
1675 .await
1676 {
1677 Ok(content) => content,
1678 Err(error) => {
1679 info!(
1680 remote_node = remote_node.address(),
1681 height = %local_height,
1682 proposer = %owner,
1683 %blob_id,
1684 %error,
1685 "skipping proposal from validator; failed to download blob",
1686 );
1687 continue 'proposal_loop;
1688 }
1689 };
1690 blobs.push(Blob::new(blob_content));
1691 }
1692 self.local_node
1693 .handle_pending_blobs(chain_id, blobs)
1694 .await?;
1695 if let Err(new_err) =
1697 Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1698 {
1699 err = new_err;
1700 } else {
1701 continue;
1702 }
1703 }
1704 if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1705 self.update_local_node_with_blobs_from(
1706 blob_ids.clone(),
1707 &[remote_node.clone()],
1708 )
1709 .await?;
1710 if let Err(new_err) =
1712 Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1713 {
1714 err = new_err;
1715 } else {
1716 continue;
1717 }
1718 }
1719 }
1720 while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1721 if let ChainError::MissingCrossChainUpdate {
1722 chain_id,
1723 origin,
1724 height,
1725 } = &**chain_err
1726 {
1727 self.download_sender_block_with_sending_ancestors(
1728 *chain_id,
1729 *origin,
1730 *height,
1731 remote_node,
1732 )
1733 .await?;
1734 if let Err(new_err) =
1736 Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1737 {
1738 err = new_err;
1739 } else {
1740 continue 'proposal_loop;
1741 }
1742 } else {
1743 break;
1744 }
1745 }
1746
1747 debug!(
1748 remote_node = remote_node.address(),
1749 proposer = %owner,
1750 height = %local_height,
1751 error = %err,
1752 "skipping proposal from validator",
1753 );
1754 }
1755 }
1756 Ok(())
1757 }
1758
1759 async fn try_process_locking_block_from(
1760 &self,
1761 remote_node: &RemoteNode<Env::ValidatorNode>,
1762 certificate: GenericCertificate<ValidatedBlock>,
1763 ) -> Result<(), ChainClientError> {
1764 let chain_id = certificate.inner().chain_id();
1765 let certificate = Box::new(certificate);
1766 match self.process_certificate(certificate.clone()).await {
1767 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1768 let mut blobs = Vec::new();
1769 for blob_id in blob_ids {
1770 let blob_content = self
1771 .requests_scheduler
1772 .download_pending_blob(remote_node, chain_id, blob_id)
1773 .await?;
1774 blobs.push(Blob::new(blob_content));
1775 }
1776 self.local_node
1777 .handle_pending_blobs(chain_id, blobs)
1778 .await?;
1779 self.process_certificate(certificate).await?;
1780 Ok(())
1781 }
1782 Err(err) => Err(err.into()),
1783 Ok(()) => Ok(()),
1784 }
1785 }
1786
1787 async fn update_local_node_with_blobs_from(
1790 &self,
1791 blob_ids: Vec<BlobId>,
1792 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1793 ) -> Result<Vec<Blob>, ChainClientError> {
1794 let timeout = self.options.blob_download_timeout;
1795 let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1797 stream::iter(blob_ids.into_iter().map(|blob_id| {
1798 communicate_concurrently(
1799 remote_nodes,
1800 async move |remote_node| {
1801 let certificate = self
1802 .requests_scheduler
1803 .download_certificate_for_blob(&remote_node, blob_id)
1804 .await?;
1805 self.receive_sender_certificate(
1806 certificate,
1807 ReceiveCertificateMode::NeedsCheck,
1808 Some(vec![remote_node.clone()]),
1809 )
1810 .await?;
1811 let blob = self
1812 .local_node
1813 .storage_client()
1814 .read_blob(blob_id)
1815 .await?
1816 .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1817 Result::<_, ChainClientError>::Ok(blob)
1818 },
1819 move |_| ChainClientError::from(NodeError::BlobsNotFound(vec![blob_id])),
1820 timeout,
1821 )
1822 }))
1823 .buffer_unordered(self.options.max_joined_tasks)
1824 .collect::<Vec<_>>()
1825 .await
1826 .into_iter()
1827 .collect()
1828 }
1829
1830 #[instrument(level = "trace", skip(self, block))]
1840 async fn stage_block_execution_with_policy(
1841 &self,
1842 block: ProposedBlock,
1843 round: Option<u32>,
1844 published_blobs: Vec<Blob>,
1845 policy: BundleExecutionPolicy,
1846 ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1847 let mut downloaded_events = HashSet::<EventId>::new();
1848 loop {
1849 let result = self
1850 .local_node
1851 .stage_block_execution_with_policy(
1852 block.clone(),
1853 round,
1854 published_blobs.clone(),
1855 policy,
1856 )
1857 .await;
1858 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1859 let validators = self.validator_nodes().await?;
1860 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1861 .await?;
1862 continue; }
1864 if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1865 let new_events: Vec<_> = event_ids
1866 .iter()
1867 .filter(|id| !downloaded_events.contains(id))
1868 .cloned()
1869 .collect();
1870 if !new_events.is_empty() {
1871 self.download_publisher_chains_for_events(&new_events)
1872 .await?;
1873 downloaded_events.extend(new_events);
1874 continue; }
1876 }
1878 if let Ok((_, executed_block, _, _)) = &result {
1879 let hash = CryptoHash::new(executed_block);
1880 let notification = Notification {
1881 chain_id: executed_block.header.chain_id,
1882 reason: Reason::BlockExecuted {
1883 height: executed_block.header.height,
1884 hash,
1885 },
1886 };
1887 self.notifier.notify(&[notification]);
1888 }
1889 let (_modified_block, executed_block, response, _resource_tracker) = result?;
1890 return Ok((executed_block, response));
1891 }
1892 }
1893
1894 #[instrument(level = "trace", skip(self, block))]
1897 async fn stage_block_execution(
1898 &self,
1899 block: ProposedBlock,
1900 round: Option<u32>,
1901 published_blobs: Vec<Blob>,
1902 ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1903 let mut downloaded_events = HashSet::<EventId>::new();
1904 loop {
1905 let result = self
1906 .local_node
1907 .stage_block_execution(block.clone(), round, published_blobs.clone())
1908 .await;
1909 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1910 let validators = self.validator_nodes().await?;
1911 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1912 .await?;
1913 continue; }
1915 if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1916 let new_events: Vec<_> = event_ids
1917 .iter()
1918 .filter(|id| !downloaded_events.contains(id))
1919 .cloned()
1920 .collect();
1921 if !new_events.is_empty() {
1922 self.download_publisher_chains_for_events(&new_events)
1923 .await?;
1924 downloaded_events.extend(new_events);
1925 continue; }
1927 }
1929 if let Ok((block, _, _)) = &result {
1930 let hash = CryptoHash::new(block);
1931 let notification = Notification {
1932 chain_id: block.header.chain_id,
1933 reason: Reason::BlockExecuted {
1934 height: block.header.height,
1935 hash,
1936 },
1937 };
1938 self.notifier.notify(&[notification]);
1939 }
1940 let (block, response, _resource_tracker) = result?;
1941 return Ok((block, response));
1942 }
1943 }
1944}
1945
1946#[derive(Debug, Clone, Copy)]
1947pub enum TimingType {
1948 ExecuteOperations,
1949 ExecuteBlock,
1950 SubmitBlockProposal,
1951 UpdateValidators,
1952}
1953
1954#[derive(Debug, Clone)]
1955pub struct ChainClientOptions {
1956 pub max_pending_message_bundles: usize,
1958 pub max_block_limit_errors: u32,
1963 pub max_new_events_per_block: usize,
1965 pub staging_bundles_time_budget: Option<Duration>,
1968 pub message_policy: MessagePolicy,
1970 pub cross_chain_message_delivery: CrossChainMessageDelivery,
1972 pub quorum_grace_period: f64,
1975 pub blob_download_timeout: Duration,
1977 pub certificate_batch_download_timeout: Duration,
1979 pub certificate_download_batch_size: u64,
1982 pub sender_certificate_download_batch_size: usize,
1985 pub max_joined_tasks: usize,
1987 pub allow_fast_blocks: bool,
1990}
1991
1992pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
1993pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
1994
1995#[cfg(with_testing)]
1996impl ChainClientOptions {
1997 pub fn test_default() -> Self {
1998 use crate::DEFAULT_QUORUM_GRACE_PERIOD;
1999
2000 ChainClientOptions {
2001 max_pending_message_bundles: 10,
2002 max_block_limit_errors: 3,
2003 max_new_events_per_block: 10,
2004 staging_bundles_time_budget: None,
2005 message_policy: MessagePolicy::new_accept_all(),
2006 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
2007 quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
2008 blob_download_timeout: Duration::from_secs(1),
2009 certificate_batch_download_timeout: Duration::from_secs(1),
2010 certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
2011 sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
2012 max_joined_tasks: 100,
2013 allow_fast_blocks: false,
2014 }
2015 }
2016}
2017
2018impl ChainClientOptions {
2019 pub fn bundle_execution_policy(&self) -> BundleExecutionPolicy {
2021 BundleExecutionPolicy {
2022 on_failure: BundleFailurePolicy::AutoRetry {
2023 max_failures: self.max_block_limit_errors,
2024 },
2025 time_budget: self.staging_bundles_time_budget,
2026 }
2027 }
2028}
2029
2030#[derive(Debug)]
2036pub struct ChainClient<Env: Environment> {
2037 #[debug(skip)]
2039 client: Arc<Client<Env>>,
2040 chain_id: ChainId,
2042 #[debug(skip)]
2044 options: ChainClientOptions,
2045 preferred_owner: Option<AccountOwner>,
2048 initial_next_block_height: BlockHeight,
2050 initial_block_hash: Option<CryptoHash>,
2052 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
2054}
2055
2056impl<Env: Environment> Clone for ChainClient<Env> {
2057 fn clone(&self) -> Self {
2058 Self {
2059 client: self.client.clone(),
2060 chain_id: self.chain_id,
2061 options: self.options.clone(),
2062 preferred_owner: self.preferred_owner,
2063 initial_next_block_height: self.initial_next_block_height,
2064 initial_block_hash: self.initial_block_hash,
2065 timing_sender: self.timing_sender.clone(),
2066 }
2067 }
2068}
2069
2070#[derive(Debug, Error)]
2072pub enum ChainClientError {
2073 #[error("Local node operation failed: {0}")]
2074 LocalNodeError(#[from] LocalNodeError),
2075
2076 #[error("Remote node operation failed: {0}")]
2077 RemoteNodeError(#[from] NodeError),
2078
2079 #[error(transparent)]
2080 ArithmeticError(#[from] ArithmeticError),
2081
2082 #[error("Missing certificates: {0:?}")]
2083 ReadCertificatesError(Vec<CryptoHash>),
2084
2085 #[error("Missing confirmed block: {0:?}")]
2086 MissingConfirmedBlock(CryptoHash),
2087
2088 #[error("JSON (de)serialization error: {0}")]
2089 JsonError(#[from] serde_json::Error),
2090
2091 #[error("Chain operation failed: {0}")]
2092 ChainError(#[from] ChainError),
2093
2094 #[error(transparent)]
2095 CommunicationError(#[from] CommunicationError<NodeError>),
2096
2097 #[error("Internal error within chain client: {0}")]
2098 InternalError(&'static str),
2099
2100 #[error(
2101 "Cannot accept a certificate from an unknown committee in the future. \
2102 Please synchronize the local view of the admin chain"
2103 )]
2104 CommitteeSynchronizationError,
2105
2106 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
2107 WalletSynchronizationError,
2108
2109 #[error("The state of the client is incompatible with the proposed block: {0}")]
2110 BlockProposalError(&'static str),
2111
2112 #[error(
2113 "Cannot accept a certificate from a committee that was retired. \
2114 Try a newer certificate from the same origin"
2115 )]
2116 CommitteeDeprecationError,
2117
2118 #[error("Protocol error within chain client: {0}")]
2119 ProtocolError(&'static str),
2120
2121 #[error("Signer doesn't have key to sign for chain {0}")]
2122 CannotFindKeyForChain(ChainId),
2123
2124 #[error("client is not configured to propose on chain {0}")]
2125 NoAccountKeyConfigured(ChainId),
2126
2127 #[error("The chain client isn't owner on chain {0}")]
2128 NotAnOwner(ChainId),
2129
2130 #[error(transparent)]
2131 ViewError(#[from] ViewError),
2132
2133 #[error(
2134 "Failed to download certificates and update local node to the next height \
2135 {target_next_block_height} of chain {chain_id}"
2136 )]
2137 CannotDownloadCertificates {
2138 chain_id: ChainId,
2139 target_next_block_height: BlockHeight,
2140 },
2141
2142 #[error(transparent)]
2143 BcsError(#[from] bcs::Error),
2144
2145 #[error(
2146 "Unexpected quorum: validators voted for block hash {hash} in {round}, \
2147 expected block hash {expected_hash} in {expected_round}"
2148 )]
2149 UnexpectedQuorum {
2150 hash: CryptoHash,
2151 round: Round,
2152 expected_hash: CryptoHash,
2153 expected_round: Round,
2154 },
2155
2156 #[error("signer error: {0:?}")]
2157 Signer(#[source] Box<dyn signer::Error>),
2158
2159 #[error("Cannot revoke the current epoch {0}")]
2160 CannotRevokeCurrentEpoch(Epoch),
2161
2162 #[error("Epoch is already revoked")]
2163 EpochAlreadyRevoked,
2164
2165 #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
2166 CannotDownloadMissingSenderBlock {
2167 chain_id: ChainId,
2168 height: BlockHeight,
2169 },
2170
2171 #[error(
2172 "A different block was already committed at this height. \
2173 The committed certificate hash is {0}"
2174 )]
2175 Conflict(CryptoHash),
2176}
2177
2178impl From<Infallible> for ChainClientError {
2179 fn from(infallible: Infallible) -> Self {
2180 match infallible {}
2181 }
2182}
2183
2184impl ChainClientError {
2185 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
2186 Self::Signer(Box::new(err))
2187 }
2188}
2189
2190impl<Env: Environment> ChainClient<Env> {
2191 #[instrument(level = "trace", skip(self))]
2193 fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
2194 self.client
2195 .chains
2196 .pin()
2197 .get(&self.chain_id)
2198 .expect("Chain client constructed for invalid chain")
2199 .client_mutex()
2200 }
2201
2202 #[instrument(level = "trace", skip(self))]
2204 pub fn pending_proposal(&self) -> Option<PendingProposal> {
2205 self.client
2206 .chains
2207 .pin()
2208 .get(&self.chain_id)
2209 .expect("Chain client constructed for invalid chain")
2210 .pending_proposal()
2211 .clone()
2212 }
2213
2214 #[instrument(level = "trace", skip(self, f))]
2216 fn update_state<F>(&self, f: F)
2217 where
2218 F: Fn(&mut ChainClientState),
2219 {
2220 let chains = self.client.chains.pin();
2221 chains
2222 .update(self.chain_id, |state| {
2223 let mut state = state.clone_for_update_unchecked();
2224 f(&mut state);
2225 state
2226 })
2227 .expect("Chain client constructed for invalid chain");
2228 }
2229
2230 #[instrument(level = "trace", skip(self))]
2232 pub fn signer(&self) -> &impl Signer {
2233 self.client.signer()
2234 }
2235
2236 #[instrument(level = "trace", skip(self))]
2238 pub fn options_mut(&mut self) -> &mut ChainClientOptions {
2239 &mut self.options
2240 }
2241
2242 #[instrument(level = "trace", skip(self))]
2244 pub fn options(&self) -> &ChainClientOptions {
2245 &self.options
2246 }
2247
2248 #[instrument(level = "trace", skip(self))]
2250 pub fn chain_id(&self) -> ChainId {
2251 self.chain_id
2252 }
2253
2254 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
2256 self.timing_sender.clone()
2257 }
2258
2259 #[instrument(level = "trace", skip(self))]
2261 pub fn admin_chain_id(&self) -> ChainId {
2262 self.client.admin_chain_id
2263 }
2264
2265 #[instrument(level = "trace", skip(self))]
2267 pub fn preferred_owner(&self) -> Option<AccountOwner> {
2268 self.preferred_owner
2269 }
2270
2271 #[instrument(level = "trace", skip(self))]
2273 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
2274 self.preferred_owner = Some(preferred_owner);
2275 }
2276
2277 #[instrument(level = "trace", skip(self))]
2279 pub fn unset_preferred_owner(&mut self) {
2280 self.preferred_owner = None;
2281 }
2282
2283 #[instrument(level = "trace")]
2285 pub async fn chain_state_view(
2286 &self,
2287 ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
2288 self.client.local_node.chain_state_view(self.chain_id).await
2289 }
2290
2291 #[instrument(level = "trace", skip(self))]
2294 pub async fn event_stream_publishers(
2295 &self,
2296 ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
2297 let subscriptions = self
2298 .client
2299 .local_node
2300 .get_event_subscriptions(self.chain_id)
2301 .await?;
2302 let mut publishers = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
2303 for ((chain_id, stream_name), _) in subscriptions {
2304 publishers.entry(chain_id).or_default().insert(stream_name);
2305 }
2306 if self.chain_id != self.client.admin_chain_id {
2307 publishers.entry(self.client.admin_chain_id).or_default();
2309 }
2310 Ok(publishers)
2311 }
2312
2313 #[instrument(level = "trace")]
2315 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
2316 self.subscribe_to(self.chain_id)
2317 }
2318
2319 #[instrument(level = "trace")]
2321 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
2322 Ok(Box::pin(UnboundedReceiverStream::new(
2323 self.client.notifier.subscribe(vec![chain_id]),
2324 )))
2325 }
2326
2327 #[instrument(level = "trace")]
2329 pub fn storage_client(&self) -> &Env::Storage {
2330 self.client.storage_client()
2331 }
2332
2333 #[instrument(level = "trace")]
2335 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
2336 let query = ChainInfoQuery::new(self.chain_id);
2337 let response = self
2338 .client
2339 .local_node
2340 .handle_chain_info_query(query)
2341 .await?;
2342 self.client.update_from_info(&response.info);
2343 Ok(response.info)
2344 }
2345
2346 #[instrument(level = "trace")]
2348 pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
2349 let query = ChainInfoQuery::new(self.chain_id)
2350 .with_manager_values()
2351 .with_committees();
2352 let response = self
2353 .client
2354 .local_node
2355 .handle_chain_info_query(query)
2356 .await?;
2357 self.client.update_from_info(&response.info);
2358 Ok(response.info)
2359 }
2360
2361 pub async fn get_chain_description(&self) -> Result<ChainDescription, ChainClientError> {
2363 self.client.get_chain_description(self.chain_id).await
2364 }
2365
2366 pub async fn prepare_for_owner(
2372 &self,
2373 owner: AccountOwner,
2374 ) -> Result<Box<ChainInfo>, ChainClientError> {
2375 ensure!(
2376 self.client.has_key_for(&owner).await?,
2377 ChainClientError::CannotFindKeyForChain(self.chain_id)
2378 );
2379 self.client
2381 .get_chain_description_blob(self.chain_id)
2382 .await?;
2383
2384 let info = self.chain_info().await?;
2386
2387 ensure!(
2389 info.manager
2390 .ownership
2391 .can_propose_in_multi_leader_round(&owner),
2392 ChainClientError::NotAnOwner(self.chain_id)
2393 );
2394
2395 Ok(info)
2396 }
2397
2398 #[instrument(level = "trace")]
2401 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
2402 if self.options.message_policy.is_ignore() {
2403 return Ok(Vec::new());
2405 }
2406
2407 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
2408 let info = self
2409 .client
2410 .local_node
2411 .handle_chain_info_query(query)
2412 .await?
2413 .info;
2414 if self.preferred_owner.is_some_and(|owner| {
2415 info.manager
2416 .ownership
2417 .is_super_owner_no_regular_owners(&owner)
2418 }) {
2419 ensure!(
2421 info.next_block_height >= self.initial_next_block_height,
2422 ChainClientError::WalletSynchronizationError
2423 );
2424 }
2425
2426 Ok(info
2427 .requested_pending_message_bundles
2428 .into_iter()
2429 .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
2430 .take(self.options.max_pending_message_bundles)
2431 .collect())
2432 }
2433
2434 #[instrument(level = "trace")]
2438 async fn collect_stream_updates(&self) -> Result<Option<Operation>, ChainClientError> {
2439 let subscription_map = self
2441 .client
2442 .local_node
2443 .get_event_subscriptions(self.chain_id)
2444 .await?;
2445 let futures = subscription_map
2447 .into_iter()
2448 .filter(|((chain_id, _), _)| {
2449 self.options
2450 .message_policy
2451 .restrict_chain_ids_to
2452 .as_ref()
2453 .is_none_or(|chain_set| chain_set.contains(chain_id))
2454 })
2455 .map(|((chain_id, stream_id), subscriptions)| {
2456 let client = self.client.clone();
2457 let previous_index = subscriptions.next_index;
2458 async move {
2459 let next_index = client
2460 .local_node
2461 .get_stream_event_count(chain_id, stream_id.clone())
2462 .await?;
2463 if let Some(next_index) =
2464 next_index.filter(|next_index| *next_index > previous_index)
2465 {
2466 Ok(Some((chain_id, stream_id, previous_index, next_index)))
2467 } else {
2468 Ok::<_, ChainClientError>(None)
2469 }
2470 }
2471 });
2472 let all_updates = futures::stream::iter(futures)
2473 .buffer_unordered(self.options.max_joined_tasks)
2474 .try_collect::<Vec<_>>()
2475 .await?
2476 .into_iter()
2477 .flatten()
2478 .collect::<Vec<_>>();
2479 let max_events = self.options.max_new_events_per_block;
2481 let mut total_events: usize = 0;
2482 let mut updates = Vec::new();
2483 for (chain_id, stream_id, previous_index, next_index) in all_updates {
2484 let new_events = (next_index - previous_index) as usize;
2485 if total_events + new_events <= max_events {
2486 total_events += new_events;
2487 updates.push((chain_id, stream_id, next_index));
2488 } else {
2489 let remaining = max_events.saturating_sub(total_events);
2490 if remaining > 0 {
2491 updates.push((chain_id, stream_id, previous_index + remaining as u32));
2492 }
2493 break;
2494 }
2495 }
2496 if updates.is_empty() {
2497 return Ok(None);
2498 }
2499 Ok(Some(SystemOperation::UpdateStreams(updates).into()))
2500 }
2501
2502 #[instrument(level = "trace")]
2503 async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
2504 self.client.chain_info_with_committees(self.chain_id).await
2505 }
2506
2507 #[instrument(level = "trace")]
2509 async fn epoch_and_committees(
2510 &self,
2511 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
2512 let info = self.chain_info_with_committees().await?;
2513 let epoch = info.epoch;
2514 let committees = info.into_committees()?;
2515 Ok((epoch, committees))
2516 }
2517
2518 #[instrument(level = "trace")]
2520 pub async fn local_committee(&self) -> Result<Committee, ChainClientError> {
2521 let info = match self.chain_info_with_committees().await {
2522 Ok(info) => info,
2523 Err(LocalNodeError::BlobsNotFound(_)) => {
2524 self.synchronize_chain_state(self.chain_id).await?;
2525 self.chain_info_with_committees().await?
2526 }
2527 Err(err) => return Err(err.into()),
2528 };
2529 Ok(info.into_current_committee()?)
2530 }
2531
2532 #[instrument(level = "trace")]
2534 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
2535 self.client.admin_committee().await
2536 }
2537
2538 #[instrument(level = "trace")]
2542 pub async fn identity(&self) -> Result<AccountOwner, ChainClientError> {
2543 let Some(preferred_owner) = self.preferred_owner else {
2544 return Err(ChainClientError::NoAccountKeyConfigured(self.chain_id));
2545 };
2546 let manager = self.chain_info().await?.manager;
2547 ensure!(
2548 manager.ownership.is_active(),
2549 LocalNodeError::InactiveChain(self.chain_id)
2550 );
2551
2552 let is_owner = manager
2555 .ownership
2556 .can_propose_in_multi_leader_round(&preferred_owner);
2557
2558 if !is_owner {
2559 let accepted_owners = manager
2560 .ownership
2561 .all_owners()
2562 .chain(&manager.leader)
2563 .collect::<Vec<_>>();
2564 warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
2565 "The preferred owner is not configured as an owner of this chain",
2566 );
2567 return Err(ChainClientError::NotAnOwner(self.chain_id));
2568 }
2569
2570 let has_signer = self
2571 .signer()
2572 .contains_key(&preferred_owner)
2573 .await
2574 .map_err(ChainClientError::signer_failure)?;
2575
2576 if !has_signer {
2577 warn!(%self.chain_id, ?preferred_owner,
2578 "Chain is one of the owners but its Signer instance doesn't contain the key",
2579 );
2580 return Err(ChainClientError::CannotFindKeyForChain(self.chain_id));
2581 }
2582
2583 Ok(preferred_owner)
2584 }
2585
2586 #[instrument(level = "trace")]
2589 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2590 #[cfg(with_metrics)]
2591 let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();
2592
2593 let mut info = self.synchronize_to_known_height().await?;
2594
2595 if self.preferred_owner.is_none_or(|owner| {
2596 !info
2597 .manager
2598 .ownership
2599 .is_super_owner_no_regular_owners(&owner)
2600 }) {
2601 info = self.client.synchronize_chain_state(self.chain_id).await?;
2605 }
2606
2607 if info.epoch > self.client.admin_committees().await?.0 {
2608 self.client
2609 .synchronize_chain_state(self.client.admin_chain_id)
2610 .await?;
2611 }
2612
2613 self.client.update_from_info(&info);
2614 Ok(info)
2615 }
2616
2617 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2622 let info = self
2623 .client
2624 .download_certificates(self.chain_id, self.initial_next_block_height)
2625 .await?;
2626 if info.next_block_height == self.initial_next_block_height {
2627 ensure!(
2629 self.initial_block_hash == info.block_hash,
2630 ChainClientError::InternalError("Invalid chain of blocks in local node")
2631 );
2632 }
2633 Ok(info)
2634 }
2635
2636 #[instrument(level = "trace", skip(old_committee, latest_certificate))]
2638 pub async fn update_validators(
2639 &self,
2640 old_committee: Option<&Committee>,
2641 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
2642 ) -> Result<(), ChainClientError> {
2643 let update_validators_start = linera_base::time::Instant::now();
2644 if let Some(old_committee) = old_committee {
2646 let old_committee_start = linera_base::time::Instant::now();
2647 self.communicate_chain_updates(old_committee, latest_certificate.clone())
2648 .await?;
2649 tracing::debug!(
2650 old_committee_ms = old_committee_start.elapsed().as_millis(),
2651 "communicated chain updates to old committee"
2652 );
2653 };
2654 if let Ok(new_committee) = self.local_committee().await {
2655 if Some(&new_committee) != old_committee {
2656 let new_committee_start = linera_base::time::Instant::now();
2659 self.communicate_chain_updates(&new_committee, latest_certificate)
2660 .await?;
2661 tracing::debug!(
2662 new_committee_ms = new_committee_start.elapsed().as_millis(),
2663 "communicated chain updates to new committee"
2664 );
2665 }
2666 }
2667 self.send_timing(update_validators_start, TimingType::UpdateValidators);
2668 Ok(())
2669 }
2670
2671 #[instrument(level = "trace", skip(committee, latest_certificate))]
2673 pub async fn communicate_chain_updates(
2674 &self,
2675 committee: &Committee,
2676 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
2677 ) -> Result<(), ChainClientError> {
2678 let delivery = self.options.cross_chain_message_delivery;
2679 let height = self.chain_info().await?.next_block_height;
2680 self.client
2681 .communicate_chain_updates(
2682 committee,
2683 self.chain_id,
2684 height,
2685 delivery,
2686 latest_certificate,
2687 )
2688 .await
2689 }
2690
2691 async fn synchronize_publisher_chains(&self) -> Result<(), ChainClientError> {
2694 let subscriptions = self
2695 .client
2696 .local_node
2697 .get_event_subscriptions(self.chain_id)
2698 .await?;
2699 let chain_ids: BTreeSet<_> = subscriptions
2700 .iter()
2701 .map(|((chain_id, _), _)| *chain_id)
2702 .chain(iter::once(self.client.admin_chain_id))
2703 .filter(|chain_id| *chain_id != self.chain_id)
2704 .collect();
2705 stream::iter(
2706 chain_ids
2707 .into_iter()
2708 .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
2709 )
2710 .buffer_unordered(self.options.max_joined_tasks)
2711 .collect::<Vec<_>>()
2712 .await
2713 .into_iter()
2714 .collect::<Result<Vec<_>, _>>()?;
2715 Ok(())
2716 }
2717
2718 #[instrument(level = "trace")]
2727 pub async fn find_received_certificates(&self) -> Result<(), ChainClientError> {
2728 debug!(chain_id = %self.chain_id, "starting find_received_certificates");
2729 #[cfg(with_metrics)]
2730 let _latency = metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
2731 let chain_id = self.chain_id;
2733 let (_, committee) = self.admin_committee().await?;
2734 let nodes = self.client.make_nodes(&committee)?;
2735
2736 let trackers = self
2737 .client
2738 .local_node
2739 .get_received_certificate_trackers(chain_id)
2740 .await?;
2741
2742 trace!("find_received_certificates: read trackers");
2743
2744 let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
2745 let result = communicate_with_quorum(
2747 &nodes,
2748 &committee,
2749 |_| (),
2750 |remote_node| {
2751 let client = &self.client;
2752 let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
2753 let received_log_batches = Arc::clone(&received_log_batches);
2754 Box::pin(async move {
2755 let batch = client
2756 .get_received_log_from_validator(chain_id, &remote_node, tracker)
2757 .await?;
2758 let mut batches = received_log_batches.lock().unwrap();
2759 batches.push((remote_node.public_key, batch));
2760 Ok(())
2761 })
2762 },
2763 self.options.quorum_grace_period,
2764 )
2765 .await;
2766
2767 if let Err(error) = result {
2768 error!(
2769 %error,
2770 "Failed to synchronize received_logs from at least a quorum of validators",
2771 );
2772 }
2773
2774 let received_logs: Vec<_> = {
2775 let mut received_log_batches = received_log_batches.lock().unwrap();
2776 std::mem::take(received_log_batches.as_mut())
2777 };
2778
2779 debug!(
2780 received_logs_len = %received_logs.len(),
2781 received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
2782 "collected received logs"
2783 );
2784
2785 let (received_logs, mut validator_trackers) = {
2786 (
2787 ReceivedLogs::from_received_result(received_logs.clone()),
2788 ValidatorTrackers::new(received_logs, &trackers),
2789 )
2790 };
2791
2792 debug!(
2793 num_chains = %received_logs.num_chains(),
2794 num_certs = %received_logs.num_certs(),
2795 "find_received_certificates: total number of chains and certificates to sync",
2796 );
2797
2798 let max_blocks_per_chain =
2799 self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
2800 for received_log in received_logs.into_batches(
2801 self.options.sender_certificate_download_batch_size,
2802 max_blocks_per_chain,
2803 ) {
2804 validator_trackers = self
2805 .receive_sender_certificates(received_log, validator_trackers, &nodes)
2806 .await?;
2807
2808 self.update_received_certificate_trackers(&validator_trackers)
2809 .await;
2810 }
2811
2812 info!("find_received_certificates finished");
2813
2814 Ok(())
2815 }
2816
2817 async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
2818 let updated_trackers = trackers.to_map();
2819 trace!(?updated_trackers, "updated tracker values");
2820
2821 if let Err(error) = self
2823 .client
2824 .local_node
2825 .update_received_certificate_trackers(self.chain_id, updated_trackers)
2826 .await
2827 {
2828 error!(
2829 chain_id = %self.chain_id,
2830 %error,
2831 "Failed to update the certificate trackers for chain",
2832 );
2833 }
2834 }
2835
2836 async fn receive_sender_certificates(
2839 &self,
2840 mut received_logs: ReceivedLogs,
2841 mut validator_trackers: ValidatorTrackers,
2842 nodes: &[RemoteNode<Env::ValidatorNode>],
2843 ) -> Result<ValidatorTrackers, ChainClientError> {
2844 debug!(
2845 num_chains = %received_logs.num_chains(),
2846 num_certs = %received_logs.num_certs(),
2847 "receive_sender_certificates: number of chains and certificates to sync",
2848 );
2849
2850 let local_next_heights = self
2852 .client
2853 .local_node
2854 .next_outbox_heights(received_logs.chains(), self.chain_id)
2855 .await?;
2856
2857 validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
2858
2859 debug!(
2860 remaining_total_certificates = %received_logs.num_certs(),
2861 "receive_sender_certificates: computed remote_heights"
2862 );
2863
2864 let mut other_sender_chains = Vec::new();
2865 let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
2866
2867 let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
2868 let received_logs = &received_logs;
2869 let other_sender_chains = &mut other_sender_chains;
2870
2871 move |(sender_chain_id, remote_heights)| {
2872 if remote_heights.is_empty() {
2873 other_sender_chains.push(sender_chain_id);
2877 return None;
2878 };
2879 let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
2880 let sender = sender.clone();
2881 let client = self.client.clone();
2882 let mut nodes = nodes.to_vec();
2883 nodes.shuffle(&mut rand::thread_rng());
2884 Some(async move {
2885 client
2886 .download_and_process_sender_chain(
2887 sender_chain_id,
2888 &nodes,
2889 received_logs,
2890 remote_heights,
2891 sender,
2892 )
2893 .await
2894 })
2895 }
2896 });
2897
2898 future::join(
2899 stream::iter(cert_futures)
2900 .buffer_unordered(self.options.max_joined_tasks)
2901 .collect::<()>(),
2902 async {
2903 while let Some(chain_and_height) = receiver.recv().await {
2904 validator_trackers.downloaded_cert(chain_and_height);
2905 }
2906 },
2907 )
2908 .await;
2909
2910 debug!(
2911 num_other_chains = %other_sender_chains.len(),
2912 "receive_sender_certificates: processing certificates finished"
2913 );
2914
2915 self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
2919 .await;
2920
2921 debug!("receive_sender_certificates: finished processing other_sender_chains");
2922
2923 Ok(validator_trackers)
2924 }
2925
2926 async fn retry_pending_cross_chain_requests(
2929 &self,
2930 nodes: &[RemoteNode<Env::ValidatorNode>],
2931 other_sender_chains: Vec<ChainId>,
2932 ) {
2933 let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
2934 let local_node = self.client.local_node.clone();
2935 async move {
2936 if let Err(error) = match local_node
2937 .retry_pending_cross_chain_requests(chain_id)
2938 .await
2939 {
2940 Ok(()) => Ok(()),
2941 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
2942 if let Err(error) = self
2943 .client
2944 .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
2945 .await
2946 {
2947 error!(
2948 ?blob_ids,
2949 %error,
2950 "Error while attempting to download blobs during retrying outgoing \
2951 messages"
2952 );
2953 }
2954 local_node
2955 .retry_pending_cross_chain_requests(chain_id)
2956 .await
2957 }
2958 err => err,
2959 } {
2960 error!(
2961 %chain_id,
2962 %error,
2963 "Failed to retry outgoing messages from chain"
2964 );
2965 }
2966 }
2967 }));
2968 stream.for_each(future::ready).await;
2969 }
2970
2971 #[instrument(level = "trace")]
2973 pub async fn transfer(
2974 &self,
2975 owner: AccountOwner,
2976 amount: Amount,
2977 recipient: Account,
2978 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2979 Box::pin(self.execute_operation(SystemOperation::Transfer {
2981 owner,
2982 recipient,
2983 amount,
2984 }))
2985 .await
2986 }
2987
2988 #[instrument(level = "trace")]
2991 pub async fn read_data_blob(
2992 &self,
2993 hash: CryptoHash,
2994 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2995 let blob_id = BlobId {
2996 hash,
2997 blob_type: BlobType::Data,
2998 };
2999 Box::pin(self.execute_operation(SystemOperation::VerifyBlob { blob_id })).await
3000 }
3001
3002 #[instrument(level = "trace")]
3004 pub async fn claim(
3005 &self,
3006 owner: AccountOwner,
3007 target_id: ChainId,
3008 recipient: Account,
3009 amount: Amount,
3010 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3011 Box::pin(self.execute_operation(SystemOperation::Claim {
3012 owner,
3013 target_id,
3014 recipient,
3015 amount,
3016 }))
3017 .await
3018 }
3019
3020 #[instrument(level = "trace")]
3023 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, ChainClientError> {
3024 let chain_id = self.chain_id;
3025 let info = self.chain_info_with_committees().await?;
3026 let committee = info.current_committee()?;
3027 let height = info.next_block_height;
3028 let round = info.manager.current_round;
3029 let action = CommunicateAction::RequestTimeout {
3030 height,
3031 round,
3032 chain_id,
3033 };
3034 let value = Timeout::new(chain_id, height, info.epoch);
3035 let certificate = Box::new(
3036 self.client
3037 .communicate_chain_action(committee, action, value)
3038 .await?,
3039 );
3040 self.client.process_certificate(certificate.clone()).await?;
3041 self.client
3043 .communicate_chain_updates(
3044 committee,
3045 chain_id,
3046 height,
3047 CrossChainMessageDelivery::NonBlocking,
3048 None,
3049 )
3050 .await?;
3051 Ok(*certificate)
3052 }
3053
3054 #[instrument(level = "trace", skip_all)]
3056 pub async fn synchronize_chain_state(
3057 &self,
3058 chain_id: ChainId,
3059 ) -> Result<Box<ChainInfo>, ChainClientError> {
3060 self.client.synchronize_chain_state(chain_id).await
3061 }
3062
3063 #[instrument(level = "trace", skip_all)]
3066 pub async fn synchronize_chain_state_from_committee(
3067 &self,
3068 committee: Committee,
3069 ) -> Result<Box<ChainInfo>, ChainClientError> {
3070 Box::pin(
3071 self.client
3072 .synchronize_chain_state_from_committee(self.chain_id, committee),
3073 )
3074 .await
3075 }
3076
3077 #[instrument(level = "trace", skip(operations, blobs))]
3079 pub async fn execute_operations(
3080 &self,
3081 operations: Vec<Operation>,
3082 blobs: Vec<Blob>,
3083 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3084 let timing_start = linera_base::time::Instant::now();
3085 tracing::debug!("execute_operations started");
3086
3087 let result = loop {
3088 let execute_block_start = linera_base::time::Instant::now();
3089 tracing::debug!("calling execute_block");
3091 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
3092 Ok(ClientOutcome::Committed(certificate)) => {
3093 tracing::debug!(
3094 execute_block_ms = execute_block_start.elapsed().as_millis(),
3095 "execute_block succeeded"
3096 );
3097 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
3098 break Ok(ClientOutcome::Committed(certificate));
3099 }
3100 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
3101 break Ok(ClientOutcome::WaitForTimeout(timeout));
3102 }
3103 Ok(ClientOutcome::Conflict(certificate)) => {
3104 info!(
3105 height = %certificate.block().header.height,
3106 "Another block was committed."
3107 );
3108 break Ok(ClientOutcome::Conflict(certificate));
3109 }
3110 Err(ChainClientError::CommunicationError(CommunicationError::Trusted(
3111 NodeError::UnexpectedBlockHeight {
3112 expected_block_height,
3113 found_block_height,
3114 },
3115 ))) if expected_block_height > found_block_height => {
3116 tracing::info!(
3117 "Local state is outdated; synchronizing chain {:.8}",
3118 self.chain_id
3119 );
3120 self.synchronize_chain_state(self.chain_id).await?;
3121 }
3122 Err(err) => return Err(err),
3123 };
3124 };
3125
3126 self.send_timing(timing_start, TimingType::ExecuteOperations);
3127 tracing::debug!(
3128 total_execute_operations_ms = timing_start.elapsed().as_millis(),
3129 "execute_operations returning"
3130 );
3131
3132 result
3133 }
3134
3135 pub async fn execute_operation(
3137 &self,
3138 operation: impl Into<Operation>,
3139 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3140 self.execute_operations(vec![operation.into()], vec![])
3141 .await
3142 }
3143
3144 #[instrument(level = "trace", skip(operations, blobs))]
3148 async fn execute_block(
3149 &self,
3150 operations: Vec<Operation>,
3151 blobs: Vec<Blob>,
3152 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3153 #[cfg(with_metrics)]
3154 let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
3155
3156 let mutex = self.client_mutex();
3157 let lock_start = linera_base::time::Instant::now();
3158 let _guard = mutex.lock_owned().await;
3159 tracing::debug!(
3160 lock_wait_ms = lock_start.elapsed().as_millis(),
3161 "acquired client_mutex in execute_block"
3162 );
3163 match self.process_pending_block_without_prepare().await? {
3165 ClientOutcome::Committed(Some(certificate)) => {
3166 return Ok(ClientOutcome::Conflict(Box::new(certificate)))
3167 }
3168 ClientOutcome::WaitForTimeout(timeout) => {
3169 return Ok(ClientOutcome::WaitForTimeout(timeout))
3170 }
3171 ClientOutcome::Conflict(certificate) => {
3172 return Ok(ClientOutcome::Conflict(certificate))
3173 }
3174 ClientOutcome::Committed(None) => {}
3175 }
3176
3177 let transactions = self.prepend_epochs_messages_and_events(operations).await?;
3181
3182 if transactions.is_empty() {
3183 return Err(ChainClientError::LocalNodeError(
3184 LocalNodeError::WorkerError(WorkerError::ChainError(Box::new(
3185 ChainError::EmptyBlock,
3186 ))),
3187 ));
3188 }
3189
3190 let block = self.new_pending_block(transactions, blobs).await?;
3191
3192 match self.process_pending_block_without_prepare().await? {
3193 ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
3194 Ok(ClientOutcome::Committed(certificate))
3195 }
3196 ClientOutcome::Committed(Some(certificate)) => {
3197 Ok(ClientOutcome::Conflict(Box::new(certificate)))
3198 }
3199 ClientOutcome::Committed(None) => Err(ChainClientError::BlockProposalError(
3201 "Unexpected block proposal error",
3202 )),
3203 ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
3204 ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
3205 }
3206 }
3207
3208 #[instrument(level = "trace", skip(operations))]
3214 async fn prepend_epochs_messages_and_events(
3215 &self,
3216 operations: Vec<Operation>,
3217 ) -> Result<Vec<Transaction>, ChainClientError> {
3218 let incoming_bundles = self.pending_message_bundles().await?;
3219 let stream_updates = self.collect_stream_updates().await?;
3220 Ok(self
3221 .collect_epoch_changes()
3222 .await?
3223 .into_iter()
3224 .map(Transaction::ExecuteOperation)
3225 .chain(
3226 incoming_bundles
3227 .into_iter()
3228 .map(Transaction::ReceiveMessages),
3229 )
3230 .chain(
3231 stream_updates
3232 .into_iter()
3233 .map(Transaction::ExecuteOperation),
3234 )
3235 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
3236 .collect::<Vec<_>>())
3237 }
3238
3239 #[instrument(level = "trace", skip(transactions, blobs))]
3243 async fn new_pending_block(
3244 &self,
3245 transactions: Vec<Transaction>,
3246 blobs: Vec<Blob>,
3247 ) -> Result<Block, ChainClientError> {
3248 let identity = self.identity().await?;
3249
3250 ensure!(
3251 self.pending_proposal().is_none(),
3252 ChainClientError::BlockProposalError(
3253 "Client state already has a pending block; \
3254 use the `linera retry-pending-block` command to commit that first"
3255 )
3256 );
3257 let info = self.chain_info_with_committees().await?;
3258 let timestamp = self.next_timestamp(&transactions, info.timestamp);
3259 let proposed_block = ProposedBlock {
3260 epoch: info.epoch,
3261 chain_id: self.chain_id,
3262 transactions,
3263 previous_block_hash: info.block_hash,
3264 height: info.next_block_height,
3265 authenticated_signer: Some(identity),
3266 timestamp,
3267 };
3268
3269 let round = self.round_for_oracle(&info, &identity).await?;
3270 let (block, _) = Box::pin(self.client.stage_block_execution_with_policy(
3273 proposed_block,
3274 round,
3275 blobs.clone(),
3276 self.options.bundle_execution_policy(),
3277 ))
3278 .await?;
3279 let (proposed_block, _) = block.clone().into_proposal();
3280 self.update_state(|state| {
3281 state.set_pending_proposal(proposed_block.clone(), blobs.clone())
3282 });
3283 Ok(block)
3284 }
3285
3286 #[instrument(level = "trace", skip(transactions))]
3291 fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
3292 let local_time = self.storage_client().clock().current_time();
3293 transactions
3294 .iter()
3295 .filter_map(Transaction::incoming_bundle)
3296 .map(|msg| msg.bundle.timestamp)
3297 .max()
3298 .map_or(local_time, |timestamp| timestamp.max(local_time))
3299 .max(block_time)
3300 }
3301
3302 #[instrument(level = "trace", skip(query))]
3304 pub async fn query_application(
3305 &self,
3306 query: Query,
3307 block_hash: Option<CryptoHash>,
3308 ) -> Result<(QueryOutcome, BlockHeight), ChainClientError> {
3309 loop {
3310 let result = self
3311 .client
3312 .local_node
3313 .query_application(self.chain_id, query.clone(), block_hash)
3314 .await;
3315 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
3316 let validators = self.client.validator_nodes().await?;
3317 self.client
3318 .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
3319 .await?;
3320 continue; }
3322 return Ok(result?);
3323 }
3324 }
3325
3326 #[instrument(level = "trace", skip(query))]
3328 pub async fn query_system_application(
3329 &self,
3330 query: SystemQuery,
3331 ) -> Result<QueryOutcome<SystemResponse>, ChainClientError> {
3332 let (
3333 QueryOutcome {
3334 response,
3335 operations,
3336 },
3337 _,
3338 ) = self.query_application(Query::System(query), None).await?;
3339 match response {
3340 QueryResponse::System(response) => Ok(QueryOutcome {
3341 response,
3342 operations,
3343 }),
3344 _ => Err(ChainClientError::InternalError(
3345 "Unexpected response for system query",
3346 )),
3347 }
3348 }
3349
3350 #[instrument(level = "trace", skip(application_id, query))]
3352 #[cfg(with_testing)]
3353 pub async fn query_user_application<A: Abi>(
3354 &self,
3355 application_id: ApplicationId<A>,
3356 query: &A::Query,
3357 ) -> Result<QueryOutcome<A::QueryResponse>, ChainClientError> {
3358 let query = Query::user(application_id, query)?;
3359 let (
3360 QueryOutcome {
3361 response,
3362 operations,
3363 },
3364 _,
3365 ) = self.query_application(query, None).await?;
3366 match response {
3367 QueryResponse::User(response_bytes) => {
3368 let response = serde_json::from_slice(&response_bytes)?;
3369 Ok(QueryOutcome {
3370 response,
3371 operations,
3372 })
3373 }
3374 _ => Err(ChainClientError::InternalError(
3375 "Unexpected response for user query",
3376 )),
3377 }
3378 }
3379
3380 #[instrument(level = "trace")]
3387 pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
3388 let (balance, _) = Box::pin(self.query_balances_with_owner(AccountOwner::CHAIN)).await?;
3389 Ok(balance)
3390 }
3391
3392 #[instrument(level = "trace", skip(owner))]
3399 pub async fn query_owner_balance(
3400 &self,
3401 owner: AccountOwner,
3402 ) -> Result<Amount, ChainClientError> {
3403 if owner.is_chain() {
3404 Box::pin(self.query_balance()).await
3405 } else {
3406 Ok(Box::pin(self.query_balances_with_owner(owner))
3407 .await?
3408 .1
3409 .unwrap_or(Amount::ZERO))
3410 }
3411 }
3412
3413 #[instrument(level = "trace", skip(owner))]
3420 async fn query_balances_with_owner(
3421 &self,
3422 owner: AccountOwner,
3423 ) -> Result<(Amount, Option<Amount>), ChainClientError> {
3424 let incoming_bundles = self.pending_message_bundles().await?;
3425 if incoming_bundles.is_empty() {
3428 let chain_balance = self.local_balance().await?;
3429 let owner_balance = self.local_owner_balance(owner).await?;
3430 return Ok((chain_balance, Some(owner_balance)));
3431 }
3432 let info = self.chain_info().await?;
3433 let transactions = incoming_bundles
3434 .into_iter()
3435 .map(Transaction::ReceiveMessages)
3436 .collect::<Vec<_>>();
3437 let timestamp = self.next_timestamp(&transactions, info.timestamp);
3438 let block = ProposedBlock {
3439 epoch: info.epoch,
3440 chain_id: self.chain_id,
3441 transactions,
3442 previous_block_hash: info.block_hash,
3443 height: info.next_block_height,
3444 authenticated_signer: if owner == AccountOwner::CHAIN {
3445 None
3446 } else {
3447 Some(owner)
3448 },
3449 timestamp,
3450 };
3451 match Box::pin(self.client.stage_block_execution_with_policy(
3452 block,
3453 None,
3454 Vec::new(),
3455 self.options.bundle_execution_policy(),
3456 ))
3457 .await
3458 {
3459 Ok((_, response)) => Ok((
3460 response.info.chain_balance,
3461 response.info.requested_owner_balance,
3462 )),
3463 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3464 WorkerError::ChainError(error),
3465 ))) if matches!(
3466 &*error,
3467 ChainError::ExecutionError(
3468 execution_error,
3469 ChainExecutionContext::Block
3470 ) if matches!(
3471 **execution_error,
3472 ExecutionError::FeesExceedFunding { .. }
3473 )
3474 ) =>
3475 {
3476 Ok((Amount::ZERO, Some(Amount::ZERO)))
3478 }
3479 Err(error) => Err(error),
3480 }
3481 }
3482
3483 #[instrument(level = "trace")]
3487 pub async fn local_balance(&self) -> Result<Amount, ChainClientError> {
3488 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
3489 Ok(balance)
3490 }
3491
3492 #[instrument(level = "trace", skip(owner))]
3496 pub async fn local_owner_balance(
3497 &self,
3498 owner: AccountOwner,
3499 ) -> Result<Amount, ChainClientError> {
3500 if owner.is_chain() {
3501 self.local_balance().await
3502 } else {
3503 Ok(self
3504 .local_balances_with_owner(owner)
3505 .await?
3506 .1
3507 .unwrap_or(Amount::ZERO))
3508 }
3509 }
3510
3511 #[instrument(level = "trace", skip(owner))]
3515 async fn local_balances_with_owner(
3516 &self,
3517 owner: AccountOwner,
3518 ) -> Result<(Amount, Option<Amount>), ChainClientError> {
3519 ensure!(
3520 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
3521 ChainClientError::WalletSynchronizationError
3522 );
3523 let mut query = ChainInfoQuery::new(self.chain_id);
3524 query.request_owner_balance = owner;
3525 let response = self
3526 .client
3527 .local_node
3528 .handle_chain_info_query(query)
3529 .await?;
3530 Ok((
3531 response.info.chain_balance,
3532 response.info.requested_owner_balance,
3533 ))
3534 }
3535
3536 #[instrument(level = "trace")]
3538 pub async fn transfer_to_account(
3539 &self,
3540 from: AccountOwner,
3541 amount: Amount,
3542 account: Account,
3543 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3544 self.transfer(from, amount, account).await
3545 }
3546
3547 #[cfg(with_testing)]
3549 #[instrument(level = "trace")]
3550 pub async fn burn(
3551 &self,
3552 owner: AccountOwner,
3553 amount: Amount,
3554 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3555 let recipient = Account::burn_address(self.chain_id);
3556 self.transfer(owner, amount, recipient).await
3557 }
3558
3559 #[instrument(level = "trace")]
3560 pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3561 let validators = self.client.validator_nodes().await?;
3562 self.client
3563 .fetch_chain_info(self.chain_id, &validators)
3564 .await
3565 }
3566
3567 #[instrument(level = "trace")]
3576 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3577 if self.preferred_owner.is_none() {
3578 return self.client.synchronize_chain_state(self.chain_id).await;
3579 }
3580 let info = self.prepare_chain().await?;
3581 self.synchronize_publisher_chains().await?;
3582 self.find_received_certificates().await?;
3583 Ok(info)
3584 }
3585
3586 #[instrument(level = "trace")]
3588 pub async fn process_pending_block(
3589 &self,
3590 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3591 self.prepare_chain().await?;
3592 self.process_pending_block_without_prepare().await
3593 }
3594
3595 #[instrument(level = "trace")]
3597 async fn process_pending_block_without_prepare(
3598 &self,
3599 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3600 let process_start = linera_base::time::Instant::now();
3601 tracing::debug!("process_pending_block_without_prepare started");
3602 let info = self.request_leader_timeout_if_needed().await?;
3603
3604 if info.manager.has_locking_block_in_current_round()
3606 && !info.manager.current_round.is_fast()
3607 {
3608 return Box::pin(self.finalize_locking_block(info)).await;
3609 }
3610 let owner = self.identity().await?;
3611
3612 let local_node = &self.client.local_node;
3613 let pending_proposal = self.pending_proposal();
3615 let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
3616 match &**locking {
3617 LockingBlock::Regular(certificate) => {
3618 let blob_ids = certificate.block().required_blob_ids();
3619 let blobs = local_node
3620 .get_locking_blobs(&blob_ids, self.chain_id)
3621 .await?
3622 .ok_or_else(|| {
3623 ChainClientError::InternalError("Missing local locking blobs")
3624 })?;
3625 debug!("Retrying locking block from round {}", certificate.round);
3626 (certificate.block().clone(), blobs)
3627 }
3628 LockingBlock::Fast(proposal) => {
3629 let proposed_block = proposal.content.block.clone();
3630 let blob_ids = proposed_block.published_blob_ids();
3631 let blobs = local_node
3632 .get_locking_blobs(&blob_ids, self.chain_id)
3633 .await?
3634 .ok_or_else(|| {
3635 ChainClientError::InternalError("Missing local locking blobs")
3636 })?;
3637 let block = self
3638 .client
3639 .stage_block_execution(proposed_block, None, blobs.clone())
3640 .await?
3641 .0;
3642 debug!("Retrying locking block from fast round.");
3643 (block, blobs)
3644 }
3645 }
3646 } else if let Some(pending_proposal) = pending_proposal {
3647 let proposed_block = pending_proposal.block;
3649 let round = self.round_for_oracle(&info, &owner).await?;
3650 let (block, _) = self
3651 .client
3652 .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
3653 .await?;
3654 debug!("Proposing the local pending block.");
3655 (block, pending_proposal.blobs)
3656 } else {
3657 return Ok(ClientOutcome::Committed(None)); };
3659
3660 let has_oracle_responses = block.has_oracle_responses();
3661 let (proposed_block, outcome) = block.into_proposal();
3662 let round = match self
3663 .round_for_new_proposal(&info, &owner, has_oracle_responses)
3664 .await?
3665 {
3666 Either::Left(round) => round,
3667 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
3668 };
3669 debug!("Proposing block for round {}", round);
3670
3671 let already_handled_locally = info
3672 .manager
3673 .already_handled_proposal(round, &proposed_block);
3674 let proposal = if let Some(locking) = info.manager.requested_locking {
3676 Box::new(match *locking {
3677 LockingBlock::Regular(cert) => {
3678 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
3679 .await
3680 .map_err(ChainClientError::signer_failure)?
3681 }
3682 LockingBlock::Fast(proposal) => {
3683 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
3684 .await
3685 .map_err(ChainClientError::signer_failure)?
3686 }
3687 })
3688 } else {
3689 Box::new(
3690 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
3691 .await
3692 .map_err(ChainClientError::signer_failure)?,
3693 )
3694 };
3695 if !already_handled_locally {
3696 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
3698 match err {
3699 LocalNodeError::BlobsNotFound(_) => {
3700 local_node
3701 .handle_pending_blobs(self.chain_id, blobs)
3702 .await?;
3703 local_node.handle_block_proposal(*proposal.clone()).await?;
3704 }
3705 err => return Err(err.into()),
3706 }
3707 }
3708 }
3709 let committee = self.local_committee().await?;
3710 let block = Block::new(proposed_block, outcome);
3711 let submit_block_proposal_start = linera_base::time::Instant::now();
3713 let certificate = if round.is_fast() {
3714 let hashed_value = ConfirmedBlock::new(block);
3715 Box::pin(
3716 self.client
3717 .submit_block_proposal(&committee, proposal, hashed_value),
3718 )
3719 .await?
3720 } else {
3721 let hashed_value = ValidatedBlock::new(block);
3722 let certificate = Box::pin(self.client.submit_block_proposal(
3723 &committee,
3724 proposal,
3725 hashed_value.clone(),
3726 ))
3727 .await?;
3728 Box::pin(self.client.finalize_block(&committee, certificate)).await?
3729 };
3730 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
3731 debug!(round = %certificate.round, "Sending confirmed block to validators");
3732 let update_start = linera_base::time::Instant::now();
3733 Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
3734 tracing::debug!(
3735 update_validators_ms = update_start.elapsed().as_millis(),
3736 total_process_ms = process_start.elapsed().as_millis(),
3737 "process_pending_block_without_prepare completing"
3738 );
3739 Ok(ClientOutcome::Committed(Some(certificate)))
3740 }
3741
3742 fn send_timing(&self, start: Instant, timing_type: TimingType) {
3743 let Some(sender) = &self.timing_sender else {
3744 return;
3745 };
3746 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
3747 tracing::warn!(%err, "Failed to send timing info");
3748 }
3749 }
3750
3751 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3754 let mut info = self.chain_info_with_manager_values().await?;
3755 if let Some(round_timeout) = info.manager.round_timeout {
3758 if round_timeout <= self.storage_client().clock().current_time() {
3759 if let Err(e) = self.request_leader_timeout().await {
3760 debug!("Failed to obtain a timeout certificate: {}", e);
3761 } else {
3762 info = self.chain_info_with_manager_values().await?;
3763 }
3764 }
3765 }
3766 Ok(info)
3767 }
3768
3769 async fn finalize_locking_block(
3773 &self,
3774 info: Box<ChainInfo>,
3775 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3776 let locking = info
3777 .manager
3778 .requested_locking
3779 .expect("Should have a locking block");
3780 let LockingBlock::Regular(certificate) = *locking else {
3781 panic!("Should have a locking validated block");
3782 };
3783 debug!(
3784 round = %certificate.round,
3785 "Finalizing locking block"
3786 );
3787 let committee = self.local_committee().await?;
3788 let certificate =
3789 Box::pin(self.client.finalize_block(&committee, certificate.clone())).await?;
3790 Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
3791 Ok(ClientOutcome::Committed(Some(certificate)))
3792 }
3793
3794 async fn round_for_oracle(
3796 &self,
3797 info: &ChainInfo,
3798 identity: &AccountOwner,
3799 ) -> Result<Option<u32>, ChainClientError> {
3800 match self.round_for_new_proposal(info, identity, true).await {
3802 Ok(Either::Left(round)) => Ok(round.multi_leader()),
3804 Err(ChainClientError::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
3808 Err(err) => Err(err),
3809 }
3810 }
3811
3812 async fn round_for_new_proposal(
3814 &self,
3815 info: &ChainInfo,
3816 identity: &AccountOwner,
3817 has_oracle_responses: bool,
3818 ) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
3819 let manager = &info.manager;
3820 let seed = self
3821 .client
3822 .local_node
3823 .get_manager_seed(self.chain_id)
3824 .await?;
3825 let skip_fast = manager.current_round.is_fast()
3830 && (has_oracle_responses || !self.options.allow_fast_blocks);
3831 let conflict = manager
3832 .requested_signed_proposal
3833 .as_ref()
3834 .into_iter()
3835 .chain(&manager.requested_proposed)
3836 .any(|proposal| proposal.content.round == manager.current_round)
3837 || skip_fast;
3838 let round = if !conflict {
3839 manager.current_round
3840 } else if let Some(round) = manager
3841 .ownership
3842 .next_round(manager.current_round)
3843 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
3844 {
3845 round
3846 } else if let Some(timeout) = info.round_timeout() {
3847 return Ok(Either::Right(timeout));
3848 } else {
3849 return Err(ChainClientError::BlockProposalError(
3850 "Conflicting proposal in the current round",
3851 ));
3852 };
3853 let current_committee = info
3854 .current_committee()?
3855 .validators
3856 .values()
3857 .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
3858 .collect();
3859 if manager.should_propose(identity, round, seed, ¤t_committee) {
3860 return Ok(Either::Left(round));
3861 }
3862 if let Some(timeout) = info.round_timeout() {
3863 return Ok(Either::Right(timeout));
3864 }
3865 Err(ChainClientError::BlockProposalError(
3866 "Not a leader in the current round",
3867 ))
3868 }
3869
3870 #[cfg(with_testing)]
3872 #[instrument(level = "trace")]
3873 pub fn clear_pending_proposal(&self) {
3874 self.update_state(|state| state.clear_pending_proposal());
3875 }
3876
3877 #[instrument(level = "trace")]
3881 pub async fn rotate_key_pair(
3882 &self,
3883 public_key: AccountPublicKey,
3884 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3885 Box::pin(self.transfer_ownership(public_key.into())).await
3886 }
3887
3888 #[instrument(level = "trace")]
3890 pub async fn transfer_ownership(
3891 &self,
3892 new_owner: AccountOwner,
3893 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3894 Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
3895 super_owners: vec![new_owner],
3896 owners: Vec::new(),
3897 multi_leader_rounds: 2,
3898 open_multi_leader_rounds: false,
3899 timeout_config: TimeoutConfig::default(),
3900 }))
3901 .await
3902 }
3903
3904 #[instrument(level = "trace")]
3906 pub async fn share_ownership(
3907 &self,
3908 new_owner: AccountOwner,
3909 new_weight: u64,
3910 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3911 let ownership = self.prepare_chain().await?.manager.ownership;
3912 ensure!(
3913 ownership.is_active(),
3914 ChainError::InactiveChain(self.chain_id)
3915 );
3916 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
3917 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
3918 owners.push((new_owner, new_weight));
3919 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
3920 super_owners: Vec::new(),
3921 owners,
3922 multi_leader_rounds: ownership.multi_leader_rounds,
3923 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3924 timeout_config: ownership.timeout_config,
3925 })];
3926 match self.execute_block(operations, vec![]).await? {
3927 ClientOutcome::Committed(certificate) => Ok(ClientOutcome::Committed(certificate)),
3928 ClientOutcome::Conflict(certificate) => {
3929 info!(
3930 height = %certificate.block().header.height,
3931 "Another block was committed."
3932 );
3933 Ok(ClientOutcome::Conflict(certificate))
3934 }
3935 ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
3936 }
3937 }
3938
3939 #[instrument(level = "trace")]
3941 pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, ChainClientError> {
3942 Ok(self
3943 .client
3944 .local_node
3945 .chain_state_view(self.chain_id)
3946 .await?
3947 .execution_state
3948 .system
3949 .ownership
3950 .get()
3951 .clone())
3952 }
3953
3954 #[instrument(level = "trace")]
3957 pub async fn change_ownership(
3958 &self,
3959 ownership: ChainOwnership,
3960 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3961 Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
3962 super_owners: ownership.super_owners.into_iter().collect(),
3963 owners: ownership.owners.into_iter().collect(),
3964 multi_leader_rounds: ownership.multi_leader_rounds,
3965 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3966 timeout_config: ownership.timeout_config.clone(),
3967 }))
3968 .await
3969 }
3970
3971 #[instrument(level = "trace")]
3973 pub async fn query_application_permissions(
3974 &self,
3975 ) -> Result<ApplicationPermissions, ChainClientError> {
3976 Ok(self
3977 .client
3978 .local_node
3979 .chain_state_view(self.chain_id)
3980 .await?
3981 .execution_state
3982 .system
3983 .application_permissions
3984 .get()
3985 .clone())
3986 }
3987
3988 #[instrument(level = "trace", skip(application_permissions))]
3990 pub async fn change_application_permissions(
3991 &self,
3992 application_permissions: ApplicationPermissions,
3993 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3994 Box::pin(
3995 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
3996 application_permissions,
3997 )),
3998 )
3999 .await
4000 }
4001
4002 #[instrument(level = "trace", skip(self))]
4004 pub async fn open_chain(
4005 &self,
4006 ownership: ChainOwnership,
4007 application_permissions: ApplicationPermissions,
4008 balance: Amount,
4009 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, ChainClientError>
4010 {
4011 let config = OpenChainConfig {
4012 ownership: ownership.clone(),
4013 balance,
4014 application_permissions: application_permissions.clone(),
4015 };
4016 let operation = Operation::system(SystemOperation::OpenChain(config));
4017 let certificate = match self.execute_block(vec![operation], vec![]).await? {
4018 ClientOutcome::Committed(certificate) => certificate,
4019 ClientOutcome::Conflict(certificate) => {
4020 return Ok(ClientOutcome::Conflict(certificate));
4021 }
4022 ClientOutcome::WaitForTimeout(timeout) => {
4023 return Ok(ClientOutcome::WaitForTimeout(timeout));
4024 }
4025 };
4026 let chain_blob = certificate
4028 .block()
4029 .body
4030 .blobs
4031 .last()
4032 .and_then(|blobs| blobs.last())
4033 .ok_or_else(|| ChainClientError::InternalError("Failed to create a new chain"))?;
4034 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
4035 for owner in ownership.all_owners() {
4037 if self.client.has_key_for(owner).await? {
4038 self.client
4039 .extend_chain_mode(description.id(), ListeningMode::FullChain);
4040 break;
4041 }
4042 }
4043 self.client
4044 .local_node
4045 .retry_pending_cross_chain_requests(self.chain_id)
4046 .await?;
4047 Ok(ClientOutcome::Committed((description, certificate)))
4048 }
4049
4050 #[instrument(level = "trace")]
4053 pub async fn close_chain(
4054 &self,
4055 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
4056 match Box::pin(self.execute_operation(SystemOperation::CloseChain)).await {
4057 Ok(outcome) => Ok(outcome.map(Some)),
4058 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
4059 WorkerError::ChainError(chain_error),
4060 ))) if matches!(*chain_error, ChainError::ClosedChain) => {
4061 Ok(ClientOutcome::Committed(None)) }
4063 Err(error) => Err(error),
4064 }
4065 }
4066
4067 #[cfg(not(target_arch = "wasm32"))]
4069 #[instrument(level = "trace", skip(contract, service))]
4070 pub async fn publish_module(
4071 &self,
4072 contract: Bytecode,
4073 service: Bytecode,
4074 vm_runtime: VmRuntime,
4075 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
4076 let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
4077 Box::pin(self.publish_module_blobs(blobs, module_id)).await
4078 }
4079
4080 #[cfg(not(target_arch = "wasm32"))]
4082 #[instrument(level = "trace", skip(blobs, module_id))]
4083 pub async fn publish_module_blobs(
4084 &self,
4085 blobs: Vec<Blob>,
4086 module_id: ModuleId,
4087 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
4088 self.execute_operations(
4089 vec![Operation::system(SystemOperation::PublishModule {
4090 module_id,
4091 })],
4092 blobs,
4093 )
4094 .await?
4095 .try_map(|certificate| Ok((module_id, certificate)))
4096 }
4097
4098 #[instrument(level = "trace", skip(bytes))]
4100 pub async fn publish_data_blobs(
4101 &self,
4102 bytes: Vec<Vec<u8>>,
4103 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4104 let blobs = bytes.into_iter().map(Blob::new_data);
4105 let publish_blob_operations = blobs
4106 .clone()
4107 .map(|blob| {
4108 Operation::system(SystemOperation::PublishDataBlob {
4109 blob_hash: blob.id().hash,
4110 })
4111 })
4112 .collect();
4113 self.execute_operations(publish_blob_operations, blobs.collect())
4114 .await
4115 }
4116
4117 #[instrument(level = "trace", skip(bytes))]
4119 pub async fn publish_data_blob(
4120 &self,
4121 bytes: Vec<u8>,
4122 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4123 Box::pin(self.publish_data_blobs(vec![bytes])).await
4124 }
4125
4126 #[instrument(
4128 level = "trace",
4129 skip(self, parameters, instantiation_argument, required_application_ids)
4130 )]
4131 pub async fn create_application<
4132 A: Abi,
4133 Parameters: Serialize,
4134 InstantiationArgument: Serialize,
4135 >(
4136 &self,
4137 module_id: ModuleId<A, Parameters, InstantiationArgument>,
4138 parameters: &Parameters,
4139 instantiation_argument: &InstantiationArgument,
4140 required_application_ids: Vec<ApplicationId>,
4141 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, ChainClientError>
4142 {
4143 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
4144 let parameters = serde_json::to_vec(parameters)?;
4145 Ok(Box::pin(self.create_application_untyped(
4146 module_id.forget_abi(),
4147 parameters,
4148 instantiation_argument,
4149 required_application_ids,
4150 ))
4151 .await?
4152 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
4153 }
4154
4155 #[instrument(
4157 level = "trace",
4158 skip(
4159 self,
4160 module_id,
4161 parameters,
4162 instantiation_argument,
4163 required_application_ids
4164 )
4165 )]
4166 pub async fn create_application_untyped(
4167 &self,
4168 module_id: ModuleId,
4169 parameters: Vec<u8>,
4170 instantiation_argument: Vec<u8>,
4171 required_application_ids: Vec<ApplicationId>,
4172 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, ChainClientError> {
4173 Box::pin(self.execute_operation(SystemOperation::CreateApplication {
4174 module_id,
4175 parameters,
4176 instantiation_argument,
4177 required_application_ids,
4178 }))
4179 .await?
4180 .try_map(|certificate| {
4181 let mut creation: Vec<_> = certificate
4183 .block()
4184 .created_blob_ids()
4185 .into_iter()
4186 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
4187 .collect();
4188 if creation.len() > 1 {
4189 return Err(ChainClientError::InternalError(
4190 "Unexpected number of application descriptions published",
4191 ));
4192 }
4193 let blob_id = creation.pop().ok_or(ChainClientError::InternalError(
4194 "ApplicationDescription blob not found.",
4195 ))?;
4196 let id = ApplicationId::new(blob_id.hash);
4197 Ok((id, certificate))
4198 })
4199 }
4200
4201 #[instrument(level = "trace", skip(committee))]
4203 pub async fn stage_new_committee(
4204 &self,
4205 committee: Committee,
4206 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4207 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
4208 let blob_hash = blob.id().hash;
4209 match self
4210 .execute_operations(
4211 vec![Operation::system(SystemOperation::Admin(
4212 AdminOperation::PublishCommitteeBlob { blob_hash },
4213 ))],
4214 vec![blob],
4215 )
4216 .await?
4217 {
4218 ClientOutcome::Committed(_) => {}
4219 outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
4220 outcome @ ClientOutcome::Conflict(_) => return Ok(outcome),
4221 }
4222 let epoch = Box::pin(self.chain_info()).await?.epoch.try_add_one()?;
4223 Box::pin(
4224 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
4225 epoch,
4226 blob_hash,
4227 })),
4228 )
4229 .await
4230 }
4231
4232 #[instrument(level = "trace")]
4238 pub async fn process_inbox(
4239 &self,
4240 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
4241 self.prepare_chain().await?;
4242 self.process_inbox_without_prepare().await
4243 }
4244
4245 #[instrument(level = "trace")]
4251 pub async fn process_inbox_without_prepare(
4252 &self,
4253 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
4254 #[cfg(with_metrics)]
4255 let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
4256
4257 let mut certificates = Vec::new();
4258 loop {
4259 match self.execute_block(vec![], vec![]).await {
4263 Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
4264 Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
4265 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
4266 return Ok((certificates, Some(timeout)));
4267 }
4268 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
4270 WorkerError::ChainError(chain_error),
4271 ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
4272 return Ok((certificates, None));
4273 }
4274 Err(error) => return Err(error),
4275 };
4276 }
4277 }
4278
4279 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, ChainClientError> {
4282 let (mut min_epoch, mut next_epoch) = {
4283 let (epoch, committees) = self.epoch_and_committees().await?;
4284 let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
4285 (min_epoch, epoch.try_add_one()?)
4286 };
4287 let mut epoch_change_ops = Vec::new();
4288 while self
4289 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
4290 .await?
4291 {
4292 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
4293 next_epoch,
4294 )));
4295 next_epoch.try_add_assign_one()?;
4296 }
4297 while self
4298 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
4299 .await?
4300 {
4301 epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
4302 min_epoch,
4303 )));
4304 min_epoch.try_add_assign_one()?;
4305 }
4306 Ok(epoch_change_ops)
4307 }
4308
4309 async fn has_admin_event(
4312 &self,
4313 stream_name: &[u8],
4314 index: u32,
4315 ) -> Result<bool, ChainClientError> {
4316 let event_id = EventId {
4317 chain_id: self.client.admin_chain_id,
4318 stream_id: StreamId::system(stream_name),
4319 index,
4320 };
4321 Ok(self
4322 .client
4323 .storage_client()
4324 .read_event(event_id)
4325 .await?
4326 .is_some())
4327 }
4328
4329 pub async fn events_from_index(
4331 &self,
4332 stream_id: StreamId,
4333 start_index: u32,
4334 ) -> Result<Vec<IndexAndEvent>, ChainClientError> {
4335 Ok(self
4336 .client
4337 .storage_client()
4338 .read_events_from_index(&self.chain_id, &stream_id, start_index)
4339 .await?)
4340 }
4341
4342 #[instrument(level = "trace")]
4347 pub async fn revoke_epochs(
4348 &self,
4349 revoked_epoch: Epoch,
4350 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4351 self.prepare_chain().await?;
4352 let (current_epoch, committees) = self.epoch_and_committees().await?;
4353 ensure!(
4354 revoked_epoch < current_epoch,
4355 ChainClientError::CannotRevokeCurrentEpoch(current_epoch)
4356 );
4357 ensure!(
4358 committees.contains_key(&revoked_epoch),
4359 ChainClientError::EpochAlreadyRevoked
4360 );
4361 let operations = committees
4362 .keys()
4363 .filter_map(|epoch| {
4364 if *epoch <= revoked_epoch {
4365 Some(Operation::system(SystemOperation::Admin(
4366 AdminOperation::RemoveCommittee { epoch: *epoch },
4367 )))
4368 } else {
4369 None
4370 }
4371 })
4372 .collect();
4373 self.execute_operations(operations, vec![]).await
4374 }
4375
4376 #[instrument(level = "trace")]
4380 pub async fn transfer_to_account_unsafe_unconfirmed(
4381 &self,
4382 owner: AccountOwner,
4383 amount: Amount,
4384 recipient: Account,
4385 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4386 Box::pin(self.execute_operation(SystemOperation::Transfer {
4387 owner,
4388 recipient,
4389 amount,
4390 }))
4391 .await
4392 }
4393
4394 #[instrument(level = "trace", skip(hash))]
4395 pub async fn read_confirmed_block(
4396 &self,
4397 hash: CryptoHash,
4398 ) -> Result<ConfirmedBlock, ChainClientError> {
4399 let block = self
4400 .client
4401 .storage_client()
4402 .read_confirmed_block(hash)
4403 .await?;
4404 block.ok_or(ChainClientError::MissingConfirmedBlock(hash))
4405 }
4406
4407 #[instrument(level = "trace", skip(hash))]
4408 pub async fn read_certificate(
4409 &self,
4410 hash: CryptoHash,
4411 ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
4412 let certificate = self.client.storage_client().read_certificate(hash).await?;
4413 certificate.ok_or(ChainClientError::ReadCertificatesError(vec![hash]))
4414 }
4415
4416 #[instrument(level = "trace")]
4418 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), ChainClientError> {
4419 self.client
4420 .local_node
4421 .retry_pending_cross_chain_requests(self.chain_id)
4422 .await?;
4423 Ok(())
4424 }
4425
4426 #[instrument(level = "trace", skip(local_node))]
4427 async fn local_chain_info(
4428 &self,
4429 chain_id: ChainId,
4430 local_node: &mut LocalNodeClient<Env::Storage>,
4431 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
4432 match local_node.chain_info(chain_id).await {
4433 Ok(info) => {
4434 self.client.update_from_info(&info);
4436 Ok(Some(info))
4437 }
4438 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
4439 Err(err) => Err(err.into()),
4440 }
4441 }
4442
4443 #[instrument(level = "trace", skip(chain_id, local_node))]
4444 async fn local_next_block_height(
4445 &self,
4446 chain_id: ChainId,
4447 local_node: &mut LocalNodeClient<Env::Storage>,
4448 ) -> Result<BlockHeight, ChainClientError> {
4449 Ok(self
4450 .local_chain_info(chain_id, local_node)
4451 .await?
4452 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
4453 }
4454
4455 #[instrument(level = "trace")]
4458 async fn local_next_height_to_receive(
4459 &self,
4460 origin: ChainId,
4461 ) -> Result<BlockHeight, ChainClientError> {
4462 Ok(self
4463 .client
4464 .local_node
4465 .get_inbox_next_height(self.chain_id, origin)
4466 .await?)
4467 }
4468
4469 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
4470 async fn process_notification(
4471 &self,
4472 remote_node: RemoteNode<Env::ValidatorNode>,
4473 mut local_node: LocalNodeClient<Env::Storage>,
4474 notification: Notification,
4475 ) -> Result<(), ChainClientError> {
4476 let mode = self.client.chain_mode(notification.chain_id);
4477 let dominated = mode
4478 .as_ref()
4479 .is_none_or(|mode| !mode.is_relevant(¬ification.reason));
4480 if dominated {
4481 debug!(
4482 chain_id = %notification.chain_id,
4483 reason = ?notification.reason,
4484 listening_mode = ?mode,
4485 "Ignoring notification due to listening mode"
4486 );
4487 return Ok(());
4488 }
4489 match notification.reason {
4490 Reason::NewIncomingBundle { origin, height } => {
4491 if self.local_next_height_to_receive(origin).await? > height {
4492 debug!(
4493 chain_id = %self.chain_id,
4494 "Accepting redundant notification for new message"
4495 );
4496 return Ok(());
4497 }
4498 self.client
4499 .download_sender_block_with_sending_ancestors(
4500 self.chain_id,
4501 origin,
4502 height,
4503 &remote_node,
4504 )
4505 .await?;
4506 if self.local_next_height_to_receive(origin).await? <= height {
4507 info!(
4508 chain_id = %self.chain_id,
4509 "NewIncomingBundle: Fail to synchronize new message after notification"
4510 );
4511 }
4512 }
4513 Reason::NewBlock {
4514 height,
4515 hash,
4516 event_streams,
4517 ..
4518 } => {
4519 let chain_id = notification.chain_id;
4520 let local_height = self
4521 .local_next_block_height(chain_id, &mut local_node)
4522 .await?;
4523 if local_height > height {
4524 debug!(
4525 chain_id = %self.chain_id,
4526 "Accepting redundant notification for new block"
4527 );
4528 return Ok(());
4529 }
4530 if let Some(ListeningMode::EventsOnly(subscribed)) =
4534 self.client.chain_mode(chain_id)
4535 {
4536 if !event_streams.is_empty() {
4537 self.client
4538 .download_event_bearing_blocks(
4539 chain_id,
4540 height,
4541 hash,
4542 local_height,
4543 &subscribed,
4544 &remote_node,
4545 )
4546 .await?;
4547 }
4548 } else {
4549 self.client
4550 .synchronize_chain_state_from(&remote_node, chain_id)
4551 .await?;
4552 if self
4553 .local_next_block_height(chain_id, &mut local_node)
4554 .await?
4555 <= height
4556 {
4557 error!("NewBlock: Fail to synchronize new block after notification");
4558 }
4559 }
4560 }
4561 Reason::NewEvents { height, hash, .. } => {
4562 let chain_id = notification.chain_id;
4563 let local_height = self
4564 .local_next_block_height(chain_id, &mut local_node)
4565 .await?;
4566 if local_height > height {
4567 debug!(
4568 chain_id = %self.chain_id,
4569 "Accepting redundant notification for new events"
4570 );
4571 return Ok(());
4572 }
4573 let subscribed = match self.client.chain_mode(chain_id) {
4574 Some(ListeningMode::EventsOnly(streams)) => streams,
4575 _ => return Ok(()),
4576 };
4577 self.client
4578 .download_event_bearing_blocks(
4579 chain_id,
4580 height,
4581 hash,
4582 local_height,
4583 &subscribed,
4584 &remote_node,
4585 )
4586 .await?;
4587 }
4588 Reason::NewRound { height, round } => {
4589 let chain_id = notification.chain_id;
4590 if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
4591 if (info.next_block_height, info.manager.current_round) >= (height, round) {
4592 debug!(
4593 chain_id = %self.chain_id,
4594 "Accepting redundant notification for new round"
4595 );
4596 return Ok(());
4597 }
4598 }
4599 self.client
4600 .synchronize_chain_state_from(&remote_node, chain_id)
4601 .await?;
4602 let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
4603 error!(
4604 chain_id = %self.chain_id,
4605 "NewRound: Fail to read local chain info for {chain_id}"
4606 );
4607 return Ok(());
4608 };
4609 if (info.next_block_height, info.manager.current_round) < (height, round) {
4610 info!(
4611 chain_id = %self.chain_id,
4612 "NewRound: Fail to synchronize new block after notification"
4613 );
4614 }
4615 }
4616 Reason::BlockExecuted { .. } => {
4617 }
4619 }
4620 Ok(())
4621 }
4622
4623 pub fn is_tracked(&self) -> bool {
4625 self.client.is_tracked(self.chain_id)
4626 }
4627
4628 pub fn listening_mode(&self) -> Option<ListeningMode> {
4630 self.client.chain_mode(self.chain_id)
4631 }
4632
4633 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
4638 pub async fn listen(
4639 &self,
4640 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), ChainClientError> {
4641 use future::FutureExt as _;
4642
4643 async fn await_while_polling<F: FusedFuture>(
4644 future: F,
4645 background_work: impl FusedStream<Item = ()>,
4646 ) -> F::Output {
4647 tokio::pin!(future);
4648 tokio::pin!(background_work);
4649 loop {
4650 futures::select! {
4651 _ = background_work.next() => (),
4652 result = future => return result,
4653 }
4654 }
4655 }
4656
4657 let mut senders = HashMap::new(); let notifications = self.subscribe()?;
4659 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
4660
4661 let mut process_notifications = FuturesUnordered::new();
4668
4669 match self.update_notification_streams(&mut senders).await {
4670 Ok(handler) => process_notifications.push(handler),
4671 Err(error) => error!("Failed to update committee: {error}"),
4672 };
4673
4674 let this = self.clone();
4675 let update_streams = async move {
4676 let mut abortable_notifications = abortable_notifications.fuse();
4677
4678 while let Some(notification) =
4679 await_while_polling(abortable_notifications.next(), &mut process_notifications)
4680 .await
4681 {
4682 if let Reason::NewBlock { .. } = notification.reason {
4686 let is_events_only = this
4687 .listening_mode()
4688 .is_some_and(|m| matches!(m, ListeningMode::EventsOnly(_)));
4689 if !is_events_only {
4690 match Box::pin(await_while_polling(
4691 this.update_notification_streams(&mut senders).fuse(),
4692 &mut process_notifications,
4693 ))
4694 .await
4695 {
4696 Ok(handler) => process_notifications.push(handler),
4697 Err(error) => error!("Failed to update committee: {error}"),
4698 }
4699 }
4700 }
4701 }
4702
4703 for abort in senders.into_values() {
4704 abort.abort();
4705 }
4706
4707 let () = process_notifications.collect().await;
4708 }
4709 .in_current_span();
4710
4711 Ok((update_streams, AbortOnDrop(abort), notifications))
4712 }
4713
4714 #[instrument(level = "trace", skip(senders))]
4715 async fn update_notification_streams(
4716 &self,
4717 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
4718 ) -> Result<impl Future<Output = ()>, ChainClientError> {
4719 let events_only = self
4720 .listening_mode()
4721 .is_some_and(|m| matches!(m, ListeningMode::EventsOnly(_)));
4722 let (nodes, local_node) = {
4723 let committee = if events_only {
4727 let (_, committee) = self.admin_committee().await?;
4728 committee
4729 } else {
4730 self.local_committee().await?
4731 };
4732 let nodes: HashMap<_, _> = self
4733 .client
4734 .validator_node_provider()
4735 .make_nodes(&committee)?
4736 .collect();
4737 (nodes, self.client.local_node.clone())
4738 };
4739 senders.retain(|validator, abort| {
4741 if !nodes.contains_key(validator) {
4742 abort.abort();
4743 }
4744 !abort.is_aborted()
4745 });
4746 let validator_tasks = FuturesUnordered::new();
4748 for (public_key, node) in nodes {
4749 let address = node.address();
4750 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
4751 continue;
4752 };
4753 let this = self.clone();
4754 let stream = stream::once({
4755 let node = node.clone();
4756 async move {
4757 let stream = node.subscribe(vec![this.chain_id]).await?;
4758 if !events_only {
4763 let remote_node = RemoteNode { public_key, node };
4764 this.client
4765 .synchronize_chain_state_from(&remote_node, this.chain_id)
4766 .await?;
4767 }
4768 Ok::<_, ChainClientError>(stream)
4769 }
4770 })
4771 .filter_map(move |result| {
4772 let address = address.clone();
4773 async move {
4774 if let Err(error) = &result {
4775 info!(?error, address, "could not connect to validator");
4776 } else {
4777 debug!(address, "connected to validator");
4778 }
4779 result.ok()
4780 }
4781 })
4782 .flatten();
4783 let (stream, abort) = stream::abortable(stream);
4784 let mut stream = Box::pin(stream);
4785 let abort_on_exit = abort.clone();
4786 let this = self.clone();
4787 let local_node = local_node.clone();
4788 let remote_node = RemoteNode { public_key, node };
4789 validator_tasks.push(async move {
4790 while let Some(notification) = stream.next().await {
4791 if let Err(error) = this
4792 .process_notification(
4793 remote_node.clone(),
4794 local_node.clone(),
4795 notification.clone(),
4796 )
4797 .await
4798 {
4799 tracing::info!(
4800 chain_id = %this.chain_id,
4801 address = remote_node.address(),
4802 ?notification,
4803 %error,
4804 "failed to process notification",
4805 );
4806 }
4807 }
4808 warn!(
4809 chain_id = %this.chain_id,
4810 address = remote_node.address(),
4811 "Validator notification stream ended; will reconnect on next update"
4812 );
4813 abort_on_exit.abort();
4814 });
4815 entry.insert(abort);
4816 }
4817 Ok(validator_tasks.collect())
4818 }
4819
4820 #[instrument(level = "trace", skip(remote_node))]
4822 pub async fn sync_validator(
4823 &self,
4824 remote_node: Env::ValidatorNode,
4825 ) -> Result<(), ChainClientError> {
4826 let validator_next_block_height = match remote_node
4827 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
4828 .await
4829 {
4830 Ok(info) => info.info.next_block_height,
4831 Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
4832 Err(err) => return Err(err.into()),
4833 };
4834 let local_next_block_height = self.chain_info().await?.next_block_height;
4835
4836 if validator_next_block_height >= local_next_block_height {
4837 debug!("Validator is up-to-date with local state");
4838 return Ok(());
4839 }
4840
4841 let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
4842 .map(BlockHeight)
4843 .collect();
4844
4845 let certificates = self
4846 .client
4847 .storage_client()
4848 .read_certificates_by_heights(self.chain_id, &heights)
4849 .await?
4850 .into_iter()
4851 .flatten()
4852 .collect::<Vec<_>>();
4853
4854 for certificate in certificates {
4855 match remote_node
4856 .handle_confirmed_certificate(
4857 certificate.clone(),
4858 CrossChainMessageDelivery::NonBlocking,
4859 )
4860 .await
4861 {
4862 Ok(_) => (),
4863 Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
4864 let missing_blobs: Vec<_> = self
4866 .client
4867 .storage_client()
4868 .read_blobs(&missing_blob_ids)
4869 .await?
4870 .into_iter()
4871 .flatten()
4872 .collect();
4873 remote_node.upload_blobs(missing_blobs).await?;
4874 remote_node
4875 .handle_confirmed_certificate(
4876 certificate,
4877 CrossChainMessageDelivery::NonBlocking,
4878 )
4879 .await?;
4880 }
4881 Err(err) => return Err(err.into()),
4882 }
4883 }
4884
4885 Ok(())
4886 }
4887}
4888
4889async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
4892 nodes: &[RemoteNode<A>],
4893 f: F,
4894 err: G,
4895 timeout: Duration,
4896) -> Result<V, E2>
4897where
4898 F: Clone + FnOnce(RemoteNode<A>) -> R,
4899 RemoteNode<A>: Clone,
4900 G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
4901 R: Future<Output = Result<V, E1>> + 'a,
4902{
4903 let mut stream = nodes
4904 .iter()
4905 .zip(0..)
4906 .map(|(remote_node, i)| {
4907 let fun = f.clone();
4908 let node = remote_node.clone();
4909 async move {
4910 linera_base::time::timer::sleep(timeout * i * i).await;
4911 fun(node).await.map_err(|err| (remote_node.public_key, err))
4912 }
4913 })
4914 .collect::<FuturesUnordered<_>>();
4915 let mut errors = vec![];
4916 while let Some(maybe_result) = stream.next().await {
4917 match maybe_result {
4918 Ok(result) => return Ok(result),
4919 Err(error) => errors.push(error),
4920 };
4921 }
4922 Err(err(errors))
4923}
4924
4925#[cfg(with_testing)]
4926impl<Env: Environment> ChainClient<Env> {
4927 pub async fn process_notification_from(
4928 &self,
4929 notification: Notification,
4930 validator: (ValidatorPublicKey, &str),
4931 ) {
4932 let mut node_list = self
4933 .client
4934 .validator_node_provider()
4935 .make_nodes_from_list(vec![validator])
4936 .unwrap();
4937 let (public_key, node) = node_list.next().unwrap();
4938 let remote_node = RemoteNode { node, public_key };
4939 let local_node = self.client.local_node.clone();
4940 self.process_notification(remote_node, local_node, notification)
4941 .await
4942 .unwrap();
4943 }
4944}
4945
4946#[must_use]
4948pub struct AbortOnDrop(pub AbortHandle);
4949
4950impl Drop for AbortOnDrop {
4951 #[instrument(level = "trace", skip(self))]
4952 fn drop(&mut self) {
4953 self.0.abort();
4954 }
4955}
4956
4957#[derive(Clone, Serialize, Deserialize)]
4959pub struct PendingProposal {
4960 pub block: ProposedBlock,
4961 pub blobs: Vec<Blob>,
4962}
4963
4964enum ReceiveCertificateMode {
4965 NeedsCheck,
4966 AlreadyChecked,
4967}
4968
4969enum CheckCertificateResult {
4970 OldEpoch,
4971 New,
4972 FutureEpoch,
4973}
4974
4975impl CheckCertificateResult {
4976 fn into_result(self) -> Result<(), ChainClientError> {
4977 match self {
4978 Self::OldEpoch => Err(ChainClientError::CommitteeDeprecationError),
4979 Self::New => Ok(()),
4980 Self::FutureEpoch => Err(ChainClientError::CommitteeSynchronizationError),
4981 }
4982 }
4983}
4984
4985#[cfg(not(target_arch = "wasm32"))]
4987pub async fn create_bytecode_blobs(
4988 contract: Bytecode,
4989 service: Bytecode,
4990 vm_runtime: VmRuntime,
4991) -> (Vec<Blob>, ModuleId) {
4992 match vm_runtime {
4993 VmRuntime::Wasm => {
4994 let (compressed_contract, compressed_service) =
4995 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
4996 .await
4997 .expect("Compression should not panic");
4998 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
4999 let service_blob = Blob::new_service_bytecode(compressed_service);
5000 let module_id =
5001 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
5002 (vec![contract_blob, service_blob], module_id)
5003 }
5004 VmRuntime::Evm => {
5005 let compressed_contract = contract.compress();
5006 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
5007 let module_id = ModuleId::new(
5008 evm_contract_blob.id().hash,
5009 evm_contract_blob.id().hash,
5010 vm_runtime,
5011 );
5012 (vec![evm_contract_blob], module_id)
5013 }
5014 }
5015}