1use std::{
6 collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7 convert::Infallible,
8 iter,
9 sync::{Arc, RwLock},
10};
11
12use chain_client_state::ChainClientState;
13use custom_debug_derive::Debug;
14use futures::{
15 future::{self, Either, FusedFuture, Future, FutureExt},
16 select,
17 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
18};
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{
22 abi::Abi,
23 crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
24 data_types::{
25 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
26 ChainDescription, Epoch, MessagePolicy, Round, TimeDelta, Timestamp,
27 },
28 ensure,
29 identifiers::{
30 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
31 ModuleId, StreamId,
32 },
33 ownership::{ChainOwnership, TimeoutConfig},
34 time::{Duration, Instant},
35};
36#[cfg(not(target_arch = "wasm32"))]
37use linera_base::{data_types::Bytecode, vm::VmRuntime};
38use linera_chain::{
39 data_types::{
40 BlockProposal, BundleExecutionPolicy, ChainAndHeight, IncomingBundle, LiteVote,
41 ProposedBlock, Transaction,
42 },
43 manager::LockingBlock,
44 types::{
45 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
46 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
47 },
48 ChainError, ChainExecutionContext, ChainStateView,
49};
50use linera_execution::{
51 committee::Committee,
52 system::{
53 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
54 REMOVED_EPOCH_STREAM_NAME,
55 },
56 ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
57};
58use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
59use linera_views::ViewError;
60use rand::prelude::SliceRandom as _;
61use received_log::ReceivedLogs;
62use serde::{Deserialize, Serialize};
63use thiserror::Error;
64use tokio::sync::{mpsc, OwnedRwLockReadGuard};
65use tokio_stream::wrappers::UnboundedReceiverStream;
66use tokio_util::sync::CancellationToken;
67use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
68use validator_trackers::ValidatorTrackers;
69
70use crate::{
71 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout},
72 environment::{wallet::Wallet as _, Environment},
73 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
74 node::{
75 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
76 ValidatorNodeProvider as _,
77 },
78 notifier::{ChannelNotifier, Notifier as _},
79 remote_node::RemoteNode,
80 updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
81 worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
82 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
83};
84
85mod chain_client_state;
86#[cfg(test)]
87#[path = "../unit_tests/client_tests.rs"]
88mod client_tests;
89pub mod requests_scheduler;
90
91pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
92mod received_log;
93mod validator_trackers;
94
95#[cfg(with_metrics)]
96mod metrics {
97 use std::sync::LazyLock;
98
99 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
100 use prometheus::HistogramVec;
101
102 pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
103 LazyLock::new(|| {
104 register_histogram_vec(
105 "process_inbox_latency",
106 "process_inbox latency",
107 &[],
108 exponential_bucket_latencies(500.0),
109 )
110 });
111
112 pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
113 register_histogram_vec(
114 "prepare_chain_latency",
115 "prepare_chain latency",
116 &[],
117 exponential_bucket_latencies(500.0),
118 )
119 });
120
121 pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
122 register_histogram_vec(
123 "synchronize_chain_state_latency",
124 "synchronize_chain_state latency",
125 &[],
126 exponential_bucket_latencies(500.0),
127 )
128 });
129
130 pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
131 register_histogram_vec(
132 "execute_block_latency",
133 "execute_block latency",
134 &[],
135 exponential_bucket_latencies(500.0),
136 )
137 });
138
139 pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
140 register_histogram_vec(
141 "find_received_certificates_latency",
142 "find_received_certificates latency",
143 &[],
144 exponential_bucket_latencies(500.0),
145 )
146 });
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
153pub enum ListeningMode {
154 FullChain,
157 FollowChain,
161}
162
163impl PartialOrd for ListeningMode {
164 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
165 use std::cmp::Ordering;
166 match (self, other) {
167 (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
168 (ListeningMode::FullChain, _) => Some(Ordering::Greater),
169 (_, ListeningMode::FullChain) => Some(Ordering::Less),
170 (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
171 }
172 }
173}
174
175impl ListeningMode {
176 pub fn is_relevant(&self, reason: &Reason) -> bool {
179 match (reason, self) {
180 (_, ListeningMode::FullChain) => true,
182 (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
185 (_, ListeningMode::FollowChain) => false,
186 }
187 }
188
189 pub fn extend(&mut self, other: Option<ListeningMode>) {
190 match (self, other) {
191 (_, None) => (),
192 (ListeningMode::FullChain, _) => (),
193 (mode, Some(ListeningMode::FullChain)) => {
194 *mode = ListeningMode::FullChain;
195 }
196 (ListeningMode::FollowChain, _) => (),
197 }
198 }
199
200 pub fn is_follow_only(&self) -> bool {
203 !matches!(self, ListeningMode::FullChain)
204 }
205
206 pub fn is_full(&self) -> bool {
209 matches!(self, ListeningMode::FullChain)
210 }
211}
212
213pub struct Client<Env: Environment> {
215 environment: Env,
216 pub local_node: LocalNodeClient<Env::Storage>,
219 requests_scheduler: RequestsScheduler<Env>,
221 admin_chain_id: ChainId,
223 chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
226 notifier: Arc<ChannelNotifier<Notification>>,
228 chains: papaya::HashMap<ChainId, ChainClientState>,
230 options: ChainClientOptions,
232}
233
234impl<Env: Environment> Client<Env> {
235 #[expect(clippy::too_many_arguments)]
237 #[instrument(level = "trace", skip_all)]
238 pub fn new(
239 environment: Env,
240 admin_chain_id: ChainId,
241 long_lived_services: bool,
242 chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
243 name: impl Into<String>,
244 chain_worker_ttl: Duration,
245 sender_chain_worker_ttl: Duration,
246 options: ChainClientOptions,
247 requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
248 ) -> Self {
249 let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect()));
250 let state = WorkerState::new_for_client(
251 name.into(),
252 environment.storage().clone(),
253 chain_modes.clone(),
254 )
255 .with_long_lived_services(long_lived_services)
256 .with_allow_inactive_chains(true)
257 .with_allow_messages_from_deprecated_epochs(true)
258 .with_chain_worker_ttl(chain_worker_ttl)
259 .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
260 let local_node = LocalNodeClient::new(state);
261 let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
262
263 Self {
264 environment,
265 local_node,
266 requests_scheduler,
267 chains: papaya::HashMap::new(),
268 admin_chain_id,
269 chain_modes,
270 notifier: Arc::new(ChannelNotifier::default()),
271 options,
272 }
273 }
274
275 pub fn admin_chain_id(&self) -> ChainId {
277 self.admin_chain_id
278 }
279
280 pub fn storage_client(&self) -> &Env::Storage {
282 self.environment.storage()
283 }
284
285 pub fn validator_node_provider(&self) -> &Env::Network {
286 self.environment.network()
287 }
288
289 #[instrument(level = "trace", skip(self))]
291 pub fn signer(&self) -> &Env::Signer {
292 self.environment.signer()
293 }
294
295 pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, ChainClientError> {
297 self.signer()
298 .contains_key(owner)
299 .await
300 .map_err(ChainClientError::signer_failure)
301 }
302
303 pub fn wallet(&self) -> &Env::Wallet {
305 self.environment.wallet()
306 }
307
308 async fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
313 match self.wallet().get(chain_id).await {
314 Ok(Some(chain)) => chain.owner.is_none(),
315 Ok(None) | Err(_) => true,
317 }
318 }
319
320 #[instrument(level = "trace", skip(self))]
323 pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
324 let mut chain_modes = self
325 .chain_modes
326 .write()
327 .expect("Panics should not happen while holding a lock to `chain_modes`");
328 let entry = chain_modes.entry(chain_id).or_insert(mode.clone());
329 entry.extend(Some(mode));
330 entry.clone()
331 }
332
333 pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
335 self.chain_modes
336 .read()
337 .expect("Panics should not happen while holding a lock to `chain_modes`")
338 .get(&chain_id)
339 .cloned()
340 }
341
342 pub fn is_tracked(&self, chain_id: ChainId) -> bool {
344 self.chain_modes
345 .read()
346 .expect("Panics should not happen while holding a lock to `chain_modes`")
347 .get(&chain_id)
348 .is_some_and(ListeningMode::is_full)
349 }
350
351 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
353 pub fn create_chain_client(
354 self: &Arc<Self>,
355 chain_id: ChainId,
356 block_hash: Option<CryptoHash>,
357 next_block_height: BlockHeight,
358 pending_proposal: Option<PendingProposal>,
359 preferred_owner: Option<AccountOwner>,
360 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
361 ) -> ChainClient<Env> {
362 self.chains
365 .pin()
366 .get_or_insert_with(chain_id, || ChainClientState::new(pending_proposal.clone()));
367
368 ChainClient {
369 client: self.clone(),
370 chain_id,
371 options: self.options.clone(),
372 preferred_owner,
373 initial_block_hash: block_hash,
374 initial_next_block_height: next_block_height,
375 timing_sender,
376 }
377 }
378
379 async fn fetch_chain_info(
381 &self,
382 chain_id: ChainId,
383 validators: &[RemoteNode<Env::ValidatorNode>],
384 ) -> Result<Box<ChainInfo>, ChainClientError> {
385 match self.local_node.chain_info(chain_id).await {
386 Ok(info) => Ok(info),
387 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
388 Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
390 self.update_local_node_with_blobs_from(blob_ids, validators)
393 .await?;
394 Ok(self.local_node.chain_info(chain_id).await?)
395 }
396 Err(err) => Err(err.into()),
397 }
398 }
399
400 #[instrument(level = "trace", skip(self))]
402 async fn download_certificates(
403 &self,
404 chain_id: ChainId,
405 target_next_block_height: BlockHeight,
406 ) -> Result<Box<ChainInfo>, ChainClientError> {
407 let mut validators = self.validator_nodes().await?;
408 validators.shuffle(&mut rand::thread_rng());
410 let mut info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
411 for remote_node in validators {
412 if target_next_block_height <= info.next_block_height {
413 return Ok(info);
414 }
415 match self
416 .download_certificates_from(&remote_node, chain_id, target_next_block_height)
417 .await
418 {
419 Err(error) => info!(
420 remote_node = remote_node.address(),
421 %error,
422 "failed to download certificates from validator",
423 ),
424 Ok(Some(new_info)) => info = new_info,
425 Ok(None) => {}
426 }
427 }
428 ensure!(
429 target_next_block_height <= info.next_block_height,
430 ChainClientError::CannotDownloadCertificates {
431 chain_id,
432 target_next_block_height,
433 }
434 );
435 Ok(info)
436 }
437
438 #[instrument(level = "trace", skip_all)]
441 async fn download_certificates_from(
442 &self,
443 remote_node: &RemoteNode<Env::ValidatorNode>,
444 chain_id: ChainId,
445 stop: BlockHeight,
446 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
447 let mut last_info = None;
448 let chain_info = self.local_node.chain_info(chain_id).await?;
450 let mut next_height = chain_info.next_block_height;
451 let hashes = self
452 .local_node
453 .get_preprocessed_block_hashes(chain_id, next_height, stop)
454 .await?;
455 let certificates = self.storage_client().read_certificates(&hashes).await?;
456 let certificates = match ResultReadCertificates::new(certificates, hashes) {
457 ResultReadCertificates::Certificates(certificates) => certificates,
458 ResultReadCertificates::InvalidHashes(hashes) => {
459 return Err(ChainClientError::ReadCertificatesError(hashes))
460 }
461 };
462 for certificate in certificates {
463 last_info = Some(self.handle_certificate(certificate).await?.info);
464 }
465 while next_height < stop {
467 let limit = u64::from(stop)
469 .checked_sub(u64::from(next_height))
470 .ok_or(ArithmeticError::Overflow)?
471 .min(self.options.certificate_download_batch_size);
472
473 let certificates = self
474 .requests_scheduler
475 .download_certificates(remote_node, chain_id, next_height, limit)
476 .await?;
477 let Some(info) = self.process_certificates(remote_node, certificates).await? else {
478 break;
479 };
480 assert!(info.next_block_height > next_height);
481 next_height = info.next_block_height;
482 last_info = Some(info);
483 }
484 Ok(last_info)
485 }
486
487 async fn download_blobs(
488 &self,
489 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
490 blob_ids: &[BlobId],
491 ) -> Result<(), ChainClientError> {
492 let blobs = &self
493 .requests_scheduler
494 .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
495 .await?
496 .ok_or_else(|| {
497 ChainClientError::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
498 })?;
499 self.local_node.store_blobs(blobs).await.map_err(Into::into)
500 }
501
502 #[instrument(level = "trace", skip_all)]
505 async fn process_certificates(
506 &self,
507 remote_node: &RemoteNode<Env::ValidatorNode>,
508 certificates: Vec<ConfirmedBlockCertificate>,
509 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
510 let mut info = None;
511 let required_blob_ids: Vec<_> = certificates
512 .iter()
513 .flat_map(|certificate| certificate.value().required_blob_ids())
514 .collect();
515
516 match self
517 .local_node
518 .read_blob_states_from_storage(&required_blob_ids)
519 .await
520 {
521 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
522 self.download_blobs(&[remote_node.clone()], &blob_ids)
523 .await?;
524 }
525 x => {
526 x?;
527 }
528 }
529
530 for certificate in certificates {
531 info = Some(
532 match self.handle_certificate(certificate.clone()).await {
533 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
534 self.download_blobs(&[remote_node.clone()], &blob_ids)
535 .await?;
536 self.handle_certificate(certificate).await?
537 }
538 x => x?,
539 }
540 .info,
541 );
542 }
543
544 Ok(info)
546 }
547
548 async fn handle_certificate<T: ProcessableCertificate>(
549 &self,
550 certificate: GenericCertificate<T>,
551 ) -> Result<ChainInfoResponse, LocalNodeError> {
552 self.local_node
553 .handle_certificate(certificate, &self.notifier)
554 .await
555 }
556
557 async fn chain_info_with_committees(
558 &self,
559 chain_id: ChainId,
560 ) -> Result<Box<ChainInfo>, LocalNodeError> {
561 let query = ChainInfoQuery::new(chain_id).with_committees();
562 let info = self.local_node.handle_chain_info_query(query).await?.info;
563 Ok(info)
564 }
565
566 #[instrument(level = "trace", skip_all)]
569 async fn admin_committees(
570 &self,
571 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
572 let info = self.chain_info_with_committees(self.admin_chain_id).await?;
573 Ok((info.epoch, info.into_committees()?))
574 }
575
576 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
578 let info = self.chain_info_with_committees(self.admin_chain_id).await?;
579 Ok((info.epoch, info.into_current_committee()?))
580 }
581
582 async fn validator_nodes(
584 &self,
585 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, ChainClientError> {
586 let (_, committee) = self.admin_committee().await?;
587 Ok(self.make_nodes(&committee)?)
588 }
589
590 fn make_nodes(
592 &self,
593 committee: &Committee,
594 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
595 Ok(self
596 .validator_node_provider()
597 .make_nodes(committee)?
598 .map(|(public_key, node)| RemoteNode { public_key, node })
599 .collect())
600 }
601
602 pub async fn get_chain_description_blob(
605 &self,
606 chain_id: ChainId,
607 ) -> Result<Blob, ChainClientError> {
608 let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
609 let blob = self
610 .local_node
611 .storage_client()
612 .read_blob(chain_desc_id)
613 .await?;
614 if let Some(blob) = blob {
615 return Ok(blob);
617 }
618 Box::pin(self.synchronize_chain_state(self.admin_chain_id)).await?;
620 let nodes = self.validator_nodes().await?;
621 Ok(self
622 .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
623 .await?
624 .pop()
625 .unwrap()) }
627
628 pub async fn get_chain_description(
631 &self,
632 chain_id: ChainId,
633 ) -> Result<ChainDescription, ChainClientError> {
634 let blob = self.get_chain_description_blob(chain_id).await?;
635 Ok(bcs::from_bytes(blob.bytes())?)
636 }
637
638 #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
640 fn update_from_info(&self, info: &ChainInfo) {
641 self.chains.pin().update(info.chain_id, |state| {
642 let mut state = state.clone_for_update_unchecked();
643 state.update_from_info(info);
644 state
645 });
646 }
647
648 #[instrument(level = "trace", skip_all)]
650 async fn process_certificate<T: ProcessableCertificate>(
651 &self,
652 certificate: Box<GenericCertificate<T>>,
653 ) -> Result<(), LocalNodeError> {
654 let info = self.handle_certificate(*certificate).await?.info;
655 self.update_from_info(&info);
656 Ok(())
657 }
658
659 #[instrument(level = "trace", skip_all)]
661 async fn finalize_block(
662 self: &Arc<Self>,
663 committee: &Committee,
664 certificate: ValidatedBlockCertificate,
665 ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
666 debug!(round = %certificate.round, "Submitting block for confirmation");
667 let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
668 let finalize_action = CommunicateAction::FinalizeBlock {
669 certificate: Box::new(certificate),
670 delivery: self.options.cross_chain_message_delivery,
671 };
672 let certificate = self
673 .communicate_chain_action(committee, finalize_action, hashed_value)
674 .await?;
675 self.receive_certificate_with_checked_signatures(certificate.clone())
676 .await?;
677 Ok(certificate)
678 }
679
680 #[instrument(level = "trace", skip_all)]
682 async fn submit_block_proposal<T: ProcessableCertificate>(
683 self: &Arc<Self>,
684 committee: &Committee,
685 proposal: Box<BlockProposal>,
686 value: T,
687 ) -> Result<GenericCertificate<T>, ChainClientError> {
688 debug!(
689 round = %proposal.content.round,
690 "Submitting block proposal to validators"
691 );
692
693 let block_timestamp = proposal.content.block.timestamp;
695 let local_time = self.local_node.storage_client().clock().current_time();
696 if block_timestamp > local_time {
697 info!(
698 chain_id = %proposal.content.block.chain_id,
699 %block_timestamp,
700 %local_time,
701 "Block timestamp is in the future; waiting until it can be proposed",
702 );
703 }
704
705 let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
707 let submit_action = CommunicateAction::SubmitBlock {
708 proposal,
709 blob_ids: value.required_blob_ids().into_iter().collect(),
710 clock_skew_sender,
711 };
712
713 let validity_threshold = committee.validity_threshold();
715 let committee_clone = committee.clone();
716 let clock_skew_check_handle = linera_base::task::spawn(async move {
717 let mut skew_weight = 0u64;
718 let mut min_skew = TimeDelta::MAX;
719 let mut max_skew = TimeDelta::ZERO;
720 while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
721 if clock_skew.as_micros() > 0 {
722 skew_weight += committee_clone.weight(&public_key);
723 min_skew = min_skew.min(clock_skew);
724 max_skew = max_skew.max(clock_skew);
725 if skew_weight >= validity_threshold {
726 warn!(
727 skew_weight,
728 validity_threshold,
729 min_skew_ms = min_skew.as_micros() / 1000,
730 max_skew_ms = max_skew.as_micros() / 1000,
731 "A validity threshold of validators reported clock skew; \
732 consider checking your system clock",
733 );
734 return;
735 }
736 }
737 }
738 });
739
740 let certificate = self
741 .communicate_chain_action(committee, submit_action, value)
742 .await?;
743
744 clock_skew_check_handle.await;
745
746 self.process_certificate(Box::new(certificate.clone()))
747 .await?;
748 Ok(certificate)
749 }
750
751 #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
753 async fn communicate_chain_updates(
754 self: &Arc<Self>,
755 committee: &Committee,
756 chain_id: ChainId,
757 height: BlockHeight,
758 delivery: CrossChainMessageDelivery,
759 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
760 ) -> Result<(), ChainClientError> {
761 let nodes = self.make_nodes(committee)?;
762 communicate_with_quorum(
763 &nodes,
764 committee,
765 |_: &()| (),
766 |remote_node| {
767 let mut updater = ValidatorUpdater {
768 remote_node,
769 client: self.clone(),
770 admin_chain_id: self.admin_chain_id,
771 };
772 let certificate = latest_certificate.clone();
773 Box::pin(async move {
774 updater
775 .send_chain_information(chain_id, height, delivery, certificate)
776 .await
777 })
778 },
779 self.options.quorum_grace_period,
780 )
781 .await?;
782 Ok(())
783 }
784
785 #[instrument(level = "trace", skip_all)]
791 async fn communicate_chain_action<T: CertificateValue>(
792 self: &Arc<Self>,
793 committee: &Committee,
794 action: CommunicateAction,
795 value: T,
796 ) -> Result<GenericCertificate<T>, ChainClientError> {
797 let nodes = self.make_nodes(committee)?;
798 let ((votes_hash, votes_round), votes) = communicate_with_quorum(
799 &nodes,
800 committee,
801 |vote: &LiteVote| (vote.value.value_hash, vote.round),
802 |remote_node| {
803 let mut updater = ValidatorUpdater {
804 remote_node,
805 client: self.clone(),
806 admin_chain_id: self.admin_chain_id,
807 };
808 let action = action.clone();
809 Box::pin(async move { updater.send_chain_update(action).await })
810 },
811 self.options.quorum_grace_period,
812 )
813 .await?;
814 ensure!(
815 (votes_hash, votes_round) == (value.hash(), action.round()),
816 ChainClientError::UnexpectedQuorum {
817 hash: votes_hash,
818 round: votes_round,
819 expected_hash: value.hash(),
820 expected_round: action.round(),
821 }
822 );
823 let certificate = LiteCertificate::try_from_votes(votes)
828 .ok_or_else(|| {
829 ChainClientError::InternalError("Vote values or rounds don't match; this is a bug")
830 })?
831 .with_value(value)
832 .ok_or_else(|| {
833 ChainClientError::ProtocolError("A quorum voted for an unexpected value")
834 })?;
835 Ok(certificate)
836 }
837
838 #[instrument(level = "trace", skip_all)]
841 async fn receive_certificate_with_checked_signatures(
842 &self,
843 certificate: ConfirmedBlockCertificate,
844 ) -> Result<(), ChainClientError> {
845 let certificate = Box::new(certificate);
846 let block = certificate.block();
847 self.download_certificates(block.header.chain_id, block.header.height)
849 .await?;
850 if let Err(err) = self.process_certificate(certificate.clone()).await {
853 match &err {
854 LocalNodeError::BlobsNotFound(blob_ids) => {
855 self.download_blobs(&self.validator_nodes().await?, blob_ids)
856 .await
857 .map_err(|_| err)?;
858 self.process_certificate(certificate).await?;
859 }
860 _ => {
861 warn!("Failed to process network hashed certificate value");
863 return Err(err.into());
864 }
865 }
866 }
867
868 Ok(())
869 }
870
871 #[instrument(level = "trace", skip_all)]
873 #[allow(dead_code)] async fn receive_sender_certificate(
875 &self,
876 certificate: ConfirmedBlockCertificate,
877 mode: ReceiveCertificateMode,
878 nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
879 ) -> Result<(), ChainClientError> {
880 let (max_epoch, committees) = self.admin_committees().await?;
882 if let ReceiveCertificateMode::NeedsCheck = mode {
883 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
884 }
885 let nodes = if let Some(nodes) = nodes {
887 nodes
888 } else {
889 self.validator_nodes().await?
890 };
891 if let Err(err) = self.handle_certificate(certificate.clone()).await {
892 match &err {
893 LocalNodeError::BlobsNotFound(blob_ids) => {
894 self.download_blobs(&nodes, blob_ids).await?;
895 self.handle_certificate(certificate.clone()).await?;
896 }
897 _ => {
898 warn!("Failed to process network hashed certificate value");
900 return Err(err.into());
901 }
902 }
903 }
904
905 Ok(())
906 }
907
908 #[instrument(level = "trace", skip_all)]
910 async fn download_and_process_sender_chain(
911 &self,
912 sender_chain_id: ChainId,
913 nodes: &[RemoteNode<Env::ValidatorNode>],
914 received_log: &ReceivedLogs,
915 mut remote_heights: Vec<BlockHeight>,
916 sender: mpsc::UnboundedSender<ChainAndHeight>,
917 ) {
918 let (max_epoch, committees) = match self.admin_committees().await {
919 Ok(result) => result,
920 Err(error) => {
921 error!(%error, %sender_chain_id, "could not read admin committees");
922 return;
923 }
924 };
925 let committees_ref = &committees;
926 let mut nodes = nodes.to_vec();
927 while !remote_heights.is_empty() {
928 let remote_heights_ref = &remote_heights;
929 nodes.shuffle(&mut rand::thread_rng());
930 let certificates = match communicate_concurrently(
931 &nodes,
932 async move |remote_node| {
933 let mut remote_heights = remote_heights_ref.clone();
934 remote_heights.retain(|height| {
937 received_log.validator_has_block(
938 &remote_node.public_key,
939 sender_chain_id,
940 *height,
941 )
942 });
943 if remote_heights.is_empty() {
944 return Err(());
947 }
948 let certificates = self
949 .requests_scheduler
950 .download_certificates_by_heights(
951 &remote_node,
952 sender_chain_id,
953 remote_heights,
954 )
955 .await
956 .map_err(|_| ())?;
957 let mut certificates_with_check_results = vec![];
958 for cert in certificates {
959 if let Ok(check_result) =
960 Self::check_certificate(max_epoch, committees_ref, &cert)
961 {
962 certificates_with_check_results
963 .push((cert, check_result.into_result().is_ok()));
964 } else {
965 return Err(());
967 }
968 }
969 Ok(certificates_with_check_results)
970 },
971 |errors| {
972 errors
973 .into_iter()
974 .map(|(validator, _error)| validator)
975 .collect::<BTreeSet<_>>()
976 },
977 self.options.certificate_batch_download_timeout,
978 )
979 .await
980 {
981 Ok(certificates_with_check_results) => certificates_with_check_results,
982 Err(faulty_validators) => {
983 nodes.retain(|node| !faulty_validators.contains(&node.public_key));
985 if nodes.is_empty() {
986 info!(
987 chain_id = %sender_chain_id,
988 "could not download certificates for chain - no more correct validators left"
989 );
990 return;
991 }
992 continue;
993 }
994 };
995
996 trace!(
997 chain_id = %sender_chain_id,
998 num_certificates = %certificates.len(),
999 "received certificates",
1000 );
1001
1002 let mut to_remove_from_queue = BTreeSet::new();
1003
1004 for (certificate, check_result) in certificates {
1005 let hash = certificate.hash();
1006 let chain_id = certificate.block().header.chain_id;
1007 let height = certificate.block().header.height;
1008 if !check_result {
1009 to_remove_from_queue.insert(height);
1013 continue;
1014 }
1015 let mode = ReceiveCertificateMode::AlreadyChecked;
1017 if let Err(error) = self
1018 .receive_sender_certificate(certificate, mode, None)
1019 .await
1020 {
1021 warn!(%error, %hash, "Received invalid certificate");
1022 } else {
1023 to_remove_from_queue.insert(height);
1024 if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1025 error!(
1026 %chain_id,
1027 %height,
1028 %error,
1029 "failed to send chain and height over the channel",
1030 );
1031 }
1032 }
1033 }
1034
1035 remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1036 }
1037 trace!(
1038 chain_id = %sender_chain_id,
1039 "find_received_certificates: finished processing chain",
1040 );
1041 }
1042
1043 #[instrument(level = "trace", skip(self))]
1045 async fn get_received_log_from_validator(
1046 &self,
1047 chain_id: ChainId,
1048 remote_node: &RemoteNode<Env::ValidatorNode>,
1049 tracker: u64,
1050 ) -> Result<Vec<ChainAndHeight>, ChainClientError> {
1051 let mut offset = tracker;
1052
1053 let mut remote_log = Vec::new();
1055 loop {
1056 trace!("get_received_log_from_validator: looping");
1057 let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1058 let info = remote_node.handle_chain_info_query(query).await?;
1059 let received_entries = info.requested_received_log.len();
1060 offset += received_entries as u64;
1061 remote_log.extend(info.requested_received_log);
1062 trace!(
1063 remote_node = remote_node.address(),
1064 %received_entries,
1065 "get_received_log_from_validator: received log batch",
1066 );
1067 if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1068 break;
1069 }
1070 }
1071
1072 trace!(
1073 remote_node = remote_node.address(),
1074 num_entries = remote_log.len(),
1075 "get_received_log_from_validator: returning downloaded log",
1076 );
1077
1078 Ok(remote_log)
1079 }
1080
1081 async fn download_sender_block_with_sending_ancestors(
1087 &self,
1088 receiver_chain_id: ChainId,
1089 sender_chain_id: ChainId,
1090 height: BlockHeight,
1091 remote_node: &RemoteNode<Env::ValidatorNode>,
1092 ) -> Result<(), ChainClientError> {
1093 let next_outbox_height = self
1094 .local_node
1095 .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1096 .await?
1097 .get(&sender_chain_id)
1098 .copied()
1099 .unwrap_or(BlockHeight::ZERO);
1100 let (max_epoch, committees) = self.admin_committees().await?;
1101
1102 let mut certificates = BTreeMap::new();
1105 let mut current_height = height;
1106
1107 while current_height >= next_outbox_height {
1109 let downloaded = self
1111 .requests_scheduler
1112 .download_certificates_by_heights(
1113 remote_node,
1114 sender_chain_id,
1115 vec![current_height],
1116 )
1117 .await?;
1118 let Some(certificate) = downloaded.into_iter().next() else {
1119 return Err(ChainClientError::CannotDownloadMissingSenderBlock {
1120 chain_id: sender_chain_id,
1121 height: current_height,
1122 });
1123 };
1124
1125 Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1127 .into_result()?;
1128
1129 let block = certificate.block();
1131 let next_height = block
1132 .body
1133 .previous_message_blocks
1134 .get(&receiver_chain_id)
1135 .map(|(_prev_hash, prev_height)| *prev_height);
1136
1137 certificates.insert(current_height, certificate);
1139
1140 if let Some(prev_height) = next_height {
1141 current_height = prev_height;
1143 } else {
1144 break;
1146 }
1147 }
1148
1149 if certificates.is_empty() {
1150 self.local_node
1151 .retry_pending_cross_chain_requests(sender_chain_id)
1152 .await?;
1153 }
1154
1155 for certificate in certificates.into_values() {
1157 self.receive_sender_certificate(
1158 certificate,
1159 ReceiveCertificateMode::AlreadyChecked,
1160 Some(vec![remote_node.clone()]),
1161 )
1162 .await?;
1163 }
1164
1165 Ok(())
1166 }
1167
1168 #[instrument(
1169 level = "trace", skip_all,
1170 fields(certificate_hash = ?incoming_certificate.hash()),
1171 )]
1172 fn check_certificate(
1173 highest_known_epoch: Epoch,
1174 committees: &BTreeMap<Epoch, Committee>,
1175 incoming_certificate: &ConfirmedBlockCertificate,
1176 ) -> Result<CheckCertificateResult, NodeError> {
1177 let block = incoming_certificate.block();
1178 if block.header.epoch > highest_known_epoch {
1180 return Ok(CheckCertificateResult::FutureEpoch);
1181 }
1182 if let Some(known_committee) = committees.get(&block.header.epoch) {
1183 incoming_certificate.check(known_committee)?;
1186 Ok(CheckCertificateResult::New)
1187 } else {
1188 Ok(CheckCertificateResult::OldEpoch)
1190 }
1191 }
1192
1193 #[instrument(level = "trace", skip_all)]
1197 pub(crate) async fn synchronize_chain_state(
1198 &self,
1199 chain_id: ChainId,
1200 ) -> Result<Box<ChainInfo>, ChainClientError> {
1201 let (_, committee) = self.admin_committee().await?;
1202 Box::pin(self.synchronize_chain_state_from_committee(chain_id, committee)).await
1203 }
1204
1205 #[instrument(level = "trace", skip_all)]
1210 pub async fn synchronize_chain_state_from_committee(
1211 &self,
1212 chain_id: ChainId,
1213 committee: Committee,
1214 ) -> Result<Box<ChainInfo>, ChainClientError> {
1215 #[cfg(with_metrics)]
1216 let _latency = if !self.is_chain_follow_only(chain_id).await {
1217 Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1218 } else {
1219 None
1220 };
1221
1222 let validators = self.make_nodes(&committee)?;
1223 Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1224 communicate_with_quorum(
1225 &validators,
1226 &committee,
1227 |_: &()| (),
1228 |remote_node| async move {
1229 self.synchronize_chain_state_from(&remote_node, chain_id)
1230 .await
1231 },
1232 self.options.quorum_grace_period,
1233 )
1234 .await?;
1235
1236 self.local_node
1237 .chain_info(chain_id)
1238 .await
1239 .map_err(Into::into)
1240 }
1241
1242 #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1248 pub(crate) async fn synchronize_chain_state_from(
1249 &self,
1250 remote_node: &RemoteNode<Env::ValidatorNode>,
1251 chain_id: ChainId,
1252 ) -> Result<(), ChainClientError> {
1253 let with_manager_values = !self.is_chain_follow_only(chain_id).await;
1254 let query = if with_manager_values {
1255 ChainInfoQuery::new(chain_id).with_manager_values()
1256 } else {
1257 ChainInfoQuery::new(chain_id)
1258 };
1259 let remote_info = remote_node.handle_chain_info_query(query).await?;
1260 let local_info = self
1261 .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1262 .await?;
1263
1264 if !with_manager_values {
1265 return Ok(());
1266 }
1267
1268 let local_height = match local_info {
1270 Some(info) => info.next_block_height,
1271 None => {
1272 self.local_node
1273 .chain_info(chain_id)
1274 .await?
1275 .next_block_height
1276 }
1277 };
1278 if local_height != remote_info.next_block_height {
1279 debug!(
1280 remote_node = remote_node.address(),
1281 remote_height = %remote_info.next_block_height,
1282 local_height = %local_height,
1283 "synced from validator, but remote height and local height are different",
1284 );
1285 return Ok(());
1286 };
1287
1288 if let Some(timeout) = remote_info.manager.timeout {
1289 self.handle_certificate(*timeout).await?;
1290 }
1291 let mut proposals = Vec::new();
1292 if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1293 proposals.push(*proposal);
1294 }
1295 if let Some(proposal) = remote_info.manager.requested_proposed {
1296 proposals.push(*proposal);
1297 }
1298 if let Some(locking) = remote_info.manager.requested_locking {
1299 match *locking {
1300 LockingBlock::Fast(proposal) => {
1301 proposals.push(proposal);
1302 }
1303 LockingBlock::Regular(cert) => {
1304 let hash = cert.hash();
1305 if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1306 {
1307 debug!(
1308 remote_node = remote_node.address(),
1309 %hash,
1310 height = %local_height,
1311 %error,
1312 "skipping locked block from validator",
1313 );
1314 }
1315 }
1316 }
1317 }
1318 'proposal_loop: for proposal in proposals {
1319 let owner: AccountOwner = proposal.owner();
1320 if let Err(mut err) =
1321 Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1322 {
1323 if let LocalNodeError::BlobsNotFound(_) = &err {
1324 let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1325 if !required_blob_ids.is_empty() {
1326 let mut blobs = Vec::new();
1327 for blob_id in required_blob_ids {
1328 let blob_content = match self
1329 .requests_scheduler
1330 .download_pending_blob(remote_node, chain_id, blob_id)
1331 .await
1332 {
1333 Ok(content) => content,
1334 Err(error) => {
1335 info!(
1336 remote_node = remote_node.address(),
1337 height = %local_height,
1338 proposer = %owner,
1339 %blob_id,
1340 %error,
1341 "skipping proposal from validator; failed to download blob",
1342 );
1343 continue 'proposal_loop;
1344 }
1345 };
1346 blobs.push(Blob::new(blob_content));
1347 }
1348 self.local_node
1349 .handle_pending_blobs(chain_id, blobs)
1350 .await?;
1351 if let Err(new_err) =
1353 Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1354 {
1355 err = new_err;
1356 } else {
1357 continue;
1358 }
1359 }
1360 if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1361 self.update_local_node_with_blobs_from(
1362 blob_ids.clone(),
1363 &[remote_node.clone()],
1364 )
1365 .await?;
1366 if let Err(new_err) =
1368 Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1369 {
1370 err = new_err;
1371 } else {
1372 continue;
1373 }
1374 }
1375 }
1376 while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1377 if let ChainError::MissingCrossChainUpdate {
1378 chain_id,
1379 origin,
1380 height,
1381 } = &**chain_err
1382 {
1383 self.download_sender_block_with_sending_ancestors(
1384 *chain_id,
1385 *origin,
1386 *height,
1387 remote_node,
1388 )
1389 .await?;
1390 if let Err(new_err) =
1392 Box::pin(self.local_node.handle_block_proposal(proposal.clone())).await
1393 {
1394 err = new_err;
1395 } else {
1396 continue 'proposal_loop;
1397 }
1398 } else {
1399 break;
1400 }
1401 }
1402
1403 debug!(
1404 remote_node = remote_node.address(),
1405 proposer = %owner,
1406 height = %local_height,
1407 error = %err,
1408 "skipping proposal from validator",
1409 );
1410 }
1411 }
1412 Ok(())
1413 }
1414
1415 async fn try_process_locking_block_from(
1416 &self,
1417 remote_node: &RemoteNode<Env::ValidatorNode>,
1418 certificate: GenericCertificate<ValidatedBlock>,
1419 ) -> Result<(), ChainClientError> {
1420 let chain_id = certificate.inner().chain_id();
1421 let certificate = Box::new(certificate);
1422 match self.process_certificate(certificate.clone()).await {
1423 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1424 let mut blobs = Vec::new();
1425 for blob_id in blob_ids {
1426 let blob_content = self
1427 .requests_scheduler
1428 .download_pending_blob(remote_node, chain_id, blob_id)
1429 .await?;
1430 blobs.push(Blob::new(blob_content));
1431 }
1432 self.local_node
1433 .handle_pending_blobs(chain_id, blobs)
1434 .await?;
1435 self.process_certificate(certificate).await?;
1436 Ok(())
1437 }
1438 Err(err) => Err(err.into()),
1439 Ok(()) => Ok(()),
1440 }
1441 }
1442
1443 async fn update_local_node_with_blobs_from(
1446 &self,
1447 blob_ids: Vec<BlobId>,
1448 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1449 ) -> Result<Vec<Blob>, ChainClientError> {
1450 let timeout = self.options.blob_download_timeout;
1451 let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1453 stream::iter(blob_ids.into_iter().map(|blob_id| {
1454 communicate_concurrently(
1455 remote_nodes,
1456 async move |remote_node| {
1457 let certificate = self
1458 .requests_scheduler
1459 .download_certificate_for_blob(&remote_node, blob_id)
1460 .await?;
1461 self.receive_sender_certificate(
1462 certificate,
1463 ReceiveCertificateMode::NeedsCheck,
1464 Some(vec![remote_node.clone()]),
1465 )
1466 .await?;
1467 let blob = self
1468 .local_node
1469 .storage_client()
1470 .read_blob(blob_id)
1471 .await?
1472 .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1473 Result::<_, ChainClientError>::Ok(blob)
1474 },
1475 move |_| ChainClientError::from(NodeError::BlobsNotFound(vec![blob_id])),
1476 timeout,
1477 )
1478 }))
1479 .buffer_unordered(self.options.max_joined_tasks)
1480 .collect::<Vec<_>>()
1481 .await
1482 .into_iter()
1483 .collect()
1484 }
1485
1486 #[instrument(level = "trace", skip(self, block))]
1496 async fn stage_block_execution_with_policy(
1497 &self,
1498 block: ProposedBlock,
1499 round: Option<u32>,
1500 published_blobs: Vec<Blob>,
1501 policy: BundleExecutionPolicy,
1502 ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1503 loop {
1504 let result = self
1505 .local_node
1506 .stage_block_execution_with_policy(
1507 block.clone(),
1508 round,
1509 published_blobs.clone(),
1510 policy,
1511 )
1512 .await;
1513 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1514 let validators = self.validator_nodes().await?;
1515 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1516 .await?;
1517 continue; }
1519 if let Ok((_, executed_block, _, _)) = &result {
1520 let hash = CryptoHash::new(executed_block);
1521 let notification = Notification {
1522 chain_id: executed_block.header.chain_id,
1523 reason: Reason::BlockExecuted {
1524 height: executed_block.header.height,
1525 hash,
1526 },
1527 };
1528 self.notifier.notify(&[notification]);
1529 }
1530 let (_modified_block, executed_block, response, _resource_tracker) = result?;
1531 return Ok((executed_block, response));
1532 }
1533 }
1534
1535 #[instrument(level = "trace", skip(self, block))]
1538 async fn stage_block_execution(
1539 &self,
1540 block: ProposedBlock,
1541 round: Option<u32>,
1542 published_blobs: Vec<Blob>,
1543 ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1544 loop {
1545 let result = self
1546 .local_node
1547 .stage_block_execution(block.clone(), round, published_blobs.clone())
1548 .await;
1549 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1550 let validators = self.validator_nodes().await?;
1551 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1552 .await?;
1553 continue; }
1555 if let Ok((block, _, _)) = &result {
1556 let hash = CryptoHash::new(block);
1557 let notification = Notification {
1558 chain_id: block.header.chain_id,
1559 reason: Reason::BlockExecuted {
1560 height: block.header.height,
1561 hash,
1562 },
1563 };
1564 self.notifier.notify(&[notification]);
1565 }
1566 let (block, response, _resource_tracker) = result?;
1567 return Ok((block, response));
1568 }
1569 }
1570}
1571
1572#[derive(Debug, Clone, Copy)]
1573pub enum TimingType {
1574 ExecuteOperations,
1575 ExecuteBlock,
1576 SubmitBlockProposal,
1577 UpdateValidators,
1578}
1579
1580#[derive(Debug, Clone)]
1581pub struct ChainClientOptions {
1582 pub max_pending_message_bundles: usize,
1584 pub max_block_limit_errors: u32,
1589 pub max_new_events_per_block: usize,
1591 pub message_policy: MessagePolicy,
1593 pub cross_chain_message_delivery: CrossChainMessageDelivery,
1595 pub quorum_grace_period: f64,
1598 pub blob_download_timeout: Duration,
1600 pub certificate_batch_download_timeout: Duration,
1602 pub certificate_download_batch_size: u64,
1605 pub sender_certificate_download_batch_size: usize,
1608 pub max_joined_tasks: usize,
1610 pub allow_fast_blocks: bool,
1613}
1614
1615pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
1616pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
1617
1618#[cfg(with_testing)]
1619impl ChainClientOptions {
1620 pub fn test_default() -> Self {
1621 use crate::DEFAULT_QUORUM_GRACE_PERIOD;
1622
1623 ChainClientOptions {
1624 max_pending_message_bundles: 10,
1625 max_block_limit_errors: 3,
1626 max_new_events_per_block: 10,
1627 message_policy: MessagePolicy::new_accept_all(),
1628 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
1629 quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
1630 blob_download_timeout: Duration::from_secs(1),
1631 certificate_batch_download_timeout: Duration::from_secs(1),
1632 certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
1633 sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
1634 max_joined_tasks: 100,
1635 allow_fast_blocks: false,
1636 }
1637 }
1638}
1639
1640#[derive(Debug)]
1646pub struct ChainClient<Env: Environment> {
1647 #[debug(skip)]
1649 client: Arc<Client<Env>>,
1650 chain_id: ChainId,
1652 #[debug(skip)]
1654 options: ChainClientOptions,
1655 preferred_owner: Option<AccountOwner>,
1658 initial_next_block_height: BlockHeight,
1660 initial_block_hash: Option<CryptoHash>,
1662 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
1664}
1665
1666impl<Env: Environment> Clone for ChainClient<Env> {
1667 fn clone(&self) -> Self {
1668 Self {
1669 client: self.client.clone(),
1670 chain_id: self.chain_id,
1671 options: self.options.clone(),
1672 preferred_owner: self.preferred_owner,
1673 initial_next_block_height: self.initial_next_block_height,
1674 initial_block_hash: self.initial_block_hash,
1675 timing_sender: self.timing_sender.clone(),
1676 }
1677 }
1678}
1679
1680#[derive(Debug, Error)]
1682pub enum ChainClientError {
1683 #[error("Local node operation failed: {0}")]
1684 LocalNodeError(#[from] LocalNodeError),
1685
1686 #[error("Remote node operation failed: {0}")]
1687 RemoteNodeError(#[from] NodeError),
1688
1689 #[error(transparent)]
1690 ArithmeticError(#[from] ArithmeticError),
1691
1692 #[error("Missing certificates: {0:?}")]
1693 ReadCertificatesError(Vec<CryptoHash>),
1694
1695 #[error("Missing confirmed block: {0:?}")]
1696 MissingConfirmedBlock(CryptoHash),
1697
1698 #[error("JSON (de)serialization error: {0}")]
1699 JsonError(#[from] serde_json::Error),
1700
1701 #[error("Chain operation failed: {0}")]
1702 ChainError(#[from] ChainError),
1703
1704 #[error(transparent)]
1705 CommunicationError(#[from] CommunicationError<NodeError>),
1706
1707 #[error("Internal error within chain client: {0}")]
1708 InternalError(&'static str),
1709
1710 #[error(
1711 "Cannot accept a certificate from an unknown committee in the future. \
1712 Please synchronize the local view of the admin chain"
1713 )]
1714 CommitteeSynchronizationError,
1715
1716 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
1717 WalletSynchronizationError,
1718
1719 #[error("The state of the client is incompatible with the proposed block: {0}")]
1720 BlockProposalError(&'static str),
1721
1722 #[error(
1723 "Cannot accept a certificate from a committee that was retired. \
1724 Try a newer certificate from the same origin"
1725 )]
1726 CommitteeDeprecationError,
1727
1728 #[error("Protocol error within chain client: {0}")]
1729 ProtocolError(&'static str),
1730
1731 #[error("Signer doesn't have key to sign for chain {0}")]
1732 CannotFindKeyForChain(ChainId),
1733
1734 #[error("client is not configured to propose on chain {0}")]
1735 NoAccountKeyConfigured(ChainId),
1736
1737 #[error("The chain client isn't owner on chain {0}")]
1738 NotAnOwner(ChainId),
1739
1740 #[error(transparent)]
1741 ViewError(#[from] ViewError),
1742
1743 #[error(
1744 "Failed to download certificates and update local node to the next height \
1745 {target_next_block_height} of chain {chain_id}"
1746 )]
1747 CannotDownloadCertificates {
1748 chain_id: ChainId,
1749 target_next_block_height: BlockHeight,
1750 },
1751
1752 #[error(transparent)]
1753 BcsError(#[from] bcs::Error),
1754
1755 #[error(
1756 "Unexpected quorum: validators voted for block hash {hash} in {round}, \
1757 expected block hash {expected_hash} in {expected_round}"
1758 )]
1759 UnexpectedQuorum {
1760 hash: CryptoHash,
1761 round: Round,
1762 expected_hash: CryptoHash,
1763 expected_round: Round,
1764 },
1765
1766 #[error("signer error: {0:?}")]
1767 Signer(#[source] Box<dyn signer::Error>),
1768
1769 #[error("Cannot revoke the current epoch {0}")]
1770 CannotRevokeCurrentEpoch(Epoch),
1771
1772 #[error("Epoch is already revoked")]
1773 EpochAlreadyRevoked,
1774
1775 #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
1776 CannotDownloadMissingSenderBlock {
1777 chain_id: ChainId,
1778 height: BlockHeight,
1779 },
1780
1781 #[error(
1782 "A different block was already committed at this height. \
1783 The committed certificate hash is {0}"
1784 )]
1785 Conflict(CryptoHash),
1786}
1787
1788impl From<Infallible> for ChainClientError {
1789 fn from(infallible: Infallible) -> Self {
1790 match infallible {}
1791 }
1792}
1793
1794impl ChainClientError {
1795 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
1796 Self::Signer(Box::new(err))
1797 }
1798}
1799
1800impl<Env: Environment> ChainClient<Env> {
1801 #[instrument(level = "trace", skip(self))]
1803 fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
1804 self.client
1805 .chains
1806 .pin()
1807 .get(&self.chain_id)
1808 .expect("Chain client constructed for invalid chain")
1809 .client_mutex()
1810 }
1811
1812 #[instrument(level = "trace", skip(self))]
1814 pub fn pending_proposal(&self) -> Option<PendingProposal> {
1815 self.client
1816 .chains
1817 .pin()
1818 .get(&self.chain_id)
1819 .expect("Chain client constructed for invalid chain")
1820 .pending_proposal()
1821 .clone()
1822 }
1823
1824 #[instrument(level = "trace", skip(self, f))]
1826 fn update_state<F>(&self, f: F)
1827 where
1828 F: Fn(&mut ChainClientState),
1829 {
1830 let chains = self.client.chains.pin();
1831 chains
1832 .update(self.chain_id, |state| {
1833 let mut state = state.clone_for_update_unchecked();
1834 f(&mut state);
1835 state
1836 })
1837 .expect("Chain client constructed for invalid chain");
1838 }
1839
1840 #[instrument(level = "trace", skip(self))]
1842 pub fn signer(&self) -> &impl Signer {
1843 self.client.signer()
1844 }
1845
1846 #[instrument(level = "trace", skip(self))]
1848 pub fn options_mut(&mut self) -> &mut ChainClientOptions {
1849 &mut self.options
1850 }
1851
1852 #[instrument(level = "trace", skip(self))]
1854 pub fn options(&self) -> &ChainClientOptions {
1855 &self.options
1856 }
1857
1858 #[instrument(level = "trace", skip(self))]
1860 pub fn chain_id(&self) -> ChainId {
1861 self.chain_id
1862 }
1863
1864 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
1866 self.timing_sender.clone()
1867 }
1868
1869 #[instrument(level = "trace", skip(self))]
1871 pub fn admin_chain_id(&self) -> ChainId {
1872 self.client.admin_chain_id
1873 }
1874
1875 #[instrument(level = "trace", skip(self))]
1877 pub fn preferred_owner(&self) -> Option<AccountOwner> {
1878 self.preferred_owner
1879 }
1880
1881 #[instrument(level = "trace", skip(self))]
1883 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
1884 self.preferred_owner = Some(preferred_owner);
1885 }
1886
1887 #[instrument(level = "trace", skip(self))]
1889 pub fn unset_preferred_owner(&mut self) {
1890 self.preferred_owner = None;
1891 }
1892
1893 #[instrument(level = "trace")]
1895 pub async fn chain_state_view(
1896 &self,
1897 ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
1898 self.client.local_node.chain_state_view(self.chain_id).await
1899 }
1900
1901 #[instrument(level = "trace", skip(self))]
1903 pub async fn event_stream_publishers(&self) -> Result<BTreeSet<ChainId>, LocalNodeError> {
1904 let subscriptions = self
1905 .client
1906 .local_node
1907 .get_event_subscriptions(self.chain_id)
1908 .await?;
1909 let mut publishers = subscriptions
1910 .into_iter()
1911 .map(|((chain_id, _), _)| chain_id)
1912 .collect::<BTreeSet<_>>();
1913 if self.chain_id != self.client.admin_chain_id {
1914 publishers.insert(self.client.admin_chain_id);
1915 }
1916 Ok(publishers)
1917 }
1918
1919 #[instrument(level = "trace")]
1921 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
1922 self.subscribe_to(self.chain_id)
1923 }
1924
1925 #[instrument(level = "trace")]
1927 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
1928 Ok(Box::pin(UnboundedReceiverStream::new(
1929 self.client.notifier.subscribe(vec![chain_id]),
1930 )))
1931 }
1932
1933 #[instrument(level = "trace")]
1935 pub fn storage_client(&self) -> &Env::Storage {
1936 self.client.storage_client()
1937 }
1938
1939 #[instrument(level = "trace")]
1941 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1942 let query = ChainInfoQuery::new(self.chain_id);
1943 let response = self
1944 .client
1945 .local_node
1946 .handle_chain_info_query(query)
1947 .await?;
1948 self.client.update_from_info(&response.info);
1949 Ok(response.info)
1950 }
1951
1952 #[instrument(level = "trace")]
1954 pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1955 let query = ChainInfoQuery::new(self.chain_id)
1956 .with_manager_values()
1957 .with_committees();
1958 let response = self
1959 .client
1960 .local_node
1961 .handle_chain_info_query(query)
1962 .await?;
1963 self.client.update_from_info(&response.info);
1964 Ok(response.info)
1965 }
1966
1967 pub async fn get_chain_description(&self) -> Result<ChainDescription, ChainClientError> {
1969 self.client.get_chain_description(self.chain_id).await
1970 }
1971
1972 pub async fn prepare_for_owner(
1978 &self,
1979 owner: AccountOwner,
1980 ) -> Result<Box<ChainInfo>, ChainClientError> {
1981 ensure!(
1982 self.client.has_key_for(&owner).await?,
1983 ChainClientError::CannotFindKeyForChain(self.chain_id)
1984 );
1985 self.client
1987 .get_chain_description_blob(self.chain_id)
1988 .await?;
1989
1990 let info = self.chain_info().await?;
1992
1993 ensure!(
1995 info.manager
1996 .ownership
1997 .can_propose_in_multi_leader_round(&owner),
1998 ChainClientError::NotAnOwner(self.chain_id)
1999 );
2000
2001 Ok(info)
2002 }
2003
2004 #[instrument(level = "trace")]
2007 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
2008 if self.options.message_policy.is_ignore() {
2009 return Ok(Vec::new());
2011 }
2012
2013 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
2014 let info = self
2015 .client
2016 .local_node
2017 .handle_chain_info_query(query)
2018 .await?
2019 .info;
2020 if self.preferred_owner.is_some_and(|owner| {
2021 info.manager
2022 .ownership
2023 .is_super_owner_no_regular_owners(&owner)
2024 }) {
2025 ensure!(
2027 info.next_block_height >= self.initial_next_block_height,
2028 ChainClientError::WalletSynchronizationError
2029 );
2030 }
2031
2032 Ok(info
2033 .requested_pending_message_bundles
2034 .into_iter()
2035 .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
2036 .take(self.options.max_pending_message_bundles)
2037 .collect())
2038 }
2039
2040 #[instrument(level = "trace")]
2044 async fn collect_stream_updates(&self) -> Result<Option<Operation>, ChainClientError> {
2045 let subscription_map = self
2047 .client
2048 .local_node
2049 .get_event_subscriptions(self.chain_id)
2050 .await?;
2051 let futures = subscription_map
2053 .into_iter()
2054 .filter(|((chain_id, _), _)| {
2055 self.options
2056 .message_policy
2057 .restrict_chain_ids_to
2058 .as_ref()
2059 .is_none_or(|chain_set| chain_set.contains(chain_id))
2060 })
2061 .map(|((chain_id, stream_id), subscriptions)| {
2062 let client = self.client.clone();
2063 let previous_index = subscriptions.next_index;
2064 async move {
2065 let next_index = client
2066 .local_node
2067 .get_stream_event_count(chain_id, stream_id.clone())
2068 .await?;
2069 if let Some(next_index) =
2070 next_index.filter(|next_index| *next_index > previous_index)
2071 {
2072 Ok(Some((chain_id, stream_id, previous_index, next_index)))
2073 } else {
2074 Ok::<_, ChainClientError>(None)
2075 }
2076 }
2077 });
2078 let all_updates = futures::stream::iter(futures)
2079 .buffer_unordered(self.options.max_joined_tasks)
2080 .try_collect::<Vec<_>>()
2081 .await?
2082 .into_iter()
2083 .flatten()
2084 .collect::<Vec<_>>();
2085 let max_events = self.options.max_new_events_per_block;
2087 let mut total_events: usize = 0;
2088 let mut updates = Vec::new();
2089 for (chain_id, stream_id, previous_index, next_index) in all_updates {
2090 let new_events = (next_index - previous_index) as usize;
2091 if total_events + new_events <= max_events {
2092 total_events += new_events;
2093 updates.push((chain_id, stream_id, next_index));
2094 } else {
2095 let remaining = max_events.saturating_sub(total_events);
2096 if remaining > 0 {
2097 updates.push((chain_id, stream_id, previous_index + remaining as u32));
2098 }
2099 break;
2100 }
2101 }
2102 if updates.is_empty() {
2103 return Ok(None);
2104 }
2105 Ok(Some(SystemOperation::UpdateStreams(updates).into()))
2106 }
2107
2108 #[instrument(level = "trace")]
2109 async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
2110 self.client.chain_info_with_committees(self.chain_id).await
2111 }
2112
2113 #[instrument(level = "trace")]
2115 async fn epoch_and_committees(
2116 &self,
2117 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
2118 let info = self.chain_info_with_committees().await?;
2119 let epoch = info.epoch;
2120 let committees = info.into_committees()?;
2121 Ok((epoch, committees))
2122 }
2123
2124 #[instrument(level = "trace")]
2126 pub async fn local_committee(&self) -> Result<Committee, ChainClientError> {
2127 let info = match self.chain_info_with_committees().await {
2128 Ok(info) => info,
2129 Err(LocalNodeError::BlobsNotFound(_)) => {
2130 self.synchronize_chain_state(self.chain_id).await?;
2131 self.chain_info_with_committees().await?
2132 }
2133 Err(err) => return Err(err.into()),
2134 };
2135 Ok(info.into_current_committee()?)
2136 }
2137
2138 #[instrument(level = "trace")]
2140 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
2141 self.client.admin_committee().await
2142 }
2143
2144 #[instrument(level = "trace")]
2148 pub async fn identity(&self) -> Result<AccountOwner, ChainClientError> {
2149 let Some(preferred_owner) = self.preferred_owner else {
2150 return Err(ChainClientError::NoAccountKeyConfigured(self.chain_id));
2151 };
2152 let manager = self.chain_info().await?.manager;
2153 ensure!(
2154 manager.ownership.is_active(),
2155 LocalNodeError::InactiveChain(self.chain_id)
2156 );
2157
2158 let is_owner = manager
2161 .ownership
2162 .can_propose_in_multi_leader_round(&preferred_owner);
2163
2164 if !is_owner {
2165 let accepted_owners = manager
2166 .ownership
2167 .all_owners()
2168 .chain(&manager.leader)
2169 .collect::<Vec<_>>();
2170 warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
2171 "The preferred owner is not configured as an owner of this chain",
2172 );
2173 return Err(ChainClientError::NotAnOwner(self.chain_id));
2174 }
2175
2176 let has_signer = self
2177 .signer()
2178 .contains_key(&preferred_owner)
2179 .await
2180 .map_err(ChainClientError::signer_failure)?;
2181
2182 if !has_signer {
2183 warn!(%self.chain_id, ?preferred_owner,
2184 "Chain is one of the owners but its Signer instance doesn't contain the key",
2185 );
2186 return Err(ChainClientError::CannotFindKeyForChain(self.chain_id));
2187 }
2188
2189 Ok(preferred_owner)
2190 }
2191
2192 #[instrument(level = "trace")]
2195 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2196 #[cfg(with_metrics)]
2197 let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();
2198
2199 let mut info = self.synchronize_to_known_height().await?;
2200
2201 if self.preferred_owner.is_none_or(|owner| {
2202 !info
2203 .manager
2204 .ownership
2205 .is_super_owner_no_regular_owners(&owner)
2206 }) {
2207 info = self.client.synchronize_chain_state(self.chain_id).await?;
2211 }
2212
2213 if info.epoch > self.client.admin_committees().await?.0 {
2214 self.client
2215 .synchronize_chain_state(self.client.admin_chain_id)
2216 .await?;
2217 }
2218
2219 self.client.update_from_info(&info);
2220 Ok(info)
2221 }
2222
2223 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2228 let info = self
2229 .client
2230 .download_certificates(self.chain_id, self.initial_next_block_height)
2231 .await?;
2232 if info.next_block_height == self.initial_next_block_height {
2233 ensure!(
2235 self.initial_block_hash == info.block_hash,
2236 ChainClientError::InternalError("Invalid chain of blocks in local node")
2237 );
2238 }
2239 Ok(info)
2240 }
2241
2242 #[instrument(level = "trace", skip(old_committee, latest_certificate))]
2244 pub async fn update_validators(
2245 &self,
2246 old_committee: Option<&Committee>,
2247 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
2248 ) -> Result<(), ChainClientError> {
2249 let update_validators_start = linera_base::time::Instant::now();
2250 if let Some(old_committee) = old_committee {
2252 let old_committee_start = linera_base::time::Instant::now();
2253 self.communicate_chain_updates(old_committee, latest_certificate.clone())
2254 .await?;
2255 tracing::debug!(
2256 old_committee_ms = old_committee_start.elapsed().as_millis(),
2257 "communicated chain updates to old committee"
2258 );
2259 };
2260 if let Ok(new_committee) = self.local_committee().await {
2261 if Some(&new_committee) != old_committee {
2262 let new_committee_start = linera_base::time::Instant::now();
2265 self.communicate_chain_updates(&new_committee, latest_certificate)
2266 .await?;
2267 tracing::debug!(
2268 new_committee_ms = new_committee_start.elapsed().as_millis(),
2269 "communicated chain updates to new committee"
2270 );
2271 }
2272 }
2273 self.send_timing(update_validators_start, TimingType::UpdateValidators);
2274 Ok(())
2275 }
2276
2277 #[instrument(level = "trace", skip(committee, latest_certificate))]
2279 pub async fn communicate_chain_updates(
2280 &self,
2281 committee: &Committee,
2282 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
2283 ) -> Result<(), ChainClientError> {
2284 let delivery = self.options.cross_chain_message_delivery;
2285 let height = self.chain_info().await?.next_block_height;
2286 self.client
2287 .communicate_chain_updates(
2288 committee,
2289 self.chain_id,
2290 height,
2291 delivery,
2292 latest_certificate,
2293 )
2294 .await
2295 }
2296
2297 async fn synchronize_publisher_chains(&self) -> Result<(), ChainClientError> {
2300 let subscriptions = self
2301 .client
2302 .local_node
2303 .get_event_subscriptions(self.chain_id)
2304 .await?;
2305 let chain_ids = subscriptions
2306 .iter()
2307 .map(|((chain_id, _), _)| *chain_id)
2308 .chain(iter::once(self.client.admin_chain_id))
2309 .filter(|chain_id| *chain_id != self.chain_id)
2310 .collect::<BTreeSet<_>>();
2311 stream::iter(
2312 chain_ids
2313 .into_iter()
2314 .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
2315 )
2316 .buffer_unordered(self.options.max_joined_tasks)
2317 .collect::<Vec<_>>()
2318 .await
2319 .into_iter()
2320 .collect::<Result<Vec<_>, _>>()?;
2321 Ok(())
2322 }
2323
2324 #[instrument(level = "trace")]
2333 pub async fn find_received_certificates(
2334 &self,
2335 cancellation_token: Option<CancellationToken>,
2336 ) -> Result<(), ChainClientError> {
2337 debug!(chain_id = %self.chain_id, "starting find_received_certificates");
2338 #[cfg(with_metrics)]
2339 let _latency = metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
2340 let chain_id = self.chain_id;
2342 let (_, committee) = self.admin_committee().await?;
2343 let nodes = self.client.make_nodes(&committee)?;
2344
2345 let trackers = self
2346 .client
2347 .local_node
2348 .get_received_certificate_trackers(chain_id)
2349 .await?;
2350
2351 trace!("find_received_certificates: read trackers");
2352
2353 let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
2354 let result = communicate_with_quorum(
2356 &nodes,
2357 &committee,
2358 |_| (),
2359 |remote_node| {
2360 let client = &self.client;
2361 let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
2362 let received_log_batches = Arc::clone(&received_log_batches);
2363 Box::pin(async move {
2364 let batch = client
2365 .get_received_log_from_validator(chain_id, &remote_node, tracker)
2366 .await?;
2367 let mut batches = received_log_batches.lock().unwrap();
2368 batches.push((remote_node.public_key, batch));
2369 Ok(())
2370 })
2371 },
2372 self.options.quorum_grace_period,
2373 )
2374 .await;
2375
2376 if let Err(error) = result {
2377 error!(
2378 %error,
2379 "Failed to synchronize received_logs from at least a quorum of validators",
2380 );
2381 }
2382
2383 let received_logs: Vec<_> = {
2384 let mut received_log_batches = received_log_batches.lock().unwrap();
2385 std::mem::take(received_log_batches.as_mut())
2386 };
2387
2388 debug!(
2389 received_logs_len = %received_logs.len(),
2390 received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
2391 "collected received logs"
2392 );
2393
2394 let (received_logs, mut validator_trackers) = {
2395 (
2396 ReceivedLogs::from_received_result(received_logs.clone()),
2397 ValidatorTrackers::new(received_logs, &trackers),
2398 )
2399 };
2400
2401 debug!(
2402 num_chains = %received_logs.num_chains(),
2403 num_certs = %received_logs.num_certs(),
2404 "find_received_certificates: total number of chains and certificates to sync",
2405 );
2406
2407 let max_blocks_per_chain =
2408 self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
2409 for received_log in received_logs.into_batches(
2410 self.options.sender_certificate_download_batch_size,
2411 max_blocks_per_chain,
2412 ) {
2413 validator_trackers = self
2414 .receive_sender_certificates(
2415 received_log,
2416 validator_trackers,
2417 &nodes,
2418 cancellation_token.clone(),
2419 )
2420 .await?;
2421
2422 self.update_received_certificate_trackers(&validator_trackers)
2423 .await;
2424 }
2425
2426 info!("find_received_certificates finished");
2427
2428 Ok(())
2429 }
2430
2431 async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
2432 let updated_trackers = trackers.to_map();
2433 trace!(?updated_trackers, "updated tracker values");
2434
2435 if let Err(error) = self
2437 .client
2438 .local_node
2439 .update_received_certificate_trackers(self.chain_id, updated_trackers)
2440 .await
2441 {
2442 error!(
2443 chain_id = %self.chain_id,
2444 %error,
2445 "Failed to update the certificate trackers for chain",
2446 );
2447 }
2448 }
2449
2450 async fn receive_sender_certificates(
2453 &self,
2454 mut received_logs: ReceivedLogs,
2455 mut validator_trackers: ValidatorTrackers,
2456 nodes: &[RemoteNode<Env::ValidatorNode>],
2457 cancellation_token: Option<CancellationToken>,
2458 ) -> Result<ValidatorTrackers, ChainClientError> {
2459 debug!(
2460 num_chains = %received_logs.num_chains(),
2461 num_certs = %received_logs.num_certs(),
2462 "receive_sender_certificates: number of chains and certificates to sync",
2463 );
2464
2465 let local_next_heights = self
2467 .client
2468 .local_node
2469 .next_outbox_heights(received_logs.chains(), self.chain_id)
2470 .await?;
2471
2472 validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
2473
2474 debug!(
2475 remaining_total_certificates = %received_logs.num_certs(),
2476 "receive_sender_certificates: computed remote_heights"
2477 );
2478
2479 let mut other_sender_chains = Vec::new();
2480 let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
2481
2482 let cert_futures = received_logs.heights_per_chain().into_iter().filter_map(
2483 |(sender_chain_id, remote_heights)| {
2484 if remote_heights.is_empty() {
2485 other_sender_chains.push(sender_chain_id);
2489 return None;
2490 };
2491 let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
2492 let sender = sender.clone();
2493 let client = self.client.clone();
2494 let mut nodes = nodes.to_vec();
2495 nodes.shuffle(&mut rand::thread_rng());
2496 let received_logs_ref = &received_logs;
2497 Some(async move {
2498 client
2499 .download_and_process_sender_chain(
2500 sender_chain_id,
2501 &nodes,
2502 received_logs_ref,
2503 remote_heights,
2504 sender,
2505 )
2506 .await
2507 })
2508 },
2509 );
2510
2511 let update_trackers = linera_base::task::spawn(async move {
2512 while let Some(chain_and_height) = receiver.recv().await {
2513 validator_trackers.downloaded_cert(chain_and_height);
2514 }
2515 validator_trackers
2516 });
2517
2518 let mut cancellation_future = Box::pin(
2519 async move {
2520 if let Some(token) = cancellation_token {
2521 token.cancelled().await
2522 } else {
2523 future::pending().await
2524 }
2525 }
2526 .fuse(),
2527 );
2528
2529 select! {
2530 _ = stream::iter(cert_futures)
2531 .buffer_unordered(self.options.max_joined_tasks)
2532 .for_each(future::ready)
2533 => (),
2534 _ = cancellation_future => ()
2535 };
2536
2537 drop(sender);
2538
2539 let validator_trackers = update_trackers.await;
2540
2541 debug!(
2542 num_other_chains = %other_sender_chains.len(),
2543 "receive_sender_certificates: processing certificates finished"
2544 );
2545
2546 self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
2550 .await;
2551
2552 debug!("receive_sender_certificates: finished processing other_sender_chains");
2553
2554 Ok(validator_trackers)
2555 }
2556
2557 async fn retry_pending_cross_chain_requests(
2560 &self,
2561 nodes: &[RemoteNode<Env::ValidatorNode>],
2562 other_sender_chains: Vec<ChainId>,
2563 ) {
2564 let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
2565 let local_node = self.client.local_node.clone();
2566 async move {
2567 if let Err(error) = match local_node
2568 .retry_pending_cross_chain_requests(chain_id)
2569 .await
2570 {
2571 Ok(()) => Ok(()),
2572 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
2573 if let Err(error) = self
2574 .client
2575 .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
2576 .await
2577 {
2578 error!(
2579 ?blob_ids,
2580 %error,
2581 "Error while attempting to download blobs during retrying outgoing \
2582 messages"
2583 );
2584 }
2585 local_node
2586 .retry_pending_cross_chain_requests(chain_id)
2587 .await
2588 }
2589 err => err,
2590 } {
2591 error!(
2592 %chain_id,
2593 %error,
2594 "Failed to retry outgoing messages from chain"
2595 );
2596 }
2597 }
2598 }));
2599 stream.for_each(future::ready).await;
2600 }
2601
2602 #[instrument(level = "trace")]
2604 pub async fn transfer(
2605 &self,
2606 owner: AccountOwner,
2607 amount: Amount,
2608 recipient: Account,
2609 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2610 Box::pin(self.execute_operation(SystemOperation::Transfer {
2612 owner,
2613 recipient,
2614 amount,
2615 }))
2616 .await
2617 }
2618
2619 #[instrument(level = "trace")]
2622 pub async fn read_data_blob(
2623 &self,
2624 hash: CryptoHash,
2625 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2626 let blob_id = BlobId {
2627 hash,
2628 blob_type: BlobType::Data,
2629 };
2630 Box::pin(self.execute_operation(SystemOperation::VerifyBlob { blob_id })).await
2631 }
2632
2633 #[instrument(level = "trace")]
2635 pub async fn claim(
2636 &self,
2637 owner: AccountOwner,
2638 target_id: ChainId,
2639 recipient: Account,
2640 amount: Amount,
2641 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2642 Box::pin(self.execute_operation(SystemOperation::Claim {
2643 owner,
2644 target_id,
2645 recipient,
2646 amount,
2647 }))
2648 .await
2649 }
2650
2651 #[instrument(level = "trace")]
2654 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, ChainClientError> {
2655 let chain_id = self.chain_id;
2656 let info = self.chain_info_with_committees().await?;
2657 let committee = info.current_committee()?;
2658 let height = info.next_block_height;
2659 let round = info.manager.current_round;
2660 let action = CommunicateAction::RequestTimeout {
2661 height,
2662 round,
2663 chain_id,
2664 };
2665 let value = Timeout::new(chain_id, height, info.epoch);
2666 let certificate = Box::new(
2667 self.client
2668 .communicate_chain_action(committee, action, value)
2669 .await?,
2670 );
2671 self.client.process_certificate(certificate.clone()).await?;
2672 self.client
2674 .communicate_chain_updates(
2675 committee,
2676 chain_id,
2677 height,
2678 CrossChainMessageDelivery::NonBlocking,
2679 None,
2680 )
2681 .await?;
2682 Ok(*certificate)
2683 }
2684
2685 #[instrument(level = "trace", skip_all)]
2687 pub async fn synchronize_chain_state(
2688 &self,
2689 chain_id: ChainId,
2690 ) -> Result<Box<ChainInfo>, ChainClientError> {
2691 self.client.synchronize_chain_state(chain_id).await
2692 }
2693
2694 #[instrument(level = "trace", skip_all)]
2697 pub async fn synchronize_chain_state_from_committee(
2698 &self,
2699 committee: Committee,
2700 ) -> Result<Box<ChainInfo>, ChainClientError> {
2701 Box::pin(
2702 self.client
2703 .synchronize_chain_state_from_committee(self.chain_id, committee),
2704 )
2705 .await
2706 }
2707
2708 #[instrument(level = "trace", skip(operations, blobs))]
2710 pub async fn execute_operations(
2711 &self,
2712 operations: Vec<Operation>,
2713 blobs: Vec<Blob>,
2714 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2715 let timing_start = linera_base::time::Instant::now();
2716 tracing::debug!("execute_operations started");
2717
2718 let result = loop {
2719 let execute_block_start = linera_base::time::Instant::now();
2720 tracing::debug!("calling execute_block");
2722 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
2723 Ok(ClientOutcome::Committed(certificate)) => {
2724 tracing::debug!(
2725 execute_block_ms = execute_block_start.elapsed().as_millis(),
2726 "execute_block succeeded"
2727 );
2728 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
2729 break Ok(ClientOutcome::Committed(certificate));
2730 }
2731 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2732 break Ok(ClientOutcome::WaitForTimeout(timeout));
2733 }
2734 Ok(ClientOutcome::Conflict(certificate)) => {
2735 info!(
2736 height = %certificate.block().header.height,
2737 "Another block was committed."
2738 );
2739 break Ok(ClientOutcome::Conflict(certificate));
2740 }
2741 Err(ChainClientError::CommunicationError(CommunicationError::Trusted(
2742 NodeError::UnexpectedBlockHeight {
2743 expected_block_height,
2744 found_block_height,
2745 },
2746 ))) if expected_block_height > found_block_height => {
2747 tracing::info!(
2748 "Local state is outdated; synchronizing chain {:.8}",
2749 self.chain_id
2750 );
2751 self.synchronize_chain_state(self.chain_id).await?;
2752 }
2753 Err(err) => return Err(err),
2754 };
2755 };
2756
2757 self.send_timing(timing_start, TimingType::ExecuteOperations);
2758 tracing::debug!(
2759 total_execute_operations_ms = timing_start.elapsed().as_millis(),
2760 "execute_operations returning"
2761 );
2762
2763 result
2764 }
2765
2766 pub async fn execute_operation(
2768 &self,
2769 operation: impl Into<Operation>,
2770 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2771 self.execute_operations(vec![operation.into()], vec![])
2772 .await
2773 }
2774
2775 #[instrument(level = "trace", skip(operations, blobs))]
2779 async fn execute_block(
2780 &self,
2781 operations: Vec<Operation>,
2782 blobs: Vec<Blob>,
2783 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2784 #[cfg(with_metrics)]
2785 let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
2786
2787 let mutex = self.client_mutex();
2788 let lock_start = linera_base::time::Instant::now();
2789 let _guard = mutex.lock_owned().await;
2790 tracing::debug!(
2791 lock_wait_ms = lock_start.elapsed().as_millis(),
2792 "acquired client_mutex in execute_block"
2793 );
2794 match self.process_pending_block_without_prepare().await? {
2796 ClientOutcome::Committed(Some(certificate)) => {
2797 return Ok(ClientOutcome::Conflict(Box::new(certificate)))
2798 }
2799 ClientOutcome::WaitForTimeout(timeout) => {
2800 return Ok(ClientOutcome::WaitForTimeout(timeout))
2801 }
2802 ClientOutcome::Conflict(certificate) => {
2803 return Ok(ClientOutcome::Conflict(certificate))
2804 }
2805 ClientOutcome::Committed(None) => {}
2806 }
2807
2808 let transactions = self.prepend_epochs_messages_and_events(operations).await?;
2812
2813 if transactions.is_empty() {
2814 return Err(ChainClientError::LocalNodeError(
2815 LocalNodeError::WorkerError(WorkerError::ChainError(Box::new(
2816 ChainError::EmptyBlock,
2817 ))),
2818 ));
2819 }
2820
2821 let block = self.new_pending_block(transactions, blobs).await?;
2822
2823 match self.process_pending_block_without_prepare().await? {
2824 ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
2825 Ok(ClientOutcome::Committed(certificate))
2826 }
2827 ClientOutcome::Committed(Some(certificate)) => {
2828 Ok(ClientOutcome::Conflict(Box::new(certificate)))
2829 }
2830 ClientOutcome::Committed(None) => Err(ChainClientError::BlockProposalError(
2832 "Unexpected block proposal error",
2833 )),
2834 ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
2835 ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
2836 }
2837 }
2838
2839 #[instrument(level = "trace", skip(operations))]
2845 async fn prepend_epochs_messages_and_events(
2846 &self,
2847 operations: Vec<Operation>,
2848 ) -> Result<Vec<Transaction>, ChainClientError> {
2849 let incoming_bundles = self.pending_message_bundles().await?;
2850 let stream_updates = self.collect_stream_updates().await?;
2851 Ok(self
2852 .collect_epoch_changes()
2853 .await?
2854 .into_iter()
2855 .map(Transaction::ExecuteOperation)
2856 .chain(
2857 incoming_bundles
2858 .into_iter()
2859 .map(Transaction::ReceiveMessages),
2860 )
2861 .chain(
2862 stream_updates
2863 .into_iter()
2864 .map(Transaction::ExecuteOperation),
2865 )
2866 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
2867 .collect::<Vec<_>>())
2868 }
2869
2870 #[instrument(level = "trace", skip(transactions, blobs))]
2874 async fn new_pending_block(
2875 &self,
2876 transactions: Vec<Transaction>,
2877 blobs: Vec<Blob>,
2878 ) -> Result<Block, ChainClientError> {
2879 let identity = self.identity().await?;
2880
2881 ensure!(
2882 self.pending_proposal().is_none(),
2883 ChainClientError::BlockProposalError(
2884 "Client state already has a pending block; \
2885 use the `linera retry-pending-block` command to commit that first"
2886 )
2887 );
2888 let info = self.chain_info_with_committees().await?;
2889 let timestamp = self.next_timestamp(&transactions, info.timestamp);
2890 let proposed_block = ProposedBlock {
2891 epoch: info.epoch,
2892 chain_id: self.chain_id,
2893 transactions,
2894 previous_block_hash: info.block_hash,
2895 height: info.next_block_height,
2896 authenticated_signer: Some(identity),
2897 timestamp,
2898 };
2899
2900 let round = self.round_for_oracle(&info, &identity).await?;
2901 let (block, _) = Box::pin(self.client.stage_block_execution_with_policy(
2904 proposed_block,
2905 round,
2906 blobs.clone(),
2907 BundleExecutionPolicy::AutoRetry {
2908 max_failures: self.options.max_block_limit_errors,
2909 },
2910 ))
2911 .await?;
2912 let (proposed_block, _) = block.clone().into_proposal();
2913 self.update_state(|state| {
2914 state.set_pending_proposal(proposed_block.clone(), blobs.clone())
2915 });
2916 Ok(block)
2917 }
2918
2919 #[instrument(level = "trace", skip(transactions))]
2924 fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
2925 let local_time = self.storage_client().clock().current_time();
2926 transactions
2927 .iter()
2928 .filter_map(Transaction::incoming_bundle)
2929 .map(|msg| msg.bundle.timestamp)
2930 .max()
2931 .map_or(local_time, |timestamp| timestamp.max(local_time))
2932 .max(block_time)
2933 }
2934
2935 #[instrument(level = "trace", skip(query))]
2937 pub async fn query_application(
2938 &self,
2939 query: Query,
2940 block_hash: Option<CryptoHash>,
2941 ) -> Result<QueryOutcome, ChainClientError> {
2942 loop {
2943 let result = self
2944 .client
2945 .local_node
2946 .query_application(self.chain_id, query.clone(), block_hash)
2947 .await;
2948 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2949 let validators = self.client.validator_nodes().await?;
2950 self.client
2951 .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
2952 .await?;
2953 continue; }
2955 return Ok(result?);
2956 }
2957 }
2958
2959 #[instrument(level = "trace", skip(query))]
2961 pub async fn query_system_application(
2962 &self,
2963 query: SystemQuery,
2964 ) -> Result<QueryOutcome<SystemResponse>, ChainClientError> {
2965 let QueryOutcome {
2966 response,
2967 operations,
2968 } = self.query_application(Query::System(query), None).await?;
2969 match response {
2970 QueryResponse::System(response) => Ok(QueryOutcome {
2971 response,
2972 operations,
2973 }),
2974 _ => Err(ChainClientError::InternalError(
2975 "Unexpected response for system query",
2976 )),
2977 }
2978 }
2979
2980 #[instrument(level = "trace", skip(application_id, query))]
2982 #[cfg(with_testing)]
2983 pub async fn query_user_application<A: Abi>(
2984 &self,
2985 application_id: ApplicationId<A>,
2986 query: &A::Query,
2987 ) -> Result<QueryOutcome<A::QueryResponse>, ChainClientError> {
2988 let query = Query::user(application_id, query)?;
2989 let QueryOutcome {
2990 response,
2991 operations,
2992 } = self.query_application(query, None).await?;
2993 match response {
2994 QueryResponse::User(response_bytes) => {
2995 let response = serde_json::from_slice(&response_bytes)?;
2996 Ok(QueryOutcome {
2997 response,
2998 operations,
2999 })
3000 }
3001 _ => Err(ChainClientError::InternalError(
3002 "Unexpected response for user query",
3003 )),
3004 }
3005 }
3006
3007 #[instrument(level = "trace")]
3014 pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
3015 let (balance, _) = Box::pin(self.query_balances_with_owner(AccountOwner::CHAIN)).await?;
3016 Ok(balance)
3017 }
3018
3019 #[instrument(level = "trace", skip(owner))]
3026 pub async fn query_owner_balance(
3027 &self,
3028 owner: AccountOwner,
3029 ) -> Result<Amount, ChainClientError> {
3030 if owner.is_chain() {
3031 Box::pin(self.query_balance()).await
3032 } else {
3033 Ok(Box::pin(self.query_balances_with_owner(owner))
3034 .await?
3035 .1
3036 .unwrap_or(Amount::ZERO))
3037 }
3038 }
3039
3040 #[instrument(level = "trace", skip(owner))]
3047 async fn query_balances_with_owner(
3048 &self,
3049 owner: AccountOwner,
3050 ) -> Result<(Amount, Option<Amount>), ChainClientError> {
3051 let incoming_bundles = self.pending_message_bundles().await?;
3052 if incoming_bundles.is_empty() {
3055 let chain_balance = self.local_balance().await?;
3056 let owner_balance = self.local_owner_balance(owner).await?;
3057 return Ok((chain_balance, Some(owner_balance)));
3058 }
3059 let info = self.chain_info().await?;
3060 let transactions = incoming_bundles
3061 .into_iter()
3062 .map(Transaction::ReceiveMessages)
3063 .collect::<Vec<_>>();
3064 let timestamp = self.next_timestamp(&transactions, info.timestamp);
3065 let block = ProposedBlock {
3066 epoch: info.epoch,
3067 chain_id: self.chain_id,
3068 transactions,
3069 previous_block_hash: info.block_hash,
3070 height: info.next_block_height,
3071 authenticated_signer: if owner == AccountOwner::CHAIN {
3072 None
3073 } else {
3074 Some(owner)
3075 },
3076 timestamp,
3077 };
3078 match Box::pin(self.client.stage_block_execution_with_policy(
3079 block,
3080 None,
3081 Vec::new(),
3082 BundleExecutionPolicy::AutoRetry {
3083 max_failures: self.options.max_block_limit_errors,
3084 },
3085 ))
3086 .await
3087 {
3088 Ok((_, response)) => Ok((
3089 response.info.chain_balance,
3090 response.info.requested_owner_balance,
3091 )),
3092 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3093 WorkerError::ChainError(error),
3094 ))) if matches!(
3095 &*error,
3096 ChainError::ExecutionError(
3097 execution_error,
3098 ChainExecutionContext::Block
3099 ) if matches!(
3100 **execution_error,
3101 ExecutionError::FeesExceedFunding { .. }
3102 )
3103 ) =>
3104 {
3105 Ok((Amount::ZERO, Some(Amount::ZERO)))
3107 }
3108 Err(error) => Err(error),
3109 }
3110 }
3111
3112 #[instrument(level = "trace")]
3116 pub async fn local_balance(&self) -> Result<Amount, ChainClientError> {
3117 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
3118 Ok(balance)
3119 }
3120
3121 #[instrument(level = "trace", skip(owner))]
3125 pub async fn local_owner_balance(
3126 &self,
3127 owner: AccountOwner,
3128 ) -> Result<Amount, ChainClientError> {
3129 if owner.is_chain() {
3130 self.local_balance().await
3131 } else {
3132 Ok(self
3133 .local_balances_with_owner(owner)
3134 .await?
3135 .1
3136 .unwrap_or(Amount::ZERO))
3137 }
3138 }
3139
3140 #[instrument(level = "trace", skip(owner))]
3144 async fn local_balances_with_owner(
3145 &self,
3146 owner: AccountOwner,
3147 ) -> Result<(Amount, Option<Amount>), ChainClientError> {
3148 ensure!(
3149 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
3150 ChainClientError::WalletSynchronizationError
3151 );
3152 let mut query = ChainInfoQuery::new(self.chain_id);
3153 query.request_owner_balance = owner;
3154 let response = self
3155 .client
3156 .local_node
3157 .handle_chain_info_query(query)
3158 .await?;
3159 Ok((
3160 response.info.chain_balance,
3161 response.info.requested_owner_balance,
3162 ))
3163 }
3164
3165 #[instrument(level = "trace")]
3167 pub async fn transfer_to_account(
3168 &self,
3169 from: AccountOwner,
3170 amount: Amount,
3171 account: Account,
3172 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3173 self.transfer(from, amount, account).await
3174 }
3175
3176 #[cfg(with_testing)]
3178 #[instrument(level = "trace")]
3179 pub async fn burn(
3180 &self,
3181 owner: AccountOwner,
3182 amount: Amount,
3183 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3184 let recipient = Account::burn_address(self.chain_id);
3185 self.transfer(owner, amount, recipient).await
3186 }
3187
3188 #[instrument(level = "trace")]
3189 pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3190 let validators = self.client.validator_nodes().await?;
3191 self.client
3192 .fetch_chain_info(self.chain_id, &validators)
3193 .await
3194 }
3195
3196 #[instrument(level = "trace")]
3205 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3206 if self.preferred_owner.is_none() {
3207 return self.client.synchronize_chain_state(self.chain_id).await;
3208 }
3209 let info = self.prepare_chain().await?;
3210 self.synchronize_publisher_chains().await?;
3211 self.find_received_certificates(None).await?;
3212 Ok(info)
3213 }
3214
3215 #[instrument(level = "trace")]
3217 pub async fn process_pending_block(
3218 &self,
3219 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3220 self.prepare_chain().await?;
3221 self.process_pending_block_without_prepare().await
3222 }
3223
3224 #[instrument(level = "trace")]
3226 async fn process_pending_block_without_prepare(
3227 &self,
3228 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3229 let process_start = linera_base::time::Instant::now();
3230 tracing::debug!("process_pending_block_without_prepare started");
3231 let info = self.request_leader_timeout_if_needed().await?;
3232
3233 if info.manager.has_locking_block_in_current_round()
3235 && !info.manager.current_round.is_fast()
3236 {
3237 return Box::pin(self.finalize_locking_block(info)).await;
3238 }
3239 let owner = self.identity().await?;
3240
3241 let local_node = &self.client.local_node;
3242 let pending_proposal = self.pending_proposal();
3244 let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
3245 match &**locking {
3246 LockingBlock::Regular(certificate) => {
3247 let blob_ids = certificate.block().required_blob_ids();
3248 let blobs = local_node
3249 .get_locking_blobs(&blob_ids, self.chain_id)
3250 .await?
3251 .ok_or_else(|| {
3252 ChainClientError::InternalError("Missing local locking blobs")
3253 })?;
3254 debug!("Retrying locking block from round {}", certificate.round);
3255 (certificate.block().clone(), blobs)
3256 }
3257 LockingBlock::Fast(proposal) => {
3258 let proposed_block = proposal.content.block.clone();
3259 let blob_ids = proposed_block.published_blob_ids();
3260 let blobs = local_node
3261 .get_locking_blobs(&blob_ids, self.chain_id)
3262 .await?
3263 .ok_or_else(|| {
3264 ChainClientError::InternalError("Missing local locking blobs")
3265 })?;
3266 let block = self
3267 .client
3268 .stage_block_execution(proposed_block, None, blobs.clone())
3269 .await?
3270 .0;
3271 debug!("Retrying locking block from fast round.");
3272 (block, blobs)
3273 }
3274 }
3275 } else if let Some(pending_proposal) = pending_proposal {
3276 let proposed_block = pending_proposal.block;
3278 let round = self.round_for_oracle(&info, &owner).await?;
3279 let (block, _) = self
3280 .client
3281 .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
3282 .await?;
3283 debug!("Proposing the local pending block.");
3284 (block, pending_proposal.blobs)
3285 } else {
3286 return Ok(ClientOutcome::Committed(None)); };
3288
3289 let has_oracle_responses = block.has_oracle_responses();
3290 let (proposed_block, outcome) = block.into_proposal();
3291 let round = match self
3292 .round_for_new_proposal(&info, &owner, has_oracle_responses)
3293 .await?
3294 {
3295 Either::Left(round) => round,
3296 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
3297 };
3298 debug!("Proposing block for round {}", round);
3299
3300 let already_handled_locally = info
3301 .manager
3302 .already_handled_proposal(round, &proposed_block);
3303 let proposal = if let Some(locking) = info.manager.requested_locking {
3305 Box::new(match *locking {
3306 LockingBlock::Regular(cert) => {
3307 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
3308 .await
3309 .map_err(ChainClientError::signer_failure)?
3310 }
3311 LockingBlock::Fast(proposal) => {
3312 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
3313 .await
3314 .map_err(ChainClientError::signer_failure)?
3315 }
3316 })
3317 } else {
3318 Box::new(
3319 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
3320 .await
3321 .map_err(ChainClientError::signer_failure)?,
3322 )
3323 };
3324 if !already_handled_locally {
3325 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
3327 match err {
3328 LocalNodeError::BlobsNotFound(_) => {
3329 local_node
3330 .handle_pending_blobs(self.chain_id, blobs)
3331 .await?;
3332 local_node.handle_block_proposal(*proposal.clone()).await?;
3333 }
3334 err => return Err(err.into()),
3335 }
3336 }
3337 }
3338 let committee = self.local_committee().await?;
3339 let block = Block::new(proposed_block, outcome);
3340 let submit_block_proposal_start = linera_base::time::Instant::now();
3342 let certificate = if round.is_fast() {
3343 let hashed_value = ConfirmedBlock::new(block);
3344 Box::pin(
3345 self.client
3346 .submit_block_proposal(&committee, proposal, hashed_value),
3347 )
3348 .await?
3349 } else {
3350 let hashed_value = ValidatedBlock::new(block);
3351 let certificate = Box::pin(self.client.submit_block_proposal(
3352 &committee,
3353 proposal,
3354 hashed_value.clone(),
3355 ))
3356 .await?;
3357 Box::pin(self.client.finalize_block(&committee, certificate)).await?
3358 };
3359 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
3360 debug!(round = %certificate.round, "Sending confirmed block to validators");
3361 let update_start = linera_base::time::Instant::now();
3362 Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
3363 tracing::debug!(
3364 update_validators_ms = update_start.elapsed().as_millis(),
3365 total_process_ms = process_start.elapsed().as_millis(),
3366 "process_pending_block_without_prepare completing"
3367 );
3368 Ok(ClientOutcome::Committed(Some(certificate)))
3369 }
3370
3371 fn send_timing(&self, start: Instant, timing_type: TimingType) {
3372 let Some(sender) = &self.timing_sender else {
3373 return;
3374 };
3375 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
3376 tracing::warn!(%err, "Failed to send timing info");
3377 }
3378 }
3379
3380 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, ChainClientError> {
3383 let mut info = self.chain_info_with_manager_values().await?;
3384 if let Some(round_timeout) = info.manager.round_timeout {
3387 if round_timeout <= self.storage_client().clock().current_time() {
3388 if let Err(e) = self.request_leader_timeout().await {
3389 debug!("Failed to obtain a timeout certificate: {}", e);
3390 } else {
3391 info = self.chain_info_with_manager_values().await?;
3392 }
3393 }
3394 }
3395 Ok(info)
3396 }
3397
3398 async fn finalize_locking_block(
3402 &self,
3403 info: Box<ChainInfo>,
3404 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3405 let locking = info
3406 .manager
3407 .requested_locking
3408 .expect("Should have a locking block");
3409 let LockingBlock::Regular(certificate) = *locking else {
3410 panic!("Should have a locking validated block");
3411 };
3412 debug!(
3413 round = %certificate.round,
3414 "Finalizing locking block"
3415 );
3416 let committee = self.local_committee().await?;
3417 let certificate =
3418 Box::pin(self.client.finalize_block(&committee, certificate.clone())).await?;
3419 Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
3420 Ok(ClientOutcome::Committed(Some(certificate)))
3421 }
3422
3423 async fn round_for_oracle(
3425 &self,
3426 info: &ChainInfo,
3427 identity: &AccountOwner,
3428 ) -> Result<Option<u32>, ChainClientError> {
3429 match self.round_for_new_proposal(info, identity, true).await {
3431 Ok(Either::Left(round)) => Ok(round.multi_leader()),
3433 Err(ChainClientError::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
3437 Err(err) => Err(err),
3438 }
3439 }
3440
3441 async fn round_for_new_proposal(
3443 &self,
3444 info: &ChainInfo,
3445 identity: &AccountOwner,
3446 has_oracle_responses: bool,
3447 ) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
3448 let manager = &info.manager;
3449 let seed = self
3450 .client
3451 .local_node
3452 .get_manager_seed(self.chain_id)
3453 .await?;
3454 let skip_fast = manager.current_round.is_fast()
3459 && (has_oracle_responses || !self.options.allow_fast_blocks);
3460 let conflict = manager
3461 .requested_signed_proposal
3462 .as_ref()
3463 .into_iter()
3464 .chain(&manager.requested_proposed)
3465 .any(|proposal| proposal.content.round == manager.current_round)
3466 || skip_fast;
3467 let round = if !conflict {
3468 manager.current_round
3469 } else if let Some(round) = manager
3470 .ownership
3471 .next_round(manager.current_round)
3472 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
3473 {
3474 round
3475 } else if let Some(timeout) = info.round_timeout() {
3476 return Ok(Either::Right(timeout));
3477 } else {
3478 return Err(ChainClientError::BlockProposalError(
3479 "Conflicting proposal in the current round",
3480 ));
3481 };
3482 let current_committee = info
3483 .current_committee()?
3484 .validators
3485 .values()
3486 .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
3487 .collect();
3488 if manager.should_propose(identity, round, seed, ¤t_committee) {
3489 return Ok(Either::Left(round));
3490 }
3491 if let Some(timeout) = info.round_timeout() {
3492 return Ok(Either::Right(timeout));
3493 }
3494 Err(ChainClientError::BlockProposalError(
3495 "Not a leader in the current round",
3496 ))
3497 }
3498
3499 #[cfg(with_testing)]
3501 #[instrument(level = "trace")]
3502 pub fn clear_pending_proposal(&self) {
3503 self.update_state(|state| state.clear_pending_proposal());
3504 }
3505
3506 #[instrument(level = "trace")]
3510 pub async fn rotate_key_pair(
3511 &self,
3512 public_key: AccountPublicKey,
3513 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3514 Box::pin(self.transfer_ownership(public_key.into())).await
3515 }
3516
3517 #[instrument(level = "trace")]
3519 pub async fn transfer_ownership(
3520 &self,
3521 new_owner: AccountOwner,
3522 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3523 Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
3524 super_owners: vec![new_owner],
3525 owners: Vec::new(),
3526 multi_leader_rounds: 2,
3527 open_multi_leader_rounds: false,
3528 timeout_config: TimeoutConfig::default(),
3529 }))
3530 .await
3531 }
3532
3533 #[instrument(level = "trace")]
3535 pub async fn share_ownership(
3536 &self,
3537 new_owner: AccountOwner,
3538 new_weight: u64,
3539 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3540 let ownership = self.prepare_chain().await?.manager.ownership;
3541 ensure!(
3542 ownership.is_active(),
3543 ChainError::InactiveChain(self.chain_id)
3544 );
3545 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
3546 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
3547 owners.push((new_owner, new_weight));
3548 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
3549 super_owners: Vec::new(),
3550 owners,
3551 multi_leader_rounds: ownership.multi_leader_rounds,
3552 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3553 timeout_config: ownership.timeout_config,
3554 })];
3555 match self.execute_block(operations, vec![]).await? {
3556 ClientOutcome::Committed(certificate) => Ok(ClientOutcome::Committed(certificate)),
3557 ClientOutcome::Conflict(certificate) => {
3558 info!(
3559 height = %certificate.block().header.height,
3560 "Another block was committed."
3561 );
3562 Ok(ClientOutcome::Conflict(certificate))
3563 }
3564 ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
3565 }
3566 }
3567
3568 #[instrument(level = "trace")]
3570 pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, ChainClientError> {
3571 Ok(self
3572 .client
3573 .local_node
3574 .chain_state_view(self.chain_id)
3575 .await?
3576 .execution_state
3577 .system
3578 .ownership
3579 .get()
3580 .clone())
3581 }
3582
3583 #[instrument(level = "trace")]
3586 pub async fn change_ownership(
3587 &self,
3588 ownership: ChainOwnership,
3589 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3590 Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
3591 super_owners: ownership.super_owners.into_iter().collect(),
3592 owners: ownership.owners.into_iter().collect(),
3593 multi_leader_rounds: ownership.multi_leader_rounds,
3594 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3595 timeout_config: ownership.timeout_config.clone(),
3596 }))
3597 .await
3598 }
3599
3600 #[instrument(level = "trace")]
3602 pub async fn query_application_permissions(
3603 &self,
3604 ) -> Result<ApplicationPermissions, ChainClientError> {
3605 Ok(self
3606 .client
3607 .local_node
3608 .chain_state_view(self.chain_id)
3609 .await?
3610 .execution_state
3611 .system
3612 .application_permissions
3613 .get()
3614 .clone())
3615 }
3616
3617 #[instrument(level = "trace", skip(application_permissions))]
3619 pub async fn change_application_permissions(
3620 &self,
3621 application_permissions: ApplicationPermissions,
3622 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3623 Box::pin(
3624 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
3625 application_permissions,
3626 )),
3627 )
3628 .await
3629 }
3630
3631 #[instrument(level = "trace", skip(self))]
3633 pub async fn open_chain(
3634 &self,
3635 ownership: ChainOwnership,
3636 application_permissions: ApplicationPermissions,
3637 balance: Amount,
3638 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, ChainClientError>
3639 {
3640 let config = OpenChainConfig {
3641 ownership: ownership.clone(),
3642 balance,
3643 application_permissions: application_permissions.clone(),
3644 };
3645 let operation = Operation::system(SystemOperation::OpenChain(config));
3646 let certificate = match self.execute_block(vec![operation], vec![]).await? {
3647 ClientOutcome::Committed(certificate) => certificate,
3648 ClientOutcome::Conflict(certificate) => {
3649 return Ok(ClientOutcome::Conflict(certificate));
3650 }
3651 ClientOutcome::WaitForTimeout(timeout) => {
3652 return Ok(ClientOutcome::WaitForTimeout(timeout));
3653 }
3654 };
3655 let chain_blob = certificate
3657 .block()
3658 .body
3659 .blobs
3660 .last()
3661 .and_then(|blobs| blobs.last())
3662 .ok_or_else(|| ChainClientError::InternalError("Failed to create a new chain"))?;
3663 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
3664 for owner in ownership.all_owners() {
3666 if self.client.has_key_for(owner).await? {
3667 self.client
3668 .extend_chain_mode(description.id(), ListeningMode::FullChain);
3669 break;
3670 }
3671 }
3672 self.client
3673 .local_node
3674 .retry_pending_cross_chain_requests(self.chain_id)
3675 .await?;
3676 Ok(ClientOutcome::Committed((description, certificate)))
3677 }
3678
3679 #[instrument(level = "trace")]
3682 pub async fn close_chain(
3683 &self,
3684 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3685 match Box::pin(self.execute_operation(SystemOperation::CloseChain)).await {
3686 Ok(outcome) => Ok(outcome.map(Some)),
3687 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3688 WorkerError::ChainError(chain_error),
3689 ))) if matches!(*chain_error, ChainError::ClosedChain) => {
3690 Ok(ClientOutcome::Committed(None)) }
3692 Err(error) => Err(error),
3693 }
3694 }
3695
3696 #[cfg(not(target_arch = "wasm32"))]
3698 #[instrument(level = "trace", skip(contract, service))]
3699 pub async fn publish_module(
3700 &self,
3701 contract: Bytecode,
3702 service: Bytecode,
3703 vm_runtime: VmRuntime,
3704 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3705 let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
3706 Box::pin(self.publish_module_blobs(blobs, module_id)).await
3707 }
3708
3709 #[cfg(not(target_arch = "wasm32"))]
3711 #[instrument(level = "trace", skip(blobs, module_id))]
3712 pub async fn publish_module_blobs(
3713 &self,
3714 blobs: Vec<Blob>,
3715 module_id: ModuleId,
3716 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3717 self.execute_operations(
3718 vec![Operation::system(SystemOperation::PublishModule {
3719 module_id,
3720 })],
3721 blobs,
3722 )
3723 .await?
3724 .try_map(|certificate| Ok((module_id, certificate)))
3725 }
3726
3727 #[instrument(level = "trace", skip(bytes))]
3729 pub async fn publish_data_blobs(
3730 &self,
3731 bytes: Vec<Vec<u8>>,
3732 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3733 let blobs = bytes.into_iter().map(Blob::new_data);
3734 let publish_blob_operations = blobs
3735 .clone()
3736 .map(|blob| {
3737 Operation::system(SystemOperation::PublishDataBlob {
3738 blob_hash: blob.id().hash,
3739 })
3740 })
3741 .collect();
3742 self.execute_operations(publish_blob_operations, blobs.collect())
3743 .await
3744 }
3745
3746 #[instrument(level = "trace", skip(bytes))]
3748 pub async fn publish_data_blob(
3749 &self,
3750 bytes: Vec<u8>,
3751 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3752 Box::pin(self.publish_data_blobs(vec![bytes])).await
3753 }
3754
3755 #[instrument(
3757 level = "trace",
3758 skip(self, parameters, instantiation_argument, required_application_ids)
3759 )]
3760 pub async fn create_application<
3761 A: Abi,
3762 Parameters: Serialize,
3763 InstantiationArgument: Serialize,
3764 >(
3765 &self,
3766 module_id: ModuleId<A, Parameters, InstantiationArgument>,
3767 parameters: &Parameters,
3768 instantiation_argument: &InstantiationArgument,
3769 required_application_ids: Vec<ApplicationId>,
3770 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, ChainClientError>
3771 {
3772 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
3773 let parameters = serde_json::to_vec(parameters)?;
3774 Ok(Box::pin(self.create_application_untyped(
3775 module_id.forget_abi(),
3776 parameters,
3777 instantiation_argument,
3778 required_application_ids,
3779 ))
3780 .await?
3781 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
3782 }
3783
3784 #[instrument(
3786 level = "trace",
3787 skip(
3788 self,
3789 module_id,
3790 parameters,
3791 instantiation_argument,
3792 required_application_ids
3793 )
3794 )]
3795 pub async fn create_application_untyped(
3796 &self,
3797 module_id: ModuleId,
3798 parameters: Vec<u8>,
3799 instantiation_argument: Vec<u8>,
3800 required_application_ids: Vec<ApplicationId>,
3801 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, ChainClientError> {
3802 Box::pin(self.execute_operation(SystemOperation::CreateApplication {
3803 module_id,
3804 parameters,
3805 instantiation_argument,
3806 required_application_ids,
3807 }))
3808 .await?
3809 .try_map(|certificate| {
3810 let mut creation: Vec<_> = certificate
3812 .block()
3813 .created_blob_ids()
3814 .into_iter()
3815 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
3816 .collect();
3817 if creation.len() > 1 {
3818 return Err(ChainClientError::InternalError(
3819 "Unexpected number of application descriptions published",
3820 ));
3821 }
3822 let blob_id = creation.pop().ok_or(ChainClientError::InternalError(
3823 "ApplicationDescription blob not found.",
3824 ))?;
3825 let id = ApplicationId::new(blob_id.hash);
3826 Ok((id, certificate))
3827 })
3828 }
3829
3830 #[instrument(level = "trace", skip(committee))]
3832 pub async fn stage_new_committee(
3833 &self,
3834 committee: Committee,
3835 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3836 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
3837 let blob_hash = blob.id().hash;
3838 match self
3839 .execute_operations(
3840 vec![Operation::system(SystemOperation::Admin(
3841 AdminOperation::PublishCommitteeBlob { blob_hash },
3842 ))],
3843 vec![blob],
3844 )
3845 .await?
3846 {
3847 ClientOutcome::Committed(_) => {}
3848 outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
3849 outcome @ ClientOutcome::Conflict(_) => return Ok(outcome),
3850 }
3851 let epoch = Box::pin(self.chain_info()).await?.epoch.try_add_one()?;
3852 Box::pin(
3853 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
3854 epoch,
3855 blob_hash,
3856 })),
3857 )
3858 .await
3859 }
3860
3861 #[instrument(level = "trace")]
3867 pub async fn process_inbox(
3868 &self,
3869 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3870 self.prepare_chain().await?;
3871 self.process_inbox_without_prepare().await
3872 }
3873
3874 #[instrument(level = "trace")]
3880 pub async fn process_inbox_without_prepare(
3881 &self,
3882 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3883 #[cfg(with_metrics)]
3884 let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
3885
3886 let mut certificates = Vec::new();
3887 loop {
3888 match self.execute_block(vec![], vec![]).await {
3892 Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
3893 Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
3894 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
3895 return Ok((certificates, Some(timeout)));
3896 }
3897 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3899 WorkerError::ChainError(chain_error),
3900 ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
3901 return Ok((certificates, None));
3902 }
3903 Err(error) => return Err(error),
3904 };
3905 }
3906 }
3907
3908 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, ChainClientError> {
3911 let (mut min_epoch, mut next_epoch) = {
3912 let (epoch, committees) = self.epoch_and_committees().await?;
3913 let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
3914 (min_epoch, epoch.try_add_one()?)
3915 };
3916 let mut epoch_change_ops = Vec::new();
3917 while self
3918 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
3919 .await?
3920 {
3921 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
3922 next_epoch,
3923 )));
3924 next_epoch.try_add_assign_one()?;
3925 }
3926 while self
3927 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
3928 .await?
3929 {
3930 epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
3931 min_epoch,
3932 )));
3933 min_epoch.try_add_assign_one()?;
3934 }
3935 Ok(epoch_change_ops)
3936 }
3937
3938 async fn has_admin_event(
3941 &self,
3942 stream_name: &[u8],
3943 index: u32,
3944 ) -> Result<bool, ChainClientError> {
3945 let event_id = EventId {
3946 chain_id: self.client.admin_chain_id,
3947 stream_id: StreamId::system(stream_name),
3948 index,
3949 };
3950 Ok(self
3951 .client
3952 .storage_client()
3953 .read_event(event_id)
3954 .await?
3955 .is_some())
3956 }
3957
3958 pub async fn events_from_index(
3960 &self,
3961 stream_id: StreamId,
3962 start_index: u32,
3963 ) -> Result<Vec<IndexAndEvent>, ChainClientError> {
3964 Ok(self
3965 .client
3966 .storage_client()
3967 .read_events_from_index(&self.chain_id, &stream_id, start_index)
3968 .await?)
3969 }
3970
3971 #[instrument(level = "trace")]
3976 pub async fn revoke_epochs(
3977 &self,
3978 revoked_epoch: Epoch,
3979 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3980 self.prepare_chain().await?;
3981 let (current_epoch, committees) = self.epoch_and_committees().await?;
3982 ensure!(
3983 revoked_epoch < current_epoch,
3984 ChainClientError::CannotRevokeCurrentEpoch(current_epoch)
3985 );
3986 ensure!(
3987 committees.contains_key(&revoked_epoch),
3988 ChainClientError::EpochAlreadyRevoked
3989 );
3990 let operations = committees
3991 .keys()
3992 .filter_map(|epoch| {
3993 if *epoch <= revoked_epoch {
3994 Some(Operation::system(SystemOperation::Admin(
3995 AdminOperation::RemoveCommittee { epoch: *epoch },
3996 )))
3997 } else {
3998 None
3999 }
4000 })
4001 .collect();
4002 self.execute_operations(operations, vec![]).await
4003 }
4004
4005 #[instrument(level = "trace")]
4009 pub async fn transfer_to_account_unsafe_unconfirmed(
4010 &self,
4011 owner: AccountOwner,
4012 amount: Amount,
4013 recipient: Account,
4014 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
4015 Box::pin(self.execute_operation(SystemOperation::Transfer {
4016 owner,
4017 recipient,
4018 amount,
4019 }))
4020 .await
4021 }
4022
4023 #[instrument(level = "trace", skip(hash))]
4024 pub async fn read_confirmed_block(
4025 &self,
4026 hash: CryptoHash,
4027 ) -> Result<ConfirmedBlock, ChainClientError> {
4028 let block = self
4029 .client
4030 .storage_client()
4031 .read_confirmed_block(hash)
4032 .await?;
4033 block.ok_or(ChainClientError::MissingConfirmedBlock(hash))
4034 }
4035
4036 #[instrument(level = "trace", skip(hash))]
4037 pub async fn read_certificate(
4038 &self,
4039 hash: CryptoHash,
4040 ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
4041 let certificate = self.client.storage_client().read_certificate(hash).await?;
4042 certificate.ok_or(ChainClientError::ReadCertificatesError(vec![hash]))
4043 }
4044
4045 #[instrument(level = "trace")]
4047 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), ChainClientError> {
4048 self.client
4049 .local_node
4050 .retry_pending_cross_chain_requests(self.chain_id)
4051 .await?;
4052 Ok(())
4053 }
4054
4055 #[instrument(level = "trace", skip(local_node))]
4056 async fn local_chain_info(
4057 &self,
4058 chain_id: ChainId,
4059 local_node: &mut LocalNodeClient<Env::Storage>,
4060 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
4061 match local_node.chain_info(chain_id).await {
4062 Ok(info) => {
4063 self.client.update_from_info(&info);
4065 Ok(Some(info))
4066 }
4067 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
4068 Err(err) => Err(err.into()),
4069 }
4070 }
4071
4072 #[instrument(level = "trace", skip(chain_id, local_node))]
4073 async fn local_next_block_height(
4074 &self,
4075 chain_id: ChainId,
4076 local_node: &mut LocalNodeClient<Env::Storage>,
4077 ) -> Result<BlockHeight, ChainClientError> {
4078 Ok(self
4079 .local_chain_info(chain_id, local_node)
4080 .await?
4081 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
4082 }
4083
4084 #[instrument(level = "trace")]
4087 async fn local_next_height_to_receive(
4088 &self,
4089 origin: ChainId,
4090 ) -> Result<BlockHeight, ChainClientError> {
4091 Ok(self
4092 .client
4093 .local_node
4094 .get_inbox_next_height(self.chain_id, origin)
4095 .await?)
4096 }
4097
4098 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
4099 async fn process_notification(
4100 &self,
4101 remote_node: RemoteNode<Env::ValidatorNode>,
4102 mut local_node: LocalNodeClient<Env::Storage>,
4103 notification: Notification,
4104 ) -> Result<(), ChainClientError> {
4105 let dominated = self
4106 .listening_mode()
4107 .is_none_or(|mode| !mode.is_relevant(¬ification.reason));
4108 if dominated {
4109 debug!(
4110 chain_id = %self.chain_id,
4111 reason = ?notification.reason,
4112 listening_mode = ?self.listening_mode(),
4113 "Ignoring notification due to listening mode"
4114 );
4115 return Ok(());
4116 }
4117 match notification.reason {
4118 Reason::NewIncomingBundle { origin, height } => {
4119 if self.local_next_height_to_receive(origin).await? > height {
4120 debug!(
4121 chain_id = %self.chain_id,
4122 "Accepting redundant notification for new message"
4123 );
4124 return Ok(());
4125 }
4126 self.client
4127 .download_sender_block_with_sending_ancestors(
4128 self.chain_id,
4129 origin,
4130 height,
4131 &remote_node,
4132 )
4133 .await?;
4134 if self.local_next_height_to_receive(origin).await? <= height {
4135 info!(
4136 chain_id = %self.chain_id,
4137 "NewIncomingBundle: Fail to synchronize new message after notification"
4138 );
4139 }
4140 }
4141 Reason::NewBlock { height, .. } => {
4142 let chain_id = notification.chain_id;
4143 if self
4144 .local_next_block_height(chain_id, &mut local_node)
4145 .await?
4146 > height
4147 {
4148 debug!(
4149 chain_id = %self.chain_id,
4150 "Accepting redundant notification for new block"
4151 );
4152 return Ok(());
4153 }
4154 self.client
4155 .synchronize_chain_state_from(&remote_node, chain_id)
4156 .await?;
4157 if self
4158 .local_next_block_height(chain_id, &mut local_node)
4159 .await?
4160 <= height
4161 {
4162 error!("NewBlock: Fail to synchronize new block after notification");
4163 }
4164 }
4165 Reason::NewRound { height, round } => {
4166 let chain_id = notification.chain_id;
4167 if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
4168 if (info.next_block_height, info.manager.current_round) >= (height, round) {
4169 debug!(
4170 chain_id = %self.chain_id,
4171 "Accepting redundant notification for new round"
4172 );
4173 return Ok(());
4174 }
4175 }
4176 self.client
4177 .synchronize_chain_state_from(&remote_node, chain_id)
4178 .await?;
4179 let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
4180 error!(
4181 chain_id = %self.chain_id,
4182 "NewRound: Fail to read local chain info for {chain_id}"
4183 );
4184 return Ok(());
4185 };
4186 if (info.next_block_height, info.manager.current_round) < (height, round) {
4187 info!(
4188 chain_id = %self.chain_id,
4189 "NewRound: Fail to synchronize new block after notification"
4190 );
4191 }
4192 }
4193 Reason::BlockExecuted { .. } => {
4194 }
4196 }
4197 Ok(())
4198 }
4199
4200 pub fn is_tracked(&self) -> bool {
4202 self.client.is_tracked(self.chain_id)
4203 }
4204
4205 pub fn listening_mode(&self) -> Option<ListeningMode> {
4207 self.client.chain_mode(self.chain_id)
4208 }
4209
4210 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
4215 pub async fn listen(
4216 &self,
4217 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), ChainClientError> {
4218 use future::FutureExt as _;
4219
4220 async fn await_while_polling<F: FusedFuture>(
4221 future: F,
4222 background_work: impl FusedStream<Item = ()>,
4223 ) -> F::Output {
4224 tokio::pin!(future);
4225 tokio::pin!(background_work);
4226 loop {
4227 futures::select! {
4228 _ = background_work.next() => (),
4229 result = future => return result,
4230 }
4231 }
4232 }
4233
4234 let mut senders = HashMap::new(); let notifications = self.subscribe()?;
4236 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
4237
4238 let mut process_notifications = FuturesUnordered::new();
4245
4246 match self.update_notification_streams(&mut senders).await {
4247 Ok(handler) => process_notifications.push(handler),
4248 Err(error) => error!("Failed to update committee: {error}"),
4249 };
4250
4251 let this = self.clone();
4252 let update_streams = async move {
4253 let mut abortable_notifications = abortable_notifications.fuse();
4254
4255 while let Some(notification) =
4256 await_while_polling(abortable_notifications.next(), &mut process_notifications)
4257 .await
4258 {
4259 if let Reason::NewBlock { .. } = notification.reason {
4260 match Box::pin(await_while_polling(
4261 this.update_notification_streams(&mut senders).fuse(),
4262 &mut process_notifications,
4263 ))
4264 .await
4265 {
4266 Ok(handler) => process_notifications.push(handler),
4267 Err(error) => error!("Failed to update committee: {error}"),
4268 }
4269 }
4270 }
4271
4272 for abort in senders.into_values() {
4273 abort.abort();
4274 }
4275
4276 let () = process_notifications.collect().await;
4277 }
4278 .in_current_span();
4279
4280 Ok((update_streams, AbortOnDrop(abort), notifications))
4281 }
4282
4283 #[instrument(level = "trace", skip(senders))]
4284 async fn update_notification_streams(
4285 &self,
4286 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
4287 ) -> Result<impl Future<Output = ()>, ChainClientError> {
4288 let (nodes, local_node) = {
4289 let committee = self.local_committee().await?;
4290 let nodes: HashMap<_, _> = self
4291 .client
4292 .validator_node_provider()
4293 .make_nodes(&committee)?
4294 .collect();
4295 (nodes, self.client.local_node.clone())
4296 };
4297 senders.retain(|validator, abort| {
4299 if !nodes.contains_key(validator) {
4300 abort.abort();
4301 }
4302 !abort.is_aborted()
4303 });
4304 let validator_tasks = FuturesUnordered::new();
4306 for (public_key, node) in nodes {
4307 let address = node.address();
4308 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
4309 continue;
4310 };
4311 let this = self.clone();
4312 let stream = stream::once({
4313 let node = node.clone();
4314 async move {
4315 let stream = node.subscribe(vec![this.chain_id]).await?;
4316 let remote_node = RemoteNode { public_key, node };
4319 this.client
4320 .synchronize_chain_state_from(&remote_node, this.chain_id)
4321 .await?;
4322 Ok::<_, ChainClientError>(stream)
4323 }
4324 })
4325 .filter_map(move |result| {
4326 let address = address.clone();
4327 async move {
4328 if let Err(error) = &result {
4329 info!(?error, address, "could not connect to validator");
4330 } else {
4331 debug!(address, "connected to validator");
4332 }
4333 result.ok()
4334 }
4335 })
4336 .flatten();
4337 let (stream, abort) = stream::abortable(stream);
4338 let mut stream = Box::pin(stream);
4339 let this = self.clone();
4340 let local_node = local_node.clone();
4341 let remote_node = RemoteNode { public_key, node };
4342 validator_tasks.push(async move {
4343 while let Some(notification) = stream.next().await {
4344 if let Err(error) = this
4345 .process_notification(
4346 remote_node.clone(),
4347 local_node.clone(),
4348 notification.clone(),
4349 )
4350 .await
4351 {
4352 tracing::info!(
4353 chain_id = %this.chain_id,
4354 address = remote_node.address(),
4355 ?notification,
4356 %error,
4357 "failed to process notification",
4358 );
4359 }
4360 }
4361 });
4362 entry.insert(abort);
4363 }
4364 Ok(validator_tasks.collect())
4365 }
4366
4367 #[instrument(level = "trace", skip(remote_node))]
4369 pub async fn sync_validator(
4370 &self,
4371 remote_node: Env::ValidatorNode,
4372 ) -> Result<(), ChainClientError> {
4373 let validator_next_block_height = match remote_node
4374 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
4375 .await
4376 {
4377 Ok(info) => info.info.next_block_height,
4378 Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
4379 Err(err) => return Err(err.into()),
4380 };
4381 let local_next_block_height = self.chain_info().await?.next_block_height;
4382
4383 if validator_next_block_height >= local_next_block_height {
4384 debug!("Validator is up-to-date with local state");
4385 return Ok(());
4386 }
4387
4388 let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
4389 .map(BlockHeight)
4390 .collect();
4391
4392 let certificates = self
4393 .client
4394 .storage_client()
4395 .read_certificates_by_heights(self.chain_id, &heights)
4396 .await?
4397 .into_iter()
4398 .flatten()
4399 .collect::<Vec<_>>();
4400
4401 for certificate in certificates {
4402 match remote_node
4403 .handle_confirmed_certificate(
4404 certificate.clone(),
4405 CrossChainMessageDelivery::NonBlocking,
4406 )
4407 .await
4408 {
4409 Ok(_) => (),
4410 Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
4411 let missing_blobs: Vec<_> = self
4413 .client
4414 .storage_client()
4415 .read_blobs(&missing_blob_ids)
4416 .await?
4417 .into_iter()
4418 .flatten()
4419 .collect();
4420 remote_node.upload_blobs(missing_blobs).await?;
4421 remote_node
4422 .handle_confirmed_certificate(
4423 certificate,
4424 CrossChainMessageDelivery::NonBlocking,
4425 )
4426 .await?;
4427 }
4428 Err(err) => return Err(err.into()),
4429 }
4430 }
4431
4432 Ok(())
4433 }
4434}
4435
4436async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
4439 nodes: &[RemoteNode<A>],
4440 f: F,
4441 err: G,
4442 timeout: Duration,
4443) -> Result<V, E2>
4444where
4445 F: Clone + FnOnce(RemoteNode<A>) -> R,
4446 RemoteNode<A>: Clone,
4447 G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
4448 R: Future<Output = Result<V, E1>> + 'a,
4449{
4450 let mut stream = nodes
4451 .iter()
4452 .zip(0..)
4453 .map(|(remote_node, i)| {
4454 let fun = f.clone();
4455 let node = remote_node.clone();
4456 async move {
4457 linera_base::time::timer::sleep(timeout * i * i).await;
4458 fun(node).await.map_err(|err| (remote_node.public_key, err))
4459 }
4460 })
4461 .collect::<FuturesUnordered<_>>();
4462 let mut errors = vec![];
4463 while let Some(maybe_result) = stream.next().await {
4464 match maybe_result {
4465 Ok(result) => return Ok(result),
4466 Err(error) => errors.push(error),
4467 };
4468 }
4469 Err(err(errors))
4470}
4471
4472#[cfg(with_testing)]
4473impl<Env: Environment> ChainClient<Env> {
4474 pub async fn process_notification_from(
4475 &self,
4476 notification: Notification,
4477 validator: (ValidatorPublicKey, &str),
4478 ) {
4479 let mut node_list = self
4480 .client
4481 .validator_node_provider()
4482 .make_nodes_from_list(vec![validator])
4483 .unwrap();
4484 let (public_key, node) = node_list.next().unwrap();
4485 let remote_node = RemoteNode { node, public_key };
4486 let local_node = self.client.local_node.clone();
4487 self.process_notification(remote_node, local_node, notification)
4488 .await
4489 .unwrap();
4490 }
4491}
4492
4493#[must_use]
4495pub struct AbortOnDrop(pub AbortHandle);
4496
4497impl Drop for AbortOnDrop {
4498 #[instrument(level = "trace", skip(self))]
4499 fn drop(&mut self) {
4500 self.0.abort();
4501 }
4502}
4503
4504#[derive(Clone, Serialize, Deserialize)]
4506pub struct PendingProposal {
4507 pub block: ProposedBlock,
4508 pub blobs: Vec<Blob>,
4509}
4510
4511enum ReceiveCertificateMode {
4512 NeedsCheck,
4513 AlreadyChecked,
4514}
4515
4516enum CheckCertificateResult {
4517 OldEpoch,
4518 New,
4519 FutureEpoch,
4520}
4521
4522impl CheckCertificateResult {
4523 fn into_result(self) -> Result<(), ChainClientError> {
4524 match self {
4525 Self::OldEpoch => Err(ChainClientError::CommitteeDeprecationError),
4526 Self::New => Ok(()),
4527 Self::FutureEpoch => Err(ChainClientError::CommitteeSynchronizationError),
4528 }
4529 }
4530}
4531
4532#[cfg(not(target_arch = "wasm32"))]
4534pub async fn create_bytecode_blobs(
4535 contract: Bytecode,
4536 service: Bytecode,
4537 vm_runtime: VmRuntime,
4538) -> (Vec<Blob>, ModuleId) {
4539 match vm_runtime {
4540 VmRuntime::Wasm => {
4541 let (compressed_contract, compressed_service) =
4542 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
4543 .await
4544 .expect("Compression should not panic");
4545 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
4546 let service_blob = Blob::new_service_bytecode(compressed_service);
4547 let module_id =
4548 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
4549 (vec![contract_blob, service_blob], module_id)
4550 }
4551 VmRuntime::Evm => {
4552 let compressed_contract = contract.compress();
4553 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
4554 let module_id = ModuleId::new(
4555 evm_contract_blob.id().hash,
4556 evm_contract_blob.id().hash,
4557 vm_runtime,
4558 );
4559 (vec![evm_contract_blob], module_id)
4560 }
4561 }
4562}