1use crate::{
17 Gateway,
18 MAX_FETCH_TIMEOUT_IN_MS,
19 PRIMARY_PING_IN_MS,
20 Transport,
21 helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22 spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
27use snarkos_node_tcp::P2P;
28use snarkvm::{
29 console::{network::Network, types::Field},
30 ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31 prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, bail};
35use parking_lot::Mutex;
36use rayon::prelude::*;
37use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration};
38use tokio::{
39 sync::{Mutex as TMutex, OnceCell, oneshot},
40 task::JoinHandle,
41};
42
43#[derive(Clone)]
44pub struct Sync<N: Network> {
45 gateway: Gateway<N>,
47 storage: Storage<N>,
49 ledger: Arc<dyn LedgerService<N>>,
51 block_sync: BlockSync<N>,
53 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
55 bft_sender: Arc<OnceCell<BFTSender<N>>>,
57 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
59 response_lock: Arc<TMutex<()>>,
61 sync_lock: Arc<TMutex<()>>,
63 latest_block_responses: Arc<TMutex<HashMap<u32, Block<N>>>>,
65}
66
67impl<N: Network> Sync<N> {
68 pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
70 let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
72 Self {
74 gateway,
75 storage,
76 ledger,
77 block_sync,
78 pending: Default::default(),
79 bft_sender: Default::default(),
80 handles: Default::default(),
81 response_lock: Default::default(),
82 sync_lock: Default::default(),
83 latest_block_responses: Default::default(),
84 }
85 }
86
87 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
89 if let Some(bft_sender) = bft_sender {
91 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
92 }
93
94 info!("Syncing storage with the ledger...");
95
96 self.sync_storage_with_ledger_at_bootup().await
98 }
99
100 pub async fn run(&self, sync_receiver: SyncReceiver<N>) -> Result<()> {
102 info!("Starting the sync module...");
103
104 let self_ = self.clone();
106 self.handles.lock().push(tokio::spawn(async move {
107 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
112 loop {
113 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
115 let communication = &self_.gateway;
117 self_.block_sync.try_block_sync(communication).await;
119
120 if let Err(e) = self_.sync_storage_with_blocks().await {
122 error!("Unable to sync storage with blocks - {e}");
123 }
124
125 if self_.is_synced() {
127 self_.latest_block_responses.lock().await.clear();
128 }
129 }
130 }));
131
132 let self_ = self.clone();
134 self.spawn(async move {
135 loop {
136 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
138
139 let self__ = self_.clone();
141 let _ = spawn_blocking!({
142 self__.pending.clear_expired_callbacks();
143 Ok(())
144 });
145 }
146 });
147
148 let SyncReceiver {
150 mut rx_block_sync_advance_with_sync_blocks,
151 mut rx_block_sync_remove_peer,
152 mut rx_block_sync_update_peer_locators,
153 mut rx_certificate_request,
154 mut rx_certificate_response,
155 } = sync_receiver;
156
157 let self_ = self.clone();
159 self.spawn(async move {
160 while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
161 if let Err(e) = self_.block_sync.process_block_response(peer_ip, blocks) {
163 callback.send(Err(e)).ok();
165 continue;
166 }
167
168 if let Err(e) = self_.sync_storage_with_blocks().await {
170 callback.send(Err(e)).ok();
172 continue;
173 }
174
175 callback.send(Ok(())).ok();
177 }
178 });
179
180 let self_ = self.clone();
182 self.spawn(async move {
183 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
184 self_.block_sync.remove_peer(&peer_ip);
185 }
186 });
187
188 let self_ = self.clone();
190 self.spawn(async move {
191 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
192 let self_clone = self_.clone();
193 tokio::spawn(async move {
194 let result = self_clone.block_sync.update_peer_locators(peer_ip, locators);
196 callback.send(result).ok();
198 });
199 }
200 });
201
202 let self_ = self.clone();
204 self.spawn(async move {
205 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
206 self_.send_certificate_response(peer_ip, certificate_request);
207 }
208 });
209
210 let self_ = self.clone();
212 self.spawn(async move {
213 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
214 self_.finish_certificate_request(peer_ip, certificate_response)
215 }
216 });
217
218 Ok(())
219 }
220}
221
222impl<N: Network> Sync<N> {
224 pub async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
226 let latest_block = self.ledger.latest_block();
228
229 let block_height = latest_block.height();
231 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
233 let gc_height = block_height.saturating_sub(max_gc_blocks);
236 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
238
239 let _lock = self.sync_lock.lock().await;
241
242 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
243
244 self.storage.sync_height_with_block(latest_block.height());
248 self.storage.sync_round_with_block(latest_block.round());
250 self.storage.garbage_collect_certificates(latest_block.round());
252 for block in &blocks {
254 if let Authority::Quorum(subdag) = block.authority() {
256 let unconfirmed_transactions = cfg_iter!(block.transactions())
258 .filter_map(|tx| {
259 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
260 })
261 .collect::<HashMap<_, _>>();
262
263 for certificates in subdag.values().cloned() {
265 cfg_into_iter!(certificates).for_each(|certificate| {
266 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
267 });
268 }
269 }
270 }
271
272 let certificates = blocks
276 .iter()
277 .flat_map(|block| {
278 match block.authority() {
279 Authority::Beacon(_) => None,
281 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
283 }
284 })
285 .flatten()
286 .collect::<Vec<_>>();
287
288 if let Some(bft_sender) = self.bft_sender.get() {
290 if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
292 bail!("Failed to update the BFT DAG from sync: {e}");
293 }
294 }
295
296 Ok(())
297 }
298
299 pub async fn sync_storage_with_blocks(&self) -> Result<()> {
301 let _lock = self.response_lock.lock().await;
303
304 let mut current_height = self.ledger.latest_block_height() + 1;
306
307 let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
309 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
311 let max_gc_height = tip.saturating_sub(max_gc_blocks);
313
314 if current_height <= max_gc_height {
316 while let Some(block) = self.block_sync.peek_next_block(current_height) {
318 info!("Syncing the ledger to block {}...", block.height());
319 match self.sync_ledger_with_block_without_bft(block).await {
321 Ok(_) => {
322 current_height += 1;
324 }
325 Err(e) => {
326 self.block_sync.remove_block_response(current_height);
328 return Err(e);
329 }
330 }
331 }
332 if current_height > max_gc_height {
334 if let Err(e) = self.sync_storage_with_ledger_at_bootup().await {
335 error!("BFT sync (with bootup routine) failed - {e}");
336 }
337 }
338 }
339
340 while let Some(block) = self.block_sync.peek_next_block(current_height) {
342 info!("Syncing the BFT to block {}...", block.height());
343 match self.sync_storage_with_block(block).await {
345 Ok(_) => {
346 current_height += 1;
348 }
349 Err(e) => {
350 self.block_sync.remove_block_response(current_height);
352 return Err(e);
353 }
354 }
355 }
356 Ok(())
357 }
358
359 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
361 let _lock = self.sync_lock.lock().await;
363
364 let self_ = self.clone();
365 tokio::task::spawn_blocking(move || {
366 self_.ledger.check_next_block(&block)?;
368 self_.ledger.advance_to_next_block(&block)?;
370
371 self_.storage.sync_height_with_block(block.height());
373 self_.storage.sync_round_with_block(block.round());
375 self_.block_sync.remove_block_response(block.height());
377
378 Ok(())
379 })
380 .await?
381 }
382
383 pub async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
385 let _lock = self.sync_lock.lock().await;
387 let mut latest_block_responses = self.latest_block_responses.lock().await;
389
390 if self.ledger.contains_block_height(block.height()) || latest_block_responses.contains_key(&block.height()) {
392 return Ok(());
393 }
394
395 if let Authority::Quorum(subdag) = block.authority() {
397 let unconfirmed_transactions = cfg_iter!(block.transactions())
399 .filter_map(|tx| {
400 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
401 })
402 .collect::<HashMap<_, _>>();
403
404 for certificates in subdag.values().cloned() {
406 cfg_into_iter!(certificates.clone()).for_each(|certificate| {
407 self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
409 });
410
411 for certificate in certificates {
413 if let Some(bft_sender) = self.bft_sender.get() {
415 if let Err(e) = bft_sender.send_sync_bft(certificate).await {
417 bail!("Sync - {e}");
418 };
419 }
420 }
421 }
422 }
423
424 let latest_block_height = self.ledger.latest_block_height();
426
427 latest_block_responses.insert(block.height(), block);
429 latest_block_responses.retain(|height, _| *height > latest_block_height);
431
432 let contiguous_blocks: Vec<Block<N>> = (latest_block_height.saturating_add(1)..)
434 .take_while(|&k| latest_block_responses.contains_key(&k))
435 .filter_map(|k| latest_block_responses.get(&k).cloned())
436 .collect();
437
438 for next_block in contiguous_blocks.into_iter() {
445 let next_block_height = next_block.height();
447
448 let leader_certificate = match next_block.authority() {
450 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
451 _ => bail!("Received a block with an unexpected authority type."),
452 };
453 let commit_round = leader_certificate.round();
454 let certificate_round = commit_round.saturating_add(1);
455
456 let committee_lookback = self.ledger.get_committee_lookback_for_round(commit_round)?;
458 let certificates = self.storage.get_certificates_for_round(certificate_round);
460 let authors = certificates
462 .iter()
463 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
464 true => Some(c.author()),
465 false => None,
466 })
467 .collect();
468
469 debug!("Validating sync block {next_block_height} at round {commit_round}...");
470 if committee_lookback.is_availability_threshold_reached(&authors) {
472 let mut current_certificate = leader_certificate;
474 let mut blocks_to_add = vec![next_block];
476
477 for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
479 let Some(previous_block) = latest_block_responses.get(&height) else {
481 bail!("Block {height} is missing from the latest block responses.");
482 };
483 let previous_certificate = match previous_block.authority() {
485 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
486 _ => bail!("Received a block with an unexpected authority type."),
487 };
488 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
490 debug!("Previous sync block {height} is linked to the current block {next_block_height}");
491 blocks_to_add.insert(0, previous_block.clone());
493 current_certificate = previous_certificate;
495 }
496 }
497
498 for block in blocks_to_add {
500 let block_height = block.height();
502 if block_height != self.ledger.latest_block_height().saturating_add(1) {
503 warn!("Skipping block {block_height} from the latest block responses - not sequential.");
504 continue;
505 }
506
507 let self_ = self.clone();
508 tokio::task::spawn_blocking(move || {
509 self_.ledger.check_next_block(&block)?;
511 self_.ledger.advance_to_next_block(&block)?;
513
514 self_.storage.sync_height_with_block(block.height());
516 self_.storage.sync_round_with_block(block.round());
518
519 Ok::<(), anyhow::Error>(())
520 })
521 .await??;
522 latest_block_responses.remove(&block_height);
524 self.block_sync.remove_block_response(block_height);
526 }
527 } else {
528 debug!(
529 "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
530 );
531 }
532 }
533
534 Ok(())
535 }
536
537 fn is_linked(
539 &self,
540 previous_certificate: BatchCertificate<N>,
541 current_certificate: BatchCertificate<N>,
542 ) -> Result<bool> {
543 let mut traversal = vec![current_certificate.clone()];
545 for round in (previous_certificate.round()..current_certificate.round()).rev() {
547 let certificates = self.storage.get_certificates_for_round(round);
549 traversal = certificates
551 .into_iter()
552 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
553 .collect();
554 }
555 Ok(traversal.contains(&previous_certificate))
556 }
557}
558
559impl<N: Network> Sync<N> {
561 pub fn is_synced(&self) -> bool {
563 if self.gateway.number_of_connected_peers() == 0 {
564 return false;
565 }
566 self.block_sync.is_block_synced()
567 }
568
569 pub fn num_blocks_behind(&self) -> u32 {
571 self.block_sync.num_blocks_behind()
572 }
573
574 pub const fn is_gateway_mode(&self) -> bool {
576 self.block_sync.mode().is_gateway()
577 }
578
579 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
581 self.block_sync.get_block_locators()
582 }
583
584 #[cfg(test)]
586 #[doc(hidden)]
587 pub(super) fn block_sync(&self) -> &BlockSync<N> {
588 &self.block_sync
589 }
590}
591
592impl<N: Network> Sync<N> {
594 pub async fn send_certificate_request(
596 &self,
597 peer_ip: SocketAddr,
598 certificate_id: Field<N>,
599 ) -> Result<BatchCertificate<N>> {
600 let (callback_sender, callback_receiver) = oneshot::channel();
602 let num_sent_requests = self.pending.num_sent_requests(certificate_id);
604 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
606 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round());
608 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
611
612 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
614
615 if should_send_request {
617 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
619 bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
620 }
621 } else {
622 debug!(
623 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
624 fmt_id(certificate_id)
625 );
626 }
627 match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
630 Ok(result) => Ok(result?),
632 Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
634 }
635 }
636
637 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
639 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
641 let self_ = self.clone();
643 tokio::spawn(async move {
644 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
645 });
646 }
647 }
648
649 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
652 let certificate = response.certificate;
653 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
655 if exists {
657 self.pending.remove(certificate.id(), Some(certificate));
660 }
661 }
662}
663
664impl<N: Network> Sync<N> {
665 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
667 self.handles.lock().push(tokio::spawn(future));
668 }
669
670 pub async fn shut_down(&self) {
672 info!("Shutting down the sync module...");
673 let _lock = self.response_lock.lock().await;
675 let _lock = self.sync_lock.lock().await;
677 self.handles.lock().iter().for_each(|handle| handle.abort());
679 }
680}
681#[cfg(test)]
682mod tests {
683 use super::*;
684
685 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
686 use snarkos_account::Account;
687 use snarkvm::{
688 console::{
689 account::{Address, PrivateKey},
690 network::MainnetV0,
691 },
692 ledger::{
693 narwhal::{BatchCertificate, BatchHeader, Subdag},
694 store::{ConsensusStore, helpers::memory::ConsensusMemory},
695 },
696 prelude::{Ledger, VM},
697 utilities::TestRng,
698 };
699
700 use aleo_std::StorageMode;
701 use indexmap::IndexSet;
702 use rand::Rng;
703 use std::collections::BTreeMap;
704
705 type CurrentNetwork = MainnetV0;
706 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
707 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
708
709 #[tokio::test]
710 #[tracing_test::traced_test]
711 async fn test_commit_via_is_linked() -> anyhow::Result<()> {
712 let rng = &mut TestRng::default();
713 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
715 let commit_round = 2;
716
717 let store = CurrentConsensusStore::open(None).unwrap();
719 let account: Account<CurrentNetwork> = Account::new(rng)?;
720
721 let seed: u64 = rng.gen();
723 let genesis_rng = &mut TestRng::from_seed(seed);
724 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
725
726 let genesis_rng = &mut TestRng::from_seed(seed);
728 let private_keys = [
729 *account.private_key(),
730 PrivateKey::new(genesis_rng)?,
731 PrivateKey::new(genesis_rng)?,
732 PrivateKey::new(genesis_rng)?,
733 ];
734
735 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
737 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
739
740 let (round_to_certificates_map, committee) = {
742 let addresses = vec![
743 Address::try_from(private_keys[0])?,
744 Address::try_from(private_keys[1])?,
745 Address::try_from(private_keys[2])?,
746 Address::try_from(private_keys[3])?,
747 ];
748
749 let committee = ledger.latest_committee().unwrap();
750
751 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
753 HashMap::new();
754 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
755
756 for round in 0..=commit_round + 8 {
757 let mut current_certificates = IndexSet::new();
758 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
759 IndexSet::new()
760 } else {
761 previous_certificates.iter().map(|c| c.id()).collect()
762 };
763 let committee_id = committee.id();
764
765 if round <= 5 {
767 let leader = committee.get_leader(round).unwrap();
768 let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
769 let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
770 for i in [leader_index, non_leader_index].into_iter() {
771 let batch_header = BatchHeader::new(
772 &private_keys[i],
773 round,
774 now(),
775 committee_id,
776 Default::default(),
777 previous_certificate_ids.clone(),
778 rng,
779 )
780 .unwrap();
781 let mut signatures = IndexSet::with_capacity(4);
783 for (j, private_key_2) in private_keys.iter().enumerate() {
784 if i != j {
785 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
786 }
787 }
788 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
789 }
790 }
791
792 if round > 5 {
794 for (i, private_key_1) in private_keys.iter().enumerate() {
795 let batch_header = BatchHeader::new(
796 private_key_1,
797 round,
798 now(),
799 committee_id,
800 Default::default(),
801 previous_certificate_ids.clone(),
802 rng,
803 )
804 .unwrap();
805 let mut signatures = IndexSet::with_capacity(4);
807 for (j, private_key_2) in private_keys.iter().enumerate() {
808 if i != j {
809 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
810 }
811 }
812 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
813 }
814 }
815 round_to_certificates_map.insert(round, current_certificates.clone());
817 previous_certificates = current_certificates.clone();
818 }
819 (round_to_certificates_map, committee)
820 };
821
822 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
824 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
826 for i in 1..=commit_round + 8 {
827 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
828 certificates.extend(c);
829 }
830 for certificate in certificates.clone().iter() {
831 storage.testing_only_insert_certificate_testing_only(certificate.clone());
832 }
833
834 let leader_round_1 = commit_round;
836 let leader_1 = committee.get_leader(leader_round_1).unwrap();
837 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
838 let block_1 = {
839 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
840 let mut leader_cert_map = IndexSet::new();
841 leader_cert_map.insert(leader_certificate.clone());
842 let mut previous_cert_map = IndexSet::new();
843 for cert in storage.get_certificates_for_round(commit_round - 1) {
844 previous_cert_map.insert(cert);
845 }
846 subdag_map.insert(commit_round, leader_cert_map.clone());
847 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
848 let subdag = Subdag::from(subdag_map.clone())?;
849 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
850 };
851 core_ledger.advance_to_next_block(&block_1)?;
853
854 let leader_round_2 = commit_round + 2;
856 let leader_2 = committee.get_leader(leader_round_2).unwrap();
857 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
858 let block_2 = {
859 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
860 let mut leader_cert_map_2 = IndexSet::new();
861 leader_cert_map_2.insert(leader_certificate_2.clone());
862 let mut previous_cert_map_2 = IndexSet::new();
863 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
864 previous_cert_map_2.insert(cert);
865 }
866 let mut prev_commit_cert_map_2 = IndexSet::new();
867 for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
868 if cert != leader_certificate {
869 prev_commit_cert_map_2.insert(cert);
870 }
871 }
872 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
873 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
874 subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
875 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
876 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
877 };
878 core_ledger.advance_to_next_block(&block_2)?;
880
881 let leader_round_3 = commit_round + 4;
883 let leader_3 = committee.get_leader(leader_round_3).unwrap();
884 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
885 let block_3 = {
886 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
887 let mut leader_cert_map_3 = IndexSet::new();
888 leader_cert_map_3.insert(leader_certificate_3.clone());
889 let mut previous_cert_map_3 = IndexSet::new();
890 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
891 previous_cert_map_3.insert(cert);
892 }
893 let mut prev_commit_cert_map_3 = IndexSet::new();
894 for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
895 if cert != leader_certificate_2 {
896 prev_commit_cert_map_3.insert(cert);
897 }
898 }
899 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
900 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
901 subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
902 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
903 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
904 };
905 core_ledger.advance_to_next_block(&block_3)?;
907
908 let syncing_ledger = Arc::new(CoreLedgerService::new(
910 CurrentLedger::load(genesis, StorageMode::Production).unwrap(),
911 Default::default(),
912 ));
913 let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
915 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone());
917 sync.sync_storage_with_block(block_1).await?;
919 assert_eq!(syncing_ledger.latest_block_height(), 1);
920 sync.sync_storage_with_block(block_2).await?;
922 assert_eq!(syncing_ledger.latest_block_height(), 2);
923 sync.sync_storage_with_block(block_3).await?;
925 assert_eq!(syncing_ledger.latest_block_height(), 3);
926 assert!(syncing_ledger.contains_block_height(1));
928 assert!(syncing_ledger.contains_block_height(2));
929
930 Ok(())
931 }
932
933 #[tokio::test]
934 #[tracing_test::traced_test]
935 async fn test_pending_certificates() -> anyhow::Result<()> {
936 let rng = &mut TestRng::default();
937 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
939 let commit_round = 2;
940
941 let store = CurrentConsensusStore::open(None).unwrap();
943 let account: Account<CurrentNetwork> = Account::new(rng)?;
944
945 let seed: u64 = rng.gen();
947 let genesis_rng = &mut TestRng::from_seed(seed);
948 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
949
950 let genesis_rng = &mut TestRng::from_seed(seed);
952 let private_keys = [
953 *account.private_key(),
954 PrivateKey::new(genesis_rng)?,
955 PrivateKey::new(genesis_rng)?,
956 PrivateKey::new(genesis_rng)?,
957 ];
958 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
960 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
962 let (round_to_certificates_map, committee) = {
964 let committee = ledger.latest_committee().unwrap();
966 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
968 HashMap::new();
969 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
970
971 for round in 0..=commit_round + 8 {
972 let mut current_certificates = IndexSet::new();
973 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
974 IndexSet::new()
975 } else {
976 previous_certificates.iter().map(|c| c.id()).collect()
977 };
978 let committee_id = committee.id();
979 for (i, private_key_1) in private_keys.iter().enumerate() {
981 let batch_header = BatchHeader::new(
982 private_key_1,
983 round,
984 now(),
985 committee_id,
986 Default::default(),
987 previous_certificate_ids.clone(),
988 rng,
989 )
990 .unwrap();
991 let mut signatures = IndexSet::with_capacity(4);
993 for (j, private_key_2) in private_keys.iter().enumerate() {
994 if i != j {
995 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
996 }
997 }
998 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
999 }
1000
1001 round_to_certificates_map.insert(round, current_certificates.clone());
1003 previous_certificates = current_certificates.clone();
1004 }
1005 (round_to_certificates_map, committee)
1006 };
1007
1008 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1010 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1012 for i in 1..=commit_round + 8 {
1013 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1014 certificates.extend(c);
1015 }
1016 for certificate in certificates.clone().iter() {
1017 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1018 }
1019 let leader_round_1 = commit_round;
1021 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1022 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1023 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1024 let block_1 = {
1025 let mut leader_cert_map = IndexSet::new();
1026 leader_cert_map.insert(leader_certificate.clone());
1027 let mut previous_cert_map = IndexSet::new();
1028 for cert in storage.get_certificates_for_round(commit_round - 1) {
1029 previous_cert_map.insert(cert);
1030 }
1031 subdag_map.insert(commit_round, leader_cert_map.clone());
1032 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1033 let subdag = Subdag::from(subdag_map.clone())?;
1034 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1035 };
1036 core_ledger.advance_to_next_block(&block_1)?;
1038
1039 let leader_round_2 = commit_round + 2;
1041 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1042 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1043 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1044 let block_2 = {
1045 let mut leader_cert_map_2 = IndexSet::new();
1046 leader_cert_map_2.insert(leader_certificate_2.clone());
1047 let mut previous_cert_map_2 = IndexSet::new();
1048 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1049 previous_cert_map_2.insert(cert);
1050 }
1051 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1052 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1053 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1054 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1055 };
1056 core_ledger.advance_to_next_block(&block_2)?;
1058
1059 let leader_round_3 = commit_round + 4;
1061 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1062 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1063 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1064 let block_3 = {
1065 let mut leader_cert_map_3 = IndexSet::new();
1066 leader_cert_map_3.insert(leader_certificate_3.clone());
1067 let mut previous_cert_map_3 = IndexSet::new();
1068 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1069 previous_cert_map_3.insert(cert);
1070 }
1071 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1072 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1073 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1074 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1075 };
1076 core_ledger.advance_to_next_block(&block_3)?;
1078
1079 let pending_certificates = storage.get_pending_certificates();
1085 for certificate in pending_certificates.clone() {
1087 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1088 }
1089 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1091 {
1092 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1093 for subdag in subdag_maps.iter() {
1094 for subdag_certificates in subdag.values() {
1095 committed_certificates.extend(subdag_certificates.iter().cloned());
1096 }
1097 }
1098 };
1099 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1101 for certificate in certificates.clone() {
1102 if !committed_certificates.contains(&certificate) {
1103 candidate_pending_certificates.insert(certificate);
1104 }
1105 }
1106 assert_eq!(pending_certificates, candidate_pending_certificates);
1108 Ok(())
1109 }
1110}