1use crate::{
17 Gateway,
18 MAX_FETCH_TIMEOUT_IN_MS,
19 PRIMARY_PING_IN_MS,
20 Transport,
21 events::DataBlocks,
22 helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
23 spawn_blocking,
24};
25use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
26use snarkos_node_bft_ledger_service::LedgerService;
27use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
28use snarkvm::{
29 console::{network::Network, types::Field},
30 ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31 prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, anyhow, bail};
35use indexmap::IndexMap;
36#[cfg(feature = "locktick")]
37use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
38#[cfg(not(feature = "locktick"))]
39use parking_lot::Mutex;
40use rayon::prelude::*;
41use std::{
42 collections::{BTreeMap, HashMap},
43 future::Future,
44 net::SocketAddr,
45 sync::Arc,
46 time::Duration,
47};
48#[cfg(not(feature = "locktick"))]
49use tokio::sync::Mutex as TMutex;
50use tokio::{
51 sync::{OnceCell, oneshot},
52 task::JoinHandle,
53};
54
55#[derive(Clone)]
68pub struct Sync<N: Network> {
69 gateway: Gateway<N>,
71 storage: Storage<N>,
73 ledger: Arc<dyn LedgerService<N>>,
75 block_sync: Arc<BlockSync<N>>,
77 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
79 bft_sender: Arc<OnceCell<BFTSender<N>>>,
81 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
83 response_lock: Arc<TMutex<()>>,
85 sync_lock: Arc<TMutex<()>>,
87 latest_block_responses: Arc<TMutex<BTreeMap<u32, Block<N>>>>,
95}
96
97impl<N: Network> Sync<N> {
98 pub fn new(
100 gateway: Gateway<N>,
101 storage: Storage<N>,
102 ledger: Arc<dyn LedgerService<N>>,
103 block_sync: Arc<BlockSync<N>>,
104 ) -> Self {
105 Self {
107 gateway,
108 storage,
109 ledger,
110 block_sync,
111 pending: Default::default(),
112 bft_sender: Default::default(),
113 handles: Default::default(),
114 response_lock: Default::default(),
115 sync_lock: Default::default(),
116 latest_block_responses: Default::default(),
117 }
118 }
119
120 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
122 if let Some(bft_sender) = bft_sender {
124 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
125 }
126
127 info!("Syncing storage with the ledger...");
128
129 self.sync_storage_with_ledger_at_bootup().await?;
131
132 debug!("Finished initial block synchronization at startup");
133 Ok(())
134 }
135
136 #[inline]
140 async fn send_block_requests(
141 &self,
142
143 block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
144 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
145 ) {
146 trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
147
148 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
150 if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
151 break;
153 }
154
155 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
157 }
158 }
159
160 pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
165 info!("Starting the sync module...");
166
167 let self_ = self.clone();
169 self.spawn(async move {
170 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
175 loop {
176 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
178
179 let new_blocks = self_.try_block_sync().await;
180 if new_blocks {
181 if let Some(ping) = &ping {
182 match self_.get_block_locators() {
183 Ok(locators) => ping.update_block_locators(locators),
184 Err(err) => error!("Failed to update block locators: {err}"),
185 }
186 }
187 }
188 }
189 });
190
191 let self_ = self.clone();
193 self.spawn(async move {
194 loop {
195 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
197
198 let self__ = self_.clone();
200 let _ = spawn_blocking!({
201 self__.pending.clear_expired_callbacks();
202 Ok(())
203 });
204 }
205 });
206
207 let SyncReceiver {
211 mut rx_block_sync_advance_with_sync_blocks,
212 mut rx_block_sync_remove_peer,
213 mut rx_block_sync_update_peer_locators,
214 mut rx_certificate_request,
215 mut rx_certificate_response,
216 } = sync_receiver;
217
218 let self_ = self.clone();
225 self.spawn(async move {
226 while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
227 callback.send(self_.advance_with_sync_blocks(peer_ip, blocks).await).ok();
228 }
229 });
230
231 let self_ = self.clone();
233 self.spawn(async move {
234 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
235 self_.remove_peer(peer_ip);
236 }
237 });
238
239 let self_ = self.clone();
246 self.spawn(async move {
247 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
248 let self_clone = self_.clone();
249 tokio::spawn(async move {
250 callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
251 });
252 }
253 });
254
255 let self_ = self.clone();
261 self.spawn(async move {
262 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
263 self_.send_certificate_response(peer_ip, certificate_request);
264 }
265 });
266
267 let self_ = self.clone();
273 self.spawn(async move {
274 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
275 self_.finish_certificate_request(peer_ip, certificate_response);
276 }
277 });
278
279 Ok(())
280 }
281
282 pub(crate) async fn try_block_sync(&self) -> bool {
289 let new_requests = self.block_sync.handle_block_request_timeouts(&self.gateway);
293 if let Some((sync_peers, requests)) = new_requests {
294 self.send_block_requests(sync_peers, requests).await;
295 }
296
297 if !self.block_sync.can_block_sync() {
300 trace!("No blocks to sync");
301 return false;
302 }
303
304 let (sync_peers, requests) = self.block_sync.prepare_block_requests();
307 self.send_block_requests(sync_peers, requests).await;
308
309 match self.try_advancing_block_synchronization().await {
311 Ok(new_blocks) => new_blocks,
312 Err(err) => {
313 error!("Block synchronization failed - {err}");
314 false
315 }
316 }
317 }
318}
319
320impl<N: Network> Sync<N> {
322 async fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
324 self.block_sync.insert_block_responses(peer_ip, blocks)?;
326
327 self.try_advancing_block_synchronization().await?;
332
333 Ok(())
334 }
335
336 fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
338 self.block_sync.update_peer_locators(peer_ip, locators)
339 }
340
341 fn remove_peer(&self, peer_ip: SocketAddr) {
343 self.block_sync.remove_peer(&peer_ip);
344 }
345
346 #[cfg(test)]
347 pub fn test_update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
348 self.update_peer_locators(peer_ip, locators)
349 }
350}
351
352impl<N: Network> Sync<N> {
354 async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
358 let latest_block = self.ledger.latest_block();
360
361 let block_height = latest_block.height();
363 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
368 let gc_height = block_height.saturating_sub(max_gc_blocks);
372 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
374
375 let _lock = self.sync_lock.lock().await;
377
378 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
379
380 self.storage.sync_height_with_block(latest_block.height());
384 self.storage.sync_round_with_block(latest_block.round());
386 self.storage.garbage_collect_certificates(latest_block.round());
388 for block in &blocks {
390 if let Authority::Quorum(subdag) = block.authority() {
395 let unconfirmed_transactions = cfg_iter!(block.transactions())
397 .filter_map(|tx| {
398 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
399 })
400 .collect::<HashMap<_, _>>();
401
402 for certificates in subdag.values().cloned() {
404 cfg_into_iter!(certificates).for_each(|certificate| {
405 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
406 });
407 }
408
409 #[cfg(feature = "telemetry")]
411 self.gateway.validator_telemetry().insert_subdag(subdag);
412 }
413 }
414
415 let certificates = blocks
419 .iter()
420 .flat_map(|block| {
421 match block.authority() {
422 Authority::Beacon(_) => None,
424 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
426 }
427 })
428 .flatten()
429 .collect::<Vec<_>>();
430
431 if let Some(bft_sender) = self.bft_sender.get() {
433 if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
435 bail!("Failed to update the BFT DAG from sync: {e}");
436 }
437 }
438
439 self.block_sync.set_sync_height(block_height);
440
441 Ok(())
442 }
443
444 async fn compute_sync_height(&self) -> u32 {
447 let ledger_height = self.ledger.latest_block_height();
448 let mut responses = self.latest_block_responses.lock().await;
449
450 responses.retain(|height, _| *height > ledger_height);
452
453 responses.last_key_value().map(|(height, _)| *height).unwrap_or(0).max(ledger_height)
455 }
456
457 async fn try_advancing_block_synchronization(&self) -> Result<bool> {
472 let _lock = self.response_lock.lock().await;
474
475 let ledger_height = self.ledger.latest_block_height();
478 self.block_sync.set_sync_height(ledger_height);
479
480 let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
482
483 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
488
489 let cleanup = |start_height, current_height, error| {
491 let new_blocks = current_height > start_height;
492
493 if new_blocks {
495 self.block_sync.set_sync_height(current_height);
496 }
497
498 if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
499 };
500
501 let max_gc_height = tip.saturating_sub(max_gc_blocks);
505 let within_gc = (ledger_height + 1) > max_gc_height;
506
507 if within_gc {
508 let start_height = self.compute_sync_height().await;
511
512 self.block_sync.set_sync_height(start_height);
515
516 let mut current_height = start_height;
518 trace!("Try advancing with block responses (at block {current_height})");
519
520 loop {
522 let next_height = current_height + 1;
523 let Some(block) = self.block_sync.peek_next_block(next_height) else {
524 break;
525 };
526 info!("Syncing the BFT to block {}...", block.height());
527 match self.sync_storage_with_block(block).await {
529 Ok(_) => {
530 current_height = next_height;
532 }
533 Err(err) => {
534 self.block_sync.remove_block_response(next_height);
536 return cleanup(start_height, current_height, Some(err));
537 }
538 }
539 }
540
541 cleanup(start_height, current_height, None)
542 } else {
543 info!("Block sync is too far behind other validators. Syncing without BFT.");
544
545 let start_height = ledger_height;
548 let mut current_height = start_height;
549
550 self.block_sync.set_sync_height(start_height);
553
554 loop {
557 let next_height = current_height + 1;
558
559 let Some(block) = self.block_sync.peek_next_block(next_height) else {
560 break;
561 };
562 info!("Syncing the ledger to block {}...", block.height());
563
564 match self.sync_ledger_with_block_without_bft(block).await {
566 Ok(_) => {
567 current_height = next_height;
569 }
570 Err(err) => {
571 self.block_sync.remove_block_response(next_height);
573 return cleanup(start_height, current_height, Some(err));
574 }
575 }
576 }
577
578 let within_gc = (current_height + 1) > max_gc_height;
580 if within_gc {
581 info!("Finished catching up with the network. Switching back to BFT sync.");
582 if let Err(err) = self.sync_storage_with_ledger_at_bootup().await {
583 error!("BFT sync (with bootup routine) failed - {err}");
584 }
585 }
586
587 cleanup(start_height, current_height, None)
588 }
589 }
590
591 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
595 let _lock = self.sync_lock.lock().await;
597
598 let self_ = self.clone();
599 tokio::task::spawn_blocking(move || {
600 self_.ledger.check_next_block(&block)?;
602 self_.ledger.advance_to_next_block(&block)?;
604
605 self_.storage.sync_height_with_block(block.height());
607 self_.storage.sync_round_with_block(block.round());
609 self_.block_sync.remove_block_response(block.height());
611
612 Ok(())
613 })
614 .await?
615 }
616
617 async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
628 let _lock = self.sync_lock.lock().await;
630
631 if self.ledger.contains_block_height(block.height()) {
634 debug!("Ledger is already synced with block at height {}. Will not sync.", block.height());
635 return Ok(());
636 }
637
638 let mut latest_block_responses = self.latest_block_responses.lock().await;
640
641 if latest_block_responses.contains_key(&block.height()) {
642 debug!("An unconfirmed block is queued already for height {}. Will not sync.", block.height());
643 return Ok(());
644 }
645
646 if let Authority::Quorum(subdag) = block.authority() {
651 let unconfirmed_transactions = cfg_iter!(block.transactions())
653 .filter_map(|tx| {
654 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
655 })
656 .collect::<HashMap<_, _>>();
657
658 for certificates in subdag.values().cloned() {
660 cfg_into_iter!(certificates.clone()).for_each(|certificate| {
661 self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
663 });
664
665 for certificate in certificates {
667 if let Some(bft_sender) = self.bft_sender.get() {
670 if let Err(err) = bft_sender.send_sync_bft(certificate).await {
672 bail!("Failed to sync certificate - {err}");
673 };
674 }
675 }
676 }
677 }
678
679 let ledger_block_height = self.ledger.latest_block_height();
681
682 latest_block_responses.insert(block.height(), block);
684 latest_block_responses.retain(|height, _| *height > ledger_block_height);
686
687 let contiguous_blocks: Vec<Block<N>> = (ledger_block_height.saturating_add(1)..)
689 .take_while(|&k| latest_block_responses.contains_key(&k))
690 .filter_map(|k| latest_block_responses.get(&k).cloned())
691 .collect();
692
693 for next_block in contiguous_blocks.into_iter() {
705 let next_block_height = next_block.height();
707
708 let leader_certificate = match next_block.authority() {
710 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
711 _ => bail!("Received a block with an unexpected authority type."),
712 };
713 let commit_round = leader_certificate.round();
714 let certificate_round =
715 commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
716
717 let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
719 let certificates = self.storage.get_certificates_for_round(certificate_round);
721 let authors = certificates
724 .iter()
725 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
726 true => Some(c.author()),
727 false => None,
728 })
729 .collect();
730
731 debug!("Validating sync block {next_block_height} at round {commit_round}...");
732 if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
734 let mut current_certificate = leader_certificate;
736 let mut blocks_to_add = vec![next_block];
738
739 for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
741 let Some(previous_block) = latest_block_responses.get(&height) else {
743 bail!("Block {height} is missing from the latest block responses.");
744 };
745 let previous_certificate = match previous_block.authority() {
747 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
748 _ => bail!("Received a block with an unexpected authority type."),
749 };
750 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
752 debug!("Previous sync block {height} is linked to the current block {next_block_height}");
753 blocks_to_add.insert(0, previous_block.clone());
755 current_certificate = previous_certificate;
757 }
758 }
759
760 for block in blocks_to_add {
762 let block_height = block.height();
764 if block_height != self.ledger.latest_block_height().saturating_add(1) {
765 warn!("Skipping block {block_height} from the latest block responses - not sequential.");
766 continue;
767 }
768 #[cfg(feature = "telemetry")]
769 let block_authority = block.authority().clone();
770
771 let self_ = self.clone();
772 tokio::task::spawn_blocking(move || {
773 self_.ledger.check_next_block(&block)?;
775 self_.ledger.advance_to_next_block(&block)?;
777
778 self_.storage.sync_height_with_block(block.height());
780 self_.storage.sync_round_with_block(block.round());
782
783 Ok::<(), anyhow::Error>(())
784 })
785 .await??;
786 latest_block_responses.remove(&block_height);
788
789 #[cfg(feature = "telemetry")]
791 if let Authority::Quorum(subdag) = block_authority {
792 self_.gateway.validator_telemetry().insert_subdag(&subdag);
793 }
794 }
795 } else {
796 debug!(
797 "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
798 );
799 }
800
801 }
803
804 Ok(())
805 }
806
807 fn is_linked(
809 &self,
810 previous_certificate: BatchCertificate<N>,
811 current_certificate: BatchCertificate<N>,
812 ) -> Result<bool> {
813 let mut traversal = vec![current_certificate.clone()];
815 for round in (previous_certificate.round()..current_certificate.round()).rev() {
817 let certificates = self.storage.get_certificates_for_round(round);
819 traversal = certificates
821 .into_iter()
822 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
823 .collect();
824 }
825 Ok(traversal.contains(&previous_certificate))
826 }
827}
828
829impl<N: Network> Sync<N> {
831 pub fn is_synced(&self) -> bool {
833 if self.gateway.number_of_connected_peers() == 0 {
836 return false;
837 }
838
839 self.block_sync.is_block_synced()
840 }
841
842 pub fn num_blocks_behind(&self) -> Option<u32> {
844 self.block_sync.num_blocks_behind()
845 }
846
847 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
849 self.block_sync.get_block_locators()
850 }
851}
852
853impl<N: Network> Sync<N> {
855 pub async fn send_certificate_request(
857 &self,
858 peer_ip: SocketAddr,
859 certificate_id: Field<N>,
860 ) -> Result<BatchCertificate<N>> {
861 let (callback_sender, callback_receiver) = oneshot::channel();
863 let num_sent_requests = self.pending.num_sent_requests(certificate_id);
865 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
867 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
869 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
872
873 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
875
876 if should_send_request {
878 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
880 bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
881 }
882 } else {
883 debug!(
884 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
885 fmt_id(certificate_id)
886 );
887 }
888 match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
891 Ok(result) => Ok(result?),
893 Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
895 }
896 }
897
898 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
900 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
902 let self_ = self.clone();
904 tokio::spawn(async move {
905 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
906 });
907 }
908 }
909
910 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
913 let certificate = response.certificate;
914 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
916 if exists {
918 self.pending.remove(certificate.id(), Some(certificate));
921 }
922 }
923}
924
925impl<N: Network> Sync<N> {
926 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
928 self.handles.lock().push(tokio::spawn(future));
929 }
930
931 pub async fn shut_down(&self) {
933 info!("Shutting down the sync module...");
934 let _lock = self.response_lock.lock().await;
936 let _lock = self.sync_lock.lock().await;
938 self.handles.lock().iter().for_each(|handle| handle.abort());
940 }
941}
942
943#[cfg(test)]
944mod tests {
945 use super::*;
946
947 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
948
949 use snarkos_account::Account;
950 use snarkos_node_sync::BlockSync;
951 use snarkvm::{
952 console::{
953 account::{Address, PrivateKey},
954 network::MainnetV0,
955 },
956 ledger::{
957 narwhal::{BatchCertificate, BatchHeader, Subdag},
958 store::{ConsensusStore, helpers::memory::ConsensusMemory},
959 },
960 prelude::{Ledger, VM},
961 utilities::TestRng,
962 };
963
964 use aleo_std::StorageMode;
965 use indexmap::IndexSet;
966 use rand::Rng;
967 use std::collections::BTreeMap;
968
969 type CurrentNetwork = MainnetV0;
970 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
971 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
972
973 #[tokio::test]
974 #[tracing_test::traced_test]
975 async fn test_commit_via_is_linked() -> anyhow::Result<()> {
976 let rng = &mut TestRng::default();
977 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
979 let commit_round = 2;
980
981 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
983 let account: Account<CurrentNetwork> = Account::new(rng)?;
984
985 let seed: u64 = rng.gen();
987 let genesis_rng = &mut TestRng::from_seed(seed);
988 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
989
990 let genesis_rng = &mut TestRng::from_seed(seed);
992 let private_keys = [
993 *account.private_key(),
994 PrivateKey::new(genesis_rng)?,
995 PrivateKey::new(genesis_rng)?,
996 PrivateKey::new(genesis_rng)?,
997 ];
998
999 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1001 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1003
1004 let (round_to_certificates_map, committee) = {
1006 let addresses = vec![
1007 Address::try_from(private_keys[0])?,
1008 Address::try_from(private_keys[1])?,
1009 Address::try_from(private_keys[2])?,
1010 Address::try_from(private_keys[3])?,
1011 ];
1012
1013 let committee = ledger.latest_committee().unwrap();
1014
1015 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1017 HashMap::new();
1018 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1019
1020 for round in 0..=commit_round + 8 {
1021 let mut current_certificates = IndexSet::new();
1022 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1023 IndexSet::new()
1024 } else {
1025 previous_certificates.iter().map(|c| c.id()).collect()
1026 };
1027 let committee_id = committee.id();
1028
1029 if round <= 5 {
1031 let leader = committee.get_leader(round).unwrap();
1032 let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
1033 let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
1034 for i in [leader_index, non_leader_index].into_iter() {
1035 let batch_header = BatchHeader::new(
1036 &private_keys[i],
1037 round,
1038 now(),
1039 committee_id,
1040 Default::default(),
1041 previous_certificate_ids.clone(),
1042 rng,
1043 )
1044 .unwrap();
1045 let mut signatures = IndexSet::with_capacity(4);
1047 for (j, private_key_2) in private_keys.iter().enumerate() {
1048 if i != j {
1049 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1050 }
1051 }
1052 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1053 }
1054 }
1055
1056 if round > 5 {
1058 for (i, private_key_1) in private_keys.iter().enumerate() {
1059 let batch_header = BatchHeader::new(
1060 private_key_1,
1061 round,
1062 now(),
1063 committee_id,
1064 Default::default(),
1065 previous_certificate_ids.clone(),
1066 rng,
1067 )
1068 .unwrap();
1069 let mut signatures = IndexSet::with_capacity(4);
1071 for (j, private_key_2) in private_keys.iter().enumerate() {
1072 if i != j {
1073 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1074 }
1075 }
1076 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1077 }
1078 }
1079 round_to_certificates_map.insert(round, current_certificates.clone());
1081 previous_certificates = current_certificates.clone();
1082 }
1083 (round_to_certificates_map, committee)
1084 };
1085
1086 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1088 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1090 for i in 1..=commit_round + 8 {
1091 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1092 certificates.extend(c);
1093 }
1094 for certificate in certificates.clone().iter() {
1095 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1096 }
1097
1098 let leader_round_1 = commit_round;
1100 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1101 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1102 let block_1 = {
1103 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1104 let mut leader_cert_map = IndexSet::new();
1105 leader_cert_map.insert(leader_certificate.clone());
1106 let mut previous_cert_map = IndexSet::new();
1107 for cert in storage.get_certificates_for_round(commit_round - 1) {
1108 previous_cert_map.insert(cert);
1109 }
1110 subdag_map.insert(commit_round, leader_cert_map.clone());
1111 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1112 let subdag = Subdag::from(subdag_map.clone())?;
1113 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1114 };
1115 core_ledger.advance_to_next_block(&block_1)?;
1117
1118 let leader_round_2 = commit_round + 2;
1120 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1121 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1122 let block_2 = {
1123 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1124 let mut leader_cert_map_2 = IndexSet::new();
1125 leader_cert_map_2.insert(leader_certificate_2.clone());
1126 let mut previous_cert_map_2 = IndexSet::new();
1127 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1128 previous_cert_map_2.insert(cert);
1129 }
1130 let mut prev_commit_cert_map_2 = IndexSet::new();
1131 for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
1132 if cert != leader_certificate {
1133 prev_commit_cert_map_2.insert(cert);
1134 }
1135 }
1136 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1137 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1138 subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
1139 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1140 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1141 };
1142 core_ledger.advance_to_next_block(&block_2)?;
1144
1145 let leader_round_3 = commit_round + 4;
1147 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1148 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1149 let block_3 = {
1150 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1151 let mut leader_cert_map_3 = IndexSet::new();
1152 leader_cert_map_3.insert(leader_certificate_3.clone());
1153 let mut previous_cert_map_3 = IndexSet::new();
1154 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1155 previous_cert_map_3.insert(cert);
1156 }
1157 let mut prev_commit_cert_map_3 = IndexSet::new();
1158 for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
1159 if cert != leader_certificate_2 {
1160 prev_commit_cert_map_3.insert(cert);
1161 }
1162 }
1163 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1164 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1165 subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
1166 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1167 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1168 };
1169 core_ledger.advance_to_next_block(&block_3)?;
1171
1172 let syncing_ledger = Arc::new(CoreLedgerService::new(
1174 CurrentLedger::load(genesis, StorageMode::new_test(None)).unwrap(),
1175 Default::default(),
1176 ));
1177 let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
1179 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1181 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1183 sync.sync_storage_with_block(block_1).await?;
1185 assert_eq!(syncing_ledger.latest_block_height(), 1);
1186 sync.sync_storage_with_block(block_2).await?;
1188 assert_eq!(syncing_ledger.latest_block_height(), 2);
1189 sync.sync_storage_with_block(block_3).await?;
1191 assert_eq!(syncing_ledger.latest_block_height(), 3);
1192 assert!(syncing_ledger.contains_block_height(1));
1194 assert!(syncing_ledger.contains_block_height(2));
1195
1196 Ok(())
1197 }
1198
1199 #[tokio::test]
1200 #[tracing_test::traced_test]
1201 async fn test_pending_certificates() -> anyhow::Result<()> {
1202 let rng = &mut TestRng::default();
1203 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1205 let commit_round = 2;
1206
1207 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1209 let account: Account<CurrentNetwork> = Account::new(rng)?;
1210
1211 let seed: u64 = rng.gen();
1213 let genesis_rng = &mut TestRng::from_seed(seed);
1214 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1215
1216 let genesis_rng = &mut TestRng::from_seed(seed);
1218 let private_keys = [
1219 *account.private_key(),
1220 PrivateKey::new(genesis_rng)?,
1221 PrivateKey::new(genesis_rng)?,
1222 PrivateKey::new(genesis_rng)?,
1223 ];
1224 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1226 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1228 let (round_to_certificates_map, committee) = {
1230 let committee = ledger.latest_committee().unwrap();
1232 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1234 HashMap::new();
1235 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1236
1237 for round in 0..=commit_round + 8 {
1238 let mut current_certificates = IndexSet::new();
1239 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1240 IndexSet::new()
1241 } else {
1242 previous_certificates.iter().map(|c| c.id()).collect()
1243 };
1244 let committee_id = committee.id();
1245 for (i, private_key_1) in private_keys.iter().enumerate() {
1247 let batch_header = BatchHeader::new(
1248 private_key_1,
1249 round,
1250 now(),
1251 committee_id,
1252 Default::default(),
1253 previous_certificate_ids.clone(),
1254 rng,
1255 )
1256 .unwrap();
1257 let mut signatures = IndexSet::with_capacity(4);
1259 for (j, private_key_2) in private_keys.iter().enumerate() {
1260 if i != j {
1261 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1262 }
1263 }
1264 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1265 }
1266
1267 round_to_certificates_map.insert(round, current_certificates.clone());
1269 previous_certificates = current_certificates.clone();
1270 }
1271 (round_to_certificates_map, committee)
1272 };
1273
1274 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1276 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1278 for i in 1..=commit_round + 8 {
1279 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1280 certificates.extend(c);
1281 }
1282 for certificate in certificates.clone().iter() {
1283 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1284 }
1285 let leader_round_1 = commit_round;
1287 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1288 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1289 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1290 let block_1 = {
1291 let mut leader_cert_map = IndexSet::new();
1292 leader_cert_map.insert(leader_certificate.clone());
1293 let mut previous_cert_map = IndexSet::new();
1294 for cert in storage.get_certificates_for_round(commit_round - 1) {
1295 previous_cert_map.insert(cert);
1296 }
1297 subdag_map.insert(commit_round, leader_cert_map.clone());
1298 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1299 let subdag = Subdag::from(subdag_map.clone())?;
1300 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1301 };
1302 core_ledger.advance_to_next_block(&block_1)?;
1304
1305 let leader_round_2 = commit_round + 2;
1307 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1308 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1309 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1310 let block_2 = {
1311 let mut leader_cert_map_2 = IndexSet::new();
1312 leader_cert_map_2.insert(leader_certificate_2.clone());
1313 let mut previous_cert_map_2 = IndexSet::new();
1314 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1315 previous_cert_map_2.insert(cert);
1316 }
1317 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1318 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1319 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1320 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1321 };
1322 core_ledger.advance_to_next_block(&block_2)?;
1324
1325 let leader_round_3 = commit_round + 4;
1327 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1328 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1329 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1330 let block_3 = {
1331 let mut leader_cert_map_3 = IndexSet::new();
1332 leader_cert_map_3.insert(leader_certificate_3.clone());
1333 let mut previous_cert_map_3 = IndexSet::new();
1334 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1335 previous_cert_map_3.insert(cert);
1336 }
1337 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1338 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1339 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1340 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1341 };
1342 core_ledger.advance_to_next_block(&block_3)?;
1344
1345 let pending_certificates = storage.get_pending_certificates();
1351 for certificate in pending_certificates.clone() {
1353 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1354 }
1355 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1357 {
1358 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1359 for subdag in subdag_maps.iter() {
1360 for subdag_certificates in subdag.values() {
1361 committed_certificates.extend(subdag_certificates.iter().cloned());
1362 }
1363 }
1364 };
1365 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1367 for certificate in certificates.clone() {
1368 if !committed_certificates.contains(&certificate) {
1369 candidate_pending_certificates.insert(certificate);
1370 }
1371 }
1372 assert_eq!(pending_certificates, candidate_pending_certificates);
1374 Ok(())
1375 }
1376}