1use crate::{
17 Gateway,
18 MAX_FETCH_TIMEOUT,
19 Transport,
20 events::{CertificateRequest, CertificateResponse, Event},
21 helpers::{Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22 ledger_service::{BeginLedgerUpdateError, LedgerService},
23 spawn_blocking,
24};
25
26use snarkos_node_sync::{BftSyncMode, BlockSync, InsertBlockResponseError, Ping, locators::BlockLocators};
27use snarkos_utilities::CallbackHandle;
28
29use snarkvm::{
30 console::{
31 network::{ConsensusVersion, Network},
32 types::Field,
33 },
34 ledger::{CheckBlockError, PendingBlock, authority::Authority, block::Block, narwhal::BatchCertificate},
35 utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error},
36};
37
38use anyhow::{Context, Result, anyhow, bail, ensure};
39#[cfg(feature = "locktick")]
40use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
41#[cfg(not(feature = "locktick"))]
42use parking_lot::Mutex;
43#[cfg(not(feature = "serial"))]
44use rayon::prelude::*;
45use std::{
46 collections::{HashMap, HashSet, VecDeque},
47 future::Future,
48 net::SocketAddr,
49 ops::Deref,
50 sync::Arc,
51 time::Duration,
52};
53#[cfg(not(feature = "locktick"))]
54use tokio::sync::Mutex as TMutex;
55use tokio::{sync::oneshot, task::JoinHandle};
56
57#[async_trait::async_trait]
60pub trait SyncCallback<N: Network>: Send + std::marker::Sync {
61 fn add_certificate_from_sync(&self, certificate: BatchCertificate<N>);
63
64 fn commit_certificate_from_sync(&self, certificate: &BatchCertificate<N>);
66}
67
68#[derive(Clone)]
81pub struct Sync<N: Network> {
82 gateway: Gateway<N>,
84 storage: Storage<N>,
86 ledger: Arc<dyn LedgerService<N>>,
88 block_sync: Arc<BlockSync<N>>,
90 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
92 sync_callback: Arc<CallbackHandle<Arc<dyn SyncCallback<N>>>>,
94 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
96 response_lock: Arc<TMutex<()>>,
98
99 pending_blocks: Arc<Mutex<VecDeque<PendingBlock<N>>>>,
107}
108
109impl<N: Network> Sync<N> {
110 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
113
114 pub fn new(
116 gateway: Gateway<N>,
117 storage: Storage<N>,
118 ledger: Arc<dyn LedgerService<N>>,
119 block_sync: Arc<BlockSync<N>>,
120 ) -> Self {
121 block_sync.set_bft_sync_mode(BftSyncMode::Fast);
123
124 Self {
126 gateway,
127 storage,
128 ledger,
129 block_sync,
130 pending: Default::default(),
131 sync_callback: Default::default(),
132 handles: Default::default(),
133 response_lock: Default::default(),
134 pending_blocks: Default::default(),
135 }
136 }
137
138 pub async fn wait_for_synced(&self) {
141 self.block_sync.wait_for_synced().await;
142 }
143
144 pub fn wait_for_synced_if_syncing(&self) -> Option<futures::future::BoxFuture<()>> {
147 self.block_sync.wait_for_synced_if_syncing()
148 }
149
150 pub fn initialize(&self, sync_callback: Option<Arc<dyn SyncCallback<N>>>) -> Result<()> {
152 if let Some(callback) = sync_callback {
154 self.sync_callback.set(callback).with_context(|| "Failed to set sync callback")?;
155 }
156
157 info!("Syncing storage with the ledger...");
158
159 self.sync_storage_with_ledger_at_bootup()
161 .with_context(|| "Syncing storage with the ledger at bootup failed")?;
162
163 debug!("Finished initial block synchronization at startup");
164 Ok(())
165 }
166
167 pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
172 info!("Starting the sync module...");
173
174 let self_ = self.clone();
176 self.spawn(async move {
177 loop {
178 let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
180
181 self_.try_issuing_block_requests().await;
183
184 }
186 });
187
188 let self_ = self.clone();
190 let ping = ping.clone();
191 self.spawn(async move {
192 loop {
193 let _ =
195 tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
196
197 let ping = ping.clone();
198 let self_ = self_.clone();
199 let hdl = tokio::spawn(async move {
200 self_.try_advancing_block_synchronization(&ping).await;
201 });
202
203 if let Err(err) = hdl.await
204 && let Ok(panic) = err.try_into_panic()
205 {
206 error!("Sync block advancement panicked: {panic:?}");
207 }
208
209 }
212 });
213
214 let self_ = self.clone();
216 self.spawn(async move {
217 loop {
218 tokio::time::sleep(MAX_FETCH_TIMEOUT).await;
220
221 let self__ = self_.clone();
223 let _ = spawn_blocking!({
224 self__.pending.clear_expired_callbacks();
225 Ok(())
226 });
227 }
228 });
229
230 let SyncReceiver {
234 mut rx_block_sync_insert_block_response,
235 mut rx_block_sync_remove_peer,
236 mut rx_block_sync_update_peer_locators,
237 mut rx_certificate_request,
238 mut rx_certificate_response,
239 } = sync_receiver;
240
241 let self_ = self.clone();
248 self.spawn(async move {
249 while let Some((peer_ip, blocks, latest_consensus_version, callback)) =
250 rx_block_sync_insert_block_response.recv().await
251 {
252 let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await;
253
254 if let Err(err) = &result {
256 if err.is_benign() {
257 trace!("Failed to insert block response from '{peer_ip}' - {err}");
258 } else {
259 warn!("Failed to insert block response from '{peer_ip}' - {err}");
260 }
261 }
262
263 callback.send(result).ok();
264 }
265 });
266
267 let self_ = self.clone();
269 self.spawn(async move {
270 while let Some((peer_ip, tx)) = rx_block_sync_remove_peer.recv().await {
271 self_.remove_peer(peer_ip);
272 tx.send(()).ok();
273 }
274 });
275
276 let self_ = self.clone();
283 self.spawn(async move {
284 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
285 let self_clone = self_.clone();
286 tokio::spawn(async move {
287 callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
288 });
289 }
290 });
291
292 let self_ = self.clone();
298 self.spawn(async move {
299 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
300 self_.send_certificate_response(peer_ip, certificate_request);
301 }
302 });
303
304 let self_ = self.clone();
310 self.spawn(async move {
311 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
312 self_.finish_certificate_request(peer_ip, certificate_response);
313 }
314 });
315
316 Ok(())
317 }
318
319 async fn try_issuing_block_requests(&self) {
324 self.block_sync.try_issuing_block_requests(&self.gateway).await;
325 }
326
327 #[cfg(test)]
329 pub(crate) fn testing_only_set_sync_height_testing_only(&self, height: u32) {
330 self.block_sync.set_sync_height(height);
331 }
332}
333
334impl<N: Network> Sync<N> {
336 async fn insert_block_response(
338 &self,
339 peer_ip: SocketAddr,
340 blocks: Vec<Block<N>>,
341 latest_consensus_version: Option<ConsensusVersion>,
342 ) -> Result<(), InsertBlockResponseError<N>> {
343 self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version)
344
345 }
348
349 fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
351 self.block_sync.update_peer_locators(peer_ip, &locators)
352 }
353
354 fn remove_peer(&self, peer_ip: SocketAddr) {
356 self.block_sync.remove_peer(&peer_ip)
357 }
358
359 #[cfg(test)]
360 pub fn testing_only_update_peer_locators_testing_only(
361 &self,
362 peer_ip: SocketAddr,
363 locators: BlockLocators<N>,
364 ) -> Result<()> {
365 self.update_peer_locators(peer_ip, locators)
366 }
367}
368
369impl<N: Network> Sync<N> {
371 fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
375 let mut pending_blocks = self.pending_blocks.lock();
376 let latest_ledger_block = self.ledger.latest_block();
377
378 while let Some(block) = pending_blocks.front()
380 && block.height() <= latest_ledger_block.height()
381 {
382 pending_blocks.pop_front();
383 }
384
385 let latest_block: &Block<N> = pending_blocks.back().map(|block| block.deref()).unwrap_or(&latest_ledger_block);
386 let max_height = latest_block.height();
387
388 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
393
394 let gc_height = max_height.saturating_sub(max_gc_blocks);
398
399 let ledger_blocks = self.ledger.get_blocks(gc_height..(latest_ledger_block.height() + 1))?;
401
402 let blocks = ledger_blocks.iter().chain(pending_blocks.iter().map(|block| block.deref()));
403 debug!("Syncing storage with ledger and pending blocks from height {gc_height} to {max_height}...");
404
405 self.storage.sync_height_with_block(latest_block.height());
409 self.storage.sync_round_with_block(latest_block.round());
411 self.storage
413 .garbage_collect_certificates(latest_block.round())
414 .with_context(|| "Failed to garbage collect certificates")?;
415
416 for block in blocks {
418 if let Authority::Quorum(subdag) = block.authority() {
419 let unconfirmed_transactions = cfg_iter!(block.transactions())
425 .filter_map(|tx| {
426 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
427 })
428 .collect::<HashMap<_, _>>();
429
430 for certificates in subdag.values().cloned() {
432 cfg_into_iter!(certificates).try_for_each(|certificate| {
433 let trusted_ledger_certificate = true;
436 self.storage
437 .sync_certificate_with_block(
438 block,
439 certificate,
440 &unconfirmed_transactions,
441 trusted_ledger_certificate,
442 )
443 .with_context(|| format!("Failed to sync certificate with block {}", block.height()))
444 })?;
445 }
446
447 #[cfg(feature = "telemetry")]
449 self.gateway.validator_telemetry().insert_subdag(subdag);
450 }
451 }
452
453 if let Some(cb) = self.sync_callback.get() {
455 for block in ledger_blocks.into_iter() {
456 if let Authority::Quorum(subdag) = block.authority() {
457 for round in subdag.values() {
458 for cert in round {
459 cb.add_certificate_from_sync(cert.clone());
460 cb.commit_certificate_from_sync(cert);
461 }
462 }
463 }
464 }
465
466 for block in pending_blocks.iter() {
468 if let Authority::Quorum(subdag) = block.authority() {
469 for round in subdag.values() {
470 for cert in round {
471 cb.add_certificate_from_sync(cert.clone());
472 }
473 }
474 }
475 }
476 }
477
478 self.block_sync.set_sync_height(max_height);
479
480 Ok(())
481 }
482
483 fn compute_sync_height(&self) -> u32 {
486 let ledger_height = self.ledger.latest_block_height();
487 let mut pending_blocks = self.pending_blocks.lock();
488
489 while let Some(b) = pending_blocks.front()
491 && b.height() <= ledger_height
492 {
493 pending_blocks.pop_front();
494 }
495
496 pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height)
498 }
499
500 async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
502 let new_blocks = match self
504 .try_advancing_block_synchronization_inner()
505 .await
506 .with_context(|| "Block synchronization failed")
507 {
508 Ok(new_blocks) => new_blocks,
509 Err(err) => {
510 error!("{}", &flatten_error(err));
511 false
512 }
513 };
514
515 if let Some(ping) = &ping
516 && new_blocks
517 {
518 match self.get_block_locators() {
519 Ok(locators) => ping.update_block_locators(locators),
520 Err(err) => error!("Failed to update block locators: {err}"),
521 }
522 }
523 }
524
525 async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
537 let _lock = self.response_lock.lock().await;
539
540 let ledger_height = self.ledger.latest_block_height();
543 self.block_sync.set_sync_height(ledger_height);
544
545 let tip = self
547 .block_sync
548 .find_sync_peers()
549 .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
550 .unwrap_or(0);
551
552 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
557
558 let cleanup = |start_height, current_height, error| {
560 let new_blocks = current_height > start_height;
561
562 if new_blocks {
564 self.block_sync.set_sync_height(current_height);
565 }
566
567 if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
568 };
569
570 let max_gc_height = tip.saturating_sub(max_gc_blocks);
574
575 let start_height = self.compute_sync_height();
578
579 let within_gc = start_height >= max_gc_height;
584
585 if within_gc {
586 let previous = self.block_sync.set_bft_sync_mode(BftSyncMode::Dag);
588 let was_in_fast_sync = previous == Some(BftSyncMode::Fast);
589
590 if was_in_fast_sync {
591 debug!("Finished catching up with the network. Switching to DAG sync.");
592 self.sync_storage_with_ledger_at_bootup()?;
593 }
594
595 let mut current_height = start_height;
597 trace!(
598 "Try advancing blocks responses with DAG updates (starting at block {next_height}, current sync speed is {speed})",
599 next_height = current_height + 1,
600 speed = self.block_sync.get_sync_speed(),
601 );
602
603 loop {
605 let next_height = current_height + 1;
606 let Some(block) = self.block_sync.peek_next_block(next_height) else {
607 break;
608 };
609 info!("Trying to sync next block at height {} with the BFT...", block.height());
610 match self.sync_storage_with_block(block, true).await {
612 Ok(_) => {
613 current_height = next_height;
615 }
616 Err(err) => {
617 self.block_sync.remove_block_response(next_height);
619 return cleanup(start_height, current_height, Some(err));
620 }
621 }
622 }
623
624 cleanup(start_height, current_height, None)
625 } else {
626 let previous = self.block_sync.set_bft_sync_mode(BftSyncMode::Fast);
627 let was_in_dag_sync = previous == Some(BftSyncMode::Dag);
628 if was_in_dag_sync {
629 warn!(
631 "Node is switching from DAG sync back to fast sync. The network tip may have advanced faster than this node is syncing."
632 );
633 }
634
635 let mut current_height = start_height;
638
639 trace!(
640 "Try advancing block responses without updating the DAG (starting at block {next_height})",
641 next_height = current_height + 1
642 );
643
644 loop {
647 let next_height = current_height + 1;
648
649 let Some(block) = self.block_sync.peek_next_block(next_height) else {
650 break;
651 };
652 info!("Syncing the ledger to block {}...", block.height());
653
654 match self.sync_storage_with_block(block, false).await {
656 Ok(_) => {
657 current_height = next_height;
659 self.block_sync.count_request_completed();
660 }
661 Err(err) => {
662 self.block_sync.remove_block_response(next_height);
664 return cleanup(start_height, current_height, Some(err));
665 }
666 }
667 }
668
669 let within_gc = current_height >= max_gc_height;
671 if within_gc {
672 info!("Finished catching up with the network. Switching back to DAG sync.");
673 self.block_sync.set_bft_sync_mode(BftSyncMode::Dag);
674 self.sync_storage_with_ledger_at_bootup().with_context(|| "BFT sync (with bootup routine) failed")?;
675 }
676
677 cleanup(start_height, current_height, None)
678 }
679 }
680
681 fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> {
687 let Authority::Quorum(subdag) = block.authority() else {
689 return Ok(());
690 };
691
692 let unconfirmed_transactions = cfg_iter!(block.transactions())
694 .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok())
695 .collect::<HashMap<_, _>>();
696
697 for certificates in subdag.values() {
699 cfg_into_iter!(certificates.clone()).try_for_each(|certificate| -> Result<()> {
700 let trusted_ledger_certificate = false;
703 self.storage
704 .sync_certificate_with_block(
705 block,
706 certificate.clone(),
707 &unconfirmed_transactions,
708 trusted_ledger_certificate,
709 )
710 .with_context(|| format!("Failed to sync certificate with block {}", block.height()))
711 })?;
712 }
713
714 if let Some(cb) = self.sync_callback.get() {
716 for round in subdag.values() {
717 for certificate in round {
718 cb.add_certificate_from_sync(certificate.clone());
719 }
720 }
721 }
722
723 Ok(())
724 }
725
726 fn is_block_availability_threshold_reached(
731 &self,
732 block: &PendingBlock<N>,
733 successors: &[PendingBlock<N>],
734 ) -> Result<bool> {
735 let leader_certificate = match block.authority() {
737 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
738 _ => bail!("Received a block with an unexpected authority type."),
739 };
740 let commit_round = leader_certificate.round();
741 let certificate_round =
742 commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
743
744 let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
746
747 let authors = successors
750 .iter()
751 .filter_map(|successor| {
752 let Authority::Quorum(subdag) = successor.authority() else {
753 return None;
754 };
755
756 subdag.get(&certificate_round)
757 })
758 .flatten()
759 .filter_map(|certificate| {
760 if certificate.previous_certificate_ids().contains(&leader_certificate.id()) {
761 Some(certificate.author())
762 } else {
763 None
764 }
765 })
766 .collect::<HashSet<_>>();
767
768 if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
770 trace!(
771 "Block {hash} at height {height} has reached availability threshold",
772 hash = block.hash(),
773 height = block.height()
774 );
775 Ok(true)
776 } else {
777 Ok(false)
778 }
779 }
780
781 async fn sync_storage_with_block(&self, new_block: Block<N>, within_gc_range: bool) -> Result<()> {
796 let new_block_height = new_block.height();
797
798 if self.ledger.contains_block_height(new_block.height()) {
801 debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
802 return Ok(());
803 }
804
805 if within_gc_range {
807 self.add_block_subdag_to_bft(&new_block)?;
808 }
809
810 let _self = self.clone();
812
813 spawn_blocking!({
814 while !_self.try_sync_storage_with_block(&new_block, within_gc_range)? {
815 trace!("Retrying to sync storage with block at height {new_block_height}");
816 }
817
818 Ok(())
819 })
820 }
821
822 fn try_sync_storage_with_block(&self, new_block: &Block<N>, within_gc_range: bool) -> Result<bool> {
833 let mut pending_blocks = self.pending_blocks.lock();
835
836 if let Some(tail) = pending_blocks.back() {
837 if tail.height() >= new_block.height() {
838 debug!(
839 "A unconfirmed block is queued already for height {height}. \
840 Will not sync.",
841 height = new_block.height()
842 );
843 return Ok(true);
844 }
845
846 ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block");
847 }
848
849 let ledger_block_height = self.ledger.latest_block_height();
851 let new_block_height = new_block.height();
852
853 while let Some(pending_block) = pending_blocks.front() {
856 if pending_block.height() > ledger_block_height {
857 break;
858 }
859
860 trace!(
861 "Pending block {hash} at height {height} became obsolete",
862 hash = pending_block.hash(),
863 height = pending_block.height()
864 );
865 pending_blocks.pop_front();
866 }
867
868 let new_block = match self.ledger.check_block_subdag(new_block.clone(), pending_blocks.make_contiguous()) {
870 Ok(new_block) => new_block,
871 Err(CheckBlockError::InvalidPrefix { index, .. }) => {
873 let height = pending_blocks.get(index).with_context(|| "Invalid prefix index")?.height();
874 debug!("Pending block at height {height} became obsolete. Will retry with updated prefix.",);
875
876 while let Some(pending_block) = pending_blocks.front()
877 && pending_block.height() <= height
878 {
879 trace!("Removing obsolete pending block at height {}.", pending_block.height());
880 pending_blocks.pop_front();
881 }
882
883 return Ok(false);
884 }
885 Err(CheckBlockError::BlockAlreadyExists { .. })
887 | Err(CheckBlockError::InvalidHeight { .. })
888 | Err(CheckBlockError::InvalidRound { .. }) => {
889 debug!(
890 "Tried to sync storage with block at height {new_block_height}, but it was already in the ledger."
891 );
892 return Ok(true);
893 }
894 Err(err) => return Err(err.into_anyhow()),
896 };
897
898 trace!(
899 "Adding new pending block {hash} at height {height}",
900 hash = new_block.hash(),
901 height = new_block.height()
902 );
903 pending_blocks.push_back(new_block);
904
905 let ledger_block_height = self.ledger.latest_block_height();
907
908 let Some(penultimate_index) = pending_blocks.len().checked_sub(1) else {
910 return Ok(true);
911 };
912
913 let commit_height = 'outer: {
920 let pending_blocks = pending_blocks.make_contiguous();
921 for index in (0..penultimate_index).rev() {
922 let block = &pending_blocks[index];
923 let successors = &pending_blocks[index + 1..];
924
925 if self
930 .is_block_availability_threshold_reached(block, successors)
931 .with_context(|| "Availability threshold check failed")?
932 {
933 break 'outer block.height();
934 }
935 }
936
937 trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len());
938 return Ok(true);
939 };
940
941 let ledger_update = match self.ledger.begin_ledger_update() {
942 Ok(update) => update,
943 Err(BeginLedgerUpdateError::ShuttingDown) => {
944 info!("BlockSync cannot advance the ledger any more. The node is shutting down.");
945 return Ok(true);
946 }
947 Err(err) => {
948 return Err(anyhow!("Unexpected error when beginning ledger update: {err}"));
949 }
950 };
951
952 let start_height = ledger_block_height + 1;
953 ensure!(commit_height >= start_height, "Invalid commit height");
954 let num_blocks = (commit_height - start_height + 1) as usize;
955
956 if num_blocks > 1 {
958 trace!(
959 "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.",
960 chain_length = pending_blocks.len(),
961 );
962 }
963
964 for pending_block in pending_blocks.drain(0..num_blocks) {
965 let hash = pending_block.hash();
966 let height = pending_block.height();
967 let storage = self.storage.clone();
968
969 let block = match ledger_update.check_block_content(pending_block) {
970 Ok(block) => block,
971 Err(CheckBlockError::InvalidHeight { .. })
972 | Err(CheckBlockError::BlockAlreadyExists { .. })
973 | Err(CheckBlockError::InvalidRound { .. }) => {
974 debug!("Pending block at height {height} became obsolete. Will retry with updated prefix.");
977 return Ok(false);
978 }
979 Err(err) => {
980 return Err(err
981 .into_anyhow()
982 .context(format!("Failed to check contents of pending block {hash} at height {height}")));
983 }
984 };
985
986 trace!("Adding pending block {hash} at height {height} to the ledger");
987 ledger_update.advance_to_next_block(&block)?;
988 storage.sync_height_with_block(block.height());
990 storage.sync_round_with_block(block.round());
992
993 if within_gc_range
994 && let Some(cb) = self.sync_callback.get()
995 && let Authority::Quorum(subdag) = block.authority()
996 {
997 for round in subdag.values() {
998 for certificate in round {
999 cb.commit_certificate_from_sync(certificate);
1000 }
1001 }
1002 }
1003 }
1004
1005 Ok(true)
1006 }
1007}
1008
1009impl<N: Network> Sync<N> {
1011 pub fn is_synced(&self) -> bool {
1013 self.block_sync.is_block_synced()
1014 }
1015
1016 pub fn num_blocks_behind(&self) -> Option<u32> {
1018 self.block_sync.num_blocks_behind()
1019 }
1020
1021 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
1023 self.block_sync.get_block_locators()
1024 }
1025}
1026
1027impl<N: Network> Sync<N> {
1029 pub async fn send_certificate_request(
1031 &self,
1032 peer_ip: SocketAddr,
1033 certificate_id: Field<N>,
1034 ) -> Result<BatchCertificate<N>> {
1035 let (callback_sender, callback_receiver) = oneshot::channel();
1037 let num_sent_requests = self.pending.num_sent_requests(certificate_id);
1039 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
1041 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
1043 let stake_redundancy_reached = || self.pending.request_stake_redundancy_reached(&self.gateway, certificate_id);
1045 let should_send_request = !contains_peer_with_sent_request
1049 && (num_sent_requests < num_redundant_requests || !stake_redundancy_reached()?);
1050
1051 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
1053
1054 if should_send_request {
1056 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
1058 bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)")
1059 }
1060 } else {
1061 debug!(
1062 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
1063 fmt_id(certificate_id)
1064 );
1065 }
1066 tokio::time::timeout(MAX_FETCH_TIMEOUT, callback_receiver)
1069 .await
1070 .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))?
1071 .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id)))
1072 }
1073
1074 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
1076 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
1078 let self_ = self.clone();
1080 tokio::spawn(async move {
1081 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
1082 });
1083 }
1084 }
1085
1086 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
1089 let certificate = response.certificate;
1090 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
1092 if exists {
1094 self.pending.remove(certificate.id(), Some(certificate));
1097 }
1098 }
1099}
1100
1101impl<N: Network> Sync<N> {
1102 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1104 self.handles.lock().push(tokio::spawn(future));
1105 }
1106
1107 pub async fn shut_down(&self) {
1109 info!("Shutting down the sync module...");
1110 self.sync_callback.clear();
1112 let _lock = self.response_lock.lock().await;
1114 self.handles.lock().iter().for_each(|handle| handle.abort());
1116 }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use super::*;
1122
1123 use crate::{BFT, helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
1124
1125 use snarkos_account::Account;
1126 use snarkos_node_network::ConnectionMode;
1127 use snarkos_node_sync::BlockSync;
1128 use snarkos_utilities::{NodeDataDir, SimpleStoppable};
1129
1130 use snarkvm::{
1131 console::{
1132 account::{Address, PrivateKey},
1133 network::MainnetV0,
1134 },
1135 ledger::{
1136 narwhal::{BatchCertificate, BatchHeader, Subdag},
1137 store::{ConsensusStore, helpers::memory::ConsensusMemory},
1138 },
1139 prelude::{Ledger, VM},
1140 utilities::TestRng,
1141 };
1142
1143 use aleo_std::StorageMode;
1144 use indexmap::IndexSet;
1145 use rand::RngExt;
1146 use std::{collections::BTreeMap, sync::OnceLock};
1147
1148 type CurrentNetwork = MainnetV0;
1149 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1150 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1151
1152 async fn setup_commit_chain(rng: &mut TestRng) -> (Block<CurrentNetwork>, Vec<Block<CurrentNetwork>>) {
1154 static CHAIN_CACHE: OnceLock<(Block<CurrentNetwork>, Vec<Block<CurrentNetwork>>)> = OnceLock::new();
1155
1156 if let Some((genesis, blocks)) = CHAIN_CACHE.get() {
1158 return (genesis.clone(), blocks.clone());
1159 }
1160
1161 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1163
1164 let first_round: u64 = 1;
1166 let num_blocks = 3;
1168 let last_round = first_round + num_blocks * 2;
1170 let first_threshold_round = 5;
1173
1174 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1176 let account: Account<CurrentNetwork> = Account::new(rng).unwrap();
1177
1178 let seed: u64 = rng.random();
1180 let vm = VM::from(store).unwrap();
1181 let genesis_pk = *account.private_key();
1182 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1183
1184 let genesis_rng = &mut TestRng::from_seed(seed);
1186 let private_keys = [
1187 *account.private_key(),
1188 PrivateKey::new(genesis_rng).unwrap(),
1189 PrivateKey::new(genesis_rng).unwrap(),
1190 PrivateKey::new(genesis_rng).unwrap(),
1191 ];
1192
1193 let genesis_clone = genesis.clone();
1195 let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap();
1196 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1198
1199 let (round_to_certificates_map, committee) = {
1201 let addresses = vec![
1202 Address::try_from(private_keys[0]).unwrap(),
1203 Address::try_from(private_keys[1]).unwrap(),
1204 Address::try_from(private_keys[2]).unwrap(),
1205 Address::try_from(private_keys[3]).unwrap(),
1206 ];
1207
1208 let committee = ledger.latest_committee().unwrap();
1209
1210 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1212 HashMap::new();
1213 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1214
1215 for round in first_round..=last_round {
1216 let mut current_certificates = IndexSet::new();
1217 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1218 IndexSet::new()
1219 } else {
1220 previous_certificates.iter().map(|c| c.id()).collect()
1221 };
1222
1223 let committee_id = committee.id();
1224
1225 let is_certificate_round = !round.is_multiple_of(2);
1227 let prev_leader = if is_certificate_round && let Some(prev_round) = round.checked_sub(1) {
1228 Some(committee.get_leader(prev_round).unwrap())
1229 } else {
1230 None
1231 };
1232
1233 for (i, private_key) in private_keys.iter().enumerate() {
1235 let previous_leader_index =
1236 addresses.iter().position(|&addr| prev_leader.is_some_and(|prev_leader| addr == prev_leader));
1237
1238 let previous_certs = if let Some(previous_leader_index) = previous_leader_index
1241 && round < first_threshold_round
1242 && i != previous_leader_index
1243 {
1244 previous_certificate_ids
1246 .iter()
1247 .cloned()
1248 .enumerate()
1249 .filter(|(idx, _)| *idx != previous_leader_index)
1250 .map(|(_, id)| id)
1251 .collect()
1252 } else {
1253 previous_certificate_ids.clone()
1254 };
1255
1256 let batch_header = BatchHeader::new(
1257 private_key,
1258 round,
1259 now(),
1260 committee_id,
1261 Default::default(),
1262 previous_certs,
1263 rng,
1264 )
1265 .unwrap();
1266
1267 let mut signatures = IndexSet::with_capacity(4);
1269 for (j, private_key_2) in private_keys.iter().enumerate() {
1270 if i != j {
1271 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1272 }
1273 }
1274 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1275 }
1276
1277 round_to_certificates_map.insert(round, current_certificates.clone());
1279 previous_certificates = current_certificates;
1280 }
1281 (round_to_certificates_map, committee)
1282 };
1283
1284 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1286
1287 let certificates: Vec<_> =
1289 round_to_certificates_map.into_iter().flat_map(|(_, certificates)| certificates.into_iter()).collect();
1290
1291 for certificate in certificates.iter() {
1293 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1294 }
1295
1296 let mut previous_leader_cert = None;
1298 let mut blocks = vec![];
1299
1300 for block_height in 1..=num_blocks {
1301 let leader_round = block_height * 2;
1302
1303 let leader = committee.get_leader(leader_round).unwrap();
1304 let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap();
1305
1306 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1307 let mut leader_cert_map = IndexSet::new();
1308 leader_cert_map.insert(leader_certificate.clone());
1309
1310 let previous_cert_map = storage.get_certificates_for_round(leader_round - 1);
1311
1312 subdag_map.insert(leader_round, leader_cert_map.clone());
1313 subdag_map.insert(leader_round - 1, previous_cert_map.clone());
1314
1315 if leader_round > 2 {
1316 let previous_commit_cert_map: IndexSet<_> = storage
1317 .get_certificates_for_round(leader_round - 2)
1318 .into_iter()
1319 .filter(|cert| {
1320 if let Some(previous_leader_cert) = &previous_leader_cert {
1321 cert != previous_leader_cert
1322 } else {
1323 true
1324 }
1325 })
1326 .collect();
1327 subdag_map.insert(leader_round - 2, previous_commit_cert_map);
1328 }
1329
1330 let subdag = Subdag::from(subdag_map.clone()).unwrap();
1331 previous_leader_cert = Some(leader_certificate);
1332
1333 let core_ledger = core_ledger.clone();
1334 let block = spawn_blocking!({
1335 let ledger_update = core_ledger.begin_ledger_update()?;
1336 let block = ledger_update.prepare_advance_to_next_quorum_block(subdag, Default::default())?;
1337 ledger_update.advance_to_next_block(&block)?;
1338 Ok(block)
1339 })
1340 .unwrap();
1341
1342 blocks.push(block);
1343 }
1344
1345 CHAIN_CACHE.get_or_init(|| (genesis, blocks)).clone()
1346 }
1347
1348 #[tokio::test]
1349 #[tracing_test::traced_test]
1350 async fn test_commit_chain_with_bft() {
1351 let rng = &mut TestRng::default();
1352
1353 let (genesis, mut blocks) = setup_commit_chain(rng).await;
1354 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1355
1356 let storage_mode = StorageMode::new_test(None);
1358
1359 let syncing_ledger = {
1362 let storage_mode = storage_mode.clone();
1363 Arc::new(CoreLedgerService::new(
1364 spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1365 SimpleStoppable::new(),
1366 ))
1367 };
1368
1369 let account = Account::new(rng).unwrap();
1370 let syncing_storage =
1371 Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1372 let gateway = Gateway::new(
1373 account.clone(),
1374 syncing_storage.clone(),
1375 syncing_ledger.clone(),
1376 None,
1377 &[],
1378 false,
1379 NodeDataDir::new_test(None),
1380 None,
1381 )
1382 .unwrap();
1383
1384 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1385 let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1386
1387 let syncing_bft = BFT::new(
1388 account.clone(),
1389 syncing_storage.clone(),
1390 syncing_ledger.clone(),
1391 block_sync,
1392 None,
1393 &[],
1394 false,
1395 NodeDataDir::new_test(None),
1396 None,
1397 )
1398 .unwrap();
1399
1400 sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1401
1402 let last_block = blocks.pop().unwrap();
1405
1406 for block in blocks {
1408 sync.sync_storage_with_block(block, true).await.unwrap();
1409 assert_eq!(syncing_bft.testing_only_latest_committed_round(), 0);
1411 }
1412
1413 sync.sync_storage_with_block(last_block, true).await.unwrap();
1416
1417 assert_eq!(syncing_bft.testing_only_latest_committed_round(), 4);
1420 }
1421
1422 #[tokio::test]
1425 #[tracing_test::traced_test]
1426 async fn test_sync_updates_storage_with_block_certificates() {
1427 let rng = &mut TestRng::default();
1428
1429 let (genesis, blocks) = setup_commit_chain(rng).await;
1430 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1431 let storage_mode = StorageMode::new_test(None);
1432
1433 let syncing_ledger = Arc::new(CoreLedgerService::new(
1434 spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1435 SimpleStoppable::new(),
1436 ));
1437
1438 let account = Account::new(rng).unwrap();
1439 let syncing_storage =
1440 Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1441 let gateway = Gateway::new(
1442 account.clone(),
1443 syncing_storage.clone(),
1444 syncing_ledger.clone(),
1445 None,
1446 &[],
1447 false,
1448 NodeDataDir::new_test(None),
1449 None,
1450 )
1451 .unwrap();
1452
1453 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1454 let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1455
1456 let syncing_bft = BFT::new(
1457 account.clone(),
1458 syncing_storage.clone(),
1459 syncing_ledger.clone(),
1460 block_sync,
1461 None,
1462 &[],
1463 false,
1464 NodeDataDir::new_test(None),
1465 None,
1466 )
1467 .unwrap();
1468
1469 sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1470
1471 for block in &blocks {
1473 sync.sync_storage_with_block(block.clone(), true).await.unwrap();
1474 }
1475
1476 let committed_blocks = &blocks[..blocks.len().saturating_sub(1)];
1479
1480 for block in committed_blocks {
1483 let Authority::Quorum(subdag) = block.authority() else {
1484 continue;
1485 };
1486 for certificates in subdag.values() {
1487 for cert in certificates {
1488 assert!(
1489 syncing_ledger.contains_certificate(&cert.id()).unwrap_or(false),
1490 "Sync should have committed block {} so certificate is in the ledger",
1491 block.height()
1492 );
1493 }
1494 }
1495 }
1496
1497 let last_committed_block = committed_blocks.last().unwrap();
1499 assert_eq!(
1500 syncing_ledger.latest_block_height(),
1501 last_committed_block.height(),
1502 "Ledger height should match last committed block"
1503 );
1504 assert_eq!(
1505 syncing_ledger.latest_block().round(),
1506 last_committed_block.round(),
1507 "Ledger round should match last committed block"
1508 );
1509 }
1510
1511 #[tokio::test]
1512 #[tracing_test::traced_test]
1513 async fn test_commit_chain_with_swich_to_bft() {
1514 let rng = &mut TestRng::default();
1515 let (genesis, mut blocks) = setup_commit_chain(rng).await;
1516 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1517 let storage_mode = StorageMode::new_test(None);
1518
1519 let syncing_ledger = {
1522 let storage_mode = storage_mode.clone();
1523 Arc::new(CoreLedgerService::new(
1524 spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1525 SimpleStoppable::new(),
1526 ))
1527 };
1528
1529 let account = Account::new(rng).unwrap();
1530 let syncing_storage =
1531 Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1532 let gateway = Gateway::new(
1533 account.clone(),
1534 syncing_storage.clone(),
1535 syncing_ledger.clone(),
1536 None,
1537 &[],
1538 false,
1539 NodeDataDir::new_test(None),
1540 None,
1541 )
1542 .unwrap();
1543
1544 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1545 let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1546
1547 let syncing_bft = BFT::new(
1548 account.clone(),
1549 syncing_storage.clone(),
1550 syncing_ledger.clone(),
1551 block_sync,
1552 None,
1553 &[],
1554 false,
1555 NodeDataDir::new_test(None),
1556 None,
1557 )
1558 .unwrap();
1559
1560 sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1561
1562 let last_block = blocks.pop().unwrap();
1564
1565 for block in blocks {
1568 sync.sync_storage_with_block(block, false).await.unwrap();
1569
1570 assert_eq!(syncing_ledger.latest_block_height(), 0);
1572 }
1573
1574 sync.sync_storage_with_ledger_at_bootup().unwrap();
1576
1577 assert_eq!(syncing_ledger.latest_block_height(), 0);
1579 assert_eq!(syncing_bft.testing_only_latest_committed_round(), 0);
1580
1581 sync.sync_storage_with_block(last_block, true).await.unwrap();
1584
1585 assert_eq!(syncing_bft.testing_only_latest_committed_round(), 4);
1588 }
1589
1590 #[tokio::test]
1599 #[tracing_test::traced_test]
1600 async fn test_commit_chain_with_switch_to_fast_sync() {
1601 let rng = &mut TestRng::default();
1602 let (genesis, mut blocks) = setup_commit_chain(rng).await;
1603 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1604 let storage_mode = StorageMode::new_test(None);
1605
1606 let syncing_ledger = {
1607 let storage_mode = storage_mode.clone();
1608 Arc::new(CoreLedgerService::new(
1609 spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1610 SimpleStoppable::new(),
1611 ))
1612 };
1613
1614 let account = Account::new(rng).unwrap();
1615 let syncing_storage =
1616 Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1617 let gateway = Gateway::new(
1618 account.clone(),
1619 syncing_storage.clone(),
1620 syncing_ledger.clone(),
1621 None,
1622 &[],
1623 false,
1624 NodeDataDir::new_test(None),
1625 None,
1626 )
1627 .unwrap();
1628
1629 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1630 let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1631
1632 let syncing_bft = BFT::new(
1633 account.clone(),
1634 syncing_storage.clone(),
1635 syncing_ledger.clone(),
1636 block_sync,
1637 None,
1638 &[],
1639 false,
1640 NodeDataDir::new_test(None),
1641 None,
1642 )
1643 .unwrap();
1644
1645 sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1646
1647 let last_block = blocks.pop().unwrap();
1649
1650 for block in blocks {
1654 sync.sync_storage_with_block(block, true).await.unwrap();
1655 assert_eq!(syncing_ledger.latest_block_height(), 0);
1656 }
1657
1658 sync.sync_storage_with_block(last_block, false).await.unwrap();
1666
1667 assert_eq!(syncing_ledger.latest_block_height(), 2);
1669 assert!(syncing_ledger.contains_block_height(1));
1670 assert!(syncing_ledger.contains_block_height(2));
1671
1672 assert_eq!(syncing_bft.testing_only_latest_committed_round(), 0);
1676 }
1677
1678 #[tokio::test]
1679 #[tracing_test::traced_test]
1680 async fn test_commit_chain_without_bft() {
1681 let rng = &mut TestRng::default();
1682 let (genesis, mut blocks) = setup_commit_chain(rng).await;
1683 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1684 let storage_mode = StorageMode::new_test(None);
1685
1686 let syncing_ledger = {
1689 let storage_mode = storage_mode.clone();
1690 Arc::new(CoreLedgerService::new(
1691 spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1692 SimpleStoppable::new(),
1693 ))
1694 };
1695
1696 let account = Account::new(rng).unwrap();
1697 let syncing_storage =
1698 Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1699 let gateway = Gateway::new(
1700 account.clone(),
1701 syncing_storage.clone(),
1702 syncing_ledger.clone(),
1703 None,
1704 &[],
1705 false,
1706 NodeDataDir::new_test(None),
1707 None,
1708 )
1709 .unwrap();
1710
1711 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1712 let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1713
1714 let syncing_bft = BFT::new(
1715 account.clone(),
1716 syncing_storage.clone(),
1717 syncing_ledger.clone(),
1718 block_sync,
1719 None,
1720 &[],
1721 false,
1722 NodeDataDir::new_test(None),
1723 None,
1724 )
1725 .unwrap();
1726
1727 sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1728
1729 let last_block = blocks.pop().unwrap();
1731
1732 for block in blocks {
1734 sync.sync_storage_with_block(block, false).await.unwrap();
1735
1736 assert_eq!(syncing_ledger.latest_block_height(), 0);
1738 }
1739
1740 sync.sync_storage_with_block(last_block, false).await.unwrap();
1743 assert_eq!(syncing_ledger.latest_block_height(), 2);
1744
1745 assert!(syncing_ledger.contains_block_height(1));
1747 assert!(syncing_ledger.contains_block_height(2));
1748 }
1749
1750 #[tokio::test]
1751 #[tracing_test::traced_test]
1752 async fn test_pending_certificates() -> anyhow::Result<()> {
1753 let rng = &mut TestRng::default();
1754 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1756 let commit_round = 2;
1757
1758 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1760 let account: Account<CurrentNetwork> = Account::new(rng)?;
1761
1762 let seed: u64 = rng.random();
1764 let vm = VM::from(store).unwrap();
1765 let genesis_pk = *account.private_key();
1766 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1767
1768 let genesis_rng = &mut TestRng::from_seed(seed);
1770 let private_keys = [
1771 *account.private_key(),
1772 PrivateKey::new(genesis_rng)?,
1773 PrivateKey::new(genesis_rng)?,
1774 PrivateKey::new(genesis_rng)?,
1775 ];
1776
1777 let core_ledger = {
1779 let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap();
1780 Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()))
1781 };
1782
1783 let (round_to_certificates_map, committee) = {
1785 let committee = core_ledger.current_committee().unwrap();
1787 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1789 HashMap::new();
1790 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1791
1792 for round in 0..=commit_round + 8 {
1793 let mut current_certificates = IndexSet::new();
1794 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1795 IndexSet::new()
1796 } else {
1797 previous_certificates.iter().map(|c| c.id()).collect()
1798 };
1799 let committee_id = committee.id();
1800 for (i, private_key_1) in private_keys.iter().enumerate() {
1802 let batch_header = BatchHeader::new(
1803 private_key_1,
1804 round,
1805 now(),
1806 committee_id,
1807 Default::default(),
1808 previous_certificate_ids.clone(),
1809 rng,
1810 )
1811 .unwrap();
1812 let mut signatures = IndexSet::with_capacity(4);
1814 for (j, private_key_2) in private_keys.iter().enumerate() {
1815 if i != j {
1816 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1817 }
1818 }
1819 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1820 }
1821
1822 round_to_certificates_map.insert(round, current_certificates.clone());
1824 previous_certificates = current_certificates.clone();
1825 }
1826 (round_to_certificates_map, committee)
1827 };
1828
1829 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1831 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1833 for i in 1..=commit_round + 8 {
1834 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1835 certificates.extend(c);
1836 }
1837 for certificate in certificates.clone().iter() {
1838 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1839 }
1840
1841 let leader_round_1 = commit_round;
1842 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1843 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1844 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1845
1846 let subdag_1 = {
1848 let mut leader_cert_map = IndexSet::new();
1849 leader_cert_map.insert(leader_certificate.clone());
1850 let mut previous_cert_map = IndexSet::new();
1851 for cert in storage.get_certificates_for_round(commit_round - 1) {
1852 previous_cert_map.insert(cert);
1853 }
1854 subdag_map.insert(commit_round, leader_cert_map.clone());
1855 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1856 Subdag::from(subdag_map.clone())?
1857 };
1858
1859 let core_ledger_cpy = core_ledger.clone();
1860 spawn_blocking!({
1861 let update1 = core_ledger_cpy.begin_ledger_update()?;
1863 let block_1 = update1.prepare_advance_to_next_quorum_block(subdag_1, Default::default())?;
1864
1865 update1.advance_to_next_block(&block_1)?;
1867
1868 Ok(())
1869 })?;
1870
1871 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1873 let subdag_2 = {
1874 let leader_round_2 = commit_round + 2;
1875 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1876 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1877 let mut leader_cert_map_2 = IndexSet::new();
1878 leader_cert_map_2.insert(leader_certificate_2.clone());
1879 let mut previous_cert_map_2 = IndexSet::new();
1880 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1881 previous_cert_map_2.insert(cert);
1882 }
1883 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1884 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1885 Subdag::from(subdag_map_2.clone())?
1886 };
1887
1888 let core_ledger_cpy = core_ledger.clone();
1889 spawn_blocking!({
1890 let update2 = core_ledger_cpy.begin_ledger_update()?;
1891
1892 let block_2 = update2.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?;
1894
1895 update2.advance_to_next_block(&block_2)?;
1897
1898 Ok(())
1899 })?;
1900
1901 let leader_round_3 = commit_round + 4;
1903 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1904 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1905
1906 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1908 let subdag_3 = {
1909 let mut leader_cert_map_3 = IndexSet::new();
1910 leader_cert_map_3.insert(leader_certificate_3.clone());
1911 let mut previous_cert_map_3 = IndexSet::new();
1912 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1913 previous_cert_map_3.insert(cert);
1914 }
1915 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1916 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1917 Subdag::from(subdag_map_3.clone())?
1918 };
1919
1920 let core_ledger_cpy = core_ledger.clone();
1921 spawn_blocking!({
1922 let update3 = core_ledger_cpy.begin_ledger_update()?;
1923
1924 let block_3 = update3.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?;
1926
1927 update3.advance_to_next_block(&block_3)?;
1929
1930 Ok(())
1931 })?;
1932
1933 let pending_certificates = storage.get_pending_certificates();
1939 for certificate in pending_certificates.clone() {
1941 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1942 }
1943 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1945 {
1946 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1947 for subdag in subdag_maps.iter() {
1948 for subdag_certificates in subdag.values() {
1949 committed_certificates.extend(subdag_certificates.iter().cloned());
1950 }
1951 }
1952 };
1953 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1955 for certificate in certificates.clone() {
1956 if !committed_certificates.contains(&certificate) {
1957 candidate_pending_certificates.insert(certificate);
1958 }
1959 }
1960 assert_eq!(pending_certificates, candidate_pending_certificates);
1962
1963 Ok(())
1964 }
1965}