1mod state;
5use std::{
6 collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7 convert::Infallible,
8 iter,
9 sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14 future::{self, Either, FusedFuture, Future},
15 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20 abi::Abi,
21 crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
22 data_types::{
23 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
24 ChainDescription, Epoch, MessagePolicy, Round, Timestamp,
25 },
26 ensure,
27 identifiers::{
28 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
29 ModuleId, StreamId,
30 },
31 ownership::{ChainOwnership, TimeoutConfig},
32 time::{Duration, Instant},
33};
34#[cfg(not(target_arch = "wasm32"))]
35use linera_base::{data_types::Bytecode, vm::VmRuntime};
36use linera_chain::{
37 data_types::{
38 BlockProposal, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight, IncomingBundle,
39 ProposedBlock, Transaction,
40 },
41 manager::LockingBlock,
42 types::{
43 Block, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, Timeout,
44 TimeoutCertificate, ValidatedBlock,
45 },
46 ChainError, ChainExecutionContext,
47};
48use linera_execution::{
49 committee::Committee,
50 system::{
51 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
52 REMOVED_EPOCH_STREAM_NAME,
53 },
54 ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
55};
56use linera_storage::{Clock as _, Storage as _};
57use linera_views::ViewError;
58use serde::Serialize;
59pub(crate) use state::State;
60use thiserror::Error;
61use tokio::sync::mpsc;
62use tokio_stream::wrappers::UnboundedReceiverStream;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65#[cfg(not(target_arch = "wasm32"))]
66use super::create_bytecode_blobs;
67use super::{
68 received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
69 ListeningMode, PendingProposal, TimingType,
70};
71use crate::{
72 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
73 environment::Environment,
74 local_node::{LocalNodeClient, LocalNodeError},
75 node::{
76 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
77 ValidatorNodeProvider as _,
78 },
79 remote_node::RemoteNode,
80 updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
81 worker::{Notification, Reason, WorkerError},
82};
83
84#[derive(Debug, Clone)]
85pub struct Options {
86 pub max_pending_message_bundles: usize,
88 pub max_block_limit_errors: u32,
93 pub max_new_events_per_block: usize,
95 pub staging_bundles_time_budget: Option<Duration>,
98 pub message_policy: MessagePolicy,
100 pub cross_chain_message_delivery: CrossChainMessageDelivery,
102 pub quorum_grace_period: f64,
105 pub blob_download_timeout: Duration,
107 pub certificate_batch_download_timeout: Duration,
109 pub certificate_download_batch_size: u64,
112 pub certificate_upload_batch_size: u64,
115 pub sender_certificate_download_batch_size: usize,
118 pub max_joined_tasks: usize,
120 pub allow_fast_blocks: bool,
123 pub notification_circuit_breaker_initial_probe_interval: Duration,
127 pub notification_circuit_breaker_max_probe_interval: Duration,
130 pub max_event_stream_queries: usize,
133}
134
135struct CircuitBreakerState {
136 next_probe_at: Instant,
137 probe_interval: Duration,
138}
139
140#[cfg(with_testing)]
141impl Options {
142 pub fn test_default() -> Self {
143 use super::{
144 DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
145 DEFAULT_MAX_EVENT_STREAM_QUERIES, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
146 };
147 use crate::DEFAULT_QUORUM_GRACE_PERIOD;
148
149 Options {
150 max_pending_message_bundles: 10,
151 max_block_limit_errors: 3,
152 max_new_events_per_block: 10,
153 staging_bundles_time_budget: None,
154 message_policy: MessagePolicy::new_accept_all(),
155 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
156 quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
157 blob_download_timeout: Duration::from_secs(1),
158 certificate_batch_download_timeout: Duration::from_secs(1),
159 certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
160 certificate_upload_batch_size: DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
161 sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
162 max_joined_tasks: 100,
163 allow_fast_blocks: false,
164 notification_circuit_breaker_initial_probe_interval: Duration::from_secs(300),
165 notification_circuit_breaker_max_probe_interval: Duration::from_secs(3600),
166 max_event_stream_queries: DEFAULT_MAX_EVENT_STREAM_QUERIES,
167 }
168 }
169}
170
171impl Options {
172 pub fn bundle_execution_policy(&self) -> BundleExecutionPolicy {
174 BundleExecutionPolicy {
175 on_failure: BundleFailurePolicy::AutoRetry {
176 max_failures: self.max_block_limit_errors,
177 },
178 time_budget: self.staging_bundles_time_budget,
179 }
180 }
181}
182
183#[derive(Debug)]
189pub struct ChainClient<Env: Environment> {
190 #[debug(skip)]
192 pub(crate) client: Arc<Client<Env>>,
193 chain_id: ChainId,
195 #[debug(skip)]
197 options: Options,
198 preferred_owner: Option<AccountOwner>,
201 initial_next_block_height: BlockHeight,
203 initial_block_hash: Option<CryptoHash>,
205 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
207}
208
209impl<Env: Environment> Clone for ChainClient<Env> {
210 fn clone(&self) -> Self {
211 Self {
212 client: self.client.clone(),
213 chain_id: self.chain_id,
214 options: self.options.clone(),
215 preferred_owner: self.preferred_owner,
216 initial_next_block_height: self.initial_next_block_height,
217 initial_block_hash: self.initial_block_hash,
218 timing_sender: self.timing_sender.clone(),
219 }
220 }
221}
222
223#[derive(Debug, Error)]
225pub enum Error {
226 #[error("Local node operation failed: {0}")]
227 LocalNodeError(#[from] LocalNodeError),
228
229 #[error("Remote node operation failed: {0}")]
230 RemoteNodeError(#[from] NodeError),
231
232 #[error(transparent)]
233 ArithmeticError(#[from] ArithmeticError),
234
235 #[error("Missing certificates: {0:?}")]
236 ReadCertificatesError(Vec<CryptoHash>),
237
238 #[error("Missing confirmed block: {0:?}")]
239 MissingConfirmedBlock(CryptoHash),
240
241 #[error("JSON (de)serialization error: {0}")]
242 JsonError(#[from] serde_json::Error),
243
244 #[error("Chain operation failed: {0}")]
245 ChainError(#[from] ChainError),
246
247 #[error(transparent)]
248 CommunicationError(#[from] CommunicationError<NodeError>),
249
250 #[error("Internal error within chain client: {0}")]
251 InternalError(&'static str),
252
253 #[error(
254 "Cannot accept a certificate from an unknown committee in the future. \
255 Please synchronize the local view of the admin chain"
256 )]
257 CommitteeSynchronizationError,
258
259 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
260 WalletSynchronizationError,
261
262 #[error("The state of the client is incompatible with the proposed block: {0}")]
263 BlockProposalError(&'static str),
264
265 #[error(
266 "Cannot accept a certificate from a committee that was retired. \
267 Try a newer certificate from the same origin"
268 )]
269 CommitteeDeprecationError,
270
271 #[error("Protocol error within chain client: {0}")]
272 ProtocolError(&'static str),
273
274 #[error("Signer doesn't have key to sign for chain {0}")]
275 CannotFindKeyForChain(ChainId),
276
277 #[error("client is not configured to propose on chain {0}")]
278 NoAccountKeyConfigured(ChainId),
279
280 #[error("The chain client isn't owner on chain {0}")]
281 NotAnOwner(ChainId),
282
283 #[error(transparent)]
284 ViewError(#[from] ViewError),
285
286 #[error(
287 "Failed to download certificates and update local node to the next height \
288 {target_next_block_height} of chain {chain_id}"
289 )]
290 CannotDownloadCertificates {
291 chain_id: ChainId,
292 target_next_block_height: BlockHeight,
293 },
294
295 #[error(transparent)]
296 BcsError(#[from] bcs::Error),
297
298 #[error(
299 "Unexpected quorum: validators voted for block hash {hash} in {round}, \
300 expected block hash {expected_hash} in {expected_round}"
301 )]
302 UnexpectedQuorum {
303 hash: CryptoHash,
304 round: Round,
305 expected_hash: CryptoHash,
306 expected_round: Round,
307 },
308
309 #[error("signer error: {0:?}")]
310 Signer(#[source] Box<dyn signer::Error>),
311
312 #[error("Cannot revoke the current epoch {0}")]
313 CannotRevokeCurrentEpoch(Epoch),
314
315 #[error("Epoch is already revoked")]
316 EpochAlreadyRevoked,
317
318 #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
319 CannotDownloadMissingSenderBlock {
320 chain_id: ChainId,
321 height: BlockHeight,
322 },
323
324 #[error(
325 "A different block was already committed at this height. \
326 The committed certificate hash is {0}"
327 )]
328 Conflict(CryptoHash),
329}
330
331impl From<Infallible> for Error {
332 fn from(infallible: Infallible) -> Self {
333 match infallible {}
334 }
335}
336
337impl Error {
338 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
339 Self::Signer(Box::new(err))
340 }
341}
342
343impl<Env: Environment> ChainClient<Env> {
344 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
345 pub(crate) fn new(
346 client: Arc<Client<Env>>,
347 chain_id: ChainId,
348 options: Options,
349 initial_block_hash: Option<CryptoHash>,
350 initial_next_block_height: BlockHeight,
351 preferred_owner: Option<AccountOwner>,
352 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
353 ) -> Self {
354 ChainClient {
355 client,
356 chain_id,
357 options,
358 preferred_owner,
359 initial_block_hash,
360 initial_next_block_height,
361 timing_sender,
362 }
363 }
364
365 #[instrument(level = "trace", skip(self))]
367 pub fn is_follow_only(&self) -> bool {
368 self.client
369 .chain_mode(self.chain_id)
370 .is_none_or(|mode| mode.is_follow_only())
371 }
372
373 #[instrument(level = "trace", skip(self))]
377 fn proposal_mutex(&self) -> Arc<tokio::sync::Mutex<Option<PendingProposal>>> {
378 self.client
379 .chains
380 .pin()
381 .get(&self.chain_id)
382 .expect("Chain client constructed for invalid chain")
383 .proposal_mutex()
384 }
385
386 #[instrument(level = "trace", skip(self))]
388 pub async fn pending_proposal(&self) -> Option<PendingProposal> {
389 self.proposal_mutex().lock().await.clone()
390 }
391
392 #[instrument(level = "trace", skip(self))]
394 pub fn signer(&self) -> &impl Signer {
395 self.client.signer()
396 }
397
398 pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, Error> {
400 self.signer()
401 .contains_key(owner)
402 .await
403 .map_err(Error::signer_failure)
404 }
405
406 #[instrument(level = "trace", skip(self))]
408 pub fn options_mut(&mut self) -> &mut Options {
409 &mut self.options
410 }
411
412 #[instrument(level = "trace", skip(self))]
414 pub fn options(&self) -> &Options {
415 &self.options
416 }
417
418 #[instrument(level = "trace", skip(self))]
420 pub fn chain_id(&self) -> ChainId {
421 self.chain_id
422 }
423
424 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
426 self.timing_sender.clone()
427 }
428
429 #[instrument(level = "trace", skip(self))]
431 pub fn admin_chain_id(&self) -> ChainId {
432 self.client.admin_chain_id
433 }
434
435 #[instrument(level = "trace", skip(self))]
437 pub fn preferred_owner(&self) -> Option<AccountOwner> {
438 self.preferred_owner
439 }
440
441 #[instrument(level = "trace", skip(self))]
443 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
444 self.preferred_owner = Some(preferred_owner);
445 }
446
447 #[instrument(level = "trace", skip(self))]
449 pub fn unset_preferred_owner(&mut self) {
450 self.preferred_owner = None;
451 }
452
453 #[instrument(level = "trace")]
455 pub async fn chain_state_view(
456 &self,
457 ) -> Result<crate::worker::ChainStateViewReadGuard<Env::Storage>, LocalNodeError> {
458 self.client.local_node.chain_state_view(self.chain_id).await
459 }
460
461 #[instrument(level = "trace", skip(self))]
464 pub async fn event_stream_publishers(
465 &self,
466 ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
467 let subscriptions = self
468 .client
469 .local_node
470 .get_event_subscriptions(self.chain_id)
471 .await?;
472 let mut publishers = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
473 for ((chain_id, stream_name), _) in subscriptions {
474 publishers.entry(chain_id).or_default().insert(stream_name);
475 }
476 if self.chain_id != self.client.admin_chain_id {
477 publishers.entry(self.client.admin_chain_id).or_default();
479 }
480 Ok(publishers)
481 }
482
483 #[instrument(level = "trace")]
485 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
486 self.subscribe_to(self.chain_id)
487 }
488
489 #[instrument(level = "trace")]
491 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
492 Ok(Box::pin(UnboundedReceiverStream::new(
493 self.client.notifier.subscribe(vec![chain_id]),
494 )))
495 }
496
497 #[instrument(level = "trace")]
499 pub fn storage_client(&self) -> &Env::Storage {
500 self.client.storage_client()
501 }
502
503 #[instrument(level = "trace")]
505 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
506 let query = ChainInfoQuery::new(self.chain_id);
507 let response = self
508 .client
509 .local_node
510 .handle_chain_info_query(query)
511 .await?;
512 Ok(response.info)
513 }
514
515 #[instrument(level = "trace")]
517 pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
518 let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
519 let response = self
520 .client
521 .local_node
522 .handle_chain_info_query(query)
523 .await?;
524 Ok(response.info)
525 }
526
527 pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
529 self.client.get_chain_description(self.chain_id).await
530 }
531
532 #[instrument(level = "trace")]
535 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
536 if self.options.message_policy.is_ignore() {
537 return Ok(Vec::new());
539 }
540
541 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
542 let info = self
543 .client
544 .local_node
545 .handle_chain_info_query(query)
546 .await?
547 .info;
548 if self.preferred_owner.is_some_and(|owner| {
549 info.manager
550 .ownership
551 .is_super_owner_no_regular_owners(&owner)
552 }) {
553 ensure!(
555 info.next_block_height >= self.initial_next_block_height,
556 Error::WalletSynchronizationError
557 );
558 }
559
560 Ok(info
561 .requested_pending_message_bundles
562 .into_iter()
563 .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
564 .take(self.options.max_pending_message_bundles)
565 .collect())
566 }
567
568 #[instrument(level = "trace")]
572 async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
573 let subscription_map = self
575 .client
576 .local_node
577 .get_event_subscriptions(self.chain_id)
578 .await?;
579 let futures = subscription_map
581 .into_iter()
582 .filter(|((chain_id, _), _)| {
583 self.options
584 .message_policy
585 .restrict_chain_ids_to
586 .as_ref()
587 .is_none_or(|chain_set| chain_set.contains(chain_id))
588 })
589 .filter(|((_, stream_id), _)| {
590 self.options
591 .message_policy
592 .process_events_from_application_ids
593 .as_ref()
594 .is_none_or(|app_set| app_set.contains(&stream_id.application_id))
595 })
596 .map(|((chain_id, stream_id), subscriptions)| {
597 let client = self.client.clone();
598 let previous_index = subscriptions.next_index;
599 async move {
600 let next_index = client
601 .local_node
602 .get_stream_event_count(chain_id, stream_id.clone())
603 .await?;
604 if let Some(next_index) =
605 next_index.filter(|next_index| *next_index > previous_index)
606 {
607 Ok(Some((chain_id, stream_id, previous_index, next_index)))
608 } else {
609 Ok::<_, Error>(None)
610 }
611 }
612 });
613 let all_updates = futures::stream::iter(futures)
614 .buffer_unordered(self.options.max_joined_tasks)
615 .try_collect::<Vec<_>>()
616 .await?
617 .into_iter()
618 .flatten()
619 .collect::<Vec<_>>();
620 let max_events = self.options.max_new_events_per_block;
622 let mut total_events: usize = 0;
623 let mut updates = Vec::new();
624 for (chain_id, stream_id, previous_index, next_index) in all_updates {
625 let new_events = (next_index - previous_index) as usize;
626 if total_events + new_events <= max_events {
627 total_events += new_events;
628 updates.push((chain_id, stream_id, next_index));
629 } else {
630 let remaining = max_events.saturating_sub(total_events);
631 if remaining > 0 {
632 updates.push((chain_id, stream_id, previous_index + remaining as u32));
633 }
634 break;
635 }
636 }
637 if updates.is_empty() {
638 return Ok(None);
639 }
640 Ok(Some(SystemOperation::UpdateStreams(updates).into()))
641 }
642
643 #[instrument(level = "trace")]
644 async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
645 self.client.chain_info_with_committees(self.chain_id).await
646 }
647
648 #[instrument(level = "trace")]
650 async fn epoch_and_committees(
651 &self,
652 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
653 let info = self.chain_info_with_committees().await?;
654 let committees = info
655 .requested_committees
656 .ok_or(LocalNodeError::InvalidChainInfoResponse)?;
657 Ok((info.epoch, committees))
658 }
659
660 #[instrument(level = "trace")]
662 pub async fn local_committee(&self) -> Result<Arc<Committee>, Error> {
663 let info = match self.chain_info().await {
664 Ok(info) => info,
665 Err(LocalNodeError::BlobsNotFound(_)) => {
666 self.synchronize_chain_state(self.chain_id).await?;
667 self.chain_info().await?
668 }
669 Err(LocalNodeError::EventsNotFound(event_ids))
670 if event_ids
671 .iter()
672 .all(|event_id| event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)) =>
673 {
674 self.synchronize_chain_state(self.client.admin_chain_id)
677 .await?;
678 self.chain_info().await?
679 }
680 Err(err) => return Err(err.into()),
681 };
682 let committee = self
683 .client
684 .storage_client()
685 .get_or_load_committee(info.epoch)
686 .await?
687 .ok_or_else(|| LocalNodeError::InactiveChain(self.chain_id))?;
688 Ok(committee)
689 }
690
691 #[instrument(level = "trace")]
693 pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
694 self.client.admin_committee().await
695 }
696
697 #[instrument(level = "trace")]
701 pub async fn identity(&self) -> Result<AccountOwner, Error> {
702 let Some(preferred_owner) = self.preferred_owner else {
703 return Err(Error::NoAccountKeyConfigured(self.chain_id));
704 };
705 let manager = self.chain_info().await?.manager;
706 ensure!(
707 manager.ownership.is_active(),
708 LocalNodeError::InactiveChain(self.chain_id)
709 );
710
711 let is_owner = manager
714 .ownership
715 .can_propose_in_multi_leader_round(&preferred_owner);
716
717 if !is_owner {
718 let accepted_owners = manager
719 .ownership
720 .all_owners()
721 .chain(&manager.leader)
722 .collect::<Vec<_>>();
723 warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
724 "The preferred owner is not configured as an owner of this chain",
725 );
726 return Err(Error::NotAnOwner(self.chain_id));
727 }
728
729 let has_signer = self
730 .signer()
731 .contains_key(&preferred_owner)
732 .await
733 .map_err(Error::signer_failure)?;
734
735 if !has_signer {
736 warn!(%self.chain_id, ?preferred_owner,
737 "Chain is one of the owners but its Signer instance doesn't contain the key",
738 );
739 return Err(Error::CannotFindKeyForChain(self.chain_id));
740 }
741
742 Ok(preferred_owner)
743 }
744
745 #[instrument(level = "trace")]
753 pub async fn prepare_for_owner(&self, owner: AccountOwner) -> Result<Box<ChainInfo>, Error> {
754 ensure!(
755 self.client.has_key_for(&owner).await?,
756 Error::CannotFindKeyForChain(self.chain_id)
757 );
758 self.client
760 .get_chain_description_blob(self.chain_id)
761 .await?;
762
763 let info = self.chain_info().await?;
765
766 ensure!(
768 info.manager
769 .ownership
770 .can_propose_in_multi_leader_round(&owner),
771 Error::NotAnOwner(self.chain_id)
772 );
773
774 Ok(info)
775 }
776
777 #[instrument(level = "trace")]
780 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
781 #[cfg(with_metrics)]
782 let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
783
784 let mut info = self.synchronize_to_known_height().await?;
785
786 if self.preferred_owner.is_none_or(|owner| {
787 !info
788 .manager
789 .ownership
790 .is_super_owner_no_regular_owners(&owner)
791 }) {
792 info = self.client.synchronize_chain_state(self.chain_id).await?;
796 }
797
798 if info.epoch > self.client.admin_committees().await?.0 {
799 self.client
800 .synchronize_chain_state(self.client.admin_chain_id)
801 .await?;
802 }
803
804 Ok(info)
805 }
806
807 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
812 let info = self
813 .client
814 .download_certificates(self.chain_id, self.initial_next_block_height)
815 .await?;
816 if info.next_block_height == self.initial_next_block_height {
817 ensure!(
819 self.initial_block_hash == info.block_hash,
820 Error::InternalError("Invalid chain of blocks in local node")
821 );
822 }
823 Ok(info)
824 }
825
826 #[instrument(level = "trace", skip(old_committee, latest_certificate))]
828 pub async fn update_validators(
829 &self,
830 old_committee: Option<&Committee>,
831 latest_certificate: Option<ConfirmedBlockCertificate>,
832 ) -> Result<(), Error> {
833 let update_validators_start = linera_base::time::Instant::now();
834 if let Some(old_committee) = old_committee {
836 let old_committee_start = linera_base::time::Instant::now();
837 self.communicate_chain_updates(old_committee, latest_certificate.clone())
838 .await?;
839 tracing::debug!(
840 old_committee_ms = old_committee_start.elapsed().as_millis(),
841 "communicated chain updates to old committee"
842 );
843 };
844 if let Ok(new_committee) = self.local_committee().await {
845 if Some(&*new_committee) != old_committee {
846 let new_committee_start = linera_base::time::Instant::now();
849 self.communicate_chain_updates(&new_committee, latest_certificate)
850 .await?;
851 tracing::debug!(
852 new_committee_ms = new_committee_start.elapsed().as_millis(),
853 "communicated chain updates to new committee"
854 );
855 }
856 }
857 self.send_timing(update_validators_start, TimingType::UpdateValidators);
858 Ok(())
859 }
860
861 #[instrument(level = "trace", skip(committee, latest_certificate))]
863 pub async fn communicate_chain_updates(
864 &self,
865 committee: &Committee,
866 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
867 ) -> Result<(), Error> {
868 let delivery = self.options.cross_chain_message_delivery;
869 let height = self.chain_info().await?.next_block_height;
870 self.client
871 .communicate_chain_updates(
872 committee,
873 self.chain_id,
874 height,
875 delivery,
876 latest_certificate,
877 )
878 .await
879 }
880
881 async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
885 let subscriptions = self
886 .client
887 .local_node
888 .get_event_subscriptions(self.chain_id)
889 .await?;
890 let mut streams_by_chain = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
892 for ((chain_id, stream_id), _) in &subscriptions {
893 if *chain_id != self.chain_id {
894 streams_by_chain
895 .entry(*chain_id)
896 .or_default()
897 .insert(stream_id.clone());
898 }
899 }
900 let admin_chain_id = self.client.admin_chain_id;
902 if admin_chain_id != self.chain_id {
903 self.client.synchronize_chain_state(admin_chain_id).await?;
904 }
905 let (_, committee) = self.admin_committee().await?;
907 let nodes = self.client.make_nodes(&committee)?;
908 let tasks = streams_by_chain
909 .into_iter()
910 .filter(|(chain_id, _)| *chain_id != admin_chain_id)
911 .map(|(chain_id, stream_ids)| {
912 self.sync_publisher_chain_events(chain_id, stream_ids, &nodes, &committee)
913 })
914 .collect::<Vec<_>>();
915 stream::iter(tasks)
916 .buffer_unordered(self.options.max_joined_tasks)
917 .collect::<Vec<_>>()
918 .await
919 .into_iter()
920 .collect::<Result<Vec<_>, _>>()?;
921 Ok(())
922 }
923
924 async fn sync_publisher_chain_events(
931 &self,
932 publisher_chain_id: ChainId,
933 stream_ids: BTreeSet<StreamId>,
934 nodes: &[RemoteNode<Env::ValidatorNode>],
935 committee: &Committee,
936 ) -> Result<(), Error> {
937 let stream_ids_ref = &stream_ids;
938 communicate_with_quorum(
939 nodes,
940 committee,
941 |_: &()| (),
942 |remote_node| async move {
943 self.client
944 .sync_events_from_node(publisher_chain_id, stream_ids_ref, &remote_node)
945 .await
946 },
947 self.options.quorum_grace_period,
948 )
949 .await?;
950 Ok(())
951 }
952
953 #[instrument(level = "debug", skip(self), fields(chain_id = %self.chain_id))]
962 pub async fn find_received_certificates(&self) -> Result<(), Error> {
963 debug!("starting find_received_certificates");
964 #[cfg(with_metrics)]
965 let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
966 let chain_id = self.chain_id;
968 let (_, committee) = self.admin_committee().await?;
969 let nodes = self.client.make_nodes(&committee)?;
970
971 let trackers = self
972 .client
973 .local_node
974 .get_received_certificate_trackers(chain_id)
975 .await?;
976
977 trace!("find_received_certificates: read trackers");
978
979 let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
980 let result = communicate_with_quorum(
982 &nodes,
983 &committee,
984 |_| (),
985 |remote_node| {
986 let client = &self.client;
987 let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
988 let received_log_batches = Arc::clone(&received_log_batches);
989 Box::pin(async move {
990 let batch = client
991 .get_received_log_from_validator(chain_id, &remote_node, tracker)
992 .await?;
993 let mut batches = received_log_batches.lock().unwrap();
994 batches.push((remote_node.public_key, batch));
995 Ok(())
996 })
997 },
998 self.options.quorum_grace_period,
999 )
1000 .await;
1001
1002 if let Err(error) = result {
1003 error!(
1004 %error,
1005 "Failed to synchronize received_logs from at least a quorum of validators",
1006 );
1007 }
1008
1009 let received_logs: Vec<_> = {
1010 let mut received_log_batches = received_log_batches.lock().unwrap();
1011 std::mem::take(received_log_batches.as_mut())
1012 };
1013
1014 debug!(
1015 received_logs_len = %received_logs.len(),
1016 received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
1017 "collected received logs"
1018 );
1019
1020 let (received_logs, mut validator_trackers) = {
1021 (
1022 ReceivedLogs::from_received_result(received_logs.clone()),
1023 ValidatorTrackers::new(received_logs, &trackers),
1024 )
1025 };
1026
1027 debug!(
1028 num_chains = %received_logs.num_chains(),
1029 num_certs = %received_logs.num_certs(),
1030 "find_received_certificates: total number of chains and certificates to sync",
1031 );
1032
1033 let max_blocks_per_chain =
1034 self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
1035 for received_log in received_logs.into_batches(
1036 self.options.sender_certificate_download_batch_size,
1037 max_blocks_per_chain,
1038 ) {
1039 validator_trackers = self
1040 .receive_sender_certificates(received_log, validator_trackers, &nodes)
1041 .await?;
1042
1043 self.update_received_certificate_trackers(&validator_trackers)
1044 .await;
1045 }
1046
1047 info!("find_received_certificates finished");
1048
1049 Ok(())
1050 }
1051
1052 async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
1053 let updated_trackers = trackers.to_map();
1054 trace!(?updated_trackers, "updated tracker values");
1055
1056 if let Err(error) = self
1058 .client
1059 .local_node
1060 .update_received_certificate_trackers(self.chain_id, updated_trackers)
1061 .await
1062 {
1063 error!(
1064 chain_id = %self.chain_id,
1065 %error,
1066 "Failed to update the certificate trackers",
1067 );
1068 }
1069 }
1070
1071 async fn receive_sender_certificates(
1074 &self,
1075 mut received_logs: ReceivedLogs,
1076 mut validator_trackers: ValidatorTrackers,
1077 nodes: &[RemoteNode<Env::ValidatorNode>],
1078 ) -> Result<ValidatorTrackers, Error> {
1079 debug!(
1080 num_chains = %received_logs.num_chains(),
1081 num_certs = %received_logs.num_certs(),
1082 "receive_sender_certificates: number of chains and certificates to sync",
1083 );
1084
1085 let local_next_heights = self
1087 .client
1088 .local_node
1089 .next_outbox_heights(received_logs.chains(), self.chain_id)
1090 .await?;
1091
1092 validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
1093
1094 debug!(
1095 remaining_total_certificates = %received_logs.num_certs(),
1096 "receive_sender_certificates: computed remote_heights"
1097 );
1098
1099 let mut other_sender_chains = Vec::new();
1100 let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
1101
1102 let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
1103 let received_logs = &received_logs;
1104 let other_sender_chains = &mut other_sender_chains;
1105
1106 move |(sender_chain_id, remote_heights)| {
1107 if remote_heights.is_empty() {
1108 other_sender_chains.push(sender_chain_id);
1112 return None;
1113 };
1114 let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
1115 let sender = sender.clone();
1116 let client = self.client.clone();
1117 let nodes = nodes.to_vec();
1118 Some(async move {
1119 client
1120 .download_and_process_sender_chain(
1121 sender_chain_id,
1122 &nodes,
1123 received_logs,
1124 remote_heights,
1125 sender,
1126 )
1127 .await
1128 })
1129 }
1130 });
1131
1132 future::join(
1133 stream::iter(cert_futures)
1134 .buffer_unordered(self.options.max_joined_tasks)
1135 .collect::<()>(),
1136 async {
1137 while let Some(chain_and_height) = receiver.recv().await {
1138 validator_trackers.downloaded_cert(chain_and_height);
1139 }
1140 },
1141 )
1142 .await;
1143
1144 debug!(
1145 num_other_chains = %other_sender_chains.len(),
1146 "receive_sender_certificates: processing certificates finished"
1147 );
1148
1149 self.retry_pending_cross_chain_requests_from_sender_chains(nodes, other_sender_chains)
1153 .await;
1154
1155 debug!("receive_sender_certificates: finished processing other_sender_chains");
1156
1157 Ok(validator_trackers)
1158 }
1159
1160 async fn retry_pending_cross_chain_requests_from_sender_chains(
1164 &self,
1165 nodes: &[RemoteNode<Env::ValidatorNode>],
1166 other_sender_chains: Vec<ChainId>,
1167 ) {
1168 let stream = other_sender_chains
1169 .into_iter()
1170 .map(|chain_id| async move {
1171 if let Err(error) = match self
1172 .client
1173 .retry_pending_cross_chain_requests(chain_id)
1174 .await
1175 {
1176 Ok(()) => Ok(()),
1177 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1178 if let Err(error) = self
1179 .client
1180 .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1181 .await
1182 {
1183 error!(
1184 ?blob_ids,
1185 %error,
1186 "Error while attempting to download blobs during retrying outgoing \
1187 messages"
1188 );
1189 }
1190 self.client
1191 .retry_pending_cross_chain_requests(chain_id)
1192 .await
1193 }
1194 err => err,
1195 } {
1196 error!(
1197 %chain_id,
1198 %error,
1199 "Failed to retry outgoing messages from chain"
1200 );
1201 }
1202 })
1203 .collect::<FuturesUnordered<_>>();
1204 stream.for_each(future::ready).await;
1205 }
1206
1207 #[instrument(level = "trace")]
1209 pub async fn transfer(
1210 &self,
1211 owner: AccountOwner,
1212 amount: Amount,
1213 recipient: Account,
1214 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1215 Box::pin(self.execute_operation(SystemOperation::Transfer {
1217 owner,
1218 recipient,
1219 amount,
1220 }))
1221 .await
1222 }
1223
1224 #[instrument(level = "trace")]
1227 pub async fn read_data_blob(
1228 &self,
1229 hash: CryptoHash,
1230 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1231 let blob_id = BlobId {
1232 hash,
1233 blob_type: BlobType::Data,
1234 };
1235 Box::pin(self.execute_operation(SystemOperation::VerifyBlob { blob_id })).await
1236 }
1237
1238 #[instrument(level = "trace")]
1240 pub async fn claim(
1241 &self,
1242 owner: AccountOwner,
1243 target_id: ChainId,
1244 recipient: Account,
1245 amount: Amount,
1246 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1247 Box::pin(self.execute_operation(SystemOperation::Claim {
1248 owner,
1249 target_id,
1250 recipient,
1251 amount,
1252 }))
1253 .await
1254 }
1255
1256 #[instrument(level = "trace")]
1259 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1260 let chain_id = self.chain_id;
1261 let committee = self.local_committee().await?;
1262 let info = self.chain_info().await?;
1263 let committee = &committee;
1264 let height = info.next_block_height;
1265 let round = info.manager.current_round;
1266 let action = CommunicateAction::RequestTimeout {
1267 height,
1268 round,
1269 chain_id,
1270 };
1271 let value = Timeout::new(chain_id, height, info.epoch);
1272 let certificate = Box::new(
1273 self.client
1274 .communicate_chain_action(committee, action, value)
1275 .await?,
1276 );
1277 self.client.handle_certificate(*certificate.clone()).await?;
1278 self.client
1280 .communicate_chain_updates(
1281 committee,
1282 chain_id,
1283 height,
1284 CrossChainMessageDelivery::NonBlocking,
1285 None,
1286 )
1287 .await?;
1288 Ok(*certificate)
1289 }
1290
1291 #[instrument(level = "trace", skip_all)]
1293 pub async fn synchronize_chain_state(
1294 &self,
1295 chain_id: ChainId,
1296 ) -> Result<Box<ChainInfo>, Error> {
1297 self.client.synchronize_chain_state(chain_id).await
1298 }
1299
1300 #[instrument(level = "trace", skip_all)]
1303 pub async fn synchronize_chain_state_from_committee(
1304 &self,
1305 committee: Arc<Committee>,
1306 ) -> Result<Box<ChainInfo>, Error> {
1307 Box::pin(
1308 self.client
1309 .synchronize_chain_state_from_committee(self.chain_id, committee),
1310 )
1311 .await
1312 }
1313
1314 #[instrument(level = "trace", skip(operations, blobs))]
1316 pub async fn execute_operations(
1317 &self,
1318 operations: Vec<Operation>,
1319 blobs: Vec<Blob>,
1320 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1321 let timing_start = linera_base::time::Instant::now();
1322 tracing::debug!("execute_operations started");
1323
1324 let result = loop {
1325 let execute_block_start = linera_base::time::Instant::now();
1326 tracing::debug!("calling execute_block");
1328 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1329 Ok(ClientOutcome::Committed(certificate)) => {
1330 tracing::debug!(
1331 execute_block_ms = execute_block_start.elapsed().as_millis(),
1332 "execute_block succeeded"
1333 );
1334 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1335 break Ok(ClientOutcome::Committed(certificate));
1336 }
1337 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
1338 break Ok(ClientOutcome::WaitForTimeout(timeout));
1339 }
1340 Ok(ClientOutcome::Conflict(certificate)) => {
1341 info!(
1342 height = %certificate.block().header.height,
1343 "Another block was committed."
1344 );
1345 break Ok(ClientOutcome::Conflict(certificate));
1346 }
1347 Err(Error::CommunicationError(CommunicationError::Trusted(
1348 NodeError::UnexpectedBlockHeight {
1349 expected_block_height,
1350 found_block_height,
1351 },
1352 ))) if expected_block_height > found_block_height => {
1353 tracing::info!(
1354 chain_id = %self.chain_id,
1355 "Local state is outdated; synchronizing chain"
1356 );
1357 self.synchronize_chain_state(self.chain_id).await?;
1358 }
1359 Err(err) => return Err(err),
1360 };
1361 };
1362
1363 self.send_timing(timing_start, TimingType::ExecuteOperations);
1364 tracing::debug!(
1365 total_execute_operations_ms = timing_start.elapsed().as_millis(),
1366 "execute_operations returning"
1367 );
1368
1369 result
1370 }
1371
1372 pub async fn execute_operation(
1374 &self,
1375 operation: impl Into<Operation>,
1376 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1377 self.execute_operations(vec![operation.into()], vec![])
1378 .await
1379 }
1380
1381 #[instrument(level = "trace", skip(operations, blobs))]
1385 async fn execute_block(
1386 &self,
1387 operations: Vec<Operation>,
1388 blobs: Vec<Blob>,
1389 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1390 #[cfg(with_metrics)]
1391 let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1392
1393 let mutex = self.proposal_mutex();
1394 let lock_start = linera_base::time::Instant::now();
1395 let mut proposal_guard = mutex.lock_owned().await;
1396 tracing::debug!(
1397 chain_id = %self.chain_id,
1398 lock_wait_ms = lock_start.elapsed().as_millis(),
1399 "acquired proposal_mutex in execute_block"
1400 );
1401 match self
1407 .process_pending_block_without_prepare(&mut proposal_guard)
1408 .await?
1409 {
1410 ClientOutcome::Committed(Some(certificate)) => {
1411 return Ok(ClientOutcome::Conflict(Box::new(certificate)))
1412 }
1413 ClientOutcome::WaitForTimeout(timeout) => {
1414 return Ok(ClientOutcome::WaitForTimeout(timeout))
1415 }
1416 ClientOutcome::Conflict(certificate) => {
1417 return Ok(ClientOutcome::Conflict(certificate))
1418 }
1419 ClientOutcome::Committed(None) => {}
1420 }
1421
1422 let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1426
1427 if transactions.is_empty() {
1428 return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1429 WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1430 )));
1431 }
1432
1433 let block = self
1434 .new_pending_block(transactions, blobs, &mut proposal_guard)
1435 .await?;
1436
1437 match self
1438 .process_pending_block_without_prepare(&mut proposal_guard)
1439 .await?
1440 {
1441 ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1442 Ok(ClientOutcome::Committed(certificate))
1443 }
1444 ClientOutcome::Committed(Some(certificate)) => {
1445 Ok(ClientOutcome::Conflict(Box::new(certificate)))
1446 }
1447 ClientOutcome::Committed(None) => {
1449 Err(Error::BlockProposalError("Unexpected block proposal error"))
1450 }
1451 ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
1452 ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
1453 }
1454 }
1455
1456 #[instrument(level = "trace", skip(operations))]
1462 async fn prepend_epochs_messages_and_events(
1463 &self,
1464 operations: Vec<Operation>,
1465 ) -> Result<Vec<Transaction>, Error> {
1466 let incoming_bundles = self.pending_message_bundles().await?;
1467 let stream_updates = self.collect_stream_updates().await?;
1468 Ok(self
1469 .collect_epoch_changes()
1470 .await?
1471 .into_iter()
1472 .map(Transaction::ExecuteOperation)
1473 .chain(
1474 incoming_bundles
1475 .into_iter()
1476 .map(Transaction::ReceiveMessages),
1477 )
1478 .chain(
1479 stream_updates
1480 .into_iter()
1481 .map(Transaction::ExecuteOperation),
1482 )
1483 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1484 .collect::<Vec<_>>())
1485 }
1486
1487 #[instrument(level = "trace", skip(transactions, blobs, proposal_guard))]
1492 async fn new_pending_block(
1493 &self,
1494 transactions: Vec<Transaction>,
1495 blobs: Vec<Blob>,
1496 proposal_guard: &mut Option<PendingProposal>,
1497 ) -> Result<Block, Error> {
1498 let identity = self.identity().await?;
1499
1500 ensure!(
1501 proposal_guard.is_none(),
1502 Error::BlockProposalError(
1503 "Client state already has a pending block; \
1504 use the `linera retry-pending-block` command to commit that first"
1505 )
1506 );
1507 let info = self.chain_info_with_manager_values().await?;
1508 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1509 let proposed_block = ProposedBlock {
1510 epoch: info.epoch,
1511 chain_id: self.chain_id,
1512 transactions,
1513 previous_block_hash: info.block_hash,
1514 height: info.next_block_height,
1515 authenticated_signer: Some(identity),
1516 timestamp,
1517 };
1518
1519 let round = self.round_for_oracle(&info, &identity).await?;
1520 let (block, _) = Box::pin(self.client.stage_block_execution(
1523 proposed_block,
1524 round,
1525 blobs.clone(),
1526 self.options.bundle_execution_policy(),
1527 ))
1528 .await?;
1529 let (proposed_block, _) = block.clone().into_proposal();
1530 *proposal_guard = Some(PendingProposal {
1531 block: proposed_block,
1532 blobs,
1533 });
1534 Ok(block)
1535 }
1536
1537 #[instrument(level = "trace", skip(transactions))]
1542 fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1543 let local_time = self.storage_client().clock().current_time();
1544 transactions
1545 .iter()
1546 .filter_map(Transaction::incoming_bundle)
1547 .map(|msg| msg.bundle.timestamp)
1548 .max()
1549 .map_or(local_time, |timestamp| timestamp.max(local_time))
1550 .max(block_time)
1551 }
1552
1553 #[instrument(level = "trace", skip(query))]
1555 pub async fn query_application(
1556 &self,
1557 query: Query,
1558 block_hash: Option<CryptoHash>,
1559 ) -> Result<(QueryOutcome, BlockHeight), Error> {
1560 loop {
1561 let result = self
1562 .client
1563 .local_node
1564 .query_application(self.chain_id, query.clone(), block_hash)
1565 .await;
1566 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1567 let validators = self.client.validator_nodes().await?;
1568 self.client
1569 .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1570 .await?;
1571 continue; }
1573 return Ok(result?);
1574 }
1575 }
1576
1577 #[instrument(level = "trace", skip(query))]
1579 pub async fn query_system_application(
1580 &self,
1581 query: SystemQuery,
1582 ) -> Result<QueryOutcome<SystemResponse>, Error> {
1583 let (
1584 QueryOutcome {
1585 response,
1586 operations,
1587 },
1588 _,
1589 ) = self.query_application(Query::System(query), None).await?;
1590 match response {
1591 QueryResponse::System(response) => Ok(QueryOutcome {
1592 response,
1593 operations,
1594 }),
1595 _ => Err(Error::InternalError("Unexpected response for system query")),
1596 }
1597 }
1598
1599 #[instrument(level = "trace", skip(application_id, query))]
1601 #[cfg(with_testing)]
1602 pub async fn query_user_application<A: Abi>(
1603 &self,
1604 application_id: ApplicationId<A>,
1605 query: &A::Query,
1606 ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1607 let query = Query::user(application_id, query)?;
1608 let (
1609 QueryOutcome {
1610 response,
1611 operations,
1612 },
1613 _,
1614 ) = self.query_application(query, None).await?;
1615 match response {
1616 QueryResponse::User(response_bytes) => {
1617 let response = serde_json::from_slice(&response_bytes)?;
1618 Ok(QueryOutcome {
1619 response,
1620 operations,
1621 })
1622 }
1623 _ => Err(Error::InternalError("Unexpected response for user query")),
1624 }
1625 }
1626
1627 #[instrument(level = "trace")]
1634 pub async fn query_balance(&self) -> Result<Amount, Error> {
1635 let (balance, _) = Box::pin(self.query_balances_with_owner(AccountOwner::CHAIN)).await?;
1636 Ok(balance)
1637 }
1638
1639 #[instrument(level = "trace", skip(owner))]
1646 pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1647 if owner.is_chain() {
1648 Box::pin(self.query_balance()).await
1649 } else {
1650 Ok(Box::pin(self.query_balances_with_owner(owner))
1651 .await?
1652 .1
1653 .unwrap_or(Amount::ZERO))
1654 }
1655 }
1656
1657 #[instrument(level = "trace", skip(owner))]
1664 pub(crate) async fn query_balances_with_owner(
1665 &self,
1666 owner: AccountOwner,
1667 ) -> Result<(Amount, Option<Amount>), Error> {
1668 let incoming_bundles = self.pending_message_bundles().await?;
1669 if incoming_bundles.is_empty() {
1672 let chain_balance = self.local_balance().await?;
1673 let owner_balance = self.local_owner_balance(owner).await?;
1674 return Ok((chain_balance, Some(owner_balance)));
1675 }
1676 let info = self.chain_info().await?;
1677 let transactions = incoming_bundles
1678 .into_iter()
1679 .map(Transaction::ReceiveMessages)
1680 .collect::<Vec<_>>();
1681 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1682 let block = ProposedBlock {
1683 epoch: info.epoch,
1684 chain_id: self.chain_id,
1685 transactions,
1686 previous_block_hash: info.block_hash,
1687 height: info.next_block_height,
1688 authenticated_signer: if owner == AccountOwner::CHAIN {
1689 None
1690 } else {
1691 Some(owner)
1692 },
1693 timestamp,
1694 };
1695 match Box::pin(self.client.stage_block_execution(
1696 block,
1697 None,
1698 Vec::new(),
1699 self.options.bundle_execution_policy(),
1700 ))
1701 .await
1702 {
1703 Ok((_, response)) => Ok((
1704 response.info.chain_balance,
1705 response.info.requested_owner_balance,
1706 )),
1707 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1708 error,
1709 )))) if matches!(
1710 &*error,
1711 ChainError::ExecutionError(
1712 execution_error,
1713 ChainExecutionContext::Block
1714 ) if matches!(
1715 **execution_error,
1716 ExecutionError::FeesExceedFunding { .. }
1717 )
1718 ) =>
1719 {
1720 Ok((Amount::ZERO, Some(Amount::ZERO)))
1722 }
1723 Err(error) => Err(error),
1724 }
1725 }
1726
1727 #[instrument(level = "trace")]
1731 pub async fn local_balance(&self) -> Result<Amount, Error> {
1732 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1733 Ok(balance)
1734 }
1735
1736 #[instrument(level = "trace", skip(owner))]
1740 pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1741 if owner.is_chain() {
1742 self.local_balance().await
1743 } else {
1744 Ok(self
1745 .local_balances_with_owner(owner)
1746 .await?
1747 .1
1748 .unwrap_or(Amount::ZERO))
1749 }
1750 }
1751
1752 #[instrument(level = "trace", skip(owner))]
1756 pub(crate) async fn local_balances_with_owner(
1757 &self,
1758 owner: AccountOwner,
1759 ) -> Result<(Amount, Option<Amount>), Error> {
1760 ensure!(
1761 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1762 Error::WalletSynchronizationError
1763 );
1764 let mut query = ChainInfoQuery::new(self.chain_id);
1765 query.request_owner_balance = owner;
1766 let response = self
1767 .client
1768 .local_node
1769 .handle_chain_info_query(query)
1770 .await?;
1771 Ok((
1772 response.info.chain_balance,
1773 response.info.requested_owner_balance,
1774 ))
1775 }
1776
1777 #[instrument(level = "trace")]
1779 pub async fn transfer_to_account(
1780 &self,
1781 from: AccountOwner,
1782 amount: Amount,
1783 account: Account,
1784 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1785 self.transfer(from, amount, account).await
1786 }
1787
1788 #[cfg(with_testing)]
1790 #[instrument(level = "trace")]
1791 pub async fn burn(
1792 &self,
1793 owner: AccountOwner,
1794 amount: Amount,
1795 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1796 let recipient = Account::burn_address(self.chain_id);
1797 self.transfer(owner, amount, recipient).await
1798 }
1799
1800 #[instrument(level = "trace")]
1801 pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1802 let validators = self.client.validator_nodes().await?;
1803 self.client
1804 .fetch_chain_info(self.chain_id, &validators)
1805 .await
1806 }
1807
1808 #[instrument(level = "trace")]
1823 pub async fn synchronize_up_to(
1824 &self,
1825 next_height: Option<BlockHeight>,
1826 until_block_time: Option<Timestamp>,
1827 ) -> Result<Box<ChainInfo>, Error> {
1828 let (_, committee) = self.client.admin_committee().await?;
1829 let validators = self.client.make_nodes(&committee)?;
1830 Box::pin(self.client.fetch_chain_info(self.chain_id, &validators)).await?;
1831 communicate_with_quorum(
1832 &validators,
1833 &committee,
1834 |_: &()| (),
1835 |remote_node| async move {
1836 self.client
1837 .download_certificates_from(
1838 &remote_node,
1839 self.chain_id,
1840 next_height.unwrap_or(BlockHeight::MAX),
1841 until_block_time,
1842 )
1843 .await?;
1844 Ok(())
1845 },
1846 self.client.options.quorum_grace_period,
1847 )
1848 .await?;
1849 self.client
1850 .local_node
1851 .chain_info(self.chain_id)
1852 .await
1853 .map_err(Into::into)
1854 }
1855
1856 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1857 if self.preferred_owner.is_none() {
1858 return self.client.synchronize_chain_state(self.chain_id).await;
1859 }
1860 let info = self.prepare_chain().await?;
1861 self.synchronize_publisher_chains().await?;
1862 self.find_received_certificates().await?;
1863 Ok(info)
1864 }
1865
1866 #[instrument(level = "trace")]
1868 pub async fn process_pending_block(
1869 &self,
1870 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1871 self.prepare_chain().await?;
1872 let mutex = self.proposal_mutex();
1873 let mut proposal_guard = mutex.lock_owned().await;
1874 self.process_pending_block_without_prepare(&mut proposal_guard)
1875 .await
1876 }
1877
1878 #[instrument(level = "debug", skip(self, proposal_guard), fields(chain_id = %self.chain_id))]
1883 async fn process_pending_block_without_prepare(
1884 &self,
1885 proposal_guard: &mut Option<PendingProposal>,
1886 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1887 let process_start = linera_base::time::Instant::now();
1888 tracing::debug!("process_pending_block_without_prepare started");
1889 let info = self.request_leader_timeout_if_needed().await?;
1890
1891 if let Some(pending) = &*proposal_guard {
1893 if pending.block.height < info.next_block_height {
1894 tracing::debug!(
1895 "Clearing pending proposal: a block was committed at height {}",
1896 pending.block.height
1897 );
1898 *proposal_guard = None;
1899 }
1900 }
1901
1902 if info.manager.has_locking_block_in_current_round()
1904 && !info.manager.current_round.is_fast()
1905 {
1906 return Box::pin(self.finalize_locking_block(info)).await;
1907 }
1908 let owner = self.identity().await?;
1909
1910 let local_node = &self.client.local_node;
1911 let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1913 match &**locking {
1914 LockingBlock::Regular(certificate) => {
1915 let blob_ids = certificate.block().required_blob_ids();
1916 let blobs = local_node
1917 .get_locking_blobs(&blob_ids, self.chain_id)
1918 .await?
1919 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1920 debug!("Retrying locking block from round {}", certificate.round);
1921 (certificate.block().clone(), blobs)
1922 }
1923 LockingBlock::Fast(proposal) => {
1924 let proposed_block = proposal.content.block.clone();
1925 let blob_ids = proposed_block.published_blob_ids();
1926 let blobs = local_node
1927 .get_locking_blobs(&blob_ids, self.chain_id)
1928 .await?
1929 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1930 let block = self
1931 .client
1932 .stage_block_execution(
1933 proposed_block,
1934 None,
1935 blobs.clone(),
1936 BundleExecutionPolicy::committed(),
1937 )
1938 .await?
1939 .0;
1940 debug!("Retrying locking block from fast round.");
1941 (block, blobs)
1942 }
1943 }
1944 } else if let Some(pending) = proposal_guard.as_ref() {
1945 let proposed_block = pending.block.clone();
1947 let blobs = pending.blobs.clone();
1948 let round = self.round_for_oracle(&info, &owner).await?;
1949 let (block, _) = self
1950 .client
1951 .stage_block_execution(
1952 proposed_block,
1953 round,
1954 blobs.clone(),
1955 BundleExecutionPolicy::committed(),
1956 )
1957 .await?;
1958 debug!("Proposing the local pending block.");
1959 (block, blobs)
1960 } else {
1961 return Ok(ClientOutcome::Committed(None)); };
1963
1964 let has_oracle_responses = block.has_oracle_responses();
1965 let (proposed_block, outcome) = block.into_proposal();
1966 let round = match self
1967 .round_for_new_proposal(&info, &owner, has_oracle_responses)
1968 .await?
1969 {
1970 Either::Left(round) => round,
1971 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1972 };
1973 debug!("Proposing block for round {}", round);
1974
1975 let already_handled_locally = info
1976 .manager
1977 .already_handled_proposal(round, &proposed_block);
1978 let proposal = if let Some(locking) = info.manager.requested_locking {
1980 Box::new(match *locking {
1981 LockingBlock::Regular(cert) => {
1982 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
1983 .await
1984 .map_err(Error::signer_failure)?
1985 }
1986 LockingBlock::Fast(proposal) => {
1987 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
1988 .await
1989 .map_err(Error::signer_failure)?
1990 }
1991 })
1992 } else {
1993 Box::new(
1994 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
1995 .await
1996 .map_err(Error::signer_failure)?,
1997 )
1998 };
1999 if !already_handled_locally {
2000 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2002 match err {
2003 LocalNodeError::BlobsNotFound(_) => {
2004 local_node
2005 .handle_pending_blobs(self.chain_id, blobs)
2006 .await?;
2007 local_node.handle_block_proposal(*proposal.clone()).await?;
2008 }
2009 err => return Err(err.into()),
2010 }
2011 }
2012 }
2013 let committee = self.local_committee().await?;
2014 let block = Block::new(proposed_block, outcome);
2015 let submit_block_proposal_start = linera_base::time::Instant::now();
2017 let certificate = if round.is_fast() {
2018 let hashed_value = ConfirmedBlock::new(block);
2019 Box::pin(
2020 self.client
2021 .submit_block_proposal(&committee, proposal, hashed_value),
2022 )
2023 .await?
2024 } else {
2025 let hashed_value = ValidatedBlock::new(block);
2026 let certificate = Box::pin(self.client.submit_block_proposal(
2027 &committee,
2028 proposal,
2029 hashed_value.clone(),
2030 ))
2031 .await?;
2032 Box::pin(self.client.finalize_block(&committee, certificate)).await?
2033 };
2034 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2035 debug!(round = %certificate.round, "Sending confirmed block to validators");
2036 let update_start = linera_base::time::Instant::now();
2037 Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
2038 tracing::debug!(
2039 update_validators_ms = update_start.elapsed().as_millis(),
2040 total_process_ms = process_start.elapsed().as_millis(),
2041 "process_pending_block_without_prepare completing"
2042 );
2043 *proposal_guard = None;
2045 Ok(ClientOutcome::Committed(Some(certificate)))
2046 }
2047
2048 fn send_timing(&self, start: Instant, timing_type: TimingType) {
2049 let Some(sender) = &self.timing_sender else {
2050 return;
2051 };
2052 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2053 tracing::warn!(%err, "Failed to send timing info");
2054 }
2055 }
2056
2057 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
2060 let mut info = self.chain_info_with_manager_values().await?;
2061 if let Some(round_timeout) = info.manager.round_timeout {
2064 if round_timeout <= self.storage_client().clock().current_time() {
2065 if let Err(e) = self.request_leader_timeout().await {
2066 debug!("Failed to obtain a timeout certificate: {}", e);
2067 } else {
2068 info = self.chain_info_with_manager_values().await?;
2069 }
2070 }
2071 }
2072 Ok(info)
2073 }
2074
2075 async fn finalize_locking_block(
2079 &self,
2080 info: Box<ChainInfo>,
2081 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2082 let locking = info
2083 .manager
2084 .requested_locking
2085 .expect("Should have a locking block");
2086 let LockingBlock::Regular(certificate) = *locking else {
2087 panic!("Should have a locking validated block");
2088 };
2089 debug!(
2090 round = %certificate.round,
2091 "Finalizing locking block"
2092 );
2093 let committee = self.local_committee().await?;
2094 let certificate =
2095 Box::pin(self.client.finalize_block(&committee, certificate.clone())).await?;
2096 Box::pin(self.update_validators(Some(&committee), Some(certificate.clone()))).await?;
2097 Ok(ClientOutcome::Committed(Some(certificate)))
2098 }
2099
2100 async fn round_for_oracle(
2102 &self,
2103 info: &ChainInfo,
2104 identity: &AccountOwner,
2105 ) -> Result<Option<u32>, Error> {
2106 match self.round_for_new_proposal(info, identity, true).await {
2108 Ok(Either::Left(round)) => Ok(round.multi_leader()),
2110 Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
2114 Err(err) => Err(err),
2115 }
2116 }
2117
2118 async fn round_for_new_proposal(
2120 &self,
2121 info: &ChainInfo,
2122 identity: &AccountOwner,
2123 has_oracle_responses: bool,
2124 ) -> Result<Either<Round, RoundTimeout>, Error> {
2125 let manager = &info.manager;
2126 let seed = self
2127 .client
2128 .local_node
2129 .get_manager_seed(self.chain_id)
2130 .await?;
2131 let skip_fast = manager.current_round.is_fast()
2136 && (has_oracle_responses || !self.options.allow_fast_blocks);
2137 let conflict = manager
2138 .requested_signed_proposal
2139 .as_ref()
2140 .into_iter()
2141 .chain(&manager.requested_proposed)
2142 .any(|proposal| proposal.content.round == manager.current_round)
2143 || skip_fast;
2144 let round = if !conflict {
2145 manager.current_round
2146 } else if let Some(round) = manager
2147 .ownership
2148 .next_round(manager.current_round)
2149 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
2150 {
2151 round
2152 } else if let Some(timeout) = info.round_timeout() {
2153 return Ok(Either::Right(timeout));
2154 } else {
2155 return Err(Error::BlockProposalError(
2156 "Conflicting proposal in the current round",
2157 ));
2158 };
2159 let current_committee = self
2160 .local_committee()
2161 .await?
2162 .validators
2163 .values()
2164 .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
2165 .collect();
2166 if manager.should_propose(identity, round, seed, ¤t_committee) {
2167 return Ok(Either::Left(round));
2168 }
2169 if let Some(timeout) = info.round_timeout() {
2170 return Ok(Either::Right(timeout));
2171 }
2172 Err(Error::BlockProposalError(
2173 "Not a leader in the current round",
2174 ))
2175 }
2176
2177 #[cfg(with_testing)]
2179 #[instrument(level = "trace")]
2180 pub async fn clear_pending_proposal(&self) {
2181 *self.proposal_mutex().lock().await = None;
2182 }
2183
2184 #[instrument(level = "trace")]
2188 pub async fn rotate_key_pair(
2189 &self,
2190 public_key: AccountPublicKey,
2191 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2192 Box::pin(self.transfer_ownership(public_key.into())).await
2193 }
2194
2195 #[instrument(level = "trace")]
2197 pub async fn transfer_ownership(
2198 &self,
2199 new_owner: AccountOwner,
2200 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2201 Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
2202 super_owners: vec![new_owner],
2203 owners: Vec::new(),
2204 multi_leader_rounds: 2,
2205 open_multi_leader_rounds: false,
2206 timeout_config: TimeoutConfig::default(),
2207 }))
2208 .await
2209 }
2210
2211 #[instrument(level = "trace")]
2213 pub async fn share_ownership(
2214 &self,
2215 new_owner: AccountOwner,
2216 new_weight: u64,
2217 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2218 let ownership = self.prepare_chain().await?.manager.ownership;
2219 ensure!(
2220 ownership.is_active(),
2221 ChainError::InactiveChain(self.chain_id)
2222 );
2223 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
2224 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
2225 owners.push((new_owner, new_weight));
2226 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
2227 super_owners: Vec::new(),
2228 owners,
2229 multi_leader_rounds: ownership.multi_leader_rounds,
2230 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2231 timeout_config: ownership.timeout_config,
2232 })];
2233 match self.execute_block(operations, vec![]).await? {
2234 ClientOutcome::Committed(certificate) => Ok(ClientOutcome::Committed(certificate)),
2235 ClientOutcome::Conflict(certificate) => {
2236 info!(
2237 height = %certificate.block().header.height,
2238 "Another block was committed."
2239 );
2240 Ok(ClientOutcome::Conflict(certificate))
2241 }
2242 ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
2243 }
2244 }
2245
2246 #[instrument(level = "trace")]
2248 pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, Error> {
2249 Ok(self
2250 .client
2251 .local_node
2252 .chain_state_view(self.chain_id)
2253 .await?
2254 .execution_state
2255 .system
2256 .ownership
2257 .get()
2258 .await?
2259 .clone())
2260 }
2261
2262 #[instrument(level = "trace")]
2265 pub async fn change_ownership(
2266 &self,
2267 ownership: ChainOwnership,
2268 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2269 Box::pin(self.execute_operation(SystemOperation::ChangeOwnership {
2270 super_owners: ownership.super_owners.into_iter().collect(),
2271 owners: ownership.owners.into_iter().collect(),
2272 multi_leader_rounds: ownership.multi_leader_rounds,
2273 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2274 timeout_config: ownership.timeout_config.clone(),
2275 }))
2276 .await
2277 }
2278
2279 #[instrument(level = "trace")]
2281 pub async fn query_application_permissions(&self) -> Result<ApplicationPermissions, Error> {
2282 Ok(self
2283 .client
2284 .local_node
2285 .chain_state_view(self.chain_id)
2286 .await?
2287 .execution_state
2288 .system
2289 .application_permissions
2290 .get()
2291 .await?
2292 .clone())
2293 }
2294
2295 #[instrument(level = "trace", skip(application_permissions))]
2297 pub async fn change_application_permissions(
2298 &self,
2299 application_permissions: ApplicationPermissions,
2300 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2301 Box::pin(
2302 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2303 application_permissions,
2304 )),
2305 )
2306 .await
2307 }
2308
2309 #[instrument(level = "trace", skip(self))]
2311 pub async fn open_chain(
2312 &self,
2313 ownership: ChainOwnership,
2314 application_permissions: ApplicationPermissions,
2315 balance: Amount,
2316 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2317 let config = OpenChainConfig {
2318 ownership: ownership.clone(),
2319 balance,
2320 application_permissions: application_permissions.clone(),
2321 };
2322 let operation = Operation::system(SystemOperation::OpenChain(config));
2323 let certificate = match self.execute_block(vec![operation], vec![]).await? {
2324 ClientOutcome::Committed(certificate) => certificate,
2325 ClientOutcome::Conflict(certificate) => {
2326 return Ok(ClientOutcome::Conflict(certificate));
2327 }
2328 ClientOutcome::WaitForTimeout(timeout) => {
2329 return Ok(ClientOutcome::WaitForTimeout(timeout));
2330 }
2331 };
2332 let chain_blob = certificate
2334 .block()
2335 .body
2336 .blobs
2337 .last()
2338 .and_then(|blobs| blobs.last())
2339 .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2340 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2341 for owner in ownership.all_owners() {
2343 if self.client.has_key_for(owner).await? {
2344 self.client
2345 .extend_chain_mode(description.id(), ListeningMode::FullChain);
2346 break;
2347 }
2348 }
2349 self.client
2350 .retry_pending_cross_chain_requests(self.chain_id)
2351 .await?;
2352 Ok(ClientOutcome::Committed((description, certificate)))
2353 }
2354
2355 #[instrument(level = "trace")]
2358 pub async fn close_chain(
2359 &self,
2360 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2361 match Box::pin(self.execute_operation(SystemOperation::CloseChain)).await {
2362 Ok(outcome) => Ok(outcome.map(Some)),
2363 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2364 chain_error,
2365 )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2366 Ok(ClientOutcome::Committed(None)) }
2368 Err(error) => Err(error),
2369 }
2370 }
2371
2372 #[cfg(not(target_arch = "wasm32"))]
2374 #[instrument(level = "trace", skip(contract, service))]
2375 pub async fn publish_module(
2376 &self,
2377 contract: Bytecode,
2378 service: Bytecode,
2379 vm_runtime: VmRuntime,
2380 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2381 let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
2382 Box::pin(self.publish_module_blobs(blobs, module_id)).await
2383 }
2384
2385 #[cfg(not(target_arch = "wasm32"))]
2387 #[instrument(level = "trace", skip(blobs, module_id))]
2388 pub async fn publish_module_blobs(
2389 &self,
2390 blobs: Vec<Blob>,
2391 module_id: ModuleId,
2392 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2393 self.execute_operations(
2394 vec![Operation::system(SystemOperation::PublishModule {
2395 module_id,
2396 })],
2397 blobs,
2398 )
2399 .await?
2400 .try_map(|certificate| Ok((module_id, certificate)))
2401 }
2402
2403 #[instrument(level = "trace", skip(bytes))]
2405 pub async fn publish_data_blobs(
2406 &self,
2407 bytes: Vec<Vec<u8>>,
2408 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2409 let blobs = bytes.into_iter().map(Blob::new_data);
2410 let publish_blob_operations = blobs
2411 .clone()
2412 .map(|blob| {
2413 Operation::system(SystemOperation::PublishDataBlob {
2414 blob_hash: blob.id().hash,
2415 })
2416 })
2417 .collect();
2418 self.execute_operations(publish_blob_operations, blobs.collect())
2419 .await
2420 }
2421
2422 #[instrument(level = "trace", skip(bytes))]
2424 pub async fn publish_data_blob(
2425 &self,
2426 bytes: Vec<u8>,
2427 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2428 Box::pin(self.publish_data_blobs(vec![bytes])).await
2429 }
2430
2431 #[instrument(
2433 level = "trace",
2434 skip(self, parameters, instantiation_argument, required_application_ids)
2435 )]
2436 pub async fn create_application<
2437 A: Abi,
2438 Parameters: Serialize,
2439 InstantiationArgument: Serialize,
2440 >(
2441 &self,
2442 module_id: ModuleId<A, Parameters, InstantiationArgument>,
2443 parameters: &Parameters,
2444 instantiation_argument: &InstantiationArgument,
2445 required_application_ids: Vec<ApplicationId>,
2446 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2447 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2448 let parameters = serde_json::to_vec(parameters)?;
2449 Ok(Box::pin(self.create_application_untyped(
2450 module_id.forget_abi(),
2451 parameters,
2452 instantiation_argument,
2453 required_application_ids,
2454 ))
2455 .await?
2456 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2457 }
2458
2459 #[instrument(
2461 level = "trace",
2462 skip(
2463 self,
2464 module_id,
2465 parameters,
2466 instantiation_argument,
2467 required_application_ids
2468 )
2469 )]
2470 pub async fn create_application_untyped(
2471 &self,
2472 module_id: ModuleId,
2473 parameters: Vec<u8>,
2474 instantiation_argument: Vec<u8>,
2475 required_application_ids: Vec<ApplicationId>,
2476 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2477 Box::pin(self.execute_operation(SystemOperation::CreateApplication {
2478 module_id,
2479 parameters,
2480 instantiation_argument,
2481 required_application_ids,
2482 }))
2483 .await?
2484 .try_map(|certificate| {
2485 let mut creation: Vec<_> = certificate
2487 .block()
2488 .created_blob_ids()
2489 .into_iter()
2490 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2491 .collect();
2492 if creation.len() > 1 {
2493 return Err(Error::InternalError(
2494 "Unexpected number of application descriptions published",
2495 ));
2496 }
2497 let blob_id = creation.pop().ok_or(Error::InternalError(
2498 "ApplicationDescription blob not found.",
2499 ))?;
2500 let id = ApplicationId::new(blob_id.hash);
2501 Ok((id, certificate))
2502 })
2503 }
2504
2505 #[instrument(level = "trace", skip(committee))]
2507 pub async fn stage_new_committee(
2508 &self,
2509 committee: Committee,
2510 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2511 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2512 let blob_hash = blob.id().hash;
2513 match self
2514 .execute_operations(
2515 vec![Operation::system(SystemOperation::Admin(
2516 AdminOperation::PublishCommitteeBlob { blob_hash },
2517 ))],
2518 vec![blob],
2519 )
2520 .await?
2521 {
2522 ClientOutcome::Committed(_) => {}
2523 outcome @ ClientOutcome::WaitForTimeout(_) | outcome @ ClientOutcome::Conflict(_) => {
2524 return Ok(outcome)
2525 }
2526 }
2527 let epoch = Box::pin(self.chain_info()).await?.epoch.try_add_one()?;
2528 Box::pin(
2529 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2530 epoch,
2531 blob_hash,
2532 })),
2533 )
2534 .await
2535 }
2536
2537 #[instrument(level = "trace")]
2543 pub async fn process_inbox(
2544 &self,
2545 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2546 self.prepare_chain().await?;
2547 self.process_inbox_without_prepare().await
2548 }
2549
2550 #[instrument(level = "trace")]
2556 pub async fn process_inbox_without_prepare(
2557 &self,
2558 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2559 #[cfg(with_metrics)]
2560 let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2561
2562 let mut certificates = Vec::new();
2563 loop {
2564 match self.execute_block(vec![], vec![]).await {
2568 Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
2569 Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
2570 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2571 return Ok((certificates, Some(timeout)));
2572 }
2573 Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2575 WorkerError::ChainError(chain_error),
2576 ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2577 return Ok((certificates, None));
2578 }
2579 Err(error) => return Err(error),
2580 };
2581 }
2582 }
2583
2584 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2587 let (mut min_epoch, mut next_epoch) = {
2588 let (epoch, committees) = self.epoch_and_committees().await?;
2589 let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2590 (min_epoch, epoch.try_add_one()?)
2591 };
2592 let mut epoch_change_ops = Vec::new();
2593 while self
2594 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2595 .await?
2596 {
2597 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2598 next_epoch,
2599 )));
2600 next_epoch.try_add_assign_one()?;
2601 }
2602 while self
2603 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2604 .await?
2605 {
2606 epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2607 min_epoch,
2608 )));
2609 min_epoch.try_add_assign_one()?;
2610 }
2611 Ok(epoch_change_ops)
2612 }
2613
2614 async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2617 let event_id = EventId {
2618 chain_id: self.client.admin_chain_id,
2619 stream_id: StreamId::system(stream_name),
2620 index,
2621 };
2622 Ok(self
2623 .client
2624 .storage_client()
2625 .read_event(event_id)
2626 .await?
2627 .is_some())
2628 }
2629
2630 pub async fn events_from_index(
2632 &self,
2633 stream_id: StreamId,
2634 start_index: u32,
2635 ) -> Result<Vec<IndexAndEvent>, Error> {
2636 Ok(self
2637 .client
2638 .storage_client()
2639 .read_events_from_index(&self.chain_id, &stream_id, start_index)
2640 .await?)
2641 }
2642
2643 #[instrument(level = "trace")]
2648 pub async fn revoke_epochs(
2649 &self,
2650 revoked_epoch: Epoch,
2651 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2652 self.prepare_chain().await?;
2653 let (current_epoch, committees) = self.epoch_and_committees().await?;
2654 ensure!(
2655 revoked_epoch < current_epoch,
2656 Error::CannotRevokeCurrentEpoch(current_epoch)
2657 );
2658 ensure!(
2659 committees.contains_key(&revoked_epoch),
2660 Error::EpochAlreadyRevoked
2661 );
2662 let operations = committees
2663 .keys()
2664 .filter_map(|epoch| {
2665 if *epoch <= revoked_epoch {
2666 Some(Operation::system(SystemOperation::Admin(
2667 AdminOperation::RemoveCommittee { epoch: *epoch },
2668 )))
2669 } else {
2670 None
2671 }
2672 })
2673 .collect();
2674 self.execute_operations(operations, vec![]).await
2675 }
2676
2677 #[instrument(level = "trace")]
2681 pub async fn transfer_to_account_unsafe_unconfirmed(
2682 &self,
2683 owner: AccountOwner,
2684 amount: Amount,
2685 recipient: Account,
2686 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2687 Box::pin(self.execute_operation(SystemOperation::Transfer {
2688 owner,
2689 recipient,
2690 amount,
2691 }))
2692 .await
2693 }
2694
2695 #[instrument(level = "trace", skip(hash))]
2696 pub async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, Error> {
2697 let block = self
2698 .client
2699 .storage_client()
2700 .read_confirmed_block(hash)
2701 .await?;
2702 block
2703 .map(Arc::unwrap_or_clone)
2704 .ok_or(Error::MissingConfirmedBlock(hash))
2705 }
2706
2707 #[instrument(level = "trace", skip(hash))]
2708 pub async fn read_certificate(
2709 &self,
2710 hash: CryptoHash,
2711 ) -> Result<ConfirmedBlockCertificate, Error> {
2712 let certificate = self.client.storage_client().read_certificate(hash).await?;
2713 certificate
2714 .map(Arc::unwrap_or_clone)
2715 .ok_or(Error::ReadCertificatesError(vec![hash]))
2716 }
2717
2718 #[instrument(level = "trace")]
2720 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2721 self.client
2722 .retry_pending_cross_chain_requests(self.chain_id)
2723 .await?;
2724 Ok(())
2725 }
2726
2727 #[instrument(level = "trace", skip(local_node))]
2728 async fn local_chain_info(
2729 &self,
2730 chain_id: ChainId,
2731 local_node: &mut LocalNodeClient<Env::Storage>,
2732 ) -> Result<Option<Box<ChainInfo>>, Error> {
2733 match local_node.chain_info(chain_id).await {
2734 Ok(info) => Ok(Some(info)),
2735 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2736 Err(err) => Err(err.into()),
2737 }
2738 }
2739
2740 #[instrument(level = "trace", skip(chain_id, local_node))]
2741 async fn local_next_block_height(
2742 &self,
2743 chain_id: ChainId,
2744 local_node: &mut LocalNodeClient<Env::Storage>,
2745 ) -> Result<BlockHeight, Error> {
2746 Ok(self
2747 .local_chain_info(chain_id, local_node)
2748 .await?
2749 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2750 }
2751
2752 #[instrument(level = "trace")]
2755 async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2756 Ok(self
2757 .client
2758 .local_node
2759 .get_inbox_next_height(self.chain_id, origin)
2760 .await?)
2761 }
2762
2763 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2764 async fn process_notification(
2765 &self,
2766 remote_node: RemoteNode<Env::ValidatorNode>,
2767 mut local_node: LocalNodeClient<Env::Storage>,
2768 notification: Notification,
2769 ) -> Result<(), Error> {
2770 let listening_mode = self.client.chain_mode(notification.chain_id);
2771 let is_relevant = listening_mode
2772 .as_ref()
2773 .is_some_and(|mode| mode.is_relevant(¬ification.reason));
2774 if !is_relevant {
2775 tracing::trace!(
2776 chain_id = %notification.chain_id,
2777 reason = ?notification.reason,
2778 ?listening_mode,
2779 "Ignoring notification due to listening mode"
2780 );
2781 return Ok(());
2782 }
2783 match notification.reason {
2784 Reason::NewIncomingBundle { origin, height } => {
2785 if self.local_next_height_to_receive(origin).await? > height {
2786 debug!(
2787 chain_id = %self.chain_id,
2788 "Accepting redundant notification for new message"
2789 );
2790 return Ok(());
2791 }
2792 self.client
2793 .download_sender_block_with_sending_ancestors(
2794 self.chain_id,
2795 origin,
2796 height,
2797 &remote_node,
2798 )
2799 .await?;
2800 if self.local_next_height_to_receive(origin).await? <= height {
2801 info!(
2802 chain_id = %self.chain_id,
2803 "NewIncomingBundle: Fail to synchronize new message after notification"
2804 );
2805 }
2806 }
2807 Reason::NewBlock {
2808 height,
2809 hash,
2810 event_streams,
2811 ..
2812 } => {
2813 let chain_id = notification.chain_id;
2814 let local_height = self
2815 .local_next_block_height(chain_id, &mut local_node)
2816 .await?;
2817 if local_height > height {
2818 debug!(
2819 chain_id = %self.chain_id,
2820 "Accepting redundant notification for new block"
2821 );
2822 return Ok(());
2823 }
2824 if let Some(ListeningMode::EventsOnly(subscribed)) =
2828 self.client.chain_mode(chain_id)
2829 {
2830 if !event_streams.is_empty() {
2831 self.client
2832 .download_event_bearing_blocks(
2833 chain_id,
2834 BTreeSet::from([(height, hash)]),
2835 local_height,
2836 &subscribed,
2837 &remote_node,
2838 )
2839 .await?;
2840 }
2841 } else {
2842 self.client
2843 .synchronize_chain_state_from(&remote_node, chain_id)
2844 .await?;
2845 if self
2846 .local_next_block_height(chain_id, &mut local_node)
2847 .await?
2848 <= height
2849 {
2850 error!("NewBlock: Fail to synchronize new block after notification");
2851 }
2852 }
2853 }
2854 Reason::NewEvents { height, hash, .. } => {
2855 let chain_id = notification.chain_id;
2856 let local_height = self
2857 .local_next_block_height(chain_id, &mut local_node)
2858 .await?;
2859 if local_height > height {
2860 debug!(
2861 chain_id = %self.chain_id,
2862 "Accepting redundant notification for new events"
2863 );
2864 return Ok(());
2865 }
2866 let subscribed = match self.client.chain_mode(chain_id) {
2867 Some(ListeningMode::EventsOnly(streams)) => streams,
2868 _ => return Ok(()),
2869 };
2870 self.client
2871 .download_event_bearing_blocks(
2872 chain_id,
2873 BTreeSet::from([(height, hash)]),
2874 local_height,
2875 &subscribed,
2876 &remote_node,
2877 )
2878 .await?;
2879 }
2880 Reason::NewRound { height, round } => {
2881 let chain_id = notification.chain_id;
2882 if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
2883 if (info.next_block_height, info.manager.current_round) >= (height, round) {
2884 debug!(
2885 chain_id = %self.chain_id,
2886 "Accepting redundant notification for new round"
2887 );
2888 return Ok(());
2889 }
2890 }
2891 self.client
2892 .synchronize_chain_state_from(&remote_node, chain_id)
2893 .await?;
2894 let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
2895 error!(
2896 chain_id = %self.chain_id,
2897 "NewRound: Fail to read local chain info for {chain_id}"
2898 );
2899 return Ok(());
2900 };
2901 if (info.next_block_height, info.manager.current_round) < (height, round) {
2902 info!(
2903 chain_id = %self.chain_id,
2904 "NewRound: Fail to synchronize new block after notification"
2905 );
2906 }
2907 }
2908 Reason::BlockExecuted { .. } => {
2909 }
2911 }
2912 Ok(())
2913 }
2914
2915 pub fn is_tracked(&self) -> bool {
2917 self.client.is_tracked(self.chain_id)
2918 }
2919
2920 pub fn listening_mode(&self) -> Option<ListeningMode> {
2922 self.client.chain_mode(self.chain_id)
2923 }
2924
2925 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2930 pub async fn listen(
2931 &self,
2932 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2933 use future::FutureExt as _;
2934
2935 async fn await_while_polling<F: FusedFuture>(
2936 future: F,
2937 background_work: impl FusedStream<Item = ()>,
2938 ) -> F::Output {
2939 tokio::pin!(future);
2940 tokio::pin!(background_work);
2941 loop {
2942 futures::select! {
2943 _ = background_work.next() => (),
2944 result = future => return result,
2945 }
2946 }
2947 }
2948
2949 let mut senders = HashMap::new();
2950 let mut circuit_breakers: HashMap<ValidatorPublicKey, CircuitBreakerState> = HashMap::new();
2951 let notifications = self.subscribe()?;
2952 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2953
2954 let mut process_notifications = FuturesUnordered::new();
2961
2962 match self
2963 .update_notification_streams(&mut senders, &mut circuit_breakers)
2964 .await
2965 {
2966 Ok(handler) => process_notifications.push(handler),
2967 Err(error) => error!("Failed to update committee: {error}"),
2968 };
2969
2970 let this = self.clone();
2971 let update_streams = async move {
2972 let mut abortable_notifications = abortable_notifications.fuse();
2973
2974 while let Some(notification) =
2975 await_while_polling(abortable_notifications.next(), &mut process_notifications)
2976 .await
2977 {
2978 if let Reason::NewBlock { .. } = notification.reason {
2982 let is_events_only = this
2983 .listening_mode()
2984 .is_some_and(|m| matches!(m, ListeningMode::EventsOnly(_)));
2985 if !is_events_only {
2986 match Box::pin(await_while_polling(
2987 this.update_notification_streams(&mut senders, &mut circuit_breakers)
2988 .fuse(),
2989 &mut process_notifications,
2990 ))
2991 .await
2992 {
2993 Ok(handler) => process_notifications.push(handler),
2994 Err(error) => error!("Failed to update committee: {error}"),
2995 }
2996 }
2997 }
2998 }
2999
3000 for abort in senders.into_values() {
3001 abort.abort();
3002 }
3003
3004 let () = process_notifications.collect().await;
3005 }
3006 .in_current_span();
3007
3008 Ok((update_streams, AbortOnDrop(abort), notifications))
3009 }
3010
3011 #[instrument(level = "trace", skip(senders, circuit_breakers))]
3012 async fn update_notification_streams(
3013 &self,
3014 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3015 circuit_breakers: &mut HashMap<ValidatorPublicKey, CircuitBreakerState>,
3016 ) -> Result<impl Future<Output = ()>, Error> {
3017 let initial_probe_interval = self
3018 .options
3019 .notification_circuit_breaker_initial_probe_interval;
3020 let max_probe_interval = self.options.notification_circuit_breaker_max_probe_interval;
3021
3022 let events_only = self
3023 .listening_mode()
3024 .is_none_or(|m| matches!(m, ListeningMode::EventsOnly(_)));
3025 let (nodes, local_node) = {
3026 let committee = if events_only {
3030 let (_, committee) = self.admin_committee().await?;
3031 committee
3032 } else {
3033 self.local_committee().await?
3034 };
3035 let nodes: HashMap<_, _> = self
3036 .client
3037 .validator_node_provider()
3038 .make_nodes(&committee)?
3039 .collect();
3040 (nodes, self.client.local_node.clone())
3041 };
3042
3043 for (validator, abort) in senders.iter() {
3045 if abort.is_aborted() && nodes.contains_key(validator) {
3046 if let Some(state) = circuit_breakers.get_mut(validator) {
3047 state.probe_interval = (state.probe_interval * 2).min(max_probe_interval);
3049 state.next_probe_at = Instant::now() + state.probe_interval;
3050 warn!(
3051 %validator,
3052 chain_id = %self.chain_id,
3053 next_probe_in = ?state.probe_interval,
3054 "Validator still unhealthy after probe; increasing probe interval"
3055 );
3056 } else {
3057 circuit_breakers.insert(
3059 *validator,
3060 CircuitBreakerState {
3061 next_probe_at: Instant::now() + initial_probe_interval,
3062 probe_interval: initial_probe_interval,
3063 },
3064 );
3065 error!(
3066 %validator,
3067 chain_id = %self.chain_id,
3068 next_probe_in = ?initial_probe_interval,
3069 "Validator notification stream ended; entering circuit breaker"
3070 );
3071 }
3072 } else if !abort.is_aborted() && circuit_breakers.contains_key(validator) {
3073 info!(
3075 %validator,
3076 chain_id = %self.chain_id,
3077 "Validator recovered from circuit breaker"
3078 );
3079 circuit_breakers.remove(validator);
3080 }
3081 }
3082
3083 senders.retain(|validator, abort| {
3084 if !nodes.contains_key(validator) {
3085 abort.abort();
3086 }
3087 !abort.is_aborted()
3088 });
3089 circuit_breakers.retain(|validator, _| nodes.contains_key(validator));
3090
3091 let validator_tasks = FuturesUnordered::new();
3092 for (public_key, node) in nodes {
3093 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3094 continue;
3095 };
3096
3097 if let Some(state) = circuit_breakers.get(&public_key) {
3099 if Instant::now() < state.next_probe_at {
3100 continue;
3101 }
3102 debug!(
3103 validator = %public_key,
3104 chain_id = %self.chain_id,
3105 "Probing unhealthy validator"
3106 );
3107 }
3108
3109 let address = node.address();
3110 let this = self.clone();
3111 let stream = stream::once({
3112 let node = node.clone();
3113 async move {
3114 let stream = node.subscribe(vec![this.chain_id]).await?;
3115 if !events_only {
3118 let remote_node = RemoteNode { public_key, node };
3119 this.client
3120 .synchronize_chain_state_from(&remote_node, this.chain_id)
3121 .await?;
3122 } else {
3123 let remote_node = RemoteNode { public_key, node };
3127 if let Some(ListeningMode::EventsOnly(subscribed)) = this.listening_mode() {
3128 if let Err(error) = this
3129 .client
3130 .sync_events_from_node(this.chain_id, &subscribed, &remote_node)
3131 .await
3132 {
3133 debug!(
3134 chain_id = %this.chain_id,
3135 %error,
3136 "Failed initial sparse sync for EventsOnly chain"
3137 );
3138 }
3139 }
3140 }
3141 Ok::<_, Error>(stream)
3142 }
3143 })
3144 .filter_map(move |result| {
3145 let address = address.clone();
3146 async move {
3147 if let Err(error) = &result {
3148 info!(?error, address, "could not connect to validator");
3149 } else {
3150 debug!(address, "connected to validator");
3151 }
3152 result.ok()
3153 }
3154 })
3155 .flatten();
3156 let (stream, abort) = stream::abortable(stream);
3157 let abort_on_exit = abort.clone();
3158 let mut stream = Box::pin(stream);
3159 let this = self.clone();
3160 let local_node = local_node.clone();
3161 let remote_node = RemoteNode { public_key, node };
3162 validator_tasks.push(async move {
3163 while let Some(notification) = stream.next().await {
3164 if let Err(error) = this
3165 .process_notification(
3166 remote_node.clone(),
3167 local_node.clone(),
3168 notification.clone(),
3169 )
3170 .await
3171 {
3172 tracing::info!(
3173 chain_id = %this.chain_id,
3174 address = remote_node.address(),
3175 ?notification,
3176 %error,
3177 "failed to process notification",
3178 );
3179 }
3180 }
3181 warn!(
3182 chain_id = %this.chain_id,
3183 address = remote_node.address(),
3184 "Validator notification stream ended"
3185 );
3186 abort_on_exit.abort();
3187 });
3188 entry.insert(abort);
3189 }
3190 Ok(validator_tasks.collect())
3191 }
3192
3193 #[instrument(level = "trace", skip(remote_node))]
3195 pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
3196 let validator_next_block_height = match remote_node
3197 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3198 .await
3199 {
3200 Ok(info) => info.info.next_block_height,
3201 Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
3202 Err(err) => return Err(err.into()),
3203 };
3204 let local_next_block_height = self.chain_info().await?.next_block_height;
3205
3206 if validator_next_block_height >= local_next_block_height {
3207 debug!("Validator is up-to-date with local state");
3208 return Ok(());
3209 }
3210
3211 let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
3212 .map(BlockHeight)
3213 .collect();
3214
3215 let certificates = self
3216 .client
3217 .storage_client()
3218 .read_certificates_by_heights(self.chain_id, &heights)
3219 .await?
3220 .into_iter()
3221 .flatten()
3222 .map(Arc::unwrap_or_clone)
3223 .collect::<Vec<_>>();
3224
3225 for certificate in certificates {
3226 match remote_node
3227 .handle_confirmed_certificate(
3228 certificate.clone(),
3229 CrossChainMessageDelivery::NonBlocking,
3230 )
3231 .await
3232 {
3233 Ok(_) => (),
3234 Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
3235 let missing_blobs: Vec<_> = self
3237 .client
3238 .storage_client()
3239 .read_blobs(&missing_blob_ids)
3240 .await?
3241 .into_iter()
3242 .flatten()
3243 .map(Arc::unwrap_or_clone)
3244 .collect();
3245 remote_node.upload_blobs(missing_blobs).await?;
3246 remote_node
3247 .handle_confirmed_certificate(
3248 certificate,
3249 CrossChainMessageDelivery::NonBlocking,
3250 )
3251 .await?;
3252 }
3253 Err(err) => return Err(err.into()),
3254 }
3255 }
3256
3257 Ok(())
3258 }
3259}
3260
3261#[cfg(with_testing)]
3262impl<Env: Environment> ChainClient<Env> {
3263 pub async fn process_notification_from(
3264 &self,
3265 notification: Notification,
3266 validator: (ValidatorPublicKey, &str),
3267 ) {
3268 let mut node_list = self
3269 .client
3270 .validator_node_provider()
3271 .make_nodes_from_list(vec![validator])
3272 .unwrap();
3273 let (public_key, node) = node_list.next().unwrap();
3274 let remote_node = RemoteNode { node, public_key };
3275 let local_node = self.client.local_node.clone();
3276 self.process_notification(remote_node, local_node, notification)
3277 .await
3278 .unwrap();
3279 }
3280}