1use crate::{
17 Gateway,
18 MAX_FETCH_TIMEOUT_IN_MS,
19 PRIMARY_PING_IN_MS,
20 Transport,
21 helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22 spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
27use snarkos_node_tcp::P2P;
28use snarkvm::{
29 console::{network::Network, types::Field},
30 ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31 prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, bail};
35use parking_lot::Mutex;
36use rayon::prelude::*;
37use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration};
38use tokio::{
39 sync::{Mutex as TMutex, OnceCell, oneshot},
40 task::JoinHandle,
41};
42
43#[derive(Clone)]
44pub struct Sync<N: Network> {
45 gateway: Gateway<N>,
47 storage: Storage<N>,
49 ledger: Arc<dyn LedgerService<N>>,
51 block_sync: BlockSync<N>,
53 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
55 bft_sender: Arc<OnceCell<BFTSender<N>>>,
57 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
59 response_lock: Arc<TMutex<()>>,
61 sync_lock: Arc<TMutex<()>>,
63 latest_block_responses: Arc<TMutex<HashMap<u32, Block<N>>>>,
65}
66
67impl<N: Network> Sync<N> {
68 pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
70 let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
72 Self {
74 gateway,
75 storage,
76 ledger,
77 block_sync,
78 pending: Default::default(),
79 bft_sender: Default::default(),
80 handles: Default::default(),
81 response_lock: Default::default(),
82 sync_lock: Default::default(),
83 latest_block_responses: Default::default(),
84 }
85 }
86
87 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
89 if let Some(bft_sender) = bft_sender {
91 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
92 }
93
94 info!("Syncing storage with the ledger...");
95
96 self.sync_storage_with_ledger_at_bootup().await
98 }
99
100 pub async fn run(&self, sync_receiver: SyncReceiver<N>) -> Result<()> {
102 info!("Starting the sync module...");
103
104 let self_ = self.clone();
106 self.handles.lock().push(tokio::spawn(async move {
107 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
112 loop {
113 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
115 let communication = &self_.gateway;
117 self_.block_sync.try_block_sync(communication).await;
119
120 if let Err(e) = self_.sync_storage_with_blocks().await {
122 error!("Unable to sync storage with blocks - {e}");
123 }
124
125 if self_.is_synced() {
127 self_.latest_block_responses.lock().await.clear();
128 }
129 }
130 }));
131
132 let self_ = self.clone();
134 self.spawn(async move {
135 loop {
136 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
138
139 let self__ = self_.clone();
141 let _ = spawn_blocking!({
142 self__.pending.clear_expired_callbacks();
143 Ok(())
144 });
145 }
146 });
147
148 let SyncReceiver {
150 mut rx_block_sync_advance_with_sync_blocks,
151 mut rx_block_sync_remove_peer,
152 mut rx_block_sync_update_peer_locators,
153 mut rx_certificate_request,
154 mut rx_certificate_response,
155 } = sync_receiver;
156
157 let self_ = self.clone();
164 self.spawn(async move {
165 while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
166 if let Err(e) = self_.block_sync.process_block_response(peer_ip, blocks) {
168 callback.send(Err(e)).ok();
170 continue;
171 }
172
173 if let Err(e) = self_.sync_storage_with_blocks().await {
175 callback.send(Err(e)).ok();
177 continue;
178 }
179
180 callback.send(Ok(())).ok();
182 }
183 });
184
185 let self_ = self.clone();
187 self.spawn(async move {
188 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
189 self_.block_sync.remove_peer(&peer_ip);
190 }
191 });
192
193 let self_ = self.clone();
200 self.spawn(async move {
201 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
202 let self_clone = self_.clone();
203 tokio::spawn(async move {
204 let result = self_clone.block_sync.update_peer_locators(peer_ip, locators);
206 callback.send(result).ok();
208 });
209 }
210 });
211
212 let self_ = self.clone();
218 self.spawn(async move {
219 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
220 self_.send_certificate_response(peer_ip, certificate_request);
221 }
222 });
223
224 let self_ = self.clone();
230 self.spawn(async move {
231 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
232 self_.finish_certificate_request(peer_ip, certificate_response)
233 }
234 });
235
236 Ok(())
237 }
238}
239
240impl<N: Network> Sync<N> {
242 pub async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
244 let latest_block = self.ledger.latest_block();
246
247 let block_height = latest_block.height();
249 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
254 let gc_height = block_height.saturating_sub(max_gc_blocks);
258 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
260
261 let _lock = self.sync_lock.lock().await;
263
264 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
265
266 self.storage.sync_height_with_block(latest_block.height());
270 self.storage.sync_round_with_block(latest_block.round());
272 self.storage.garbage_collect_certificates(latest_block.round());
274 for block in &blocks {
276 if let Authority::Quorum(subdag) = block.authority() {
281 let unconfirmed_transactions = cfg_iter!(block.transactions())
283 .filter_map(|tx| {
284 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
285 })
286 .collect::<HashMap<_, _>>();
287
288 for certificates in subdag.values().cloned() {
290 cfg_into_iter!(certificates).for_each(|certificate| {
291 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
292 });
293 }
294 }
295 }
296
297 let certificates = blocks
301 .iter()
302 .flat_map(|block| {
303 match block.authority() {
304 Authority::Beacon(_) => None,
306 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
308 }
309 })
310 .flatten()
311 .collect::<Vec<_>>();
312
313 if let Some(bft_sender) = self.bft_sender.get() {
315 if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
317 bail!("Failed to update the BFT DAG from sync: {e}");
318 }
319 }
320
321 Ok(())
322 }
323
324 pub async fn sync_storage_with_blocks(&self) -> Result<()> {
326 let _lock = self.response_lock.lock().await;
328
329 let mut current_height = self.ledger.latest_block_height() + 1;
334
335 let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
337 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
342 let max_gc_height = tip.saturating_sub(max_gc_blocks);
346
347 if current_height <= max_gc_height {
349 while let Some(block) = self.block_sync.peek_next_block(current_height) {
351 info!("Syncing the ledger to block {}...", block.height());
352 match self.sync_ledger_with_block_without_bft(block).await {
354 Ok(_) => {
355 current_height += 1;
357 }
358 Err(e) => {
359 self.block_sync.remove_block_response(current_height);
361 return Err(e);
362 }
363 }
364 }
365 if current_height > max_gc_height {
367 if let Err(e) = self.sync_storage_with_ledger_at_bootup().await {
368 error!("BFT sync (with bootup routine) failed - {e}");
369 }
370 }
371 }
372
373 while let Some(block) = self.block_sync.peek_next_block(current_height) {
375 info!("Syncing the BFT to block {}...", block.height());
376 match self.sync_storage_with_block(block).await {
378 Ok(_) => {
379 current_height += 1;
381 }
382 Err(e) => {
383 self.block_sync.remove_block_response(current_height);
385 return Err(e);
386 }
387 }
388 }
389 Ok(())
390 }
391
392 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
394 let _lock = self.sync_lock.lock().await;
396
397 let self_ = self.clone();
398 tokio::task::spawn_blocking(move || {
399 self_.ledger.check_next_block(&block)?;
401 self_.ledger.advance_to_next_block(&block)?;
403
404 self_.storage.sync_height_with_block(block.height());
406 self_.storage.sync_round_with_block(block.round());
408 self_.block_sync.remove_block_response(block.height());
410
411 Ok(())
412 })
413 .await?
414 }
415
416 pub async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
418 let _lock = self.sync_lock.lock().await;
420 let mut latest_block_responses = self.latest_block_responses.lock().await;
422
423 if self.ledger.contains_block_height(block.height()) || latest_block_responses.contains_key(&block.height()) {
425 return Ok(());
426 }
427
428 if let Authority::Quorum(subdag) = block.authority() {
433 let unconfirmed_transactions = cfg_iter!(block.transactions())
435 .filter_map(|tx| {
436 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
437 })
438 .collect::<HashMap<_, _>>();
439
440 for certificates in subdag.values().cloned() {
442 cfg_into_iter!(certificates.clone()).for_each(|certificate| {
443 self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
445 });
446
447 for certificate in certificates {
449 if let Some(bft_sender) = self.bft_sender.get() {
451 if let Err(e) = bft_sender.send_sync_bft(certificate).await {
453 bail!("Sync - {e}");
454 };
455 }
456 }
457 }
458 }
459
460 let latest_block_height = self.ledger.latest_block_height();
462
463 latest_block_responses.insert(block.height(), block);
465 latest_block_responses.retain(|height, _| *height > latest_block_height);
467
468 let contiguous_blocks: Vec<Block<N>> = (latest_block_height.saturating_add(1)..)
470 .take_while(|&k| latest_block_responses.contains_key(&k))
471 .filter_map(|k| latest_block_responses.get(&k).cloned())
472 .collect();
473
474 for next_block in contiguous_blocks.into_iter() {
481 let next_block_height = next_block.height();
483
484 let leader_certificate = match next_block.authority() {
486 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
487 _ => bail!("Received a block with an unexpected authority type."),
488 };
489 let commit_round = leader_certificate.round();
490 let certificate_round = commit_round.saturating_add(1);
491
492 let committee_lookback = self.ledger.get_committee_lookback_for_round(commit_round)?;
494 let certificates = self.storage.get_certificates_for_round(certificate_round);
496 let authors = certificates
498 .iter()
499 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
500 true => Some(c.author()),
501 false => None,
502 })
503 .collect();
504
505 debug!("Validating sync block {next_block_height} at round {commit_round}...");
506 if committee_lookback.is_availability_threshold_reached(&authors) {
508 let mut current_certificate = leader_certificate;
510 let mut blocks_to_add = vec![next_block];
512
513 for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
515 let Some(previous_block) = latest_block_responses.get(&height) else {
517 bail!("Block {height} is missing from the latest block responses.");
518 };
519 let previous_certificate = match previous_block.authority() {
521 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
522 _ => bail!("Received a block with an unexpected authority type."),
523 };
524 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
526 debug!("Previous sync block {height} is linked to the current block {next_block_height}");
527 blocks_to_add.insert(0, previous_block.clone());
529 current_certificate = previous_certificate;
531 }
532 }
533
534 for block in blocks_to_add {
536 let block_height = block.height();
538 if block_height != self.ledger.latest_block_height().saturating_add(1) {
539 warn!("Skipping block {block_height} from the latest block responses - not sequential.");
540 continue;
541 }
542
543 let self_ = self.clone();
544 tokio::task::spawn_blocking(move || {
545 self_.ledger.check_next_block(&block)?;
547 self_.ledger.advance_to_next_block(&block)?;
549
550 self_.storage.sync_height_with_block(block.height());
552 self_.storage.sync_round_with_block(block.round());
554
555 Ok::<(), anyhow::Error>(())
556 })
557 .await??;
558 latest_block_responses.remove(&block_height);
560 self.block_sync.remove_block_response(block_height);
562 }
563 } else {
564 debug!(
565 "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
566 );
567 }
568 }
569
570 Ok(())
571 }
572
573 fn is_linked(
575 &self,
576 previous_certificate: BatchCertificate<N>,
577 current_certificate: BatchCertificate<N>,
578 ) -> Result<bool> {
579 let mut traversal = vec![current_certificate.clone()];
581 for round in (previous_certificate.round()..current_certificate.round()).rev() {
583 let certificates = self.storage.get_certificates_for_round(round);
585 traversal = certificates
587 .into_iter()
588 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
589 .collect();
590 }
591 Ok(traversal.contains(&previous_certificate))
592 }
593}
594
595impl<N: Network> Sync<N> {
597 pub fn is_synced(&self) -> bool {
599 if self.gateway.number_of_connected_peers() == 0 {
600 return false;
601 }
602 self.block_sync.is_block_synced()
603 }
604
605 pub fn num_blocks_behind(&self) -> u32 {
607 self.block_sync.num_blocks_behind()
608 }
609
610 pub const fn is_gateway_mode(&self) -> bool {
612 self.block_sync.mode().is_gateway()
613 }
614
615 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
617 self.block_sync.get_block_locators()
618 }
619
620 #[cfg(test)]
622 #[doc(hidden)]
623 pub(super) fn block_sync(&self) -> &BlockSync<N> {
624 &self.block_sync
625 }
626}
627
628impl<N: Network> Sync<N> {
630 pub async fn send_certificate_request(
632 &self,
633 peer_ip: SocketAddr,
634 certificate_id: Field<N>,
635 ) -> Result<BatchCertificate<N>> {
636 let (callback_sender, callback_receiver) = oneshot::channel();
638 let num_sent_requests = self.pending.num_sent_requests(certificate_id);
640 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
642 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
644 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
647
648 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
650
651 if should_send_request {
653 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
655 bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
656 }
657 } else {
658 debug!(
659 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
660 fmt_id(certificate_id)
661 );
662 }
663 match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
666 Ok(result) => Ok(result?),
668 Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
670 }
671 }
672
673 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
675 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
677 let self_ = self.clone();
679 tokio::spawn(async move {
680 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
681 });
682 }
683 }
684
685 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
688 let certificate = response.certificate;
689 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
691 if exists {
693 self.pending.remove(certificate.id(), Some(certificate));
696 }
697 }
698}
699
700impl<N: Network> Sync<N> {
701 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
703 self.handles.lock().push(tokio::spawn(future));
704 }
705
706 pub async fn shut_down(&self) {
708 info!("Shutting down the sync module...");
709 let _lock = self.response_lock.lock().await;
711 let _lock = self.sync_lock.lock().await;
713 self.handles.lock().iter().for_each(|handle| handle.abort());
715 }
716}
717#[cfg(test)]
718mod tests {
719 use super::*;
720
721 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
722 use snarkos_account::Account;
723 use snarkvm::{
724 console::{
725 account::{Address, PrivateKey},
726 network::MainnetV0,
727 },
728 ledger::{
729 narwhal::{BatchCertificate, BatchHeader, Subdag},
730 store::{ConsensusStore, helpers::memory::ConsensusMemory},
731 },
732 prelude::{Ledger, VM},
733 utilities::TestRng,
734 };
735
736 use aleo_std::StorageMode;
737 use indexmap::IndexSet;
738 use rand::Rng;
739 use std::collections::BTreeMap;
740
741 type CurrentNetwork = MainnetV0;
742 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
743 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
744
745 #[tokio::test]
746 #[tracing_test::traced_test]
747 async fn test_commit_via_is_linked() -> anyhow::Result<()> {
748 let rng = &mut TestRng::default();
749 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
751 let commit_round = 2;
752
753 let store = CurrentConsensusStore::open(None).unwrap();
755 let account: Account<CurrentNetwork> = Account::new(rng)?;
756
757 let seed: u64 = rng.gen();
759 let genesis_rng = &mut TestRng::from_seed(seed);
760 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
761
762 let genesis_rng = &mut TestRng::from_seed(seed);
764 let private_keys = [
765 *account.private_key(),
766 PrivateKey::new(genesis_rng)?,
767 PrivateKey::new(genesis_rng)?,
768 PrivateKey::new(genesis_rng)?,
769 ];
770
771 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
773 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
775
776 let (round_to_certificates_map, committee) = {
778 let addresses = vec![
779 Address::try_from(private_keys[0])?,
780 Address::try_from(private_keys[1])?,
781 Address::try_from(private_keys[2])?,
782 Address::try_from(private_keys[3])?,
783 ];
784
785 let committee = ledger.latest_committee().unwrap();
786
787 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
789 HashMap::new();
790 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
791
792 for round in 0..=commit_round + 8 {
793 let mut current_certificates = IndexSet::new();
794 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
795 IndexSet::new()
796 } else {
797 previous_certificates.iter().map(|c| c.id()).collect()
798 };
799 let committee_id = committee.id();
800
801 if round <= 5 {
803 let leader = committee.get_leader(round).unwrap();
804 let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
805 let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
806 for i in [leader_index, non_leader_index].into_iter() {
807 let batch_header = BatchHeader::new(
808 &private_keys[i],
809 round,
810 now(),
811 committee_id,
812 Default::default(),
813 previous_certificate_ids.clone(),
814 rng,
815 )
816 .unwrap();
817 let mut signatures = IndexSet::with_capacity(4);
819 for (j, private_key_2) in private_keys.iter().enumerate() {
820 if i != j {
821 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
822 }
823 }
824 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
825 }
826 }
827
828 if round > 5 {
830 for (i, private_key_1) in private_keys.iter().enumerate() {
831 let batch_header = BatchHeader::new(
832 private_key_1,
833 round,
834 now(),
835 committee_id,
836 Default::default(),
837 previous_certificate_ids.clone(),
838 rng,
839 )
840 .unwrap();
841 let mut signatures = IndexSet::with_capacity(4);
843 for (j, private_key_2) in private_keys.iter().enumerate() {
844 if i != j {
845 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
846 }
847 }
848 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
849 }
850 }
851 round_to_certificates_map.insert(round, current_certificates.clone());
853 previous_certificates = current_certificates.clone();
854 }
855 (round_to_certificates_map, committee)
856 };
857
858 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
860 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
862 for i in 1..=commit_round + 8 {
863 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
864 certificates.extend(c);
865 }
866 for certificate in certificates.clone().iter() {
867 storage.testing_only_insert_certificate_testing_only(certificate.clone());
868 }
869
870 let leader_round_1 = commit_round;
872 let leader_1 = committee.get_leader(leader_round_1).unwrap();
873 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
874 let block_1 = {
875 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
876 let mut leader_cert_map = IndexSet::new();
877 leader_cert_map.insert(leader_certificate.clone());
878 let mut previous_cert_map = IndexSet::new();
879 for cert in storage.get_certificates_for_round(commit_round - 1) {
880 previous_cert_map.insert(cert);
881 }
882 subdag_map.insert(commit_round, leader_cert_map.clone());
883 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
884 let subdag = Subdag::from(subdag_map.clone())?;
885 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
886 };
887 core_ledger.advance_to_next_block(&block_1)?;
889
890 let leader_round_2 = commit_round + 2;
892 let leader_2 = committee.get_leader(leader_round_2).unwrap();
893 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
894 let block_2 = {
895 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
896 let mut leader_cert_map_2 = IndexSet::new();
897 leader_cert_map_2.insert(leader_certificate_2.clone());
898 let mut previous_cert_map_2 = IndexSet::new();
899 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
900 previous_cert_map_2.insert(cert);
901 }
902 let mut prev_commit_cert_map_2 = IndexSet::new();
903 for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
904 if cert != leader_certificate {
905 prev_commit_cert_map_2.insert(cert);
906 }
907 }
908 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
909 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
910 subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
911 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
912 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
913 };
914 core_ledger.advance_to_next_block(&block_2)?;
916
917 let leader_round_3 = commit_round + 4;
919 let leader_3 = committee.get_leader(leader_round_3).unwrap();
920 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
921 let block_3 = {
922 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
923 let mut leader_cert_map_3 = IndexSet::new();
924 leader_cert_map_3.insert(leader_certificate_3.clone());
925 let mut previous_cert_map_3 = IndexSet::new();
926 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
927 previous_cert_map_3.insert(cert);
928 }
929 let mut prev_commit_cert_map_3 = IndexSet::new();
930 for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
931 if cert != leader_certificate_2 {
932 prev_commit_cert_map_3.insert(cert);
933 }
934 }
935 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
936 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
937 subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
938 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
939 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
940 };
941 core_ledger.advance_to_next_block(&block_3)?;
943
944 let syncing_ledger = Arc::new(CoreLedgerService::new(
946 CurrentLedger::load(genesis, StorageMode::Production).unwrap(),
947 Default::default(),
948 ));
949 let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
951 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone());
953 sync.sync_storage_with_block(block_1).await?;
955 assert_eq!(syncing_ledger.latest_block_height(), 1);
956 sync.sync_storage_with_block(block_2).await?;
958 assert_eq!(syncing_ledger.latest_block_height(), 2);
959 sync.sync_storage_with_block(block_3).await?;
961 assert_eq!(syncing_ledger.latest_block_height(), 3);
962 assert!(syncing_ledger.contains_block_height(1));
964 assert!(syncing_ledger.contains_block_height(2));
965
966 Ok(())
967 }
968
969 #[tokio::test]
970 #[tracing_test::traced_test]
971 async fn test_pending_certificates() -> anyhow::Result<()> {
972 let rng = &mut TestRng::default();
973 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
975 let commit_round = 2;
976
977 let store = CurrentConsensusStore::open(None).unwrap();
979 let account: Account<CurrentNetwork> = Account::new(rng)?;
980
981 let seed: u64 = rng.gen();
983 let genesis_rng = &mut TestRng::from_seed(seed);
984 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
985
986 let genesis_rng = &mut TestRng::from_seed(seed);
988 let private_keys = [
989 *account.private_key(),
990 PrivateKey::new(genesis_rng)?,
991 PrivateKey::new(genesis_rng)?,
992 PrivateKey::new(genesis_rng)?,
993 ];
994 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
996 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
998 let (round_to_certificates_map, committee) = {
1000 let committee = ledger.latest_committee().unwrap();
1002 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1004 HashMap::new();
1005 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1006
1007 for round in 0..=commit_round + 8 {
1008 let mut current_certificates = IndexSet::new();
1009 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1010 IndexSet::new()
1011 } else {
1012 previous_certificates.iter().map(|c| c.id()).collect()
1013 };
1014 let committee_id = committee.id();
1015 for (i, private_key_1) in private_keys.iter().enumerate() {
1017 let batch_header = BatchHeader::new(
1018 private_key_1,
1019 round,
1020 now(),
1021 committee_id,
1022 Default::default(),
1023 previous_certificate_ids.clone(),
1024 rng,
1025 )
1026 .unwrap();
1027 let mut signatures = IndexSet::with_capacity(4);
1029 for (j, private_key_2) in private_keys.iter().enumerate() {
1030 if i != j {
1031 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1032 }
1033 }
1034 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1035 }
1036
1037 round_to_certificates_map.insert(round, current_certificates.clone());
1039 previous_certificates = current_certificates.clone();
1040 }
1041 (round_to_certificates_map, committee)
1042 };
1043
1044 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1046 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1048 for i in 1..=commit_round + 8 {
1049 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1050 certificates.extend(c);
1051 }
1052 for certificate in certificates.clone().iter() {
1053 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1054 }
1055 let leader_round_1 = commit_round;
1057 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1058 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1059 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1060 let block_1 = {
1061 let mut leader_cert_map = IndexSet::new();
1062 leader_cert_map.insert(leader_certificate.clone());
1063 let mut previous_cert_map = IndexSet::new();
1064 for cert in storage.get_certificates_for_round(commit_round - 1) {
1065 previous_cert_map.insert(cert);
1066 }
1067 subdag_map.insert(commit_round, leader_cert_map.clone());
1068 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1069 let subdag = Subdag::from(subdag_map.clone())?;
1070 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1071 };
1072 core_ledger.advance_to_next_block(&block_1)?;
1074
1075 let leader_round_2 = commit_round + 2;
1077 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1078 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1079 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1080 let block_2 = {
1081 let mut leader_cert_map_2 = IndexSet::new();
1082 leader_cert_map_2.insert(leader_certificate_2.clone());
1083 let mut previous_cert_map_2 = IndexSet::new();
1084 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1085 previous_cert_map_2.insert(cert);
1086 }
1087 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1088 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1089 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1090 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1091 };
1092 core_ledger.advance_to_next_block(&block_2)?;
1094
1095 let leader_round_3 = commit_round + 4;
1097 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1098 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1099 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1100 let block_3 = {
1101 let mut leader_cert_map_3 = IndexSet::new();
1102 leader_cert_map_3.insert(leader_certificate_3.clone());
1103 let mut previous_cert_map_3 = IndexSet::new();
1104 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1105 previous_cert_map_3.insert(cert);
1106 }
1107 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1108 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1109 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1110 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1111 };
1112 core_ledger.advance_to_next_block(&block_3)?;
1114
1115 let pending_certificates = storage.get_pending_certificates();
1121 for certificate in pending_certificates.clone() {
1123 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1124 }
1125 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1127 {
1128 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1129 for subdag in subdag_maps.iter() {
1130 for subdag_certificates in subdag.values() {
1131 committed_certificates.extend(subdag_certificates.iter().cloned());
1132 }
1133 }
1134 };
1135 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1137 for certificate in certificates.clone() {
1138 if !committed_certificates.contains(&certificate) {
1139 candidate_pending_certificates.insert(certificate);
1140 }
1141 }
1142 assert_eq!(pending_certificates, candidate_pending_certificates);
1144 Ok(())
1145 }
1146}