1use crate::{
17 Gateway,
18 MAX_FETCH_TIMEOUT_IN_MS,
19 PRIMARY_PING_IN_MS,
20 Transport,
21 events::DataBlocks,
22 helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
23 spawn_blocking,
24};
25use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
26use snarkos_node_bft_ledger_service::LedgerService;
27use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
28use snarkvm::{
29 console::{network::Network, types::Field},
30 ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31 prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, anyhow, bail};
35use indexmap::IndexMap;
36#[cfg(feature = "locktick")]
37use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
38#[cfg(not(feature = "locktick"))]
39use parking_lot::Mutex;
40#[cfg(not(feature = "serial"))]
41use rayon::prelude::*;
42use std::{
43 collections::{BTreeMap, HashMap},
44 future::Future,
45 net::SocketAddr,
46 sync::Arc,
47 time::Duration,
48};
49#[cfg(not(feature = "locktick"))]
50use tokio::sync::Mutex as TMutex;
51use tokio::{
52 sync::{OnceCell, oneshot},
53 task::JoinHandle,
54};
55
56#[derive(Clone)]
69pub struct Sync<N: Network> {
70 gateway: Gateway<N>,
72 storage: Storage<N>,
74 ledger: Arc<dyn LedgerService<N>>,
76 block_sync: Arc<BlockSync<N>>,
78 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
80 bft_sender: Arc<OnceCell<BFTSender<N>>>,
82 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
84 response_lock: Arc<TMutex<()>>,
86 sync_lock: Arc<TMutex<()>>,
88 latest_block_responses: Arc<TMutex<BTreeMap<u32, Block<N>>>>,
96}
97
98impl<N: Network> Sync<N> {
99 pub fn new(
101 gateway: Gateway<N>,
102 storage: Storage<N>,
103 ledger: Arc<dyn LedgerService<N>>,
104 block_sync: Arc<BlockSync<N>>,
105 ) -> Self {
106 Self {
108 gateway,
109 storage,
110 ledger,
111 block_sync,
112 pending: Default::default(),
113 bft_sender: Default::default(),
114 handles: Default::default(),
115 response_lock: Default::default(),
116 sync_lock: Default::default(),
117 latest_block_responses: Default::default(),
118 }
119 }
120
121 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
123 if let Some(bft_sender) = bft_sender {
125 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
126 }
127
128 info!("Syncing storage with the ledger...");
129
130 self.sync_storage_with_ledger_at_bootup().await?;
132
133 debug!("Finished initial block synchronization at startup");
134 Ok(())
135 }
136
137 #[inline]
141 async fn send_block_requests(
142 &self,
143
144 block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
145 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
146 ) {
147 trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
148
149 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
151 if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
152 break;
154 }
155
156 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
158 }
159 }
160
161 pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
166 info!("Starting the sync module...");
167
168 let self_ = self.clone();
170 self.spawn(async move {
171 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
176 loop {
177 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
179
180 let new_blocks = self_.try_block_sync().await;
181 if new_blocks {
182 if let Some(ping) = &ping {
183 match self_.get_block_locators() {
184 Ok(locators) => ping.update_block_locators(locators),
185 Err(err) => error!("Failed to update block locators: {err}"),
186 }
187 }
188 }
189 }
190 });
191
192 let self_ = self.clone();
194 self.spawn(async move {
195 loop {
196 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
198
199 let self__ = self_.clone();
201 let _ = spawn_blocking!({
202 self__.pending.clear_expired_callbacks();
203 Ok(())
204 });
205 }
206 });
207
208 let SyncReceiver {
212 mut rx_block_sync_advance_with_sync_blocks,
213 mut rx_block_sync_remove_peer,
214 mut rx_block_sync_update_peer_locators,
215 mut rx_certificate_request,
216 mut rx_certificate_response,
217 } = sync_receiver;
218
219 let self_ = self.clone();
226 self.spawn(async move {
227 while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
228 callback.send(self_.advance_with_sync_blocks(peer_ip, blocks).await).ok();
229 }
230 });
231
232 let self_ = self.clone();
234 self.spawn(async move {
235 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
236 self_.remove_peer(peer_ip);
237 }
238 });
239
240 let self_ = self.clone();
247 self.spawn(async move {
248 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
249 let self_clone = self_.clone();
250 tokio::spawn(async move {
251 callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
252 });
253 }
254 });
255
256 let self_ = self.clone();
262 self.spawn(async move {
263 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
264 self_.send_certificate_response(peer_ip, certificate_request);
265 }
266 });
267
268 let self_ = self.clone();
274 self.spawn(async move {
275 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
276 self_.finish_certificate_request(peer_ip, certificate_response);
277 }
278 });
279
280 Ok(())
281 }
282
283 pub(crate) async fn try_block_sync(&self) -> bool {
290 let new_requests = self.block_sync.handle_block_request_timeouts(&self.gateway);
294 if let Some((sync_peers, requests)) = new_requests {
295 self.send_block_requests(sync_peers, requests).await;
296 }
297
298 if !self.block_sync.can_block_sync() {
301 trace!("No blocks to sync");
302 return false;
303 }
304
305 let (sync_peers, requests) = self.block_sync.prepare_block_requests();
308 self.send_block_requests(sync_peers, requests).await;
309
310 match self.try_advancing_block_synchronization().await {
312 Ok(new_blocks) => new_blocks,
313 Err(err) => {
314 error!("Block synchronization failed - {err}");
315 false
316 }
317 }
318 }
319}
320
321impl<N: Network> Sync<N> {
323 async fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
325 self.block_sync.insert_block_responses(peer_ip, blocks)?;
327
328 self.try_advancing_block_synchronization().await?;
333
334 Ok(())
335 }
336
337 fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
339 self.block_sync.update_peer_locators(peer_ip, locators)
340 }
341
342 fn remove_peer(&self, peer_ip: SocketAddr) {
344 self.block_sync.remove_peer(&peer_ip);
345 }
346
347 #[cfg(test)]
348 pub fn test_update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
349 self.update_peer_locators(peer_ip, locators)
350 }
351}
352
353impl<N: Network> Sync<N> {
355 async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
359 let latest_block = self.ledger.latest_block();
361
362 let block_height = latest_block.height();
364 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
369 let gc_height = block_height.saturating_sub(max_gc_blocks);
373 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
375
376 let _lock = self.sync_lock.lock().await;
378
379 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
380
381 self.storage.sync_height_with_block(latest_block.height());
385 self.storage.sync_round_with_block(latest_block.round());
387 self.storage.garbage_collect_certificates(latest_block.round());
389 for block in &blocks {
391 if let Authority::Quorum(subdag) = block.authority() {
396 let unconfirmed_transactions = cfg_iter!(block.transactions())
398 .filter_map(|tx| {
399 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
400 })
401 .collect::<HashMap<_, _>>();
402
403 for certificates in subdag.values().cloned() {
405 cfg_into_iter!(certificates).for_each(|certificate| {
406 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
407 });
408 }
409
410 #[cfg(feature = "telemetry")]
412 self.gateway.validator_telemetry().insert_subdag(subdag);
413 }
414 }
415
416 let certificates = blocks
420 .iter()
421 .flat_map(|block| {
422 match block.authority() {
423 Authority::Beacon(_) => None,
425 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
427 }
428 })
429 .flatten()
430 .collect::<Vec<_>>();
431
432 if let Some(bft_sender) = self.bft_sender.get() {
434 if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
436 bail!("Failed to update the BFT DAG from sync: {e}");
437 }
438 }
439
440 self.block_sync.set_sync_height(block_height);
441
442 Ok(())
443 }
444
445 async fn compute_sync_height(&self) -> u32 {
448 let ledger_height = self.ledger.latest_block_height();
449 let mut responses = self.latest_block_responses.lock().await;
450
451 responses.retain(|height, _| *height > ledger_height);
453
454 responses.last_key_value().map(|(height, _)| *height).unwrap_or(0).max(ledger_height)
456 }
457
458 async fn try_advancing_block_synchronization(&self) -> Result<bool> {
473 let _lock = self.response_lock.lock().await;
475
476 let ledger_height = self.ledger.latest_block_height();
479 self.block_sync.set_sync_height(ledger_height);
480
481 let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
483
484 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
489
490 let cleanup = |start_height, current_height, error| {
492 let new_blocks = current_height > start_height;
493
494 if new_blocks {
496 self.block_sync.set_sync_height(current_height);
497 }
498
499 if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
500 };
501
502 let max_gc_height = tip.saturating_sub(max_gc_blocks);
506 let within_gc = (ledger_height + 1) > max_gc_height;
507
508 if within_gc {
509 let start_height = self.compute_sync_height().await;
512
513 self.block_sync.set_sync_height(start_height);
516
517 let mut current_height = start_height;
519 trace!("Try advancing with block responses (at block {current_height})");
520
521 loop {
523 let next_height = current_height + 1;
524 let Some(block) = self.block_sync.peek_next_block(next_height) else {
525 break;
526 };
527 info!("Syncing the BFT to block {}...", block.height());
528 match self.sync_storage_with_block(block).await {
530 Ok(_) => {
531 current_height = next_height;
533 }
534 Err(err) => {
535 self.block_sync.remove_block_response(next_height);
537 return cleanup(start_height, current_height, Some(err));
538 }
539 }
540 }
541
542 cleanup(start_height, current_height, None)
543 } else {
544 info!("Block sync is too far behind other validators. Syncing without BFT.");
545
546 let start_height = ledger_height;
549 let mut current_height = start_height;
550
551 self.block_sync.set_sync_height(start_height);
554
555 loop {
558 let next_height = current_height + 1;
559
560 let Some(block) = self.block_sync.peek_next_block(next_height) else {
561 break;
562 };
563 info!("Syncing the ledger to block {}...", block.height());
564
565 match self.sync_ledger_with_block_without_bft(block).await {
567 Ok(_) => {
568 current_height = next_height;
570 }
571 Err(err) => {
572 self.block_sync.remove_block_response(next_height);
574 return cleanup(start_height, current_height, Some(err));
575 }
576 }
577 }
578
579 let within_gc = (current_height + 1) > max_gc_height;
581 if within_gc {
582 info!("Finished catching up with the network. Switching back to BFT sync.");
583 if let Err(err) = self.sync_storage_with_ledger_at_bootup().await {
584 error!("BFT sync (with bootup routine) failed - {err}");
585 }
586 }
587
588 cleanup(start_height, current_height, None)
589 }
590 }
591
592 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
596 let _lock = self.sync_lock.lock().await;
598
599 let self_ = self.clone();
600 tokio::task::spawn_blocking(move || {
601 self_.ledger.check_next_block(&block)?;
603 self_.ledger.advance_to_next_block(&block)?;
605
606 self_.storage.sync_height_with_block(block.height());
608 self_.storage.sync_round_with_block(block.round());
610 self_.block_sync.remove_block_response(block.height());
612
613 Ok(())
614 })
615 .await?
616 }
617
618 async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
629 let _lock = self.sync_lock.lock().await;
631
632 if self.ledger.contains_block_height(block.height()) {
635 debug!("Ledger is already synced with block at height {}. Will not sync.", block.height());
636 return Ok(());
637 }
638
639 let mut latest_block_responses = self.latest_block_responses.lock().await;
641
642 if latest_block_responses.contains_key(&block.height()) {
643 debug!("An unconfirmed block is queued already for height {}. Will not sync.", block.height());
644 return Ok(());
645 }
646
647 if let Authority::Quorum(subdag) = block.authority() {
652 let unconfirmed_transactions = cfg_iter!(block.transactions())
654 .filter_map(|tx| {
655 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
656 })
657 .collect::<HashMap<_, _>>();
658
659 for certificates in subdag.values().cloned() {
661 cfg_into_iter!(certificates.clone()).for_each(|certificate| {
662 self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
664 });
665
666 for certificate in certificates {
668 if let Some(bft_sender) = self.bft_sender.get() {
671 if let Err(err) = bft_sender.send_sync_bft(certificate).await {
673 bail!("Failed to sync certificate - {err}");
674 };
675 }
676 }
677 }
678 }
679
680 let ledger_block_height = self.ledger.latest_block_height();
682
683 latest_block_responses.insert(block.height(), block);
685 latest_block_responses.retain(|height, _| *height > ledger_block_height);
687
688 let contiguous_blocks: Vec<Block<N>> = (ledger_block_height.saturating_add(1)..)
690 .take_while(|&k| latest_block_responses.contains_key(&k))
691 .filter_map(|k| latest_block_responses.get(&k).cloned())
692 .collect();
693
694 for next_block in contiguous_blocks.into_iter() {
706 let next_block_height = next_block.height();
708
709 let leader_certificate = match next_block.authority() {
711 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
712 _ => bail!("Received a block with an unexpected authority type."),
713 };
714 let commit_round = leader_certificate.round();
715 let certificate_round =
716 commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
717
718 let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
720 let certificates = self.storage.get_certificates_for_round(certificate_round);
722 let authors = certificates
725 .iter()
726 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
727 true => Some(c.author()),
728 false => None,
729 })
730 .collect();
731
732 debug!("Validating sync block {next_block_height} at round {commit_round}...");
733 if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
735 let mut current_certificate = leader_certificate;
737 let mut blocks_to_add = vec![next_block];
739
740 for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
742 let Some(previous_block) = latest_block_responses.get(&height) else {
744 bail!("Block {height} is missing from the latest block responses.");
745 };
746 let previous_certificate = match previous_block.authority() {
748 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
749 _ => bail!("Received a block with an unexpected authority type."),
750 };
751 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
753 debug!("Previous sync block {height} is linked to the current block {next_block_height}");
754 blocks_to_add.insert(0, previous_block.clone());
756 current_certificate = previous_certificate;
758 }
759 }
760
761 for block in blocks_to_add {
763 let block_height = block.height();
765 if block_height != self.ledger.latest_block_height().saturating_add(1) {
766 warn!("Skipping block {block_height} from the latest block responses - not sequential.");
767 continue;
768 }
769 #[cfg(feature = "telemetry")]
770 let block_authority = block.authority().clone();
771
772 let self_ = self.clone();
773 tokio::task::spawn_blocking(move || {
774 self_.ledger.check_next_block(&block)?;
776 self_.ledger.advance_to_next_block(&block)?;
778
779 self_.storage.sync_height_with_block(block.height());
781 self_.storage.sync_round_with_block(block.round());
783
784 Ok::<(), anyhow::Error>(())
785 })
786 .await??;
787 latest_block_responses.remove(&block_height);
789
790 #[cfg(feature = "telemetry")]
792 if let Authority::Quorum(subdag) = block_authority {
793 self_.gateway.validator_telemetry().insert_subdag(&subdag);
794 }
795 }
796 } else {
797 debug!(
798 "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
799 );
800 }
801
802 }
804
805 Ok(())
806 }
807
808 fn is_linked(
810 &self,
811 previous_certificate: BatchCertificate<N>,
812 current_certificate: BatchCertificate<N>,
813 ) -> Result<bool> {
814 let mut traversal = vec![current_certificate.clone()];
816 for round in (previous_certificate.round()..current_certificate.round()).rev() {
818 let certificates = self.storage.get_certificates_for_round(round);
820 traversal = certificates
822 .into_iter()
823 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
824 .collect();
825 }
826 Ok(traversal.contains(&previous_certificate))
827 }
828}
829
830impl<N: Network> Sync<N> {
832 pub fn is_synced(&self) -> bool {
834 if self.gateway.number_of_connected_peers() == 0 {
837 return false;
838 }
839
840 self.block_sync.is_block_synced()
841 }
842
843 pub fn num_blocks_behind(&self) -> Option<u32> {
845 self.block_sync.num_blocks_behind()
846 }
847
848 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
850 self.block_sync.get_block_locators()
851 }
852}
853
854impl<N: Network> Sync<N> {
856 pub async fn send_certificate_request(
858 &self,
859 peer_ip: SocketAddr,
860 certificate_id: Field<N>,
861 ) -> Result<BatchCertificate<N>> {
862 let (callback_sender, callback_receiver) = oneshot::channel();
864 let num_sent_requests = self.pending.num_sent_requests(certificate_id);
866 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
868 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
870 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
873
874 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
876
877 if should_send_request {
879 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
881 bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
882 }
883 } else {
884 debug!(
885 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
886 fmt_id(certificate_id)
887 );
888 }
889 match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
892 Ok(result) => Ok(result?),
894 Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
896 }
897 }
898
899 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
901 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
903 let self_ = self.clone();
905 tokio::spawn(async move {
906 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
907 });
908 }
909 }
910
911 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
914 let certificate = response.certificate;
915 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
917 if exists {
919 self.pending.remove(certificate.id(), Some(certificate));
922 }
923 }
924}
925
926impl<N: Network> Sync<N> {
927 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
929 self.handles.lock().push(tokio::spawn(future));
930 }
931
932 pub async fn shut_down(&self) {
934 info!("Shutting down the sync module...");
935 let _lock = self.response_lock.lock().await;
937 let _lock = self.sync_lock.lock().await;
939 self.handles.lock().iter().for_each(|handle| handle.abort());
941 }
942}
943
944#[cfg(test)]
945mod tests {
946 use super::*;
947
948 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
949
950 use snarkos_account::Account;
951 use snarkos_node_sync::BlockSync;
952 use snarkvm::{
953 console::{
954 account::{Address, PrivateKey},
955 network::MainnetV0,
956 },
957 ledger::{
958 narwhal::{BatchCertificate, BatchHeader, Subdag},
959 store::{ConsensusStore, helpers::memory::ConsensusMemory},
960 },
961 prelude::{Ledger, VM},
962 utilities::TestRng,
963 };
964
965 use aleo_std::StorageMode;
966 use indexmap::IndexSet;
967 use rand::Rng;
968 use std::collections::BTreeMap;
969
970 type CurrentNetwork = MainnetV0;
971 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
972 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
973
974 #[tokio::test]
975 #[tracing_test::traced_test]
976 async fn test_commit_via_is_linked() -> anyhow::Result<()> {
977 let rng = &mut TestRng::default();
978 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
980 let commit_round = 2;
981
982 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
984 let account: Account<CurrentNetwork> = Account::new(rng)?;
985
986 let seed: u64 = rng.r#gen();
988 let genesis_rng = &mut TestRng::from_seed(seed);
989 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
990
991 let genesis_rng = &mut TestRng::from_seed(seed);
993 let private_keys = [
994 *account.private_key(),
995 PrivateKey::new(genesis_rng)?,
996 PrivateKey::new(genesis_rng)?,
997 PrivateKey::new(genesis_rng)?,
998 ];
999
1000 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1002 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1004
1005 let (round_to_certificates_map, committee) = {
1007 let addresses = vec![
1008 Address::try_from(private_keys[0])?,
1009 Address::try_from(private_keys[1])?,
1010 Address::try_from(private_keys[2])?,
1011 Address::try_from(private_keys[3])?,
1012 ];
1013
1014 let committee = ledger.latest_committee().unwrap();
1015
1016 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1018 HashMap::new();
1019 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1020
1021 for round in 0..=commit_round + 8 {
1022 let mut current_certificates = IndexSet::new();
1023 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1024 IndexSet::new()
1025 } else {
1026 previous_certificates.iter().map(|c| c.id()).collect()
1027 };
1028 let committee_id = committee.id();
1029
1030 if round <= 5 {
1032 let leader = committee.get_leader(round).unwrap();
1033 let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
1034 let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
1035 for i in [leader_index, non_leader_index].into_iter() {
1036 let batch_header = BatchHeader::new(
1037 &private_keys[i],
1038 round,
1039 now(),
1040 committee_id,
1041 Default::default(),
1042 previous_certificate_ids.clone(),
1043 rng,
1044 )
1045 .unwrap();
1046 let mut signatures = IndexSet::with_capacity(4);
1048 for (j, private_key_2) in private_keys.iter().enumerate() {
1049 if i != j {
1050 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1051 }
1052 }
1053 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1054 }
1055 }
1056
1057 if round > 5 {
1059 for (i, private_key_1) in private_keys.iter().enumerate() {
1060 let batch_header = BatchHeader::new(
1061 private_key_1,
1062 round,
1063 now(),
1064 committee_id,
1065 Default::default(),
1066 previous_certificate_ids.clone(),
1067 rng,
1068 )
1069 .unwrap();
1070 let mut signatures = IndexSet::with_capacity(4);
1072 for (j, private_key_2) in private_keys.iter().enumerate() {
1073 if i != j {
1074 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1075 }
1076 }
1077 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1078 }
1079 }
1080 round_to_certificates_map.insert(round, current_certificates.clone());
1082 previous_certificates = current_certificates.clone();
1083 }
1084 (round_to_certificates_map, committee)
1085 };
1086
1087 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1089 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1091 for i in 1..=commit_round + 8 {
1092 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1093 certificates.extend(c);
1094 }
1095 for certificate in certificates.clone().iter() {
1096 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1097 }
1098
1099 let leader_round_1 = commit_round;
1101 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1102 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1103 let block_1 = {
1104 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1105 let mut leader_cert_map = IndexSet::new();
1106 leader_cert_map.insert(leader_certificate.clone());
1107 let mut previous_cert_map = IndexSet::new();
1108 for cert in storage.get_certificates_for_round(commit_round - 1) {
1109 previous_cert_map.insert(cert);
1110 }
1111 subdag_map.insert(commit_round, leader_cert_map.clone());
1112 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1113 let subdag = Subdag::from(subdag_map.clone())?;
1114 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1115 };
1116 core_ledger.advance_to_next_block(&block_1)?;
1118
1119 let leader_round_2 = commit_round + 2;
1121 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1122 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1123 let block_2 = {
1124 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1125 let mut leader_cert_map_2 = IndexSet::new();
1126 leader_cert_map_2.insert(leader_certificate_2.clone());
1127 let mut previous_cert_map_2 = IndexSet::new();
1128 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1129 previous_cert_map_2.insert(cert);
1130 }
1131 let mut prev_commit_cert_map_2 = IndexSet::new();
1132 for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
1133 if cert != leader_certificate {
1134 prev_commit_cert_map_2.insert(cert);
1135 }
1136 }
1137 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1138 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1139 subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
1140 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1141 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1142 };
1143 core_ledger.advance_to_next_block(&block_2)?;
1145
1146 let leader_round_3 = commit_round + 4;
1148 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1149 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1150 let block_3 = {
1151 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1152 let mut leader_cert_map_3 = IndexSet::new();
1153 leader_cert_map_3.insert(leader_certificate_3.clone());
1154 let mut previous_cert_map_3 = IndexSet::new();
1155 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1156 previous_cert_map_3.insert(cert);
1157 }
1158 let mut prev_commit_cert_map_3 = IndexSet::new();
1159 for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
1160 if cert != leader_certificate_2 {
1161 prev_commit_cert_map_3.insert(cert);
1162 }
1163 }
1164 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1165 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1166 subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
1167 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1168 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1169 };
1170 core_ledger.advance_to_next_block(&block_3)?;
1172
1173 let syncing_ledger = Arc::new(CoreLedgerService::new(
1175 CurrentLedger::load(genesis, StorageMode::new_test(None)).unwrap(),
1176 Default::default(),
1177 ));
1178 let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
1180 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1182 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1184 sync.sync_storage_with_block(block_1).await?;
1186 assert_eq!(syncing_ledger.latest_block_height(), 1);
1187 sync.sync_storage_with_block(block_2).await?;
1189 assert_eq!(syncing_ledger.latest_block_height(), 2);
1190 sync.sync_storage_with_block(block_3).await?;
1192 assert_eq!(syncing_ledger.latest_block_height(), 3);
1193 assert!(syncing_ledger.contains_block_height(1));
1195 assert!(syncing_ledger.contains_block_height(2));
1196
1197 Ok(())
1198 }
1199
1200 #[tokio::test]
1201 #[tracing_test::traced_test]
1202 async fn test_pending_certificates() -> anyhow::Result<()> {
1203 let rng = &mut TestRng::default();
1204 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1206 let commit_round = 2;
1207
1208 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1210 let account: Account<CurrentNetwork> = Account::new(rng)?;
1211
1212 let seed: u64 = rng.r#gen();
1214 let genesis_rng = &mut TestRng::from_seed(seed);
1215 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1216
1217 let genesis_rng = &mut TestRng::from_seed(seed);
1219 let private_keys = [
1220 *account.private_key(),
1221 PrivateKey::new(genesis_rng)?,
1222 PrivateKey::new(genesis_rng)?,
1223 PrivateKey::new(genesis_rng)?,
1224 ];
1225 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1227 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1229 let (round_to_certificates_map, committee) = {
1231 let committee = ledger.latest_committee().unwrap();
1233 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1235 HashMap::new();
1236 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1237
1238 for round in 0..=commit_round + 8 {
1239 let mut current_certificates = IndexSet::new();
1240 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1241 IndexSet::new()
1242 } else {
1243 previous_certificates.iter().map(|c| c.id()).collect()
1244 };
1245 let committee_id = committee.id();
1246 for (i, private_key_1) in private_keys.iter().enumerate() {
1248 let batch_header = BatchHeader::new(
1249 private_key_1,
1250 round,
1251 now(),
1252 committee_id,
1253 Default::default(),
1254 previous_certificate_ids.clone(),
1255 rng,
1256 )
1257 .unwrap();
1258 let mut signatures = IndexSet::with_capacity(4);
1260 for (j, private_key_2) in private_keys.iter().enumerate() {
1261 if i != j {
1262 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1263 }
1264 }
1265 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1266 }
1267
1268 round_to_certificates_map.insert(round, current_certificates.clone());
1270 previous_certificates = current_certificates.clone();
1271 }
1272 (round_to_certificates_map, committee)
1273 };
1274
1275 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1277 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1279 for i in 1..=commit_round + 8 {
1280 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1281 certificates.extend(c);
1282 }
1283 for certificate in certificates.clone().iter() {
1284 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1285 }
1286 let leader_round_1 = commit_round;
1288 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1289 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1290 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1291 let block_1 = {
1292 let mut leader_cert_map = IndexSet::new();
1293 leader_cert_map.insert(leader_certificate.clone());
1294 let mut previous_cert_map = IndexSet::new();
1295 for cert in storage.get_certificates_for_round(commit_round - 1) {
1296 previous_cert_map.insert(cert);
1297 }
1298 subdag_map.insert(commit_round, leader_cert_map.clone());
1299 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1300 let subdag = Subdag::from(subdag_map.clone())?;
1301 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1302 };
1303 core_ledger.advance_to_next_block(&block_1)?;
1305
1306 let leader_round_2 = commit_round + 2;
1308 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1309 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1310 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1311 let block_2 = {
1312 let mut leader_cert_map_2 = IndexSet::new();
1313 leader_cert_map_2.insert(leader_certificate_2.clone());
1314 let mut previous_cert_map_2 = IndexSet::new();
1315 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1316 previous_cert_map_2.insert(cert);
1317 }
1318 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1319 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1320 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1321 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1322 };
1323 core_ledger.advance_to_next_block(&block_2)?;
1325
1326 let leader_round_3 = commit_round + 4;
1328 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1329 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1330 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1331 let block_3 = {
1332 let mut leader_cert_map_3 = IndexSet::new();
1333 leader_cert_map_3.insert(leader_certificate_3.clone());
1334 let mut previous_cert_map_3 = IndexSet::new();
1335 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1336 previous_cert_map_3.insert(cert);
1337 }
1338 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1339 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1340 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1341 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1342 };
1343 core_ledger.advance_to_next_block(&block_3)?;
1345
1346 let pending_certificates = storage.get_pending_certificates();
1352 for certificate in pending_certificates.clone() {
1354 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1355 }
1356 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1358 {
1359 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1360 for subdag in subdag_maps.iter() {
1361 for subdag_certificates in subdag.values() {
1362 committed_certificates.extend(subdag_certificates.iter().cloned());
1363 }
1364 }
1365 };
1366 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1368 for certificate in certificates.clone() {
1369 if !committed_certificates.contains(&certificate) {
1370 candidate_pending_certificates.insert(certificate);
1371 }
1372 }
1373 assert_eq!(pending_certificates, candidate_pending_certificates);
1375 Ok(())
1376 }
1377}