1use crate::{
17 Gateway,
18 MAX_FETCH_TIMEOUT_IN_MS,
19 PRIMARY_PING_IN_MS,
20 Transport,
21 helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22 spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
27use snarkos_node_tcp::P2P;
28use snarkvm::{
29 console::{network::Network, types::Field},
30 ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31 prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, anyhow, bail};
35#[cfg(feature = "locktick")]
36use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
37#[cfg(not(feature = "locktick"))]
38use parking_lot::Mutex;
39use rayon::prelude::*;
40use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration};
41#[cfg(not(feature = "locktick"))]
42use tokio::sync::Mutex as TMutex;
43use tokio::{
44 sync::{OnceCell, oneshot},
45 task::JoinHandle,
46};
47
48#[derive(Clone)]
49pub struct Sync<N: Network> {
50 gateway: Gateway<N>,
52 storage: Storage<N>,
54 ledger: Arc<dyn LedgerService<N>>,
56 block_sync: BlockSync<N>,
58 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
60 bft_sender: Arc<OnceCell<BFTSender<N>>>,
62 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
64 response_lock: Arc<TMutex<()>>,
66 sync_lock: Arc<TMutex<()>>,
68 latest_block_responses: Arc<TMutex<HashMap<u32, Block<N>>>>,
73}
74
75impl<N: Network> Sync<N> {
76 pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
78 let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
80 Self {
82 gateway,
83 storage,
84 ledger,
85 block_sync,
86 pending: Default::default(),
87 bft_sender: Default::default(),
88 handles: Default::default(),
89 response_lock: Default::default(),
90 sync_lock: Default::default(),
91 latest_block_responses: Default::default(),
92 }
93 }
94
95 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
97 if let Some(bft_sender) = bft_sender {
99 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
100 }
101
102 info!("Syncing storage with the ledger...");
103
104 self.sync_storage_with_ledger_at_bootup().await
106 }
107
108 pub async fn run(&self, sync_receiver: SyncReceiver<N>) -> Result<()> {
110 info!("Starting the sync module...");
111
112 let self_ = self.clone();
114 self.handles.lock().push(tokio::spawn(async move {
115 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
120 loop {
121 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
123 let communication = &self_.gateway;
125 self_.block_sync.try_block_sync(communication).await;
127
128 if let Err(e) = self_.sync_storage_with_blocks().await {
130 error!("Unable to sync storage with blocks - {e}");
131 }
132
133 if self_.is_synced() {
135 self_.latest_block_responses.lock().await.clear();
136 }
137 }
138 }));
139
140 let self_ = self.clone();
142 self.spawn(async move {
143 loop {
144 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
146
147 let self__ = self_.clone();
149 let _ = spawn_blocking!({
150 self__.pending.clear_expired_callbacks();
151 Ok(())
152 });
153 }
154 });
155
156 let SyncReceiver {
158 mut rx_block_sync_advance_with_sync_blocks,
159 mut rx_block_sync_remove_peer,
160 mut rx_block_sync_update_peer_locators,
161 mut rx_certificate_request,
162 mut rx_certificate_response,
163 } = sync_receiver;
164
165 let self_ = self.clone();
172 self.spawn(async move {
173 while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
174 if let Err(e) = self_.block_sync.process_block_response(peer_ip, blocks) {
176 callback.send(Err(e)).ok();
178 continue;
179 }
180
181 if let Err(e) = self_.sync_storage_with_blocks().await {
183 callback.send(Err(e)).ok();
185 continue;
186 }
187
188 callback.send(Ok(())).ok();
190 }
191 });
192
193 let self_ = self.clone();
195 self.spawn(async move {
196 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
197 self_.block_sync.remove_peer(&peer_ip);
198 }
199 });
200
201 let self_ = self.clone();
208 self.spawn(async move {
209 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
210 let self_clone = self_.clone();
211 tokio::spawn(async move {
212 let result = self_clone.block_sync.update_peer_locators(peer_ip, locators);
214 callback.send(result).ok();
216 });
217 }
218 });
219
220 let self_ = self.clone();
226 self.spawn(async move {
227 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
228 self_.send_certificate_response(peer_ip, certificate_request);
229 }
230 });
231
232 let self_ = self.clone();
238 self.spawn(async move {
239 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
240 self_.finish_certificate_request(peer_ip, certificate_response)
241 }
242 });
243
244 Ok(())
245 }
246}
247
248impl<N: Network> Sync<N> {
250 pub async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
252 let latest_block = self.ledger.latest_block();
254
255 let block_height = latest_block.height();
257 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
262 let gc_height = block_height.saturating_sub(max_gc_blocks);
266 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
268
269 let _lock = self.sync_lock.lock().await;
271
272 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
273
274 self.storage.sync_height_with_block(latest_block.height());
278 self.storage.sync_round_with_block(latest_block.round());
280 self.storage.garbage_collect_certificates(latest_block.round());
282 for block in &blocks {
284 if let Authority::Quorum(subdag) = block.authority() {
289 let unconfirmed_transactions = cfg_iter!(block.transactions())
291 .filter_map(|tx| {
292 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
293 })
294 .collect::<HashMap<_, _>>();
295
296 for certificates in subdag.values().cloned() {
298 cfg_into_iter!(certificates).for_each(|certificate| {
299 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
300 });
301 }
302
303 #[cfg(feature = "telemetry")]
305 self.gateway.validator_telemetry().insert_subdag(subdag);
306 }
307 }
308
309 let certificates = blocks
313 .iter()
314 .flat_map(|block| {
315 match block.authority() {
316 Authority::Beacon(_) => None,
318 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
320 }
321 })
322 .flatten()
323 .collect::<Vec<_>>();
324
325 if let Some(bft_sender) = self.bft_sender.get() {
327 if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
329 bail!("Failed to update the BFT DAG from sync: {e}");
330 }
331 }
332
333 Ok(())
334 }
335
336 pub async fn sync_storage_with_blocks(&self) -> Result<()> {
338 let _lock = self.response_lock.lock().await;
340
341 let mut current_height = self.ledger.latest_block_height() + 1;
346
347 let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
349 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
354 let max_gc_height = tip.saturating_sub(max_gc_blocks);
358
359 if current_height <= max_gc_height {
361 while let Some(block) = self.block_sync.peek_next_block(current_height) {
363 info!("Syncing the ledger to block {}...", block.height());
364 match self.sync_ledger_with_block_without_bft(block).await {
366 Ok(_) => {
367 current_height += 1;
369 }
370 Err(e) => {
371 self.block_sync.remove_block_response(current_height);
373 return Err(e);
374 }
375 }
376 }
377 if current_height > max_gc_height {
379 if let Err(e) = self.sync_storage_with_ledger_at_bootup().await {
380 error!("BFT sync (with bootup routine) failed - {e}");
381 }
382 }
383 }
384
385 while let Some(block) = self.block_sync.peek_next_block(current_height) {
387 info!("Syncing the BFT to block {}...", block.height());
388 match self.sync_storage_with_block(block).await {
390 Ok(_) => {
391 current_height += 1;
393 }
394 Err(e) => {
395 self.block_sync.remove_block_response(current_height);
397 return Err(e);
398 }
399 }
400 }
401 Ok(())
402 }
403
404 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
406 let _lock = self.sync_lock.lock().await;
408
409 let self_ = self.clone();
410 tokio::task::spawn_blocking(move || {
411 self_.ledger.check_next_block(&block)?;
413 self_.ledger.advance_to_next_block(&block)?;
415
416 self_.storage.sync_height_with_block(block.height());
418 self_.storage.sync_round_with_block(block.round());
420 self_.block_sync.remove_block_response(block.height());
422
423 Ok(())
424 })
425 .await?
426 }
427
428 pub async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
439 let _lock = self.sync_lock.lock().await;
441 let mut latest_block_responses = self.latest_block_responses.lock().await;
443
444 if self.ledger.contains_block_height(block.height()) || latest_block_responses.contains_key(&block.height()) {
446 return Ok(());
447 }
448
449 if let Authority::Quorum(subdag) = block.authority() {
454 let unconfirmed_transactions = cfg_iter!(block.transactions())
456 .filter_map(|tx| {
457 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
458 })
459 .collect::<HashMap<_, _>>();
460
461 for certificates in subdag.values().cloned() {
463 cfg_into_iter!(certificates.clone()).for_each(|certificate| {
464 self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
466 });
467
468 for certificate in certificates {
470 if let Some(bft_sender) = self.bft_sender.get() {
472 if let Err(e) = bft_sender.send_sync_bft(certificate).await {
474 bail!("Sync - {e}");
475 };
476 }
477 }
478 }
479 }
480
481 let latest_block_height = self.ledger.latest_block_height();
483
484 latest_block_responses.insert(block.height(), block);
486 latest_block_responses.retain(|height, _| *height > latest_block_height);
488
489 let contiguous_blocks: Vec<Block<N>> = (latest_block_height.saturating_add(1)..)
491 .take_while(|&k| latest_block_responses.contains_key(&k))
492 .filter_map(|k| latest_block_responses.get(&k).cloned())
493 .collect();
494
495 for next_block in contiguous_blocks.into_iter() {
507 let next_block_height = next_block.height();
509
510 let leader_certificate = match next_block.authority() {
512 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
513 _ => bail!("Received a block with an unexpected authority type."),
514 };
515 let commit_round = leader_certificate.round();
516 let certificate_round =
517 commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
518
519 let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
521 let certificates = self.storage.get_certificates_for_round(certificate_round);
523 let authors = certificates
526 .iter()
527 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
528 true => Some(c.author()),
529 false => None,
530 })
531 .collect();
532
533 debug!("Validating sync block {next_block_height} at round {commit_round}...");
534 if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
536 let mut current_certificate = leader_certificate;
538 let mut blocks_to_add = vec![next_block];
540
541 for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
543 let Some(previous_block) = latest_block_responses.get(&height) else {
545 bail!("Block {height} is missing from the latest block responses.");
546 };
547 let previous_certificate = match previous_block.authority() {
549 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
550 _ => bail!("Received a block with an unexpected authority type."),
551 };
552 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
554 debug!("Previous sync block {height} is linked to the current block {next_block_height}");
555 blocks_to_add.insert(0, previous_block.clone());
557 current_certificate = previous_certificate;
559 }
560 }
561
562 for block in blocks_to_add {
564 let block_height = block.height();
566 if block_height != self.ledger.latest_block_height().saturating_add(1) {
567 warn!("Skipping block {block_height} from the latest block responses - not sequential.");
568 continue;
569 }
570 #[cfg(feature = "telemetry")]
571 let block_authority = block.authority().clone();
572
573 let self_ = self.clone();
574 tokio::task::spawn_blocking(move || {
575 self_.ledger.check_next_block(&block)?;
577 self_.ledger.advance_to_next_block(&block)?;
579
580 self_.storage.sync_height_with_block(block.height());
582 self_.storage.sync_round_with_block(block.round());
584
585 Ok::<(), anyhow::Error>(())
586 })
587 .await??;
588 latest_block_responses.remove(&block_height);
590 self.block_sync.remove_block_response(block_height);
592
593 #[cfg(feature = "telemetry")]
595 if let Authority::Quorum(subdag) = block_authority {
596 self_.gateway.validator_telemetry().insert_subdag(&subdag);
597 }
598 }
599 } else {
600 debug!(
601 "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
602 );
603 }
604 }
605
606 Ok(())
607 }
608
609 fn is_linked(
611 &self,
612 previous_certificate: BatchCertificate<N>,
613 current_certificate: BatchCertificate<N>,
614 ) -> Result<bool> {
615 let mut traversal = vec![current_certificate.clone()];
617 for round in (previous_certificate.round()..current_certificate.round()).rev() {
619 let certificates = self.storage.get_certificates_for_round(round);
621 traversal = certificates
623 .into_iter()
624 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
625 .collect();
626 }
627 Ok(traversal.contains(&previous_certificate))
628 }
629}
630
631impl<N: Network> Sync<N> {
633 pub fn is_synced(&self) -> bool {
635 if self.gateway.number_of_connected_peers() == 0 {
636 return false;
637 }
638 self.block_sync.is_block_synced()
639 }
640
641 pub fn num_blocks_behind(&self) -> u32 {
643 self.block_sync.num_blocks_behind()
644 }
645
646 pub const fn is_gateway_mode(&self) -> bool {
648 self.block_sync.mode().is_gateway()
649 }
650
651 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
653 self.block_sync.get_block_locators()
654 }
655
656 #[cfg(test)]
658 #[doc(hidden)]
659 pub(super) fn block_sync(&self) -> &BlockSync<N> {
660 &self.block_sync
661 }
662}
663
664impl<N: Network> Sync<N> {
666 pub async fn send_certificate_request(
668 &self,
669 peer_ip: SocketAddr,
670 certificate_id: Field<N>,
671 ) -> Result<BatchCertificate<N>> {
672 let (callback_sender, callback_receiver) = oneshot::channel();
674 let num_sent_requests = self.pending.num_sent_requests(certificate_id);
676 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
678 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
680 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
683
684 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
686
687 if should_send_request {
689 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
691 bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
692 }
693 } else {
694 debug!(
695 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
696 fmt_id(certificate_id)
697 );
698 }
699 match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
702 Ok(result) => Ok(result?),
704 Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
706 }
707 }
708
709 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
711 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
713 let self_ = self.clone();
715 tokio::spawn(async move {
716 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
717 });
718 }
719 }
720
721 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
724 let certificate = response.certificate;
725 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
727 if exists {
729 self.pending.remove(certificate.id(), Some(certificate));
732 }
733 }
734}
735
736impl<N: Network> Sync<N> {
737 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
739 self.handles.lock().push(tokio::spawn(future));
740 }
741
742 pub async fn shut_down(&self) {
744 info!("Shutting down the sync module...");
745 let _lock = self.response_lock.lock().await;
747 let _lock = self.sync_lock.lock().await;
749 self.handles.lock().iter().for_each(|handle| handle.abort());
751 }
752}
753#[cfg(test)]
754mod tests {
755 use super::*;
756
757 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
758 use snarkos_account::Account;
759 use snarkvm::{
760 console::{
761 account::{Address, PrivateKey},
762 network::MainnetV0,
763 },
764 ledger::{
765 narwhal::{BatchCertificate, BatchHeader, Subdag},
766 store::{ConsensusStore, helpers::memory::ConsensusMemory},
767 },
768 prelude::{Ledger, VM},
769 utilities::TestRng,
770 };
771
772 use aleo_std::StorageMode;
773 use indexmap::IndexSet;
774 use rand::Rng;
775 use std::collections::BTreeMap;
776
777 type CurrentNetwork = MainnetV0;
778 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
779 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
780
781 #[tokio::test]
782 #[tracing_test::traced_test]
783 async fn test_commit_via_is_linked() -> anyhow::Result<()> {
784 let rng = &mut TestRng::default();
785 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
787 let commit_round = 2;
788
789 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
791 let account: Account<CurrentNetwork> = Account::new(rng)?;
792
793 let seed: u64 = rng.gen();
795 let genesis_rng = &mut TestRng::from_seed(seed);
796 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
797
798 let genesis_rng = &mut TestRng::from_seed(seed);
800 let private_keys = [
801 *account.private_key(),
802 PrivateKey::new(genesis_rng)?,
803 PrivateKey::new(genesis_rng)?,
804 PrivateKey::new(genesis_rng)?,
805 ];
806
807 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
809 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
811
812 let (round_to_certificates_map, committee) = {
814 let addresses = vec![
815 Address::try_from(private_keys[0])?,
816 Address::try_from(private_keys[1])?,
817 Address::try_from(private_keys[2])?,
818 Address::try_from(private_keys[3])?,
819 ];
820
821 let committee = ledger.latest_committee().unwrap();
822
823 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
825 HashMap::new();
826 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
827
828 for round in 0..=commit_round + 8 {
829 let mut current_certificates = IndexSet::new();
830 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
831 IndexSet::new()
832 } else {
833 previous_certificates.iter().map(|c| c.id()).collect()
834 };
835 let committee_id = committee.id();
836
837 if round <= 5 {
839 let leader = committee.get_leader(round).unwrap();
840 let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
841 let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
842 for i in [leader_index, non_leader_index].into_iter() {
843 let batch_header = BatchHeader::new(
844 &private_keys[i],
845 round,
846 now(),
847 committee_id,
848 Default::default(),
849 previous_certificate_ids.clone(),
850 rng,
851 )
852 .unwrap();
853 let mut signatures = IndexSet::with_capacity(4);
855 for (j, private_key_2) in private_keys.iter().enumerate() {
856 if i != j {
857 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
858 }
859 }
860 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
861 }
862 }
863
864 if round > 5 {
866 for (i, private_key_1) in private_keys.iter().enumerate() {
867 let batch_header = BatchHeader::new(
868 private_key_1,
869 round,
870 now(),
871 committee_id,
872 Default::default(),
873 previous_certificate_ids.clone(),
874 rng,
875 )
876 .unwrap();
877 let mut signatures = IndexSet::with_capacity(4);
879 for (j, private_key_2) in private_keys.iter().enumerate() {
880 if i != j {
881 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
882 }
883 }
884 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
885 }
886 }
887 round_to_certificates_map.insert(round, current_certificates.clone());
889 previous_certificates = current_certificates.clone();
890 }
891 (round_to_certificates_map, committee)
892 };
893
894 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
896 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
898 for i in 1..=commit_round + 8 {
899 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
900 certificates.extend(c);
901 }
902 for certificate in certificates.clone().iter() {
903 storage.testing_only_insert_certificate_testing_only(certificate.clone());
904 }
905
906 let leader_round_1 = commit_round;
908 let leader_1 = committee.get_leader(leader_round_1).unwrap();
909 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
910 let block_1 = {
911 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
912 let mut leader_cert_map = IndexSet::new();
913 leader_cert_map.insert(leader_certificate.clone());
914 let mut previous_cert_map = IndexSet::new();
915 for cert in storage.get_certificates_for_round(commit_round - 1) {
916 previous_cert_map.insert(cert);
917 }
918 subdag_map.insert(commit_round, leader_cert_map.clone());
919 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
920 let subdag = Subdag::from(subdag_map.clone())?;
921 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
922 };
923 core_ledger.advance_to_next_block(&block_1)?;
925
926 let leader_round_2 = commit_round + 2;
928 let leader_2 = committee.get_leader(leader_round_2).unwrap();
929 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
930 let block_2 = {
931 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
932 let mut leader_cert_map_2 = IndexSet::new();
933 leader_cert_map_2.insert(leader_certificate_2.clone());
934 let mut previous_cert_map_2 = IndexSet::new();
935 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
936 previous_cert_map_2.insert(cert);
937 }
938 let mut prev_commit_cert_map_2 = IndexSet::new();
939 for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
940 if cert != leader_certificate {
941 prev_commit_cert_map_2.insert(cert);
942 }
943 }
944 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
945 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
946 subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
947 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
948 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
949 };
950 core_ledger.advance_to_next_block(&block_2)?;
952
953 let leader_round_3 = commit_round + 4;
955 let leader_3 = committee.get_leader(leader_round_3).unwrap();
956 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
957 let block_3 = {
958 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
959 let mut leader_cert_map_3 = IndexSet::new();
960 leader_cert_map_3.insert(leader_certificate_3.clone());
961 let mut previous_cert_map_3 = IndexSet::new();
962 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
963 previous_cert_map_3.insert(cert);
964 }
965 let mut prev_commit_cert_map_3 = IndexSet::new();
966 for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
967 if cert != leader_certificate_2 {
968 prev_commit_cert_map_3.insert(cert);
969 }
970 }
971 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
972 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
973 subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
974 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
975 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
976 };
977 core_ledger.advance_to_next_block(&block_3)?;
979
980 let syncing_ledger = Arc::new(CoreLedgerService::new(
982 CurrentLedger::load(genesis, StorageMode::new_test(None)).unwrap(),
983 Default::default(),
984 ));
985 let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
987 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone());
989 sync.sync_storage_with_block(block_1).await?;
991 assert_eq!(syncing_ledger.latest_block_height(), 1);
992 sync.sync_storage_with_block(block_2).await?;
994 assert_eq!(syncing_ledger.latest_block_height(), 2);
995 sync.sync_storage_with_block(block_3).await?;
997 assert_eq!(syncing_ledger.latest_block_height(), 3);
998 assert!(syncing_ledger.contains_block_height(1));
1000 assert!(syncing_ledger.contains_block_height(2));
1001
1002 Ok(())
1003 }
1004
1005 #[tokio::test]
1006 #[tracing_test::traced_test]
1007 async fn test_pending_certificates() -> anyhow::Result<()> {
1008 let rng = &mut TestRng::default();
1009 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1011 let commit_round = 2;
1012
1013 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1015 let account: Account<CurrentNetwork> = Account::new(rng)?;
1016
1017 let seed: u64 = rng.gen();
1019 let genesis_rng = &mut TestRng::from_seed(seed);
1020 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1021
1022 let genesis_rng = &mut TestRng::from_seed(seed);
1024 let private_keys = [
1025 *account.private_key(),
1026 PrivateKey::new(genesis_rng)?,
1027 PrivateKey::new(genesis_rng)?,
1028 PrivateKey::new(genesis_rng)?,
1029 ];
1030 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1032 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1034 let (round_to_certificates_map, committee) = {
1036 let committee = ledger.latest_committee().unwrap();
1038 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1040 HashMap::new();
1041 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1042
1043 for round in 0..=commit_round + 8 {
1044 let mut current_certificates = IndexSet::new();
1045 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1046 IndexSet::new()
1047 } else {
1048 previous_certificates.iter().map(|c| c.id()).collect()
1049 };
1050 let committee_id = committee.id();
1051 for (i, private_key_1) in private_keys.iter().enumerate() {
1053 let batch_header = BatchHeader::new(
1054 private_key_1,
1055 round,
1056 now(),
1057 committee_id,
1058 Default::default(),
1059 previous_certificate_ids.clone(),
1060 rng,
1061 )
1062 .unwrap();
1063 let mut signatures = IndexSet::with_capacity(4);
1065 for (j, private_key_2) in private_keys.iter().enumerate() {
1066 if i != j {
1067 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1068 }
1069 }
1070 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1071 }
1072
1073 round_to_certificates_map.insert(round, current_certificates.clone());
1075 previous_certificates = current_certificates.clone();
1076 }
1077 (round_to_certificates_map, committee)
1078 };
1079
1080 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1082 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1084 for i in 1..=commit_round + 8 {
1085 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1086 certificates.extend(c);
1087 }
1088 for certificate in certificates.clone().iter() {
1089 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1090 }
1091 let leader_round_1 = commit_round;
1093 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1094 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1095 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1096 let block_1 = {
1097 let mut leader_cert_map = IndexSet::new();
1098 leader_cert_map.insert(leader_certificate.clone());
1099 let mut previous_cert_map = IndexSet::new();
1100 for cert in storage.get_certificates_for_round(commit_round - 1) {
1101 previous_cert_map.insert(cert);
1102 }
1103 subdag_map.insert(commit_round, leader_cert_map.clone());
1104 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1105 let subdag = Subdag::from(subdag_map.clone())?;
1106 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1107 };
1108 core_ledger.advance_to_next_block(&block_1)?;
1110
1111 let leader_round_2 = commit_round + 2;
1113 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1114 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1115 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1116 let block_2 = {
1117 let mut leader_cert_map_2 = IndexSet::new();
1118 leader_cert_map_2.insert(leader_certificate_2.clone());
1119 let mut previous_cert_map_2 = IndexSet::new();
1120 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1121 previous_cert_map_2.insert(cert);
1122 }
1123 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1124 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1125 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1126 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1127 };
1128 core_ledger.advance_to_next_block(&block_2)?;
1130
1131 let leader_round_3 = commit_round + 4;
1133 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1134 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1135 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1136 let block_3 = {
1137 let mut leader_cert_map_3 = IndexSet::new();
1138 leader_cert_map_3.insert(leader_certificate_3.clone());
1139 let mut previous_cert_map_3 = IndexSet::new();
1140 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1141 previous_cert_map_3.insert(cert);
1142 }
1143 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1144 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1145 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1146 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1147 };
1148 core_ledger.advance_to_next_block(&block_3)?;
1150
1151 let pending_certificates = storage.get_pending_certificates();
1157 for certificate in pending_certificates.clone() {
1159 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1160 }
1161 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1163 {
1164 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1165 for subdag in subdag_maps.iter() {
1166 for subdag_certificates in subdag.values() {
1167 committed_certificates.extend(subdag_certificates.iter().cloned());
1168 }
1169 }
1170 };
1171 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1173 for certificate in certificates.clone() {
1174 if !committed_certificates.contains(&certificate) {
1175 candidate_pending_certificates.insert(certificate);
1176 }
1177 }
1178 assert_eq!(pending_certificates, candidate_pending_certificates);
1180 Ok(())
1181 }
1182}