1use std::{
6 collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
7 convert::Infallible,
8 iter,
9 ops::{Deref, DerefMut},
10 sync::{Arc, RwLock},
11};
12
13use chain_client_state::ChainClientState;
14use custom_debug_derive::Debug;
15use dashmap::{
16 mapref::one::{MappedRef as DashMapMappedRef, Ref as DashMapRef, RefMut as DashMapRefMut},
17 DashMap,
18};
19use futures::{
20 future::{self, Either, FusedFuture, Future},
21 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt},
22};
23#[cfg(with_metrics)]
24use linera_base::prometheus_util::MeasureLatency as _;
25use linera_base::{
26 abi::Abi,
27 crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
28 data_types::{
29 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
30 ChainDescription, Epoch, Round, Timestamp,
31 },
32 ensure,
33 identifiers::{
34 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
35 ModuleId, StreamId,
36 },
37 ownership::{ChainOwnership, TimeoutConfig},
38 time::{Duration, Instant},
39};
40#[cfg(not(target_arch = "wasm32"))]
41use linera_base::{data_types::Bytecode, vm::VmRuntime};
42use linera_chain::{
43 data_types::{
44 BlockProposal, ChainAndHeight, IncomingBundle, LiteVote, MessageAction, ProposedBlock,
45 Transaction,
46 },
47 manager::LockingBlock,
48 types::{
49 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
50 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
51 },
52 ChainError, ChainExecutionContext, ChainStateView,
53};
54use linera_execution::{
55 committee::Committee,
56 system::{
57 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
58 REMOVED_EPOCH_STREAM_NAME,
59 },
60 ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
61};
62use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
63use linera_views::ViewError;
64use rand::prelude::SliceRandom as _;
65use serde::{Deserialize, Serialize};
66use thiserror::Error;
67use tokio::sync::{mpsc, OwnedRwLockReadGuard};
68use tokio_stream::wrappers::UnboundedReceiverStream;
69use tracing::{debug, error, info, instrument, warn, Instrument as _};
70
71use crate::{
72 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout},
73 environment::Environment,
74 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
75 node::{
76 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
77 ValidatorNodeProvider as _,
78 },
79 notifier::ChannelNotifier,
80 remote_node::RemoteNode,
81 updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
82 worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
83};
84
85mod chain_client_state;
86#[cfg(test)]
87#[path = "../unit_tests/client_tests.rs"]
88mod client_tests;
89
90#[cfg(with_metrics)]
91mod metrics {
92 use std::sync::LazyLock;
93
94 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
95 use prometheus::HistogramVec;
96
97 pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
98 LazyLock::new(|| {
99 register_histogram_vec(
100 "process_inbox_latency",
101 "process_inbox latency",
102 &[],
103 exponential_bucket_latencies(500.0),
104 )
105 });
106
107 pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
108 register_histogram_vec(
109 "prepare_chain_latency",
110 "prepare_chain latency",
111 &[],
112 exponential_bucket_latencies(500.0),
113 )
114 });
115
116 pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
117 register_histogram_vec(
118 "synchronize_chain_state_latency",
119 "synchronize_chain_state latency",
120 &[],
121 exponential_bucket_latencies(500.0),
122 )
123 });
124
125 pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
126 register_histogram_vec(
127 "execute_block_latency",
128 "execute_block latency",
129 &[],
130 exponential_bucket_latencies(500.0),
131 )
132 });
133
134 pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
135 register_histogram_vec(
136 "find_received_certificates_latency",
137 "find_received_certificates latency",
138 &[],
139 exponential_bucket_latencies(500.0),
140 )
141 });
142}
143
144pub struct Client<Env: Environment> {
146 environment: Env,
147 local_node: LocalNodeClient<Env::Storage>,
150 admin_id: ChainId,
152 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
155 notifier: Arc<ChannelNotifier<Notification>>,
157 chains: DashMap<ChainId, ChainClientState>,
159 options: ChainClientOptions,
161}
162
163impl<Env: Environment> Client<Env> {
164 #[instrument(level = "trace", skip_all)]
166 pub fn new(
167 environment: Env,
168 admin_id: ChainId,
169 long_lived_services: bool,
170 tracked_chains: impl IntoIterator<Item = ChainId>,
171 name: impl Into<String>,
172 chain_worker_ttl: Duration,
173 options: ChainClientOptions,
174 ) -> Self {
175 let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
176 let state = WorkerState::new_for_client(
177 name.into(),
178 environment.storage().clone(),
179 tracked_chains.clone(),
180 )
181 .with_long_lived_services(long_lived_services)
182 .with_allow_inactive_chains(true)
183 .with_allow_messages_from_deprecated_epochs(true)
184 .with_chain_worker_ttl(chain_worker_ttl);
185 let local_node = LocalNodeClient::new(state);
186
187 Self {
188 environment,
189 local_node,
190 chains: DashMap::new(),
191 admin_id,
192 tracked_chains,
193 notifier: Arc::new(ChannelNotifier::default()),
194 options,
195 }
196 }
197
198 pub fn storage_client(&self) -> &Env::Storage {
200 self.environment.storage()
201 }
202
203 pub fn validator_node_provider(&self) -> &Env::Network {
204 self.environment.network()
205 }
206
207 #[instrument(level = "trace", skip(self))]
209 pub fn signer(&self) -> &impl Signer {
210 self.environment.signer()
211 }
212
213 #[instrument(level = "trace", skip(self))]
215 pub fn track_chain(&self, chain_id: ChainId) {
216 self.tracked_chains
217 .write()
218 .expect("Panics should not happen while holding a lock to `tracked_chains`")
219 .insert(chain_id);
220 }
221
222 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
224 pub fn create_chain_client(
225 self: &Arc<Self>,
226 chain_id: ChainId,
227 block_hash: Option<CryptoHash>,
228 next_block_height: BlockHeight,
229 pending_proposal: Option<PendingProposal>,
230 preferred_owner: Option<AccountOwner>,
231 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
232 ) -> ChainClient<Env> {
233 if let dashmap::mapref::entry::Entry::Vacant(e) = self.chains.entry(chain_id) {
236 e.insert(ChainClientState::new(pending_proposal));
237 }
238
239 ChainClient {
240 client: self.clone(),
241 chain_id,
242 options: self.options.clone(),
243 preferred_owner,
244 initial_block_hash: block_hash,
245 initial_next_block_height: next_block_height,
246 timing_sender,
247 }
248 }
249
250 async fn fetch_chain_info(
252 &self,
253 chain_id: ChainId,
254 validators: &[RemoteNode<Env::ValidatorNode>],
255 ) -> Result<Box<ChainInfo>, ChainClientError> {
256 match self.local_node.chain_info(chain_id).await {
257 Ok(info) => Ok(info),
258 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
259 self.synchronize_chain_state(self.admin_id).await?;
261 self.update_local_node_with_blobs_from(blob_ids, validators)
264 .await?;
265 Ok(self.local_node.chain_info(chain_id).await?)
266 }
267 Err(err) => Err(err.into()),
268 }
269 }
270
271 #[instrument(level = "trace", skip(self))]
273 async fn download_certificates(
274 &self,
275 chain_id: ChainId,
276 target_next_block_height: BlockHeight,
277 ) -> Result<Box<ChainInfo>, ChainClientError> {
278 let mut validators = self.validator_nodes().await?;
279 validators.shuffle(&mut rand::thread_rng());
281 let mut info = self.fetch_chain_info(chain_id, &validators).await?;
282 for remote_node in validators {
283 if target_next_block_height <= info.next_block_height {
284 return Ok(info);
285 }
286 match self
287 .download_certificates_from(&remote_node, chain_id, target_next_block_height)
288 .await
289 {
290 Err(err) => warn!(
291 "Failed to download certificates from validator {:?}: {err}",
292 remote_node.public_key
293 ),
294 Ok(Some(new_info)) => info = new_info,
295 Ok(None) => {}
296 }
297 }
298 ensure!(
299 target_next_block_height <= info.next_block_height,
300 ChainClientError::CannotDownloadCertificates {
301 chain_id,
302 target_next_block_height,
303 }
304 );
305 Ok(info)
306 }
307
308 #[instrument(level = "trace", skip_all)]
311 async fn download_certificates_from(
312 &self,
313 remote_node: &RemoteNode<Env::ValidatorNode>,
314 chain_id: ChainId,
315 stop: BlockHeight,
316 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
317 let mut last_info = None;
318 let mut hashes = Vec::new();
320 let mut next_height = BlockHeight::ZERO;
321 {
322 let chain = self.local_node.chain_state_view(chain_id).await?;
323 next_height = next_height.max(chain.tip_state.get().next_block_height);
324 while next_height < stop {
325 let Some(hash) = chain.preprocessed_blocks.get(&next_height).await? else {
326 break;
327 };
328 hashes.push(hash);
329 next_height = next_height.try_add_one()?;
330 }
331 }
332 let certificates = self
333 .storage_client()
334 .read_certificates(hashes.clone())
335 .await?;
336 let certificates = match ResultReadCertificates::new(certificates, hashes) {
337 ResultReadCertificates::Certificates(certificates) => certificates,
338 ResultReadCertificates::InvalidHashes(hashes) => {
339 return Err(ChainClientError::ReadCertificatesError(hashes))
340 }
341 };
342 for certificate in certificates {
343 last_info = Some(self.handle_certificate(Box::new(certificate)).await?.info);
344 }
345 while next_height < stop {
347 let limit = u64::from(stop)
349 .checked_sub(u64::from(next_height))
350 .ok_or(ArithmeticError::Overflow)?
351 .min(1000);
352 let certificates = remote_node
353 .query_certificates_from(chain_id, next_height, limit)
354 .await?;
355 let Some(info) = self.process_certificates(remote_node, certificates).await? else {
356 break;
357 };
358 assert!(info.next_block_height > next_height);
359 next_height = info.next_block_height;
360 last_info = Some(info);
361 }
362 Ok(last_info)
363 }
364
365 #[instrument(level = "trace", skip_all)]
368 async fn process_certificates(
369 &self,
370 remote_node: &RemoteNode<impl ValidatorNode>,
371 certificates: Vec<ConfirmedBlockCertificate>,
372 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
373 let mut info = None;
374 for certificate in certificates {
375 let certificate = Box::new(certificate);
376 let mut result = self.handle_certificate(certificate.clone()).await;
377
378 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
379 let blobs = future::join_all(blob_ids.iter().map(|blob_id| async move {
380 remote_node.try_download_blob(*blob_id).await.unwrap()
381 }))
382 .await;
383 self.local_node.store_blobs(&blobs).await?;
384 result = self.handle_certificate(certificate.clone()).await;
385 }
386
387 info = Some(result?.info);
388 }
389 Ok(info)
391 }
392
393 async fn handle_certificate<T: ProcessableCertificate>(
394 &self,
395 certificate: Box<GenericCertificate<T>>,
396 ) -> Result<ChainInfoResponse, LocalNodeError> {
397 self.local_node
398 .handle_certificate(*certificate, &self.notifier)
399 .await
400 }
401
402 async fn chain_info_with_committees(
403 &self,
404 chain_id: ChainId,
405 ) -> Result<Box<ChainInfo>, LocalNodeError> {
406 let query = ChainInfoQuery::new(chain_id).with_committees();
407 let info = self.local_node.handle_chain_info_query(query).await?.info;
408 Ok(info)
409 }
410
411 #[instrument(level = "trace", skip_all)]
414 async fn admin_committees(
415 &self,
416 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
417 let info = self.chain_info_with_committees(self.admin_id).await?;
418 Ok((info.epoch, info.into_committees()?))
419 }
420
421 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
423 let info = self.chain_info_with_committees(self.admin_id).await?;
424 Ok((info.epoch, info.into_current_committee()?))
425 }
426
427 async fn validator_nodes(
429 &self,
430 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, ChainClientError> {
431 let (_, committee) = self.admin_committee().await?;
432 Ok(self.make_nodes(&committee)?)
433 }
434
435 fn make_nodes(
437 &self,
438 committee: &Committee,
439 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
440 Ok(self
441 .validator_node_provider()
442 .make_nodes(committee)?
443 .map(|(public_key, node)| RemoteNode { public_key, node })
444 .collect())
445 }
446
447 pub async fn get_chain_description(
450 &self,
451 chain_id: ChainId,
452 ) -> Result<ChainDescription, ChainClientError> {
453 let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
454 let blob = self
455 .local_node
456 .storage_client()
457 .read_blob(chain_desc_id)
458 .await?;
459 if let Some(blob) = blob {
460 return Ok(bcs::from_bytes(blob.bytes())?);
462 };
463 self.synchronize_chain_state(self.admin_id).await?;
465 let nodes = self.validator_nodes().await?;
466 let blob = self
467 .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
468 .await?
469 .pop()
470 .unwrap(); Ok(bcs::from_bytes(blob.bytes())?)
472 }
473
474 #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
476 fn update_from_info(&self, info: &ChainInfo) {
477 if let Some(mut state) = self.chains.get_mut(&info.chain_id) {
478 state.value_mut().update_from_info(info);
479 }
480 }
481
482 #[instrument(level = "trace", skip_all)]
484 async fn process_certificate<T: ProcessableCertificate>(
485 &self,
486 certificate: Box<GenericCertificate<T>>,
487 ) -> Result<(), LocalNodeError> {
488 let info = self.handle_certificate(certificate).await?.info;
489 self.update_from_info(&info);
490 Ok(())
491 }
492
493 #[instrument(level = "trace", skip_all)]
495 async fn finalize_block(
496 &self,
497 committee: &Committee,
498 certificate: ValidatedBlockCertificate,
499 ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
500 debug!(round = %certificate.round, "Submitting block for confirmation");
501 let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
502 let finalize_action = CommunicateAction::FinalizeBlock {
503 certificate: Box::new(certificate),
504 delivery: self.options.cross_chain_message_delivery,
505 };
506 let certificate = self
507 .communicate_chain_action(committee, finalize_action, hashed_value)
508 .await?;
509 self.receive_certificate(certificate.clone(), ReceiveCertificateMode::AlreadyChecked)
510 .await?;
511 Ok(certificate)
512 }
513
514 #[instrument(level = "trace", skip_all)]
516 async fn submit_block_proposal<T: ProcessableCertificate>(
517 &self,
518 committee: &Committee,
519 proposal: Box<BlockProposal>,
520 value: T,
521 ) -> Result<GenericCertificate<T>, ChainClientError> {
522 debug!(
523 round = %proposal.content.round,
524 "Submitting block proposal to validators"
525 );
526 let submit_action = CommunicateAction::SubmitBlock {
527 proposal,
528 blob_ids: value.required_blob_ids().into_iter().collect(),
529 };
530 let certificate = self
531 .communicate_chain_action(committee, submit_action, value)
532 .await?;
533 self.process_certificate(Box::new(certificate.clone()))
534 .await?;
535 Ok(certificate)
536 }
537
538 #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
540 async fn communicate_chain_updates(
541 &self,
542 committee: &Committee,
543 chain_id: ChainId,
544 height: BlockHeight,
545 delivery: CrossChainMessageDelivery,
546 ) -> Result<(), ChainClientError> {
547 let nodes = self.make_nodes(committee)?;
548 communicate_with_quorum(
549 &nodes,
550 committee,
551 |_: &()| (),
552 |remote_node| {
553 let mut updater = ValidatorUpdater {
554 remote_node,
555 local_node: self.local_node.clone(),
556 admin_id: self.admin_id,
557 };
558 Box::pin(async move {
559 updater
560 .send_chain_information(chain_id, height, delivery)
561 .await
562 })
563 },
564 self.options.grace_period,
565 )
566 .await?;
567 Ok(())
568 }
569
570 #[instrument(level = "trace", skip_all)]
576 async fn communicate_chain_action<T: CertificateValue>(
577 &self,
578 committee: &Committee,
579 action: CommunicateAction,
580 value: T,
581 ) -> Result<GenericCertificate<T>, ChainClientError> {
582 let nodes = self.make_nodes(committee)?;
583 let ((votes_hash, votes_round), votes) = communicate_with_quorum(
584 &nodes,
585 committee,
586 |vote: &LiteVote| (vote.value.value_hash, vote.round),
587 |remote_node| {
588 let mut updater = ValidatorUpdater {
589 remote_node,
590 local_node: self.local_node.clone(),
591 admin_id: self.admin_id,
592 };
593 let action = action.clone();
594 Box::pin(async move { updater.send_chain_update(action).await })
595 },
596 self.options.grace_period,
597 )
598 .await?;
599 ensure!(
600 (votes_hash, votes_round) == (value.hash(), action.round()),
601 ChainClientError::UnexpectedQuorum {
602 hash: votes_hash,
603 round: votes_round,
604 expected_hash: value.hash(),
605 expected_round: action.round(),
606 }
607 );
608 let certificate = LiteCertificate::try_from_votes(votes)
613 .ok_or_else(|| {
614 ChainClientError::InternalError("Vote values or rounds don't match; this is a bug")
615 })?
616 .with_value(value)
617 .ok_or_else(|| {
618 ChainClientError::ProtocolError("A quorum voted for an unexpected value")
619 })?;
620 Ok(certificate)
621 }
622
623 #[instrument(level = "trace", skip_all)]
626 async fn receive_certificate_and_update_validators(
627 &self,
628 certificate: ConfirmedBlockCertificate,
629 mode: ReceiveCertificateMode,
630 ) -> Result<(), ChainClientError> {
631 let block_chain_id = certificate.block().header.chain_id;
632 let block_height = certificate.block().header.height;
633
634 self.receive_certificate(certificate, mode).await?;
635
636 let local_committee = self
639 .chain_info_with_committees(block_chain_id)
640 .await?
641 .into_current_committee()?;
642 self.communicate_chain_updates(
643 &local_committee,
644 block_chain_id,
645 block_height.try_add_one()?,
646 CrossChainMessageDelivery::Blocking,
647 )
648 .await?;
649 Ok(())
650 }
651
652 #[instrument(level = "trace", skip_all)]
655 async fn receive_certificate(
656 &self,
657 certificate: ConfirmedBlockCertificate,
658 mode: ReceiveCertificateMode,
659 ) -> Result<(), ChainClientError> {
660 let certificate = Box::new(certificate);
661 let block = certificate.block();
662
663 let (max_epoch, committees) = self.admin_committees().await?;
665 if let ReceiveCertificateMode::NeedsCheck = mode {
666 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
667 }
668 self.download_certificates(block.header.chain_id, block.header.height)
670 .await?;
671 if let Err(err) = self.process_certificate(certificate.clone()).await {
674 match &err {
675 LocalNodeError::BlobsNotFound(blob_ids) => {
676 let blobs = RemoteNode::download_blobs(
677 blob_ids,
678 &self.validator_nodes().await?,
679 self.options.blob_download_timeout,
680 )
681 .await
682 .ok_or(err)?;
683 self.local_node.store_blobs(&blobs).await?;
684 self.process_certificate(certificate).await?;
685 }
686 _ => {
687 warn!("Failed to process network hashed certificate value");
689 return Err(err.into());
690 }
691 }
692 }
693
694 Ok(())
695 }
696
697 #[instrument(level = "trace", skip_all)]
699 #[allow(dead_code)] async fn receive_sender_certificate(
701 &self,
702 certificate: ConfirmedBlockCertificate,
703 mode: ReceiveCertificateMode,
704 nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
705 ) -> Result<(), ChainClientError> {
706 let certificate = Box::new(certificate);
707
708 let (max_epoch, committees) = self.admin_committees().await?;
710 if let ReceiveCertificateMode::NeedsCheck = mode {
711 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
712 }
713 let nodes = if let Some(nodes) = nodes {
715 nodes
716 } else {
717 self.validator_nodes().await?
718 };
719 if let Err(err) = self.handle_certificate(certificate.clone()).await {
720 match &err {
721 LocalNodeError::BlobsNotFound(blob_ids) => {
722 let blobs = RemoteNode::download_blobs(
723 blob_ids,
724 &nodes,
725 self.options.blob_download_timeout,
726 )
727 .await
728 .ok_or(err)?;
729 self.local_node.store_blobs(&blobs).await?;
730 self.handle_certificate(certificate.clone()).await?;
731 }
732 _ => {
733 warn!("Failed to process network hashed certificate value");
735 return Err(err.into());
736 }
737 }
738 }
739
740 Ok(())
741 }
742
743 #[instrument(level = "trace", skip(self))]
746 async fn synchronize_received_certificates_from_validator(
747 &self,
748 chain_id: ChainId,
749 remote_node: &RemoteNode<Env::ValidatorNode>,
750 ) -> Result<ReceivedCertificatesFromValidator, ChainClientError> {
751 let mut tracker = self
752 .local_node
753 .chain_state_view(chain_id)
754 .await?
755 .received_certificate_trackers
756 .get()
757 .get(&remote_node.public_key)
758 .copied()
759 .unwrap_or(0);
760 let (max_epoch, committees) = self.admin_committees().await?;
761
762 let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(tracker);
764 let info = remote_node.handle_chain_info_query(query).await?;
765 let remote_log = info.requested_received_log;
766 let remote_heights = Self::heights_per_chain(&remote_log);
767
768 let local_next_heights = self
770 .local_node
771 .next_outbox_heights(remote_heights.keys(), chain_id)
772 .await?;
773
774 let mut downloaded_heights = BTreeMap::new();
776 let mut other_sender_chains = Vec::new();
779
780 let certificates = future::try_join_all(remote_heights.into_iter().filter_map(
781 |(sender_chain_id, remote_heights)| {
782 let local_next = *local_next_heights.get(&sender_chain_id)?;
783 if let Ok(height) = local_next.try_sub_one() {
784 downloaded_heights.insert(sender_chain_id, height);
785 }
786 let remote_heights = remote_heights
787 .into_iter()
788 .filter(|h| *h >= local_next)
789 .collect::<Vec<_>>();
790 if remote_heights.is_empty() {
791 other_sender_chains.push(sender_chain_id);
795 return None;
796 };
797 Some(async move {
798 let certificates = remote_node
799 .download_certificates_by_heights(sender_chain_id, remote_heights)
800 .await?;
801 Ok::<Vec<_>, ChainClientError>(certificates)
802 })
803 },
804 ))
805 .await?
806 .into_iter()
807 .flatten()
808 .collect::<Vec<_>>();
809
810 let mut certificates_by_height_by_chain = BTreeMap::new();
811
812 for confirmed_block_certificate in certificates {
814 let block_header = &confirmed_block_certificate.inner().block().header;
815 let sender_chain_id = block_header.chain_id;
816 let height = block_header.height;
817 let epoch = block_header.epoch;
818 match Self::check_certificate(max_epoch, &committees, &confirmed_block_certificate)? {
819 CheckCertificateResult::FutureEpoch => {
820 warn!(
821 "Postponing received certificate from {sender_chain_id:.8} at height \
822 {height} from future epoch {epoch}"
823 );
824 }
827 CheckCertificateResult::OldEpoch => {
828 warn!("Skipping received certificate from past epoch {epoch:?}");
833 }
834 CheckCertificateResult::New => {
835 certificates_by_height_by_chain
836 .entry(sender_chain_id)
837 .or_insert_with(BTreeMap::new)
838 .insert(height, confirmed_block_certificate);
839 }
840 }
841 }
842
843 for entry in remote_log {
845 if certificates_by_height_by_chain
846 .get(&entry.chain_id)
847 .is_some_and(|certs| certs.contains_key(&entry.height))
848 {
849 tracker += 1;
850 } else {
851 break;
852 }
853 }
854
855 for (sender_chain_id, certs) in &mut certificates_by_height_by_chain {
856 if certs
857 .values()
858 .any(|cert| !cert.block().recipients().contains(&chain_id))
859 {
860 warn!(
861 "Skipping received certificates from chain {sender_chain_id:.8}:
862 No messages for {chain_id:.8}."
863 );
864 certs.clear();
865 }
866 }
867
868 Ok(ReceivedCertificatesFromValidator {
869 public_key: remote_node.public_key,
870 tracker,
871 certificates: certificates_by_height_by_chain
872 .into_values()
873 .flat_map(BTreeMap::into_values)
874 .collect(),
875 other_sender_chains,
876 })
877 }
878
879 #[instrument(
880 level = "trace", skip_all,
881 fields(certificate_hash = ?incoming_certificate.hash()),
882 )]
883 fn check_certificate(
884 highest_known_epoch: Epoch,
885 committees: &BTreeMap<Epoch, Committee>,
886 incoming_certificate: &ConfirmedBlockCertificate,
887 ) -> Result<CheckCertificateResult, NodeError> {
888 let block = incoming_certificate.block();
889 if block.header.epoch > highest_known_epoch {
891 return Ok(CheckCertificateResult::FutureEpoch);
892 }
893 if let Some(known_committee) = committees.get(&block.header.epoch) {
894 incoming_certificate.check(known_committee)?;
897 Ok(CheckCertificateResult::New)
898 } else {
899 Ok(CheckCertificateResult::OldEpoch)
901 }
902 }
903
904 fn heights_per_chain(
907 remote_log: &[ChainAndHeight],
908 ) -> BTreeMap<ChainId, BTreeSet<BlockHeight>> {
909 remote_log.iter().fold(
910 BTreeMap::<ChainId, BTreeSet<_>>::new(),
911 |mut chain_to_info, entry| {
912 chain_to_info
913 .entry(entry.chain_id)
914 .or_default()
915 .insert(entry.height);
916 chain_to_info
917 },
918 )
919 }
920
921 #[instrument(level = "trace", skip_all)]
923 async fn synchronize_chain_state(
924 &self,
925 chain_id: ChainId,
926 ) -> Result<Box<ChainInfo>, ChainClientError> {
927 let (_, committee) = self.admin_committee().await?;
928 self.synchronize_chain_state_from_committee(chain_id, committee)
929 .await
930 }
931
932 #[instrument(level = "trace", skip_all)]
935 pub async fn synchronize_chain_state_from_committee(
936 &self,
937 chain_id: ChainId,
938 committee: Committee,
939 ) -> Result<Box<ChainInfo>, ChainClientError> {
940 #[cfg(with_metrics)]
941 let _latency = metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency();
942
943 let validators = self.make_nodes(&committee)?;
944 Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
945 communicate_with_quorum(
946 &validators,
947 &committee,
948 |_: &()| (),
949 |remote_node| async move {
950 self.synchronize_chain_state_from(&remote_node, chain_id)
951 .await
952 },
953 self.options.grace_period,
954 )
955 .await?;
956
957 self.local_node
958 .chain_info(chain_id)
959 .await
960 .map_err(Into::into)
961 }
962
963 #[instrument(level = "trace", skip(self, remote_node, chain_id))]
966 async fn synchronize_chain_state_from(
967 &self,
968 remote_node: &RemoteNode<Env::ValidatorNode>,
969 chain_id: ChainId,
970 ) -> Result<(), ChainClientError> {
971 let mut local_info = self.local_node.chain_info(chain_id).await?;
972 let query = ChainInfoQuery::new(chain_id).with_manager_values();
973 let remote_info = remote_node.handle_chain_info_query(query).await?;
974 if let Some(new_info) = self
975 .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
976 .await?
977 {
978 local_info = new_info;
979 };
980
981 if local_info.next_block_height != remote_info.next_block_height {
983 debug!(
984 "Synced from validator {}; but remote height is {} and local height is {}",
985 remote_node.public_key, remote_info.next_block_height, local_info.next_block_height
986 );
987 return Ok(());
988 };
989
990 if let Some(timeout) = remote_info.manager.timeout {
991 self.handle_certificate(Box::new(*timeout)).await?;
992 }
993 let mut proposals = Vec::new();
994 if let Some(proposal) = remote_info.manager.requested_signed_proposal {
995 proposals.push(*proposal);
996 }
997 if let Some(proposal) = remote_info.manager.requested_proposed {
998 proposals.push(*proposal);
999 }
1000 if let Some(locking) = remote_info.manager.requested_locking {
1001 match *locking {
1002 LockingBlock::Fast(proposal) => {
1003 proposals.push(proposal);
1004 }
1005 LockingBlock::Regular(cert) => {
1006 let hash = cert.hash();
1007 if let Err(err) = self.try_process_locking_block_from(remote_node, cert).await {
1008 debug!(
1009 "Skipping locked block {hash} from validator {} at height {}: {err}",
1010 remote_node.public_key, local_info.next_block_height,
1011 );
1012 }
1013 }
1014 }
1015 }
1016 'proposal_loop: for proposal in proposals {
1017 let owner: AccountOwner = proposal.owner();
1018 if let Err(mut err) = self
1019 .local_node
1020 .handle_block_proposal(proposal.clone())
1021 .await
1022 {
1023 if let LocalNodeError::BlobsNotFound(_) = &err {
1024 let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1025 if !required_blob_ids.is_empty() {
1026 let mut blobs = Vec::new();
1027 for blob_id in required_blob_ids {
1028 let blob_content = match remote_node
1029 .node
1030 .download_pending_blob(chain_id, blob_id)
1031 .await
1032 {
1033 Ok(content) => content,
1034 Err(err) => {
1035 warn!(
1036 "Skipping proposal from {owner} and validator {} at \
1037 height {}; failed to download {blob_id}: {err}",
1038 remote_node.public_key, local_info.next_block_height
1039 );
1040 continue 'proposal_loop;
1041 }
1042 };
1043 blobs.push(Blob::new(blob_content));
1044 }
1045 self.local_node
1046 .handle_pending_blobs(chain_id, blobs)
1047 .await?;
1048 if let Err(new_err) = self
1050 .local_node
1051 .handle_block_proposal(proposal.clone())
1052 .await
1053 {
1054 err = new_err;
1055 } else {
1056 continue;
1057 }
1058 }
1059 if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1060 self.update_local_node_with_blobs_from(
1061 blob_ids.clone(),
1062 &[remote_node.clone()],
1063 )
1064 .await?;
1065 if let Err(new_err) = self
1067 .local_node
1068 .handle_block_proposal(proposal.clone())
1069 .await
1070 {
1071 err = new_err;
1072 } else {
1073 continue;
1074 }
1075 }
1076 }
1077
1078 debug!(
1079 "Skipping proposal from {owner} and validator {} at height {}: {err}",
1080 remote_node.public_key, local_info.next_block_height
1081 );
1082 }
1083 }
1084 Ok(())
1085 }
1086
1087 async fn try_process_locking_block_from(
1088 &self,
1089 remote_node: &RemoteNode<Env::ValidatorNode>,
1090 certificate: GenericCertificate<ValidatedBlock>,
1091 ) -> Result<(), ChainClientError> {
1092 let chain_id = certificate.inner().chain_id();
1093 let certificate = Box::new(certificate);
1094 match self.process_certificate(certificate.clone()).await {
1095 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1096 let mut blobs = Vec::new();
1097 for blob_id in blob_ids {
1098 let blob_content = remote_node
1099 .node
1100 .download_pending_blob(chain_id, blob_id)
1101 .await?;
1102 blobs.push(Blob::new(blob_content));
1103 }
1104 self.local_node
1105 .handle_pending_blobs(chain_id, blobs)
1106 .await?;
1107 self.process_certificate(certificate).await?;
1108 Ok(())
1109 }
1110 Err(err) => Err(err.into()),
1111 Ok(()) => Ok(()),
1112 }
1113 }
1114
1115 async fn update_local_node_with_blobs_from(
1118 &self,
1119 blob_ids: Vec<BlobId>,
1120 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1121 ) -> Result<Vec<Blob>, ChainClientError> {
1122 let timeout = self.options.blob_download_timeout;
1123 future::try_join_all(blob_ids.into_iter().map(|blob_id| async move {
1124 let mut stream = remote_nodes
1125 .iter()
1126 .zip(0..)
1127 .map(|(remote_node, i)| async move {
1128 linera_base::time::timer::sleep(timeout * i * i).await;
1129 let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
1130 self.receive_sender_certificate(
1132 certificate,
1133 ReceiveCertificateMode::NeedsCheck,
1134 Some(vec![remote_node.clone()]),
1135 )
1136 .await?;
1137 let blob = self
1138 .local_node
1139 .storage_client()
1140 .read_blob(blob_id)
1141 .await?
1142 .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1143 Result::<_, ChainClientError>::Ok(blob)
1144 })
1145 .collect::<FuturesUnordered<_>>();
1146 while let Some(maybe_blob) = stream.next().await {
1147 if let Ok(blob) = maybe_blob {
1148 return Ok(blob);
1149 }
1150 }
1151 Err(LocalNodeError::BlobsNotFound(vec![blob_id]).into())
1152 }))
1153 .await
1154 }
1155
1156 async fn receive_certificates_for_blobs(
1159 &self,
1160 blob_ids: Vec<BlobId>,
1161 ) -> Result<(), ChainClientError> {
1162 let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1164 let validators = self.validator_nodes().await?;
1165
1166 let mut missing_blobs = Vec::new();
1167 for blob_id in blob_ids {
1168 let mut certificate_stream = validators
1169 .iter()
1170 .map(|remote_node| async move {
1171 let cert = remote_node.download_certificate_for_blob(blob_id).await?;
1172 Ok::<_, NodeError>((remote_node.clone(), cert))
1173 })
1174 .collect::<FuturesUnordered<_>>();
1175 loop {
1176 let Some(result) = certificate_stream.next().await else {
1177 missing_blobs.push(blob_id);
1178 break;
1179 };
1180 if let Ok((remote_node, cert)) = result {
1181 if self
1182 .receive_sender_certificate(
1183 cert,
1184 ReceiveCertificateMode::NeedsCheck,
1185 Some(vec![remote_node]),
1186 )
1187 .await
1188 .is_ok()
1189 {
1190 break;
1191 }
1192 }
1193 }
1194 }
1195
1196 if missing_blobs.is_empty() {
1197 Ok(())
1198 } else {
1199 Err(NodeError::BlobsNotFound(missing_blobs).into())
1200 }
1201 }
1202
1203 #[tracing::instrument(level = "trace", skip(self, block))]
1208 async fn stage_block_execution_and_discard_failing_messages(
1209 &self,
1210 mut block: ProposedBlock,
1211 round: Option<u32>,
1212 published_blobs: Vec<Blob>,
1213 ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1214 loop {
1215 let result = self
1216 .stage_block_execution(block.clone(), round, published_blobs.clone())
1217 .await;
1218 if let Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
1219 WorkerError::ChainError(chain_error),
1220 ))) = &result
1221 {
1222 if let ChainError::ExecutionError(
1223 error,
1224 ChainExecutionContext::IncomingBundle(index),
1225 ) = &**chain_error
1226 {
1227 let transaction = block
1228 .transactions
1229 .get_mut(*index as usize)
1230 .expect("Transaction at given index should exist");
1231 let Transaction::ReceiveMessages(message) = transaction else {
1232 panic!(
1233 "Expected incoming bundle at transaction index {}, found operation",
1234 index
1235 );
1236 };
1237 ensure!(
1238 !message.bundle.is_protected(),
1239 ChainClientError::BlockProposalError(
1240 "Protected incoming message failed to execute locally"
1241 )
1242 );
1243 info!(
1247 %error, origin = ?message.origin,
1248 "Message failed to execute locally and will be rejected."
1249 );
1250 message.action = MessageAction::Reject;
1251 continue;
1252 }
1253 }
1254 return result;
1255 }
1256 }
1257
1258 #[instrument(level = "trace", skip(self, block))]
1261 async fn stage_block_execution(
1262 &self,
1263 block: ProposedBlock,
1264 round: Option<u32>,
1265 published_blobs: Vec<Blob>,
1266 ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1267 loop {
1268 let result = self
1269 .local_node
1270 .stage_block_execution(block.clone(), round, published_blobs.clone())
1271 .await;
1272 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1273 self.receive_certificates_for_blobs(blob_ids.clone())
1274 .await?;
1275 continue; }
1277 return Ok(result?);
1278 }
1279 }
1280}
1281
1282#[derive(Clone, Debug)]
1284pub struct MessagePolicy {
1285 blanket: BlanketMessagePolicy,
1287 restrict_chain_ids_to: Option<HashSet<ChainId>>,
1291}
1292
1293#[derive(Copy, Clone, Debug, clap::ValueEnum)]
1294pub enum BlanketMessagePolicy {
1295 Accept,
1297 Reject,
1300 Ignore,
1303}
1304
1305impl MessagePolicy {
1306 pub fn new(
1307 blanket: BlanketMessagePolicy,
1308 restrict_chain_ids_to: Option<HashSet<ChainId>>,
1309 ) -> Self {
1310 Self {
1311 blanket,
1312 restrict_chain_ids_to,
1313 }
1314 }
1315
1316 #[cfg(with_testing)]
1317 pub fn new_accept_all() -> Self {
1318 Self {
1319 blanket: BlanketMessagePolicy::Accept,
1320 restrict_chain_ids_to: None,
1321 }
1322 }
1323
1324 #[instrument(level = "trace", skip(self))]
1325 fn must_handle(&self, bundle: &mut IncomingBundle) -> bool {
1326 if self.is_reject() {
1327 if bundle.bundle.is_skippable() {
1328 return false;
1329 } else if !bundle.bundle.is_protected() {
1330 bundle.action = MessageAction::Reject;
1331 }
1332 }
1333 match &self.restrict_chain_ids_to {
1334 None => true,
1335 Some(chains) => chains.contains(&bundle.origin),
1336 }
1337 }
1338
1339 #[instrument(level = "trace", skip(self))]
1340 fn is_ignore(&self) -> bool {
1341 matches!(self.blanket, BlanketMessagePolicy::Ignore)
1342 }
1343
1344 #[instrument(level = "trace", skip(self))]
1345 fn is_reject(&self) -> bool {
1346 matches!(self.blanket, BlanketMessagePolicy::Reject)
1347 }
1348}
1349
1350#[derive(Debug, Clone, Copy)]
1351pub enum TimingType {
1352 ExecuteOperations,
1353 ExecuteBlock,
1354 SubmitBlockProposal,
1355 UpdateValidators,
1356}
1357
1358#[derive(Debug, Clone)]
1359pub struct ChainClientOptions {
1360 pub max_pending_message_bundles: usize,
1362 pub message_policy: MessagePolicy,
1364 pub cross_chain_message_delivery: CrossChainMessageDelivery,
1366 pub grace_period: f64,
1369 pub blob_download_timeout: Duration,
1371}
1372
1373#[cfg(with_testing)]
1374impl ChainClientOptions {
1375 pub fn test_default() -> Self {
1376 use crate::DEFAULT_GRACE_PERIOD;
1377
1378 ChainClientOptions {
1379 max_pending_message_bundles: 10,
1380 message_policy: MessagePolicy::new_accept_all(),
1381 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
1382 grace_period: DEFAULT_GRACE_PERIOD,
1383 blob_download_timeout: Duration::from_secs(1),
1384 }
1385 }
1386}
1387
1388#[derive(Debug)]
1394pub struct ChainClient<Env: Environment> {
1395 #[debug(skip)]
1397 client: Arc<Client<Env>>,
1398 chain_id: ChainId,
1400 #[debug(skip)]
1402 options: ChainClientOptions,
1403 preferred_owner: Option<AccountOwner>,
1406 initial_next_block_height: BlockHeight,
1408 initial_block_hash: Option<CryptoHash>,
1410 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
1412}
1413
1414impl<Env: Environment> Clone for ChainClient<Env> {
1415 fn clone(&self) -> Self {
1416 Self {
1417 client: self.client.clone(),
1418 chain_id: self.chain_id,
1419 options: self.options.clone(),
1420 preferred_owner: self.preferred_owner,
1421 initial_next_block_height: self.initial_next_block_height,
1422 initial_block_hash: self.initial_block_hash,
1423 timing_sender: self.timing_sender.clone(),
1424 }
1425 }
1426}
1427
1428#[derive(Debug, Error)]
1430pub enum ChainClientError {
1431 #[error("Local node operation failed: {0}")]
1432 LocalNodeError(#[from] LocalNodeError),
1433
1434 #[error("Remote node operation failed: {0}")]
1435 RemoteNodeError(#[from] NodeError),
1436
1437 #[error(transparent)]
1438 ArithmeticError(#[from] ArithmeticError),
1439
1440 #[error("Missing certificates: {0:?}")]
1441 ReadCertificatesError(Vec<CryptoHash>),
1442
1443 #[error("Missing confirmed block: {0:?}")]
1444 MissingConfirmedBlock(CryptoHash),
1445
1446 #[error("JSON (de)serialization error: {0}")]
1447 JsonError(#[from] serde_json::Error),
1448
1449 #[error("Chain operation failed: {0}")]
1450 ChainError(#[from] ChainError),
1451
1452 #[error(transparent)]
1453 CommunicationError(#[from] CommunicationError<NodeError>),
1454
1455 #[error("Internal error within chain client: {0}")]
1456 InternalError(&'static str),
1457
1458 #[error(
1459 "Cannot accept a certificate from an unknown committee in the future. \
1460 Please synchronize the local view of the admin chain"
1461 )]
1462 CommitteeSynchronizationError,
1463
1464 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
1465 WalletSynchronizationError,
1466
1467 #[error("The state of the client is incompatible with the proposed block: {0}")]
1468 BlockProposalError(&'static str),
1469
1470 #[error(
1471 "Cannot accept a certificate from a committee that was retired. \
1472 Try a newer certificate from the same origin"
1473 )]
1474 CommitteeDeprecationError,
1475
1476 #[error("Protocol error within chain client: {0}")]
1477 ProtocolError(&'static str),
1478
1479 #[error("Signer doesn't have key to sign for chain {0}")]
1480 CannotFindKeyForChain(ChainId),
1481
1482 #[error("client is not configured to propose on chain {0}")]
1483 NoAccountKeyConfigured(ChainId),
1484
1485 #[error("The chain client isn't owner on chain {0}")]
1486 NotAnOwner(ChainId),
1487
1488 #[error(transparent)]
1489 ViewError(#[from] ViewError),
1490
1491 #[error(
1492 "Failed to download certificates and update local node to the next height \
1493 {target_next_block_height} of chain {chain_id:?}"
1494 )]
1495 CannotDownloadCertificates {
1496 chain_id: ChainId,
1497 target_next_block_height: BlockHeight,
1498 },
1499
1500 #[error(transparent)]
1501 BcsError(#[from] bcs::Error),
1502
1503 #[error(
1504 "Unexpected quorum: validators voted for block {hash} in {round}, \
1505 expected block {expected_hash} in {expected_round}"
1506 )]
1507 UnexpectedQuorum {
1508 hash: CryptoHash,
1509 round: Round,
1510 expected_hash: CryptoHash,
1511 expected_round: Round,
1512 },
1513
1514 #[error("signer error: {0:?}")]
1515 Signer(#[source] Box<dyn signer::Error>),
1516
1517 #[error("Cannot revoke the current epoch {0}")]
1518 CannotRevokeCurrentEpoch(Epoch),
1519
1520 #[error("Epoch is already revoked")]
1521 EpochAlreadyRevoked,
1522}
1523
1524impl From<Infallible> for ChainClientError {
1525 fn from(infallible: Infallible) -> Self {
1526 match infallible {}
1527 }
1528}
1529
1530impl ChainClientError {
1531 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
1532 Self::Signer(Box::new(err))
1533 }
1534}
1535
1536pub struct Unsend<T> {
1540 inner: T,
1541 _phantom: std::marker::PhantomData<*mut u8>,
1542}
1543
1544impl<T> Unsend<T> {
1545 fn new(inner: T) -> Self {
1546 Self {
1547 inner,
1548 _phantom: Default::default(),
1549 }
1550 }
1551}
1552
1553impl<T: Deref> Deref for Unsend<T> {
1554 type Target = T::Target;
1555 fn deref(&self) -> &T::Target {
1556 self.inner.deref()
1557 }
1558}
1559
1560impl<T: DerefMut> DerefMut for Unsend<T> {
1561 fn deref_mut(&mut self) -> &mut T::Target {
1562 self.inner.deref_mut()
1563 }
1564}
1565
1566pub type ChainGuard<'a, T> = Unsend<DashMapRef<'a, ChainId, T>>;
1567pub type ChainGuardMut<'a, T> = Unsend<DashMapRefMut<'a, ChainId, T>>;
1568pub type ChainGuardMapped<'a, T> = Unsend<DashMapMappedRef<'a, ChainId, ChainClientState, T>>;
1569
1570impl<Env: Environment> ChainClient<Env> {
1571 #[instrument(level = "trace", skip(self))]
1573 pub fn state(&self) -> ChainGuard<ChainClientState> {
1574 Unsend::new(
1575 self.client
1576 .chains
1577 .get(&self.chain_id)
1578 .expect("Chain client constructed for invalid chain"),
1579 )
1580 }
1581
1582 #[instrument(level = "trace", skip(self))]
1585 fn state_mut(&self) -> ChainGuardMut<ChainClientState> {
1586 Unsend::new(
1587 self.client
1588 .chains
1589 .get_mut(&self.chain_id)
1590 .expect("Chain client constructed for invalid chain"),
1591 )
1592 }
1593
1594 #[instrument(level = "trace", skip(self))]
1596 pub fn signer(&self) -> &impl Signer {
1597 self.client.signer()
1598 }
1599
1600 #[instrument(level = "trace", skip(self))]
1602 pub fn options_mut(&mut self) -> &mut ChainClientOptions {
1603 &mut self.options
1604 }
1605
1606 #[instrument(level = "trace", skip(self))]
1608 pub fn options(&self) -> &ChainClientOptions {
1609 &self.options
1610 }
1611
1612 #[instrument(level = "trace", skip(self))]
1614 pub fn chain_id(&self) -> ChainId {
1615 self.chain_id
1616 }
1617
1618 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
1620 self.timing_sender.clone()
1621 }
1622
1623 #[instrument(level = "trace", skip(self))]
1625 pub fn admin_id(&self) -> ChainId {
1626 self.client.admin_id
1627 }
1628
1629 #[instrument(level = "trace", skip(self))]
1631 pub fn pending_proposal(&self) -> ChainGuardMapped<Option<PendingProposal>> {
1632 Unsend::new(self.state().inner.map(|state| state.pending_proposal()))
1633 }
1634
1635 #[instrument(level = "trace", skip(self))]
1637 pub fn preferred_owner(&self) -> Option<AccountOwner> {
1638 self.preferred_owner
1639 }
1640
1641 #[instrument(level = "trace", skip(self))]
1643 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
1644 self.preferred_owner = Some(preferred_owner);
1645 }
1646
1647 #[instrument(level = "trace", skip(self))]
1649 pub fn unset_preferred_owner(&mut self) {
1650 self.preferred_owner = None;
1651 }
1652
1653 #[instrument(level = "trace")]
1655 pub async fn chain_state_view(
1656 &self,
1657 ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
1658 self.client.local_node.chain_state_view(self.chain_id).await
1659 }
1660
1661 #[instrument(level = "trace", skip(self))]
1663 pub async fn event_stream_publishers(&self) -> Result<BTreeSet<ChainId>, LocalNodeError> {
1664 let mut publishers = self
1665 .chain_state_view()
1666 .await?
1667 .execution_state
1668 .system
1669 .event_subscriptions
1670 .indices()
1671 .await?
1672 .into_iter()
1673 .map(|(chain_id, _)| chain_id)
1674 .collect::<BTreeSet<_>>();
1675 if self.chain_id != self.client.admin_id {
1676 publishers.insert(self.client.admin_id);
1677 }
1678 Ok(publishers)
1679 }
1680
1681 #[instrument(level = "trace")]
1683 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
1684 self.subscribe_to(self.chain_id)
1685 }
1686
1687 #[instrument(level = "trace")]
1689 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
1690 Ok(Box::pin(UnboundedReceiverStream::new(
1691 self.client.notifier.subscribe(vec![chain_id]),
1692 )))
1693 }
1694
1695 #[instrument(level = "trace")]
1697 pub fn storage_client(&self) -> &Env::Storage {
1698 self.client.storage_client()
1699 }
1700
1701 #[instrument(level = "trace")]
1703 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1704 let query = ChainInfoQuery::new(self.chain_id);
1705 let response = self
1706 .client
1707 .local_node
1708 .handle_chain_info_query(query)
1709 .await?;
1710 self.client.update_from_info(&response.info);
1711 Ok(response.info)
1712 }
1713
1714 #[instrument(level = "trace")]
1716 async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1717 let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
1718 let response = self
1719 .client
1720 .local_node
1721 .handle_chain_info_query(query)
1722 .await?;
1723 self.client.update_from_info(&response.info);
1724 Ok(response.info)
1725 }
1726
1727 pub async fn get_chain_description(&self) -> Result<ChainDescription, ChainClientError> {
1729 self.client.get_chain_description(self.chain_id).await
1730 }
1731
1732 #[instrument(level = "trace")]
1735 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
1736 if self.options.message_policy.is_ignore() {
1737 return Ok(Vec::new());
1739 }
1740
1741 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
1742 let info = self
1743 .client
1744 .local_node
1745 .handle_chain_info_query(query)
1746 .await?
1747 .info;
1748 {
1749 ensure!(
1750 self.has_other_owners(&info.manager.ownership)
1751 || info.next_block_height >= self.initial_next_block_height,
1752 ChainClientError::WalletSynchronizationError
1753 );
1754 }
1755
1756 Ok(info
1757 .requested_pending_message_bundles
1758 .into_iter()
1759 .filter_map(|mut bundle| {
1760 self.options
1761 .message_policy
1762 .must_handle(&mut bundle)
1763 .then_some(bundle)
1764 })
1765 .take(self.options.max_pending_message_bundles)
1766 .collect())
1767 }
1768
1769 #[instrument(level = "trace")]
1773 async fn collect_stream_updates(&self) -> Result<Option<Operation>, ChainClientError> {
1774 let subscription_map = self
1776 .chain_state_view()
1777 .await?
1778 .execution_state
1779 .system
1780 .event_subscriptions
1781 .index_values()
1782 .await?;
1783 let futures = subscription_map
1785 .into_iter()
1786 .map(|((chain_id, stream_id), subscriptions)| {
1787 let client = self.client.clone();
1788 async move {
1789 let chain = client.local_node.chain_state_view(chain_id).await?;
1790 if let Some(next_index) = chain
1791 .execution_state
1792 .stream_event_counts
1793 .get(&stream_id)
1794 .await?
1795 .filter(|next_index| *next_index > subscriptions.next_index)
1796 {
1797 Ok(Some((chain_id, stream_id, next_index)))
1798 } else {
1799 Ok::<_, ChainClientError>(None)
1800 }
1801 }
1802 });
1803 let updates = future::try_join_all(futures)
1804 .await?
1805 .into_iter()
1806 .flatten()
1807 .collect::<Vec<_>>();
1808 if updates.is_empty() {
1809 return Ok(None);
1810 }
1811 Ok(Some(SystemOperation::UpdateStreams(updates).into()))
1812 }
1813
1814 #[instrument(level = "trace")]
1815 async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1816 self.client.chain_info_with_committees(self.chain_id).await
1817 }
1818
1819 #[instrument(level = "trace")]
1821 async fn epoch_and_committees(
1822 &self,
1823 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
1824 let info = self.chain_info_with_committees().await?;
1825 let epoch = info.epoch;
1826 let committees = info.into_committees()?;
1827 Ok((epoch, committees))
1828 }
1829
1830 #[instrument(level = "trace")]
1832 pub async fn local_committee(&self) -> Result<Committee, ChainClientError> {
1833 let info = match self.chain_info_with_committees().await {
1834 Ok(info) => info,
1835 Err(LocalNodeError::BlobsNotFound(_)) => {
1836 self.synchronize_chain_state(self.chain_id).await?;
1837 self.chain_info_with_committees().await?
1838 }
1839 Err(err) => return Err(err.into()),
1840 };
1841 Ok(info.into_current_committee()?)
1842 }
1843
1844 #[instrument(level = "trace")]
1846 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
1847 self.client.admin_committee().await
1848 }
1849
1850 #[instrument(level = "trace")]
1854 pub async fn identity(&self) -> Result<AccountOwner, ChainClientError> {
1855 let Some(preferred_owner) = self.preferred_owner else {
1856 return Err(ChainClientError::NoAccountKeyConfigured(self.chain_id));
1857 };
1858 let manager = self.chain_info().await?.manager;
1859 ensure!(
1860 manager.ownership.is_active(),
1861 LocalNodeError::InactiveChain(self.chain_id)
1862 );
1863
1864 let is_owner = manager
1865 .ownership
1866 .all_owners()
1867 .chain(&manager.leader)
1868 .any(|owner| *owner == preferred_owner);
1869
1870 if !is_owner {
1871 let accepted_owners = manager
1872 .ownership
1873 .all_owners()
1874 .chain(&manager.leader)
1875 .collect::<Vec<_>>();
1876 warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
1877 "Chain has multiple owners configured but none is preferred owner",
1878 );
1879 return Err(ChainClientError::NotAnOwner(self.chain_id));
1880 }
1881
1882 let has_signer = self
1883 .signer()
1884 .contains_key(&preferred_owner)
1885 .await
1886 .map_err(ChainClientError::signer_failure)?;
1887
1888 if !has_signer {
1889 warn!(%self.chain_id, ?preferred_owner,
1890 "Chain is one of the owners but its Signer instance doesn't contain the key",
1891 );
1892 return Err(ChainClientError::CannotFindKeyForChain(self.chain_id));
1893 }
1894
1895 Ok(preferred_owner)
1896 }
1897
1898 #[instrument(level = "trace")]
1901 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
1902 #[cfg(with_metrics)]
1903 let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();
1904
1905 let mut info = self.synchronize_to_known_height().await?;
1906
1907 if self.has_other_owners(&info.manager.ownership) {
1908 info = self.client.synchronize_chain_state(self.chain_id).await?;
1912 }
1913
1914 if info.epoch > self.client.admin_committees().await?.0 {
1915 self.client
1916 .synchronize_chain_state(self.client.admin_id)
1917 .await?;
1918 }
1919
1920 let result = self
1921 .chain_state_view()
1922 .await?
1923 .validate_incoming_bundles()
1924 .await;
1925 if matches!(result, Err(ChainError::MissingCrossChainUpdate { .. })) {
1926 self.find_received_certificates().await?;
1927 }
1928 self.client.update_from_info(&info);
1929 Ok(info)
1930 }
1931
1932 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, ChainClientError> {
1937 let info = self
1938 .client
1939 .download_certificates(self.chain_id, self.initial_next_block_height)
1940 .await?;
1941 if info.next_block_height == self.initial_next_block_height {
1942 ensure!(
1944 self.initial_block_hash == info.block_hash,
1945 ChainClientError::InternalError("Invalid chain of blocks in local node")
1946 );
1947 }
1948 Ok(info)
1949 }
1950
1951 #[instrument(level = "trace", skip(committee, operations))]
1955 pub async fn submit_fast_block_proposal(
1956 &self,
1957 committee: &Committee,
1958 operations: &[Operation],
1959 incoming_bundles: &[IncomingBundle],
1960 super_owner: AccountOwner,
1961 ) -> Result<(u64, u64, u64, u64), ChainClientError> {
1962 let creating_proposal_start = Instant::now();
1963 let info = self.chain_info().await?;
1964 let timestamp = self.next_timestamp(incoming_bundles, info.timestamp);
1965 let transactions = incoming_bundles
1966 .iter()
1967 .map(|bundle| Transaction::ReceiveMessages(bundle.clone()))
1968 .chain(
1969 operations
1970 .iter()
1971 .map(|operation| Transaction::ExecuteOperation(operation.clone())),
1972 )
1973 .collect::<Vec<_>>();
1974 let proposed_block = ProposedBlock {
1975 epoch: info.epoch,
1976 chain_id: self.chain_id,
1977 transactions,
1978 previous_block_hash: info.block_hash,
1979 height: info.next_block_height,
1980 authenticated_signer: Some(super_owner),
1981 timestamp,
1982 };
1983 let proposal = Box::new(
1984 BlockProposal::new_initial(
1985 super_owner,
1986 Round::Fast,
1987 proposed_block.clone(),
1988 self.signer(),
1989 )
1990 .await
1991 .map_err(ChainClientError::signer_failure)?,
1992 );
1993 let creating_proposal_ms = creating_proposal_start.elapsed().as_millis() as u64;
1994 let stage_block_execution_start = Instant::now();
1995 let block = self
1996 .client
1997 .local_node
1998 .stage_block_execution(proposed_block, None, Vec::new())
1999 .await?
2000 .0;
2001 let stage_block_execution_ms = stage_block_execution_start.elapsed().as_millis() as u64;
2002 let creating_confirmed_block_start = Instant::now();
2003 let value = ConfirmedBlock::new(block);
2004 let creating_confirmed_block_ms =
2005 creating_confirmed_block_start.elapsed().as_millis() as u64;
2006 let submitting_block_proposal_start = Instant::now();
2007 self.client
2008 .submit_block_proposal(committee, proposal, value)
2009 .await?;
2010 let submitting_block_proposal_ms =
2011 submitting_block_proposal_start.elapsed().as_millis() as u64;
2012 Ok((
2013 creating_proposal_ms,
2014 stage_block_execution_ms,
2015 creating_confirmed_block_ms,
2016 submitting_block_proposal_ms,
2017 ))
2018 }
2019
2020 #[instrument(level = "trace", skip(old_committee))]
2022 pub async fn update_validators(
2023 &self,
2024 old_committee: Option<&Committee>,
2025 ) -> Result<(), ChainClientError> {
2026 let update_validators_start = linera_base::time::Instant::now();
2027 if let Some(old_committee) = old_committee {
2029 self.communicate_chain_updates(old_committee).await?
2030 };
2031 if let Ok(new_committee) = self.local_committee().await {
2032 if Some(&new_committee) != old_committee {
2033 self.communicate_chain_updates(&new_committee).await?;
2036 }
2037 }
2038 self.send_timing(update_validators_start, TimingType::UpdateValidators);
2039 Ok(())
2040 }
2041
2042 #[instrument(level = "trace", skip(committee))]
2044 pub async fn communicate_chain_updates(
2045 &self,
2046 committee: &Committee,
2047 ) -> Result<(), ChainClientError> {
2048 let delivery = self.options.cross_chain_message_delivery;
2049 let height = self.chain_info().await?.next_block_height;
2050 self.client
2051 .communicate_chain_updates(committee, self.chain_id, height, delivery)
2052 .await
2053 }
2054
2055 #[tracing::instrument(level = "trace", skip(received_certificates_batches))]
2058 async fn receive_certificates_from_validators(
2059 &self,
2060 received_certificates_batches: Vec<ReceivedCertificatesFromValidator>,
2061 ) {
2062 let validator_count = received_certificates_batches.len();
2063 let mut other_sender_chains = BTreeSet::new();
2064 let mut certificates =
2065 BTreeMap::<ChainId, BTreeMap<BlockHeight, ConfirmedBlockCertificate>>::new();
2066 let mut new_trackers = BTreeMap::new();
2067 for response in received_certificates_batches {
2068 other_sender_chains.extend(response.other_sender_chains);
2069 new_trackers.insert(response.public_key, response.tracker);
2070 for certificate in response.certificates {
2071 certificates
2072 .entry(certificate.block().header.chain_id)
2073 .or_default()
2074 .insert(certificate.block().header.height, certificate);
2075 }
2076 }
2077 let certificate_count = certificates.values().map(BTreeMap::len).sum::<usize>();
2078
2079 tracing::info!(
2080 "Received {certificate_count} certificates from {validator_count} validator(s)."
2081 );
2082
2083 let stream = FuturesUnordered::from_iter(certificates.into_values().map(|certificates| {
2085 let client = self.client.clone();
2086 async move {
2087 for certificate in certificates.into_values() {
2088 let hash = certificate.hash();
2089 let mode = ReceiveCertificateMode::AlreadyChecked;
2090 if let Err(err) = client
2091 .receive_sender_certificate(certificate, mode, None)
2092 .await
2093 {
2094 error!("Received invalid certificate {hash}: {err}");
2095 }
2096 }
2097 }
2098 }));
2099 stream.for_each(future::ready).await;
2100
2101 let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
2105 let local_node = self.client.local_node.clone();
2106 async move {
2107 if let Err(error) = local_node
2108 .retry_pending_cross_chain_requests(chain_id)
2109 .await
2110 {
2111 error!("Failed to retry outgoing messages from {chain_id}: {error}");
2112 }
2113 }
2114 }));
2115 stream.for_each(future::ready).await;
2116
2117 if let Err(error) = self
2119 .client
2120 .local_node
2121 .update_received_certificate_trackers(self.chain_id, new_trackers)
2122 .await
2123 {
2124 error!(
2125 "Failed to update the certificate trackers for chain {:.8}: {error}",
2126 self.chain_id
2127 );
2128 }
2129 }
2130
2131 async fn synchronize_publisher_chains(&self) -> Result<(), ChainClientError> {
2134 let chain_ids = self
2135 .chain_state_view()
2136 .await?
2137 .execution_state
2138 .system
2139 .event_subscriptions
2140 .indices()
2141 .await?
2142 .iter()
2143 .map(|(chain_id, _)| *chain_id)
2144 .chain(iter::once(self.client.admin_id))
2145 .filter(|chain_id| *chain_id != self.chain_id)
2146 .collect::<BTreeSet<_>>();
2147 future::try_join_all(
2148 chain_ids
2149 .into_iter()
2150 .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
2151 )
2152 .await?;
2153 Ok(())
2154 }
2155
2156 #[instrument(level = "trace")]
2165 async fn find_received_certificates(&self) -> Result<(), ChainClientError> {
2166 #[cfg(with_metrics)]
2167 let _latency = metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
2168
2169 let chain_id = self.chain_id;
2171 let (_, committee) = self.admin_committee().await?;
2172 let nodes = self.client.make_nodes(&committee)?;
2173 let result = communicate_with_quorum(
2175 &nodes,
2176 &committee,
2177 |_| (),
2178 |remote_node| {
2179 let client = &self.client;
2180 Box::pin(async move {
2181 client
2182 .synchronize_received_certificates_from_validator(chain_id, &remote_node)
2183 .await
2184 })
2185 },
2186 self.options.grace_period,
2187 )
2188 .await;
2189 let received_certificate_batches = match result {
2190 Ok(((), received_certificate_batches)) => received_certificate_batches
2191 .into_iter()
2192 .map(|(_, batch)| batch)
2193 .collect(),
2194 Err(CommunicationError::Trusted(NodeError::InactiveChain(id))) if id == chain_id => {
2195 return Ok(());
2198 }
2199 Err(error) => {
2200 return Err(error.into());
2201 }
2202 };
2203 self.receive_certificates_from_validators(received_certificate_batches)
2204 .await;
2205 Ok(())
2206 }
2207
2208 #[instrument(level = "trace")]
2210 pub async fn transfer(
2211 &self,
2212 owner: AccountOwner,
2213 amount: Amount,
2214 recipient: Account,
2215 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2216 self.execute_operation(SystemOperation::Transfer {
2218 owner,
2219 recipient,
2220 amount,
2221 })
2222 .await
2223 }
2224
2225 #[instrument(level = "trace")]
2228 pub async fn read_data_blob(
2229 &self,
2230 hash: CryptoHash,
2231 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2232 let blob_id = BlobId {
2233 hash,
2234 blob_type: BlobType::Data,
2235 };
2236 self.execute_operation(SystemOperation::VerifyBlob { blob_id })
2237 .await
2238 }
2239
2240 #[instrument(level = "trace")]
2242 pub async fn claim(
2243 &self,
2244 owner: AccountOwner,
2245 target_id: ChainId,
2246 recipient: Account,
2247 amount: Amount,
2248 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2249 self.execute_operation(SystemOperation::Claim {
2250 owner,
2251 target_id,
2252 recipient,
2253 amount,
2254 })
2255 .await
2256 }
2257
2258 #[instrument(level = "trace")]
2261 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, ChainClientError> {
2262 let chain_id = self.chain_id;
2263 let info = self.chain_info_with_committees().await?;
2264 let committee = info.current_committee()?;
2265 let height = info.next_block_height;
2266 let round = info.manager.current_round;
2267 let action = CommunicateAction::RequestTimeout {
2268 height,
2269 round,
2270 chain_id,
2271 };
2272 let value = Timeout::new(chain_id, height, info.epoch);
2273 let certificate = Box::new(
2274 self.client
2275 .communicate_chain_action(committee, action, value)
2276 .await?,
2277 );
2278 self.client.process_certificate(certificate.clone()).await?;
2279 self.client
2281 .communicate_chain_updates(
2282 committee,
2283 chain_id,
2284 height,
2285 CrossChainMessageDelivery::NonBlocking,
2286 )
2287 .await?;
2288 Ok(*certificate)
2289 }
2290
2291 #[instrument(level = "trace", skip_all)]
2293 pub async fn synchronize_chain_state(
2294 &self,
2295 chain_id: ChainId,
2296 ) -> Result<Box<ChainInfo>, ChainClientError> {
2297 self.client.synchronize_chain_state(chain_id).await
2298 }
2299
2300 #[instrument(level = "trace", skip_all)]
2303 pub async fn synchronize_chain_state_from_committee(
2304 &self,
2305 committee: Committee,
2306 ) -> Result<Box<ChainInfo>, ChainClientError> {
2307 self.client
2308 .synchronize_chain_state_from_committee(self.chain_id, committee)
2309 .await
2310 }
2311
2312 #[instrument(level = "trace", skip(operations, blobs))]
2314 pub async fn execute_operations(
2315 &self,
2316 operations: Vec<Operation>,
2317 blobs: Vec<Blob>,
2318 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2319 let timing_start = linera_base::time::Instant::now();
2320
2321 let result = loop {
2322 let execute_block_start = linera_base::time::Instant::now();
2323 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
2325 Ok(ExecuteBlockOutcome::Executed(certificate)) => {
2326 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
2327 break Ok(ClientOutcome::Committed(certificate));
2328 }
2329 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
2330 break Ok(ClientOutcome::WaitForTimeout(timeout));
2331 }
2332 Ok(ExecuteBlockOutcome::Conflict(certificate)) => {
2333 info!(
2334 height = %certificate.block().header.height,
2335 "Another block was committed; retrying."
2336 );
2337 }
2338 Err(ChainClientError::CommunicationError(CommunicationError::Trusted(
2339 NodeError::UnexpectedBlockHeight {
2340 expected_block_height,
2341 found_block_height,
2342 },
2343 ))) if expected_block_height > found_block_height => {
2344 tracing::info!(
2345 "Local state is outdated; synchronizing chain {:.8}",
2346 self.chain_id
2347 );
2348 self.synchronize_chain_state(self.chain_id).await?;
2349 }
2350 Err(err) => return Err(err),
2351 };
2352 };
2353
2354 self.send_timing(timing_start, TimingType::ExecuteOperations);
2355
2356 result
2357 }
2358
2359 pub async fn execute_operation(
2361 &self,
2362 operation: impl Into<Operation>,
2363 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2364 self.execute_operations(vec![operation.into()], vec![])
2365 .await
2366 }
2367
2368 #[instrument(level = "trace", skip(operations, blobs))]
2372 async fn execute_block(
2373 &self,
2374 operations: Vec<Operation>,
2375 blobs: Vec<Blob>,
2376 ) -> Result<ExecuteBlockOutcome, ChainClientError> {
2377 #[cfg(with_metrics)]
2378 let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
2379
2380 let mutex = self.state().client_mutex();
2381 let _guard = mutex.lock_owned().await;
2382 match self.process_pending_block_without_prepare().await? {
2384 ClientOutcome::Committed(Some(certificate)) => {
2385 return Ok(ExecuteBlockOutcome::Conflict(certificate))
2386 }
2387 ClientOutcome::WaitForTimeout(timeout) => {
2388 return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
2389 }
2390 ClientOutcome::Committed(None) => {}
2391 }
2392
2393 let incoming_bundles = self.pending_message_bundles().await?;
2394 let identity = self.identity().await?;
2395 let confirmed_value = self
2396 .new_pending_block(incoming_bundles, operations, blobs, identity)
2397 .await?;
2398
2399 match self.process_pending_block_without_prepare().await? {
2400 ClientOutcome::Committed(Some(certificate))
2401 if certificate.block() == confirmed_value.block() =>
2402 {
2403 Ok(ExecuteBlockOutcome::Executed(certificate))
2404 }
2405 ClientOutcome::Committed(Some(certificate)) => {
2406 Ok(ExecuteBlockOutcome::Conflict(certificate))
2407 }
2408 ClientOutcome::Committed(None) => Err(ChainClientError::BlockProposalError(
2410 "Unexpected block proposal error",
2411 )),
2412 ClientOutcome::WaitForTimeout(timeout) => {
2413 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
2414 }
2415 }
2416 }
2417
2418 #[instrument(level = "trace", skip(incoming_bundles, operations, blobs))]
2422 async fn new_pending_block(
2423 &self,
2424 incoming_bundles: Vec<IncomingBundle>,
2425 operations: Vec<Operation>,
2426 blobs: Vec<Blob>,
2427 identity: AccountOwner,
2428 ) -> Result<ConfirmedBlock, ChainClientError> {
2429 ensure!(
2430 self.state().pending_proposal().is_none(),
2431 ChainClientError::BlockProposalError(
2432 "Client state already has a pending block; \
2433 use the `linera retry-pending-block` command to commit that first"
2434 )
2435 );
2436 let info = self.chain_info().await?;
2437 let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2438 let transactions = incoming_bundles
2439 .into_iter()
2440 .map(Transaction::ReceiveMessages)
2441 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
2442 .collect::<Vec<_>>();
2443 let proposed_block = ProposedBlock {
2444 epoch: info.epoch,
2445 chain_id: self.chain_id,
2446 transactions,
2447 previous_block_hash: info.block_hash,
2448 height: info.next_block_height,
2449 authenticated_signer: Some(identity),
2450 timestamp,
2451 };
2452
2453 let round = match Self::round_for_new_proposal(&info, &identity, true)? {
2458 Either::Left(round) => round.multi_leader(),
2459 Either::Right(_) => None,
2460 };
2461 let (block, _) = self
2464 .client
2465 .stage_block_execution_and_discard_failing_messages(
2466 proposed_block,
2467 round,
2468 blobs.clone(),
2469 )
2470 .await?;
2471 let (proposed_block, _) = block.clone().into_proposal();
2472 self.state_mut().set_pending_proposal(proposed_block, blobs);
2473 Ok(ConfirmedBlock::new(block))
2474 }
2475
2476 #[instrument(level = "trace", skip(incoming_bundles))]
2481 fn next_timestamp(
2482 &self,
2483 incoming_bundles: &[IncomingBundle],
2484 block_time: Timestamp,
2485 ) -> Timestamp {
2486 let local_time = self.storage_client().clock().current_time();
2487 incoming_bundles
2488 .iter()
2489 .map(|msg| msg.bundle.timestamp)
2490 .max()
2491 .map_or(local_time, |timestamp| timestamp.max(local_time))
2492 .max(block_time)
2493 }
2494
2495 #[instrument(level = "trace", skip(query))]
2497 pub async fn query_application(&self, query: Query) -> Result<QueryOutcome, ChainClientError> {
2498 loop {
2499 let result = self
2500 .client
2501 .local_node
2502 .query_application(self.chain_id, query.clone())
2503 .await;
2504 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2505 self.client
2506 .receive_certificates_for_blobs(blob_ids.clone())
2507 .await?;
2508 continue; }
2510 return Ok(result?);
2511 }
2512 }
2513
2514 #[instrument(level = "trace", skip(query))]
2516 pub async fn query_system_application(
2517 &self,
2518 query: SystemQuery,
2519 ) -> Result<QueryOutcome<SystemResponse>, ChainClientError> {
2520 let QueryOutcome {
2521 response,
2522 operations,
2523 } = self.query_application(Query::System(query)).await?;
2524 match response {
2525 QueryResponse::System(response) => Ok(QueryOutcome {
2526 response,
2527 operations,
2528 }),
2529 _ => Err(ChainClientError::InternalError(
2530 "Unexpected response for system query",
2531 )),
2532 }
2533 }
2534
2535 #[instrument(level = "trace", skip(application_id, query))]
2537 pub async fn query_user_application<A: Abi>(
2538 &self,
2539 application_id: ApplicationId<A>,
2540 query: &A::Query,
2541 ) -> Result<QueryOutcome<A::QueryResponse>, ChainClientError> {
2542 let query = Query::user(application_id, query)?;
2543 let QueryOutcome {
2544 response,
2545 operations,
2546 } = self.query_application(query).await?;
2547 match response {
2548 QueryResponse::User(response_bytes) => {
2549 let response = serde_json::from_slice(&response_bytes)?;
2550 Ok(QueryOutcome {
2551 response,
2552 operations,
2553 })
2554 }
2555 _ => Err(ChainClientError::InternalError(
2556 "Unexpected response for user query",
2557 )),
2558 }
2559 }
2560
2561 #[instrument(level = "trace")]
2568 pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
2569 let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
2570 Ok(balance)
2571 }
2572
2573 #[instrument(level = "trace", skip(owner))]
2580 pub async fn query_owner_balance(
2581 &self,
2582 owner: AccountOwner,
2583 ) -> Result<Amount, ChainClientError> {
2584 if owner.is_chain() {
2585 self.query_balance().await
2586 } else {
2587 Ok(self
2588 .query_balances_with_owner(owner)
2589 .await?
2590 .1
2591 .unwrap_or(Amount::ZERO))
2592 }
2593 }
2594
2595 #[instrument(level = "trace", skip(owner))]
2602 async fn query_balances_with_owner(
2603 &self,
2604 owner: AccountOwner,
2605 ) -> Result<(Amount, Option<Amount>), ChainClientError> {
2606 let incoming_bundles = self.pending_message_bundles().await?;
2607 if incoming_bundles.is_empty() {
2610 let chain_balance = self.local_balance().await?;
2611 let owner_balance = self.local_owner_balance(owner).await?;
2612 return Ok((chain_balance, Some(owner_balance)));
2613 }
2614 let info = self.chain_info().await?;
2615 let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2616 let transactions = incoming_bundles
2617 .into_iter()
2618 .map(Transaction::ReceiveMessages)
2619 .collect::<Vec<_>>();
2620 let block = ProposedBlock {
2621 epoch: info.epoch,
2622 chain_id: self.chain_id,
2623 transactions,
2624 previous_block_hash: info.block_hash,
2625 height: info.next_block_height,
2626 authenticated_signer: if owner == AccountOwner::CHAIN {
2627 None
2628 } else {
2629 Some(owner)
2630 },
2631 timestamp,
2632 };
2633 match self
2634 .client
2635 .stage_block_execution_and_discard_failing_messages(block, None, Vec::new())
2636 .await
2637 {
2638 Ok((_, response)) => Ok((
2639 response.info.chain_balance,
2640 response.info.requested_owner_balance,
2641 )),
2642 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
2643 WorkerError::ChainError(error),
2644 ))) if matches!(
2645 &*error,
2646 ChainError::ExecutionError(
2647 execution_error,
2648 ChainExecutionContext::Block
2649 ) if matches!(
2650 **execution_error,
2651 ExecutionError::FeesExceedFunding { .. }
2652 )
2653 ) =>
2654 {
2655 Ok((Amount::ZERO, Some(Amount::ZERO)))
2657 }
2658 Err(error) => Err(error),
2659 }
2660 }
2661
2662 #[instrument(level = "trace")]
2666 pub async fn local_balance(&self) -> Result<Amount, ChainClientError> {
2667 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
2668 Ok(balance)
2669 }
2670
2671 #[instrument(level = "trace", skip(owner))]
2675 pub async fn local_owner_balance(
2676 &self,
2677 owner: AccountOwner,
2678 ) -> Result<Amount, ChainClientError> {
2679 if owner.is_chain() {
2680 self.local_balance().await
2681 } else {
2682 Ok(self
2683 .local_balances_with_owner(owner)
2684 .await?
2685 .1
2686 .unwrap_or(Amount::ZERO))
2687 }
2688 }
2689
2690 #[instrument(level = "trace", skip(owner))]
2694 async fn local_balances_with_owner(
2695 &self,
2696 owner: AccountOwner,
2697 ) -> Result<(Amount, Option<Amount>), ChainClientError> {
2698 ensure!(
2699 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
2700 ChainClientError::WalletSynchronizationError
2701 );
2702 let mut query = ChainInfoQuery::new(self.chain_id);
2703 query.request_owner_balance = owner;
2704 let response = self
2705 .client
2706 .local_node
2707 .handle_chain_info_query(query)
2708 .await?;
2709 Ok((
2710 response.info.chain_balance,
2711 response.info.requested_owner_balance,
2712 ))
2713 }
2714
2715 #[instrument(level = "trace")]
2717 pub async fn transfer_to_account(
2718 &self,
2719 from: AccountOwner,
2720 amount: Amount,
2721 account: Account,
2722 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2723 self.transfer(from, amount, account).await
2724 }
2725
2726 #[cfg(with_testing)]
2728 #[instrument(level = "trace")]
2729 pub async fn burn(
2730 &self,
2731 owner: AccountOwner,
2732 amount: Amount,
2733 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2734 let recipient = Account::burn_address(self.chain_id);
2735 self.transfer(owner, amount, recipient).await
2736 }
2737
2738 #[instrument(level = "trace")]
2739 pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2740 let validators = self.client.validator_nodes().await?;
2741 self.client
2742 .fetch_chain_info(self.chain_id, &validators)
2743 .await
2744 }
2745
2746 #[instrument(level = "trace")]
2752 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2753 let info = self.prepare_chain().await?;
2754 self.synchronize_publisher_chains().await?;
2755 self.find_received_certificates().await?;
2756 Ok(info)
2757 }
2758
2759 #[instrument(level = "trace")]
2761 pub async fn process_pending_block(
2762 &self,
2763 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2764 self.synchronize_from_validators().await?;
2765 self.process_pending_block_without_prepare().await
2766 }
2767
2768 #[instrument(level = "trace")]
2770 async fn process_pending_block_without_prepare(
2771 &self,
2772 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2773 let info = self.request_leader_timeout_if_needed().await?;
2774
2775 if info.manager.has_locking_block_in_current_round()
2777 && !info.manager.current_round.is_fast()
2778 {
2779 return self.finalize_locking_block(info).await;
2780 }
2781 let owner = self.identity().await?;
2782
2783 let local_node = &self.client.local_node;
2784 let pending_proposal = self.state().pending_proposal().clone();
2786 let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
2787 match &**locking {
2788 LockingBlock::Regular(certificate) => {
2789 let blob_ids = certificate.block().required_blob_ids();
2790 let blobs = local_node
2791 .get_locking_blobs(&blob_ids, self.chain_id)
2792 .await?
2793 .ok_or_else(|| {
2794 ChainClientError::InternalError("Missing local locking blobs")
2795 })?;
2796 debug!("Retrying locking block from round {}", certificate.round);
2797 (certificate.block().clone(), blobs)
2798 }
2799 LockingBlock::Fast(proposal) => {
2800 let proposed_block = proposal.content.block.clone();
2801 let blob_ids = proposed_block.published_blob_ids();
2802 let blobs = local_node
2803 .get_locking_blobs(&blob_ids, self.chain_id)
2804 .await?
2805 .ok_or_else(|| {
2806 ChainClientError::InternalError("Missing local locking blobs")
2807 })?;
2808 let block = self
2809 .client
2810 .stage_block_execution(proposed_block, None, blobs.clone())
2811 .await?
2812 .0;
2813 debug!("Retrying locking block from fast round.");
2814 (block, blobs)
2815 }
2816 }
2817 } else if let Some(pending_proposal) = pending_proposal {
2818 let proposed_block = pending_proposal.block;
2822 let round = match Self::round_for_new_proposal(&info, &owner, true)? {
2823 Either::Left(round) => round.multi_leader(),
2824 Either::Right(_) => None,
2825 };
2826 let (block, _) = self
2827 .client
2828 .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
2829 .await?;
2830 debug!("Proposing the local pending block.");
2831 (block, pending_proposal.blobs)
2832 } else {
2833 return Ok(ClientOutcome::Committed(None)); };
2835
2836 let has_oracle_responses = block.has_oracle_responses();
2837 let (proposed_block, outcome) = block.into_proposal();
2838 let round = match Self::round_for_new_proposal(&info, &owner, has_oracle_responses)? {
2839 Either::Left(round) => round,
2840 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
2841 };
2842 debug!("Proposing block for round {}", round);
2843
2844 let already_handled_locally = info
2845 .manager
2846 .already_handled_proposal(round, &proposed_block);
2847 let proposal = if let Some(locking) = info.manager.requested_locking {
2849 Box::new(match *locking {
2850 LockingBlock::Regular(cert) => {
2851 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
2852 .await
2853 .map_err(ChainClientError::signer_failure)?
2854 }
2855 LockingBlock::Fast(proposal) => {
2856 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
2857 .await
2858 .map_err(ChainClientError::signer_failure)?
2859 }
2860 })
2861 } else {
2862 Box::new(
2863 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
2864 .await
2865 .map_err(ChainClientError::signer_failure)?,
2866 )
2867 };
2868 if !already_handled_locally {
2869 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2871 match err {
2872 LocalNodeError::BlobsNotFound(_) => {
2873 local_node
2874 .handle_pending_blobs(self.chain_id, blobs)
2875 .await?;
2876 local_node.handle_block_proposal(*proposal.clone()).await?;
2877 }
2878 err => return Err(err.into()),
2879 }
2880 }
2881 }
2882 let committee = self.local_committee().await?;
2883 let block = Block::new(proposed_block, outcome);
2884 let submit_block_proposal_start = linera_base::time::Instant::now();
2886 let certificate = if round.is_fast() {
2887 let hashed_value = ConfirmedBlock::new(block);
2888 self.client
2889 .submit_block_proposal(&committee, proposal, hashed_value)
2890 .await?
2891 } else {
2892 let hashed_value = ValidatedBlock::new(block);
2893 let certificate = self
2894 .client
2895 .submit_block_proposal(&committee, proposal, hashed_value.clone())
2896 .await?;
2897 self.client.finalize_block(&committee, certificate).await?
2898 };
2899 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2900 debug!(round = %certificate.round, "Sending confirmed block to validators");
2901 self.update_validators(Some(&committee)).await?;
2902 Ok(ClientOutcome::Committed(Some(certificate)))
2903 }
2904
2905 fn send_timing(&self, start: Instant, timing_type: TimingType) {
2906 let Some(sender) = &self.timing_sender else {
2907 return;
2908 };
2909 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2910 tracing::warn!(%err, "Failed to send timing info");
2911 }
2912 }
2913
2914 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2917 let mut info = self.chain_info_with_manager_values().await?;
2918 if let Some(round_timeout) = info.manager.round_timeout {
2921 if round_timeout <= self.storage_client().clock().current_time() {
2922 self.request_leader_timeout().await?;
2923 info = self.chain_info_with_manager_values().await?;
2924 }
2925 }
2926 Ok(info)
2927 }
2928
2929 async fn finalize_locking_block(
2933 &self,
2934 info: Box<ChainInfo>,
2935 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2936 let locking = info
2937 .manager
2938 .requested_locking
2939 .expect("Should have a locking block");
2940 let LockingBlock::Regular(certificate) = *locking else {
2941 panic!("Should have a locking validated block");
2942 };
2943 debug!(
2944 round = %certificate.round,
2945 "Finalizing locking block"
2946 );
2947 let committee = self.local_committee().await?;
2948 match self
2949 .client
2950 .finalize_block(&committee, certificate.clone())
2951 .await
2952 {
2953 Ok(certificate) => {
2954 self.update_validators(Some(&committee)).await?;
2955 Ok(ClientOutcome::Committed(Some(certificate)))
2956 }
2957 Err(ChainClientError::CommunicationError(error)) => {
2958 let timestamp = info.manager.round_timeout.ok_or(error)?;
2961 Ok(ClientOutcome::WaitForTimeout(RoundTimeout {
2962 timestamp,
2963 current_round: info.manager.current_round,
2964 next_block_height: info.next_block_height,
2965 }))
2966 }
2967 Err(error) => Err(error),
2968 }
2969 }
2970
2971 fn round_for_new_proposal(
2973 info: &ChainInfo,
2974 identity: &AccountOwner,
2975 has_oracle_responses: bool,
2976 ) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
2977 let manager = &info.manager;
2978 let conflict = manager
2982 .requested_signed_proposal
2983 .as_ref()
2984 .into_iter()
2985 .chain(&manager.requested_proposed)
2986 .any(|proposal| proposal.content.round == manager.current_round)
2987 || (manager.current_round.is_fast() && has_oracle_responses);
2988 let round = if !conflict {
2989 manager.current_round
2990 } else if let Some(round) = manager
2991 .ownership
2992 .next_round(manager.current_round)
2993 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
2994 {
2995 round
2996 } else if let Some(timeout) = info.round_timeout() {
2997 return Ok(Either::Right(timeout));
2998 } else {
2999 return Err(ChainClientError::BlockProposalError(
3000 "Conflicting proposal in the current round",
3001 ));
3002 };
3003 if manager.can_propose(identity, round) {
3004 return Ok(Either::Left(round));
3005 }
3006 if let Some(timeout) = info.round_timeout() {
3007 return Ok(Either::Right(timeout));
3008 }
3009 Err(ChainClientError::BlockProposalError(
3010 "Not a leader in the current round",
3011 ))
3012 }
3013
3014 #[instrument(level = "trace")]
3016 pub fn clear_pending_proposal(&self) {
3017 self.state_mut().clear_pending_proposal();
3018 }
3019
3020 #[instrument(
3022 level = "trace",
3023 skip(certificate),
3024 fields(certificate_hash = ?certificate.hash()),
3025 )]
3026 pub async fn receive_certificate_and_update_validators(
3027 &self,
3028 certificate: ConfirmedBlockCertificate,
3029 ) -> Result<(), ChainClientError> {
3030 self.client
3031 .receive_certificate_and_update_validators(
3032 certificate,
3033 ReceiveCertificateMode::NeedsCheck,
3034 )
3035 .await
3036 }
3037
3038 #[instrument(level = "trace")]
3042 pub async fn rotate_key_pair(
3043 &self,
3044 public_key: AccountPublicKey,
3045 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3046 self.transfer_ownership(public_key.into()).await
3047 }
3048
3049 #[instrument(level = "trace")]
3051 pub async fn transfer_ownership(
3052 &self,
3053 new_owner: AccountOwner,
3054 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3055 self.execute_operation(SystemOperation::ChangeOwnership {
3056 super_owners: vec![new_owner],
3057 owners: Vec::new(),
3058 multi_leader_rounds: 2,
3059 open_multi_leader_rounds: false,
3060 timeout_config: TimeoutConfig::default(),
3061 })
3062 .await
3063 }
3064
3065 #[instrument(level = "trace")]
3067 pub async fn share_ownership(
3068 &self,
3069 new_owner: AccountOwner,
3070 new_weight: u64,
3071 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3072 loop {
3073 let ownership = self.prepare_chain().await?.manager.ownership;
3074 ensure!(
3075 ownership.is_active(),
3076 ChainError::InactiveChain(self.chain_id)
3077 );
3078 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
3079 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
3080 owners.push((new_owner, new_weight));
3081 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
3082 super_owners: Vec::new(),
3083 owners,
3084 multi_leader_rounds: ownership.multi_leader_rounds,
3085 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3086 timeout_config: ownership.timeout_config,
3087 })];
3088 match self.execute_block(operations, vec![]).await? {
3089 ExecuteBlockOutcome::Executed(certificate) => {
3090 return Ok(ClientOutcome::Committed(certificate));
3091 }
3092 ExecuteBlockOutcome::Conflict(certificate) => {
3093 info!(
3094 height = %certificate.block().header.height,
3095 "Another block was committed; retrying."
3096 );
3097 }
3098 ExecuteBlockOutcome::WaitForTimeout(timeout) => {
3099 return Ok(ClientOutcome::WaitForTimeout(timeout));
3100 }
3101 };
3102 }
3103 }
3104
3105 #[instrument(level = "trace")]
3108 pub async fn change_ownership(
3109 &self,
3110 ownership: ChainOwnership,
3111 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3112 self.execute_operation(SystemOperation::ChangeOwnership {
3113 super_owners: ownership.super_owners.into_iter().collect(),
3114 owners: ownership.owners.into_iter().collect(),
3115 multi_leader_rounds: ownership.multi_leader_rounds,
3116 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3117 timeout_config: ownership.timeout_config.clone(),
3118 })
3119 .await
3120 }
3121
3122 #[instrument(level = "trace", skip(application_permissions))]
3124 pub async fn change_application_permissions(
3125 &self,
3126 application_permissions: ApplicationPermissions,
3127 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3128 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
3129 application_permissions,
3130 ))
3131 .await
3132 }
3133
3134 #[instrument(level = "trace", skip(self))]
3136 pub async fn open_chain(
3137 &self,
3138 ownership: ChainOwnership,
3139 application_permissions: ApplicationPermissions,
3140 balance: Amount,
3141 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, ChainClientError>
3142 {
3143 loop {
3144 let config = OpenChainConfig {
3145 ownership: ownership.clone(),
3146 balance,
3147 application_permissions: application_permissions.clone(),
3148 };
3149 let operation = Operation::system(SystemOperation::OpenChain(config));
3150 let certificate = match self.execute_block(vec![operation], vec![]).await? {
3151 ExecuteBlockOutcome::Executed(certificate) => certificate,
3152 ExecuteBlockOutcome::Conflict(_) => continue,
3153 ExecuteBlockOutcome::WaitForTimeout(timeout) => {
3154 return Ok(ClientOutcome::WaitForTimeout(timeout));
3155 }
3156 };
3157 let chain_blob = certificate
3159 .block()
3160 .body
3161 .blobs
3162 .last()
3163 .and_then(|blobs| blobs.last())
3164 .ok_or_else(|| ChainClientError::InternalError("Failed to create a new chain"))?;
3165 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
3166 self.client.track_chain(description.id());
3168 self.client
3169 .local_node
3170 .retry_pending_cross_chain_requests(self.chain_id)
3171 .await?;
3172 return Ok(ClientOutcome::Committed((description, certificate)));
3173 }
3174 }
3175
3176 #[instrument(level = "trace")]
3179 pub async fn close_chain(
3180 &self,
3181 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3182 match self.execute_operation(SystemOperation::CloseChain).await {
3183 Ok(outcome) => Ok(outcome.map(Some)),
3184 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3185 WorkerError::ChainError(chain_error),
3186 ))) if matches!(*chain_error, ChainError::ClosedChain) => {
3187 Ok(ClientOutcome::Committed(None)) }
3189 Err(error) => Err(error),
3190 }
3191 }
3192
3193 #[cfg(not(target_arch = "wasm32"))]
3195 #[instrument(level = "trace", skip(contract, service))]
3196 pub async fn publish_module(
3197 &self,
3198 contract: Bytecode,
3199 service: Bytecode,
3200 vm_runtime: VmRuntime,
3201 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3202 let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
3203 self.publish_module_blobs(blobs, module_id).await
3204 }
3205
3206 #[cfg(not(target_arch = "wasm32"))]
3208 #[instrument(level = "trace", skip(blobs, module_id))]
3209 pub async fn publish_module_blobs(
3210 &self,
3211 blobs: Vec<Blob>,
3212 module_id: ModuleId,
3213 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3214 self.execute_operations(
3215 vec![Operation::system(SystemOperation::PublishModule {
3216 module_id,
3217 })],
3218 blobs,
3219 )
3220 .await?
3221 .try_map(|certificate| Ok((module_id, certificate)))
3222 }
3223
3224 #[instrument(level = "trace", skip(bytes))]
3226 pub async fn publish_data_blobs(
3227 &self,
3228 bytes: Vec<Vec<u8>>,
3229 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3230 let blobs = bytes.into_iter().map(Blob::new_data);
3231 let publish_blob_operations = blobs
3232 .clone()
3233 .map(|blob| {
3234 Operation::system(SystemOperation::PublishDataBlob {
3235 blob_hash: blob.id().hash,
3236 })
3237 })
3238 .collect();
3239 self.execute_operations(publish_blob_operations, blobs.collect())
3240 .await
3241 }
3242
3243 #[instrument(level = "trace", skip(bytes))]
3245 pub async fn publish_data_blob(
3246 &self,
3247 bytes: Vec<u8>,
3248 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3249 self.publish_data_blobs(vec![bytes]).await
3250 }
3251
3252 #[instrument(
3254 level = "trace",
3255 skip(self, parameters, instantiation_argument, required_application_ids)
3256 )]
3257 pub async fn create_application<
3258 A: Abi,
3259 Parameters: Serialize,
3260 InstantiationArgument: Serialize,
3261 >(
3262 &self,
3263 module_id: ModuleId<A, Parameters, InstantiationArgument>,
3264 parameters: &Parameters,
3265 instantiation_argument: &InstantiationArgument,
3266 required_application_ids: Vec<ApplicationId>,
3267 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, ChainClientError>
3268 {
3269 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
3270 let parameters = serde_json::to_vec(parameters)?;
3271 Ok(self
3272 .create_application_untyped(
3273 module_id.forget_abi(),
3274 parameters,
3275 instantiation_argument,
3276 required_application_ids,
3277 )
3278 .await?
3279 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
3280 }
3281
3282 #[instrument(
3284 level = "trace",
3285 skip(
3286 self,
3287 module_id,
3288 parameters,
3289 instantiation_argument,
3290 required_application_ids
3291 )
3292 )]
3293 pub async fn create_application_untyped(
3294 &self,
3295 module_id: ModuleId,
3296 parameters: Vec<u8>,
3297 instantiation_argument: Vec<u8>,
3298 required_application_ids: Vec<ApplicationId>,
3299 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, ChainClientError> {
3300 self.execute_operation(SystemOperation::CreateApplication {
3301 module_id,
3302 parameters,
3303 instantiation_argument,
3304 required_application_ids,
3305 })
3306 .await?
3307 .try_map(|certificate| {
3308 let mut creation: Vec<_> = certificate
3310 .block()
3311 .created_blob_ids()
3312 .into_iter()
3313 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
3314 .collect();
3315 if creation.len() > 1 {
3316 return Err(ChainClientError::InternalError(
3317 "Unexpected number of application descriptions published",
3318 ));
3319 }
3320 let blob_id = creation.pop().ok_or(ChainClientError::InternalError(
3321 "ApplicationDescription blob not found.",
3322 ))?;
3323 let id = ApplicationId::new(blob_id.hash);
3324 Ok((id, certificate))
3325 })
3326 }
3327
3328 #[instrument(level = "trace", skip(committee))]
3330 pub async fn stage_new_committee(
3331 &self,
3332 committee: Committee,
3333 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3334 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
3335 let blob_hash = blob.id().hash;
3336 match self
3337 .execute_operations(
3338 vec![Operation::system(SystemOperation::Admin(
3339 AdminOperation::PublishCommitteeBlob { blob_hash },
3340 ))],
3341 vec![blob],
3342 )
3343 .await?
3344 {
3345 ClientOutcome::Committed(_) => {}
3346 outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
3347 }
3348 let epoch = self.chain_info().await?.epoch.try_add_one()?;
3349 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
3350 epoch,
3351 blob_hash,
3352 }))
3353 .await
3354 }
3355
3356 #[instrument(level = "trace")]
3362 pub async fn process_inbox(
3363 &self,
3364 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3365 self.prepare_chain().await?;
3366 self.process_inbox_without_prepare().await
3367 }
3368
3369 #[instrument(level = "trace")]
3375 pub async fn process_inbox_without_prepare(
3376 &self,
3377 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3378 #[cfg(with_metrics)]
3379 let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
3380
3381 let mut epoch_change_ops = self.collect_epoch_changes().await?.into_iter();
3382
3383 let mut certificates = Vec::new();
3384 loop {
3385 let incoming_bundles = self.pending_message_bundles().await?;
3386 let stream_updates = self.collect_stream_updates().await?;
3387 let block_operations = stream_updates
3388 .into_iter()
3389 .chain(epoch_change_ops.next())
3390 .collect::<Vec<_>>();
3391 if incoming_bundles.is_empty() && block_operations.is_empty() {
3392 return Ok((certificates, None));
3393 }
3394 match self.execute_block(block_operations, vec![]).await {
3395 Ok(ExecuteBlockOutcome::Executed(certificate))
3396 | Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
3397 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
3398 return Ok((certificates, Some(timeout)));
3399 }
3400 Err(error) => return Err(error),
3401 };
3402 }
3403 }
3404
3405 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, ChainClientError> {
3408 let (mut min_epoch, mut next_epoch) = {
3409 let (epoch, committees) = self.epoch_and_committees().await?;
3410 let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
3411 (min_epoch, epoch.try_add_one()?)
3412 };
3413 let mut epoch_change_ops = Vec::new();
3414 while self
3415 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
3416 .await?
3417 {
3418 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
3419 next_epoch,
3420 )));
3421 next_epoch.try_add_assign_one()?;
3422 }
3423 while self
3424 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
3425 .await?
3426 {
3427 epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
3428 min_epoch,
3429 )));
3430 min_epoch.try_add_assign_one()?;
3431 }
3432 Ok(epoch_change_ops)
3433 }
3434
3435 async fn has_admin_event(
3438 &self,
3439 stream_name: &[u8],
3440 index: u32,
3441 ) -> Result<bool, ChainClientError> {
3442 let event_id = EventId {
3443 chain_id: self.client.admin_id,
3444 stream_id: StreamId::system(stream_name),
3445 index,
3446 };
3447 Ok(self
3448 .client
3449 .storage_client()
3450 .read_event(event_id)
3451 .await?
3452 .is_some())
3453 }
3454
3455 pub async fn events_from_index(
3457 &self,
3458 stream_id: StreamId,
3459 start_index: u32,
3460 ) -> Result<Vec<IndexAndEvent>, ChainClientError> {
3461 Ok(self
3462 .client
3463 .storage_client()
3464 .read_events_from_index(&self.chain_id, &stream_id, start_index)
3465 .await?)
3466 }
3467
3468 #[instrument(level = "trace")]
3473 pub async fn revoke_epochs(
3474 &self,
3475 revoked_epoch: Epoch,
3476 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3477 self.prepare_chain().await?;
3478 let (current_epoch, committees) = self.epoch_and_committees().await?;
3479 ensure!(
3480 revoked_epoch < current_epoch,
3481 ChainClientError::CannotRevokeCurrentEpoch(current_epoch)
3482 );
3483 ensure!(
3484 committees.contains_key(&revoked_epoch),
3485 ChainClientError::EpochAlreadyRevoked
3486 );
3487 let operations = committees
3488 .keys()
3489 .filter_map(|epoch| {
3490 if *epoch <= revoked_epoch {
3491 Some(Operation::system(SystemOperation::Admin(
3492 AdminOperation::RemoveCommittee { epoch: *epoch },
3493 )))
3494 } else {
3495 None
3496 }
3497 })
3498 .collect();
3499 self.execute_operations(operations, vec![]).await
3500 }
3501
3502 #[instrument(level = "trace")]
3506 pub async fn transfer_to_account_unsafe_unconfirmed(
3507 &self,
3508 owner: AccountOwner,
3509 amount: Amount,
3510 recipient: Account,
3511 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3512 self.execute_operation(SystemOperation::Transfer {
3513 owner,
3514 recipient,
3515 amount,
3516 })
3517 .await
3518 }
3519
3520 #[instrument(level = "trace", skip(hash))]
3521 pub async fn read_confirmed_block(
3522 &self,
3523 hash: CryptoHash,
3524 ) -> Result<ConfirmedBlock, ChainClientError> {
3525 let block = self
3526 .client
3527 .storage_client()
3528 .read_confirmed_block(hash)
3529 .await?;
3530 block.ok_or(ChainClientError::MissingConfirmedBlock(hash))
3531 }
3532
3533 #[instrument(level = "trace")]
3535 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), ChainClientError> {
3536 self.client
3537 .local_node
3538 .retry_pending_cross_chain_requests(self.chain_id)
3539 .await?;
3540 Ok(())
3541 }
3542
3543 #[instrument(level = "trace", skip(local_node))]
3544 async fn local_chain_info(
3545 &self,
3546 chain_id: ChainId,
3547 local_node: &mut LocalNodeClient<Env::Storage>,
3548 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
3549 match local_node.chain_info(chain_id).await {
3550 Ok(info) => {
3551 self.client.update_from_info(&info);
3553 Ok(Some(info))
3554 }
3555 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
3556 Err(err) => Err(err.into()),
3557 }
3558 }
3559
3560 #[instrument(level = "trace", skip(chain_id, local_node))]
3561 async fn local_next_block_height(
3562 &self,
3563 chain_id: ChainId,
3564 local_node: &mut LocalNodeClient<Env::Storage>,
3565 ) -> Result<BlockHeight, ChainClientError> {
3566 Ok(self
3567 .local_chain_info(chain_id, local_node)
3568 .await?
3569 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
3570 }
3571
3572 #[instrument(level = "trace")]
3575 async fn local_next_height_to_receive(
3576 &self,
3577 origin: ChainId,
3578 ) -> Result<BlockHeight, ChainClientError> {
3579 let chain = self.chain_state_view().await?;
3580 Ok(match chain.inboxes.try_load_entry(&origin).await? {
3581 Some(inbox) => inbox.next_block_height_to_receive()?,
3582 None => BlockHeight::ZERO,
3583 })
3584 }
3585
3586 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
3587 async fn process_notification(
3588 &self,
3589 remote_node: RemoteNode<Env::ValidatorNode>,
3590 mut local_node: LocalNodeClient<Env::Storage>,
3591 notification: Notification,
3592 ) -> Result<(), ChainClientError> {
3593 match notification.reason {
3594 Reason::NewIncomingBundle { origin, height } => {
3595 if self.local_next_height_to_receive(origin).await? > height {
3596 debug!(
3597 chain_id = %self.chain_id,
3598 "Accepting redundant notification for new message"
3599 );
3600 return Ok(());
3601 }
3602 self.find_received_certificates_from_validator(remote_node)
3603 .await?;
3604 if self.local_next_height_to_receive(origin).await? <= height {
3605 warn!(
3606 chain_id = %self.chain_id,
3607 "NewIncomingBundle: Fail to synchronize new message after notification"
3608 );
3609 }
3610 }
3611 Reason::NewBlock { height, .. } => {
3612 let chain_id = notification.chain_id;
3613 if self
3614 .local_next_block_height(chain_id, &mut local_node)
3615 .await?
3616 > height
3617 {
3618 debug!(
3619 chain_id = %self.chain_id,
3620 "Accepting redundant notification for new block"
3621 );
3622 return Ok(());
3623 }
3624 self.client
3625 .synchronize_chain_state_from(&remote_node, chain_id)
3626 .await?;
3627 if self
3628 .local_next_block_height(chain_id, &mut local_node)
3629 .await?
3630 <= height
3631 {
3632 error!("NewBlock: Fail to synchronize new block after notification");
3633 }
3634 }
3635 Reason::NewRound { height, round } => {
3636 let chain_id = notification.chain_id;
3637 if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
3638 if (info.next_block_height, info.manager.current_round) >= (height, round) {
3639 debug!(
3640 chain_id = %self.chain_id,
3641 "Accepting redundant notification for new round"
3642 );
3643 return Ok(());
3644 }
3645 }
3646 self.client
3647 .synchronize_chain_state_from(&remote_node, chain_id)
3648 .await?;
3649 let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
3650 error!(
3651 chain_id = %self.chain_id,
3652 "NewRound: Fail to read local chain info for {chain_id}"
3653 );
3654 return Ok(());
3655 };
3656 if (info.next_block_height, info.manager.current_round) < (height, round) {
3657 error!(
3658 chain_id = %self.chain_id,
3659 "NewRound: Fail to synchronize new block after notification"
3660 );
3661 }
3662 }
3663 }
3664 Ok(())
3665 }
3666
3667 pub fn is_tracked(&self) -> bool {
3669 self.client
3670 .tracked_chains
3671 .read()
3672 .unwrap()
3673 .contains(&self.chain_id)
3674 }
3675
3676 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
3679 pub async fn listen(
3680 &self,
3681 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), ChainClientError> {
3682 use future::FutureExt as _;
3683
3684 async fn await_while_polling<F: FusedFuture>(
3685 future: F,
3686 background_work: impl FusedStream<Item = ()>,
3687 ) -> F::Output {
3688 tokio::pin!(future);
3689 tokio::pin!(background_work);
3690 loop {
3691 futures::select! {
3692 _ = background_work.next() => (),
3693 result = future => return result,
3694 }
3695 }
3696 }
3697
3698 let mut senders = HashMap::new(); let notifications = self.subscribe()?;
3700 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
3701
3702 let mut process_notifications = FuturesUnordered::new();
3709
3710 match self.update_notification_streams(&mut senders).await {
3711 Ok(handler) => process_notifications.push(handler),
3712 Err(error) => error!("Failed to update committee: {error}"),
3713 };
3714
3715 let this = self.clone();
3716 let update_streams = async move {
3717 let mut abortable_notifications = abortable_notifications.fuse();
3718
3719 while let Some(notification) =
3720 await_while_polling(abortable_notifications.next(), &mut process_notifications)
3721 .await
3722 {
3723 if let Reason::NewBlock { .. } = notification.reason {
3724 match Box::pin(await_while_polling(
3725 this.update_notification_streams(&mut senders).fuse(),
3726 &mut process_notifications,
3727 ))
3728 .await
3729 {
3730 Ok(handler) => process_notifications.push(handler),
3731 Err(error) => error!("Failed to update committee: {error}"),
3732 }
3733 }
3734 }
3735
3736 for abort in senders.into_values() {
3737 abort.abort();
3738 }
3739
3740 let () = process_notifications.collect().await;
3741 }
3742 .in_current_span();
3743
3744 Ok((update_streams, AbortOnDrop(abort), notifications))
3745 }
3746
3747 #[instrument(level = "trace", skip(senders))]
3748 async fn update_notification_streams(
3749 &self,
3750 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3751 ) -> Result<impl Future<Output = ()>, ChainClientError> {
3752 let (nodes, local_node) = {
3753 let committee = self.local_committee().await?;
3754 let nodes: HashMap<_, _> = self
3755 .client
3756 .validator_node_provider()
3757 .make_nodes(&committee)?
3758 .collect();
3759 (nodes, self.client.local_node.clone())
3760 };
3761 senders.retain(|validator, abort| {
3763 if !nodes.contains_key(validator) {
3764 abort.abort();
3765 }
3766 !abort.is_aborted()
3767 });
3768 let validator_tasks = FuturesUnordered::new();
3770 for (public_key, node) in nodes {
3771 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3772 continue;
3773 };
3774 let this = self.clone();
3775 let stream = stream::once({
3776 let node = node.clone();
3777 async move {
3778 let stream = node.subscribe(vec![this.chain_id]).await?;
3779 let remote_node = RemoteNode { public_key, node };
3782 this.client
3783 .synchronize_chain_state_from(&remote_node, this.chain_id)
3784 .await?;
3785 Ok::<_, ChainClientError>(stream)
3786 }
3787 })
3788 .filter_map(move |result| async move {
3789 if let Err(error) = &result {
3790 warn!(?error, "Could not connect to validator {public_key}");
3791 } else {
3792 info!("Connected to validator {public_key}");
3793 }
3794 result.ok()
3795 })
3796 .flatten();
3797 let (stream, abort) = stream::abortable(stream);
3798 let mut stream = Box::pin(stream);
3799 let this = self.clone();
3800 let local_node = local_node.clone();
3801 let remote_node = RemoteNode { public_key, node };
3802 validator_tasks.push(async move {
3803 while let Some(notification) = stream.next().await {
3804 if let Err(err) = this
3805 .process_notification(
3806 remote_node.clone(),
3807 local_node.clone(),
3808 notification.clone(),
3809 )
3810 .await
3811 {
3812 tracing::warn!(
3813 chain_id = %this.chain_id,
3814 validator_public_key = ?remote_node.public_key,
3815 ?notification,
3816 "Failed to process notification: {err}",
3817 );
3818 }
3819 }
3820 });
3821 entry.insert(abort);
3822 }
3823 Ok(validator_tasks.collect())
3824 }
3825
3826 #[instrument(level = "trace")]
3831 async fn find_received_certificates_from_validator(
3832 &self,
3833 remote_node: RemoteNode<Env::ValidatorNode>,
3834 ) -> Result<(), ChainClientError> {
3835 let chain_id = self.chain_id;
3836 let received_certificates = self
3838 .client
3839 .synchronize_received_certificates_from_validator(chain_id, &remote_node)
3840 .await?;
3841 self.receive_certificates_from_validators(vec![received_certificates])
3844 .await;
3845 Ok(())
3846 }
3847
3848 #[instrument(level = "trace", skip(remote_node))]
3850 pub async fn sync_validator(
3851 &self,
3852 remote_node: Env::ValidatorNode,
3853 ) -> Result<(), ChainClientError> {
3854 let validator_next_block_height = match remote_node
3855 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3856 .await
3857 {
3858 Ok(info) => info.info.next_block_height.0,
3859 Err(NodeError::BlobsNotFound(_)) => 0,
3860 Err(err) => return Err(err.into()),
3861 };
3862 let local_chain_state = self.chain_info().await?;
3863
3864 let Some(missing_certificate_count) = local_chain_state
3865 .next_block_height
3866 .0
3867 .checked_sub(validator_next_block_height)
3868 .filter(|count| *count > 0)
3869 else {
3870 debug!("Validator is up-to-date with local state");
3871 return Ok(());
3872 };
3873
3874 let missing_certificates_end = usize::try_from(local_chain_state.next_block_height.0)
3875 .expect("`usize` should be at least `u64`");
3876 let missing_certificates_start = missing_certificates_end
3877 - usize::try_from(missing_certificate_count).expect("`usize` should be at least `u64`");
3878
3879 let missing_certificate_hashes = self
3880 .chain_state_view()
3881 .await?
3882 .confirmed_log
3883 .read(missing_certificates_start..missing_certificates_end)
3884 .await?;
3885
3886 let certificates = self
3887 .client
3888 .storage_client()
3889 .read_certificates(missing_certificate_hashes.clone())
3890 .await?;
3891 let certificates =
3892 match ResultReadCertificates::new(certificates, missing_certificate_hashes) {
3893 ResultReadCertificates::Certificates(certificates) => certificates,
3894 ResultReadCertificates::InvalidHashes(hashes) => {
3895 return Err(ChainClientError::ReadCertificatesError(hashes))
3896 }
3897 };
3898 for certificate in certificates {
3899 match remote_node
3900 .handle_confirmed_certificate(
3901 certificate.clone(),
3902 CrossChainMessageDelivery::NonBlocking,
3903 )
3904 .await
3905 {
3906 Ok(_) => (),
3907 Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
3908 let missing_blobs: Vec<_> = self
3910 .client
3911 .storage_client()
3912 .read_blobs(&missing_blob_ids)
3913 .await?
3914 .into_iter()
3915 .flatten()
3916 .collect();
3917 remote_node.upload_blobs(missing_blobs).await?;
3918 remote_node
3919 .handle_confirmed_certificate(
3920 certificate,
3921 CrossChainMessageDelivery::NonBlocking,
3922 )
3923 .await?;
3924 }
3925 Err(err) => return Err(err.into()),
3926 }
3927 }
3928
3929 Ok(())
3930 }
3931
3932 fn has_other_owners(&self, ownership: &ChainOwnership) -> bool {
3934 ownership
3935 .all_owners()
3936 .any(|owner| Some(owner) != self.preferred_owner.as_ref())
3937 }
3938}
3939
3940#[cfg(with_testing)]
3941impl<Env: Environment> ChainClient<Env> {
3942 pub async fn process_notification_from(
3943 &self,
3944 notification: Notification,
3945 validator: (ValidatorPublicKey, &str),
3946 ) {
3947 let mut node_list = self
3948 .client
3949 .validator_node_provider()
3950 .make_nodes_from_list(vec![validator])
3951 .unwrap();
3952 let (public_key, node) = node_list.next().unwrap();
3953 let remote_node = RemoteNode { node, public_key };
3954 let local_node = self.client.local_node.clone();
3955 self.process_notification(remote_node, local_node, notification)
3956 .await
3957 .unwrap();
3958 }
3959}
3960
3961#[derive(Debug)]
3963enum ExecuteBlockOutcome {
3964 Executed(ConfirmedBlockCertificate),
3966 Conflict(ConfirmedBlockCertificate),
3969 WaitForTimeout(RoundTimeout),
3972}
3973
3974#[must_use]
3976pub struct AbortOnDrop(pub AbortHandle);
3977
3978impl Drop for AbortOnDrop {
3979 #[instrument(level = "trace", skip(self))]
3980 fn drop(&mut self) {
3981 self.0.abort();
3982 }
3983}
3984
3985struct ReceivedCertificatesFromValidator {
3987 public_key: ValidatorPublicKey,
3989 tracker: u64,
3991 certificates: Vec<ConfirmedBlockCertificate>,
3994 other_sender_chains: Vec<ChainId>,
3997}
3998
3999#[derive(Clone, Serialize, Deserialize)]
4001pub struct PendingProposal {
4002 pub block: ProposedBlock,
4003 pub blobs: Vec<Blob>,
4004}
4005
4006enum ReceiveCertificateMode {
4007 NeedsCheck,
4008 AlreadyChecked,
4009}
4010
4011enum CheckCertificateResult {
4012 OldEpoch,
4013 New,
4014 FutureEpoch,
4015}
4016
4017impl CheckCertificateResult {
4018 fn into_result(self) -> Result<(), ChainClientError> {
4019 match self {
4020 Self::OldEpoch => Err(ChainClientError::CommitteeDeprecationError),
4021 Self::New => Ok(()),
4022 Self::FutureEpoch => Err(ChainClientError::CommitteeSynchronizationError),
4023 }
4024 }
4025}
4026
4027#[cfg(not(target_arch = "wasm32"))]
4029pub async fn create_bytecode_blobs(
4030 contract: Bytecode,
4031 service: Bytecode,
4032 vm_runtime: VmRuntime,
4033) -> (Vec<Blob>, ModuleId) {
4034 match vm_runtime {
4035 VmRuntime::Wasm => {
4036 let (compressed_contract, compressed_service) =
4037 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
4038 .await
4039 .expect("Compression should not panic");
4040 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
4041 let service_blob = Blob::new_service_bytecode(compressed_service);
4042 let module_id =
4043 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
4044 (vec![contract_blob, service_blob], module_id)
4045 }
4046 VmRuntime::Evm => {
4047 let compressed_contract = contract.compress();
4048 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
4049 let module_id = ModuleId::new(
4050 evm_contract_blob.id().hash,
4051 evm_contract_blob.id().hash,
4052 vm_runtime,
4053 );
4054 (vec![evm_contract_blob], module_id)
4055 }
4056 }
4057}