1use crate::{
17 Gateway,
18 MAX_FETCH_TIMEOUT_IN_MS,
19 Transport,
20 events::DataBlocks,
21 helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22 spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_router::PeerPoolHandling;
27use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
28use snarkvm::{
29 console::{network::Network, types::Field},
30 ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31 prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, anyhow, bail};
35use indexmap::IndexMap;
36#[cfg(feature = "locktick")]
37use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
38#[cfg(not(feature = "locktick"))]
39use parking_lot::Mutex;
40#[cfg(not(feature = "serial"))]
41use rayon::prelude::*;
42use std::{
43 collections::{BTreeMap, HashMap},
44 future::Future,
45 net::SocketAddr,
46 sync::Arc,
47 time::Duration,
48};
49#[cfg(not(feature = "locktick"))]
50use tokio::sync::Mutex as TMutex;
51use tokio::{
52 sync::{OnceCell, oneshot},
53 task::JoinHandle,
54};
55
56#[derive(Clone)]
69pub struct Sync<N: Network> {
70 gateway: Gateway<N>,
72 storage: Storage<N>,
74 ledger: Arc<dyn LedgerService<N>>,
76 block_sync: Arc<BlockSync<N>>,
78 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
80 bft_sender: Arc<OnceCell<BFTSender<N>>>,
82 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
84 response_lock: Arc<TMutex<()>>,
86 sync_lock: Arc<TMutex<()>>,
88 latest_block_responses: Arc<TMutex<BTreeMap<u32, Block<N>>>>,
96}
97
98impl<N: Network> Sync<N> {
99 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
102
103 pub fn new(
105 gateway: Gateway<N>,
106 storage: Storage<N>,
107 ledger: Arc<dyn LedgerService<N>>,
108 block_sync: Arc<BlockSync<N>>,
109 ) -> Self {
110 Self {
112 gateway,
113 storage,
114 ledger,
115 block_sync,
116 pending: Default::default(),
117 bft_sender: Default::default(),
118 handles: Default::default(),
119 response_lock: Default::default(),
120 sync_lock: Default::default(),
121 latest_block_responses: Default::default(),
122 }
123 }
124
125 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
127 if let Some(bft_sender) = bft_sender {
129 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
130 }
131
132 info!("Syncing storage with the ledger...");
133
134 self.sync_storage_with_ledger_at_bootup().await?;
136
137 debug!("Finished initial block synchronization at startup");
138 Ok(())
139 }
140
141 #[inline]
145 async fn send_block_requests(
146 &self,
147
148 block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
149 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
150 ) {
151 trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
152
153 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
155 if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
156 break;
158 }
159
160 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
162 }
163 }
164
165 pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
170 info!("Starting the sync module...");
171
172 let self_ = self.clone();
174 self.spawn(async move {
175 loop {
176 let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
178
179 self_.try_issuing_block_requests().await;
181
182 }
184 });
185
186 let self_ = self.clone();
188 let ping = ping.clone();
189 self.spawn(async move {
190 loop {
191 let _ =
193 tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
194
195 self_.try_advancing_block_synchronization(&ping).await;
196
197 }
200 });
201
202 let self_ = self.clone();
204 self.spawn(async move {
205 loop {
206 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
208
209 let self__ = self_.clone();
211 let _ = spawn_blocking!({
212 self__.pending.clear_expired_callbacks();
213 Ok(())
214 });
215 }
216 });
217
218 let SyncReceiver {
222 mut rx_block_sync_insert_block_response,
223 mut rx_block_sync_remove_peer,
224 mut rx_block_sync_update_peer_locators,
225 mut rx_certificate_request,
226 mut rx_certificate_response,
227 } = sync_receiver;
228
229 let self_ = self.clone();
236 self.spawn(async move {
237 while let Some((peer_ip, blocks, callback)) = rx_block_sync_insert_block_response.recv().await {
238 callback.send(self_.insert_block_response(peer_ip, blocks).await).ok();
239 }
240 });
241
242 let self_ = self.clone();
244 self.spawn(async move {
245 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
246 self_.remove_peer(peer_ip);
247 }
248 });
249
250 let self_ = self.clone();
257 self.spawn(async move {
258 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
259 let self_clone = self_.clone();
260 tokio::spawn(async move {
261 callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
262 });
263 }
264 });
265
266 let self_ = self.clone();
272 self.spawn(async move {
273 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
274 self_.send_certificate_response(peer_ip, certificate_request);
275 }
276 });
277
278 let self_ = self.clone();
284 self.spawn(async move {
285 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
286 self_.finish_certificate_request(peer_ip, certificate_response);
287 }
288 });
289
290 Ok(())
291 }
292
293 async fn try_issuing_block_requests(&self) {
298 let timeout_requests = self.block_sync.handle_block_request_timeouts(&self.gateway);
302 if let Some((requests, sync_peers)) = timeout_requests {
303 self.send_block_requests(requests, sync_peers).await;
304 return;
305 }
306
307 self.block_sync.set_sync_height(self.ledger.latest_block_height());
310
311 if !self.block_sync.can_block_sync() {
314 return;
315 }
316
317 let (requests, sync_peers) = self.block_sync.prepare_block_requests();
320
321 if requests.is_empty() {
323 return;
324 }
325
326 self.send_block_requests(requests, sync_peers).await;
328 }
329
330 #[cfg(test)]
333 pub(crate) async fn testing_only_try_block_sync_testing_only(&self) {
334 self.try_issuing_block_requests().await;
336
337 self.try_advancing_block_synchronization(&None).await;
339 }
340}
341
342impl<N: Network> Sync<N> {
344 async fn insert_block_response(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
346 self.block_sync.insert_block_responses(peer_ip, blocks)
348
349 }
352
353 fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
355 self.block_sync.update_peer_locators(peer_ip, &locators)
356 }
357
358 fn remove_peer(&self, peer_ip: SocketAddr) {
360 self.block_sync.remove_peer(&peer_ip);
361 }
362
363 #[cfg(test)]
364 pub fn testing_only_update_peer_locators_testing_only(
365 &self,
366 peer_ip: SocketAddr,
367 locators: BlockLocators<N>,
368 ) -> Result<()> {
369 self.update_peer_locators(peer_ip, locators)
370 }
371}
372
373impl<N: Network> Sync<N> {
375 async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
379 let latest_block = self.ledger.latest_block();
381
382 let block_height = latest_block.height();
384 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
389 let gc_height = block_height.saturating_sub(max_gc_blocks);
393 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
395
396 let _lock = self.sync_lock.lock().await;
398
399 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
400
401 self.storage.sync_height_with_block(latest_block.height());
405 self.storage.sync_round_with_block(latest_block.round());
407 self.storage.garbage_collect_certificates(latest_block.round());
409 for block in &blocks {
411 if let Authority::Quorum(subdag) = block.authority() {
416 let unconfirmed_transactions = cfg_iter!(block.transactions())
418 .filter_map(|tx| {
419 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
420 })
421 .collect::<HashMap<_, _>>();
422
423 for certificates in subdag.values().cloned() {
425 cfg_into_iter!(certificates).for_each(|certificate| {
426 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
427 });
428 }
429
430 #[cfg(feature = "telemetry")]
432 self.gateway.validator_telemetry().insert_subdag(subdag);
433 }
434 }
435
436 let certificates = blocks
440 .iter()
441 .flat_map(|block| {
442 match block.authority() {
443 Authority::Beacon(_) => None,
445 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
447 }
448 })
449 .flatten()
450 .collect::<Vec<_>>();
451
452 if let Some(bft_sender) = self.bft_sender.get() {
454 if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
456 bail!("Failed to update the BFT DAG from sync: {e}");
457 }
458 }
459
460 self.block_sync.set_sync_height(block_height);
461
462 Ok(())
463 }
464
465 async fn compute_sync_height(&self) -> u32 {
468 let ledger_height = self.ledger.latest_block_height();
469 let mut responses = self.latest_block_responses.lock().await;
470
471 responses.retain(|height, _| *height > ledger_height);
473
474 responses.last_key_value().map(|(height, _)| *height).unwrap_or(0).max(ledger_height)
476 }
477
478 async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
480 let new_blocks = match self.try_advancing_block_synchronization_inner().await {
482 Ok(new_blocks) => new_blocks,
483 Err(err) => {
484 error!("Block synchronization failed - {err}");
485 false
486 }
487 };
488
489 if let Some(ping) = &ping
490 && new_blocks
491 {
492 match self.get_block_locators() {
493 Ok(locators) => ping.update_block_locators(locators),
494 Err(err) => error!("Failed to update block locators: {err}"),
495 }
496 }
497 }
498
499 async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
514 let _lock = self.response_lock.lock().await;
516
517 let ledger_height = self.ledger.latest_block_height();
520 self.block_sync.set_sync_height(ledger_height);
521
522 let tip = self
524 .block_sync
525 .find_sync_peers()
526 .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
527 .unwrap_or(0);
528
529 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
534
535 let cleanup = |start_height, current_height, error| {
537 let new_blocks = current_height > start_height;
538
539 if new_blocks {
541 self.block_sync.set_sync_height(current_height);
542 }
543
544 if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
545 };
546
547 let max_gc_height = tip.saturating_sub(max_gc_blocks);
551 let within_gc = (ledger_height + 1) > max_gc_height;
552
553 if within_gc {
554 let start_height = self.compute_sync_height().await;
557
558 self.block_sync.set_sync_height(start_height);
561
562 let mut current_height = start_height;
564 trace!("Try advancing with block responses (at block {current_height})");
565
566 loop {
568 let next_height = current_height + 1;
569 let Some(block) = self.block_sync.peek_next_block(next_height) else {
570 break;
571 };
572 info!("Syncing the BFT to block {}...", block.height());
573 match self.sync_storage_with_block(block).await {
575 Ok(_) => {
576 current_height = next_height;
578 }
579 Err(err) => {
580 self.block_sync.remove_block_response(next_height);
582 return cleanup(start_height, current_height, Some(err));
583 }
584 }
585 }
586
587 cleanup(start_height, current_height, None)
588 } else {
589 info!("Block sync is too far behind other validators. Syncing without BFT.");
590
591 let start_height = ledger_height;
594 let mut current_height = start_height;
595
596 self.block_sync.set_sync_height(start_height);
599
600 loop {
603 let next_height = current_height + 1;
604
605 let Some(block) = self.block_sync.peek_next_block(next_height) else {
606 break;
607 };
608 info!("Syncing the ledger to block {}...", block.height());
609
610 match self.sync_ledger_with_block_without_bft(block).await {
612 Ok(_) => {
613 current_height = next_height;
615 self.block_sync.count_request_completed();
616 }
617 Err(err) => {
618 self.block_sync.remove_block_response(next_height);
620 return cleanup(start_height, current_height, Some(err));
621 }
622 }
623 }
624
625 let within_gc = (current_height + 1) > max_gc_height;
627 if within_gc {
628 info!("Finished catching up with the network. Switching back to BFT sync.");
629 if let Err(err) = self.sync_storage_with_ledger_at_bootup().await {
630 error!("BFT sync (with bootup routine) failed - {err}");
631 }
632 }
633
634 cleanup(start_height, current_height, None)
635 }
636 }
637
638 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
642 let _lock = self.sync_lock.lock().await;
644
645 let self_ = self.clone();
646 tokio::task::spawn_blocking(move || {
647 self_.ledger.check_next_block(&block)?;
649 self_.ledger.advance_to_next_block(&block)?;
651
652 self_.storage.sync_height_with_block(block.height());
654 self_.storage.sync_round_with_block(block.round());
656 self_.block_sync.remove_block_response(block.height());
658
659 Ok(())
660 })
661 .await?
662 }
663
664 async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
675 let _lock = self.sync_lock.lock().await;
677
678 if self.ledger.contains_block_height(block.height()) {
681 debug!("Ledger is already synced with block at height {}. Will not sync.", block.height());
682 return Ok(());
683 }
684
685 let mut latest_block_responses = self.latest_block_responses.lock().await;
687
688 if latest_block_responses.contains_key(&block.height()) {
689 debug!("An unconfirmed block is queued already for height {}. Will not sync.", block.height());
690 return Ok(());
691 }
692
693 if let Authority::Quorum(subdag) = block.authority() {
698 let unconfirmed_transactions = cfg_iter!(block.transactions())
700 .filter_map(|tx| {
701 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
702 })
703 .collect::<HashMap<_, _>>();
704
705 for certificates in subdag.values().cloned() {
707 cfg_into_iter!(certificates.clone()).for_each(|certificate| {
708 self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
710 });
711
712 for certificate in certificates {
714 if let Some(bft_sender) = self.bft_sender.get() {
717 if let Err(err) = bft_sender.send_sync_bft(certificate).await {
719 bail!("Failed to sync certificate - {err}");
720 };
721 }
722 }
723 }
724 }
725
726 let ledger_block_height = self.ledger.latest_block_height();
728
729 latest_block_responses.insert(block.height(), block);
731 latest_block_responses.retain(|height, _| *height > ledger_block_height);
733
734 let contiguous_blocks: Vec<Block<N>> = (ledger_block_height.saturating_add(1)..)
736 .take_while(|&k| latest_block_responses.contains_key(&k))
737 .filter_map(|k| latest_block_responses.get(&k).cloned())
738 .collect();
739
740 for next_block in contiguous_blocks.into_iter() {
752 let next_block_height = next_block.height();
754
755 let leader_certificate = match next_block.authority() {
757 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
758 _ => bail!("Received a block with an unexpected authority type."),
759 };
760 let commit_round = leader_certificate.round();
761 let certificate_round =
762 commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
763
764 let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
766 let certificates = self.storage.get_certificates_for_round(certificate_round);
768 let authors = certificates
771 .iter()
772 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
773 true => Some(c.author()),
774 false => None,
775 })
776 .collect();
777
778 debug!("Validating sync block {next_block_height} at round {commit_round}...");
779 if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
781 let mut current_certificate = leader_certificate;
783 let mut blocks_to_add = vec![next_block];
785
786 for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
788 let Some(previous_block) = latest_block_responses.get(&height) else {
790 bail!("Block {height} is missing from the latest block responses.");
791 };
792 let previous_certificate = match previous_block.authority() {
794 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
795 _ => bail!("Received a block with an unexpected authority type."),
796 };
797 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
799 debug!("Previous sync block {height} is linked to the current block {next_block_height}");
800 blocks_to_add.insert(0, previous_block.clone());
802 current_certificate = previous_certificate;
804 }
805 }
806
807 for block in blocks_to_add {
809 let block_height = block.height();
811 if block_height != self.ledger.latest_block_height().saturating_add(1) {
812 warn!("Skipping block {block_height} from the latest block responses - not sequential.");
813 continue;
814 }
815 #[cfg(feature = "telemetry")]
816 let block_authority = block.authority().clone();
817
818 let self_ = self.clone();
819 tokio::task::spawn_blocking(move || {
820 self_.ledger.check_next_block(&block)?;
822 self_.ledger.advance_to_next_block(&block)?;
824
825 self_.storage.sync_height_with_block(block.height());
827 self_.storage.sync_round_with_block(block.round());
829
830 Ok::<(), anyhow::Error>(())
831 })
832 .await??;
833 latest_block_responses.remove(&block_height);
835
836 #[cfg(feature = "telemetry")]
838 if let Authority::Quorum(subdag) = block_authority {
839 self_.gateway.validator_telemetry().insert_subdag(&subdag);
840 }
841 }
842 } else {
843 debug!(
844 "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
845 );
846 }
847
848 }
850
851 Ok(())
852 }
853
854 fn is_linked(
856 &self,
857 previous_certificate: BatchCertificate<N>,
858 current_certificate: BatchCertificate<N>,
859 ) -> Result<bool> {
860 let mut traversal = vec![current_certificate.clone()];
862 for round in (previous_certificate.round()..current_certificate.round()).rev() {
864 let certificates = self.storage.get_certificates_for_round(round);
866 traversal = certificates
868 .into_iter()
869 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
870 .collect();
871 }
872 Ok(traversal.contains(&previous_certificate))
873 }
874}
875
876impl<N: Network> Sync<N> {
878 pub fn is_synced(&self) -> bool {
880 if self.gateway.number_of_connected_peers() == 0 {
883 return false;
884 }
885
886 self.block_sync.is_block_synced()
887 }
888
889 pub fn num_blocks_behind(&self) -> Option<u32> {
891 self.block_sync.num_blocks_behind()
892 }
893
894 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
896 self.block_sync.get_block_locators()
897 }
898}
899
900impl<N: Network> Sync<N> {
902 pub async fn send_certificate_request(
904 &self,
905 peer_ip: SocketAddr,
906 certificate_id: Field<N>,
907 ) -> Result<BatchCertificate<N>> {
908 let (callback_sender, callback_receiver) = oneshot::channel();
910 let num_sent_requests = self.pending.num_sent_requests(certificate_id);
912 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
914 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
916 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
919
920 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
922
923 if should_send_request {
925 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
927 bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
928 }
929 } else {
930 debug!(
931 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
932 fmt_id(certificate_id)
933 );
934 }
935 match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
938 Ok(result) => Ok(result?),
940 Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
942 }
943 }
944
945 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
947 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
949 let self_ = self.clone();
951 tokio::spawn(async move {
952 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
953 });
954 }
955 }
956
957 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
960 let certificate = response.certificate;
961 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
963 if exists {
965 self.pending.remove(certificate.id(), Some(certificate));
968 }
969 }
970}
971
972impl<N: Network> Sync<N> {
973 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
975 self.handles.lock().push(tokio::spawn(future));
976 }
977
978 pub async fn shut_down(&self) {
980 info!("Shutting down the sync module...");
981 let _lock = self.response_lock.lock().await;
983 let _lock = self.sync_lock.lock().await;
985 self.handles.lock().iter().for_each(|handle| handle.abort());
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use super::*;
993
994 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
995
996 use snarkos_account::Account;
997 use snarkos_node_sync::BlockSync;
998 use snarkvm::{
999 console::{
1000 account::{Address, PrivateKey},
1001 network::MainnetV0,
1002 },
1003 ledger::{
1004 narwhal::{BatchCertificate, BatchHeader, Subdag},
1005 store::{ConsensusStore, helpers::memory::ConsensusMemory},
1006 },
1007 prelude::{Ledger, VM},
1008 utilities::TestRng,
1009 };
1010
1011 use aleo_std::StorageMode;
1012 use indexmap::IndexSet;
1013 use rand::Rng;
1014 use std::collections::BTreeMap;
1015
1016 type CurrentNetwork = MainnetV0;
1017 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1018 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1019
1020 #[tokio::test]
1021 #[tracing_test::traced_test]
1022 async fn test_commit_via_is_linked() -> anyhow::Result<()> {
1023 let rng = &mut TestRng::default();
1024 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1026 let commit_round = 2;
1027
1028 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1030 let account: Account<CurrentNetwork> = Account::new(rng)?;
1031
1032 let seed: u64 = rng.r#gen();
1034 let genesis_rng = &mut TestRng::from_seed(seed);
1035 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1036
1037 let genesis_rng = &mut TestRng::from_seed(seed);
1039 let private_keys = [
1040 *account.private_key(),
1041 PrivateKey::new(genesis_rng)?,
1042 PrivateKey::new(genesis_rng)?,
1043 PrivateKey::new(genesis_rng)?,
1044 ];
1045
1046 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1048 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1050
1051 let (round_to_certificates_map, committee) = {
1053 let addresses = vec![
1054 Address::try_from(private_keys[0])?,
1055 Address::try_from(private_keys[1])?,
1056 Address::try_from(private_keys[2])?,
1057 Address::try_from(private_keys[3])?,
1058 ];
1059
1060 let committee = ledger.latest_committee().unwrap();
1061
1062 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1064 HashMap::new();
1065 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1066
1067 for round in 0..=commit_round + 8 {
1068 let mut current_certificates = IndexSet::new();
1069 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1070 IndexSet::new()
1071 } else {
1072 previous_certificates.iter().map(|c| c.id()).collect()
1073 };
1074 let committee_id = committee.id();
1075
1076 if round <= 5 {
1078 let leader = committee.get_leader(round).unwrap();
1079 let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
1080 let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
1081 for i in [leader_index, non_leader_index].into_iter() {
1082 let batch_header = BatchHeader::new(
1083 &private_keys[i],
1084 round,
1085 now(),
1086 committee_id,
1087 Default::default(),
1088 previous_certificate_ids.clone(),
1089 rng,
1090 )
1091 .unwrap();
1092 let mut signatures = IndexSet::with_capacity(4);
1094 for (j, private_key_2) in private_keys.iter().enumerate() {
1095 if i != j {
1096 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1097 }
1098 }
1099 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1100 }
1101 }
1102
1103 if round > 5 {
1105 for (i, private_key_1) in private_keys.iter().enumerate() {
1106 let batch_header = BatchHeader::new(
1107 private_key_1,
1108 round,
1109 now(),
1110 committee_id,
1111 Default::default(),
1112 previous_certificate_ids.clone(),
1113 rng,
1114 )
1115 .unwrap();
1116 let mut signatures = IndexSet::with_capacity(4);
1118 for (j, private_key_2) in private_keys.iter().enumerate() {
1119 if i != j {
1120 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1121 }
1122 }
1123 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1124 }
1125 }
1126 round_to_certificates_map.insert(round, current_certificates.clone());
1128 previous_certificates = current_certificates.clone();
1129 }
1130 (round_to_certificates_map, committee)
1131 };
1132
1133 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1135 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1137 for i in 1..=commit_round + 8 {
1138 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1139 certificates.extend(c);
1140 }
1141 for certificate in certificates.clone().iter() {
1142 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1143 }
1144
1145 let leader_round_1 = commit_round;
1147 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1148 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1149 let block_1 = {
1150 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1151 let mut leader_cert_map = IndexSet::new();
1152 leader_cert_map.insert(leader_certificate.clone());
1153 let mut previous_cert_map = IndexSet::new();
1154 for cert in storage.get_certificates_for_round(commit_round - 1) {
1155 previous_cert_map.insert(cert);
1156 }
1157 subdag_map.insert(commit_round, leader_cert_map.clone());
1158 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1159 let subdag = Subdag::from(subdag_map.clone())?;
1160 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1161 };
1162 core_ledger.advance_to_next_block(&block_1)?;
1164
1165 let leader_round_2 = commit_round + 2;
1167 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1168 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1169 let block_2 = {
1170 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1171 let mut leader_cert_map_2 = IndexSet::new();
1172 leader_cert_map_2.insert(leader_certificate_2.clone());
1173 let mut previous_cert_map_2 = IndexSet::new();
1174 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1175 previous_cert_map_2.insert(cert);
1176 }
1177 let mut prev_commit_cert_map_2 = IndexSet::new();
1178 for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
1179 if cert != leader_certificate {
1180 prev_commit_cert_map_2.insert(cert);
1181 }
1182 }
1183 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1184 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1185 subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
1186 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1187 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1188 };
1189 core_ledger.advance_to_next_block(&block_2)?;
1191
1192 let leader_round_3 = commit_round + 4;
1194 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1195 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1196 let block_3 = {
1197 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1198 let mut leader_cert_map_3 = IndexSet::new();
1199 leader_cert_map_3.insert(leader_certificate_3.clone());
1200 let mut previous_cert_map_3 = IndexSet::new();
1201 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1202 previous_cert_map_3.insert(cert);
1203 }
1204 let mut prev_commit_cert_map_3 = IndexSet::new();
1205 for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
1206 if cert != leader_certificate_2 {
1207 prev_commit_cert_map_3.insert(cert);
1208 }
1209 }
1210 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1211 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1212 subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
1213 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1214 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1215 };
1216 core_ledger.advance_to_next_block(&block_3)?;
1218
1219 let storage_mode = StorageMode::new_test(None);
1221 let syncing_ledger = Arc::new(CoreLedgerService::new(
1222 CurrentLedger::load(genesis, storage_mode.clone()).unwrap(),
1223 Default::default(),
1224 ));
1225 let gateway =
1227 Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], storage_mode, None)?;
1228 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1230 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1232 sync.sync_storage_with_block(block_1).await?;
1234 assert_eq!(syncing_ledger.latest_block_height(), 1);
1235 sync.sync_storage_with_block(block_2).await?;
1237 assert_eq!(syncing_ledger.latest_block_height(), 2);
1238 sync.sync_storage_with_block(block_3).await?;
1240 assert_eq!(syncing_ledger.latest_block_height(), 3);
1241 assert!(syncing_ledger.contains_block_height(1));
1243 assert!(syncing_ledger.contains_block_height(2));
1244
1245 Ok(())
1246 }
1247
1248 #[tokio::test]
1249 #[tracing_test::traced_test]
1250 async fn test_pending_certificates() -> anyhow::Result<()> {
1251 let rng = &mut TestRng::default();
1252 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1254 let commit_round = 2;
1255
1256 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1258 let account: Account<CurrentNetwork> = Account::new(rng)?;
1259
1260 let seed: u64 = rng.r#gen();
1262 let genesis_rng = &mut TestRng::from_seed(seed);
1263 let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1264
1265 let genesis_rng = &mut TestRng::from_seed(seed);
1267 let private_keys = [
1268 *account.private_key(),
1269 PrivateKey::new(genesis_rng)?,
1270 PrivateKey::new(genesis_rng)?,
1271 PrivateKey::new(genesis_rng)?,
1272 ];
1273 let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1275 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1277 let (round_to_certificates_map, committee) = {
1279 let committee = ledger.latest_committee().unwrap();
1281 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1283 HashMap::new();
1284 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1285
1286 for round in 0..=commit_round + 8 {
1287 let mut current_certificates = IndexSet::new();
1288 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1289 IndexSet::new()
1290 } else {
1291 previous_certificates.iter().map(|c| c.id()).collect()
1292 };
1293 let committee_id = committee.id();
1294 for (i, private_key_1) in private_keys.iter().enumerate() {
1296 let batch_header = BatchHeader::new(
1297 private_key_1,
1298 round,
1299 now(),
1300 committee_id,
1301 Default::default(),
1302 previous_certificate_ids.clone(),
1303 rng,
1304 )
1305 .unwrap();
1306 let mut signatures = IndexSet::with_capacity(4);
1308 for (j, private_key_2) in private_keys.iter().enumerate() {
1309 if i != j {
1310 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1311 }
1312 }
1313 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1314 }
1315
1316 round_to_certificates_map.insert(round, current_certificates.clone());
1318 previous_certificates = current_certificates.clone();
1319 }
1320 (round_to_certificates_map, committee)
1321 };
1322
1323 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1325 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1327 for i in 1..=commit_round + 8 {
1328 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1329 certificates.extend(c);
1330 }
1331 for certificate in certificates.clone().iter() {
1332 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1333 }
1334 let leader_round_1 = commit_round;
1336 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1337 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1338 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1339 let block_1 = {
1340 let mut leader_cert_map = IndexSet::new();
1341 leader_cert_map.insert(leader_certificate.clone());
1342 let mut previous_cert_map = IndexSet::new();
1343 for cert in storage.get_certificates_for_round(commit_round - 1) {
1344 previous_cert_map.insert(cert);
1345 }
1346 subdag_map.insert(commit_round, leader_cert_map.clone());
1347 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1348 let subdag = Subdag::from(subdag_map.clone())?;
1349 core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1350 };
1351 core_ledger.advance_to_next_block(&block_1)?;
1353
1354 let leader_round_2 = commit_round + 2;
1356 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1357 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1358 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1359 let block_2 = {
1360 let mut leader_cert_map_2 = IndexSet::new();
1361 leader_cert_map_2.insert(leader_certificate_2.clone());
1362 let mut previous_cert_map_2 = IndexSet::new();
1363 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1364 previous_cert_map_2.insert(cert);
1365 }
1366 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1367 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1368 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1369 core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1370 };
1371 core_ledger.advance_to_next_block(&block_2)?;
1373
1374 let leader_round_3 = commit_round + 4;
1376 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1377 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1378 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1379 let block_3 = {
1380 let mut leader_cert_map_3 = IndexSet::new();
1381 leader_cert_map_3.insert(leader_certificate_3.clone());
1382 let mut previous_cert_map_3 = IndexSet::new();
1383 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1384 previous_cert_map_3.insert(cert);
1385 }
1386 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1387 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1388 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1389 core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1390 };
1391 core_ledger.advance_to_next_block(&block_3)?;
1393
1394 let pending_certificates = storage.get_pending_certificates();
1400 for certificate in pending_certificates.clone() {
1402 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1403 }
1404 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1406 {
1407 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1408 for subdag in subdag_maps.iter() {
1409 for subdag_certificates in subdag.values() {
1410 committed_certificates.extend(subdag_certificates.iter().cloned());
1411 }
1412 }
1413 };
1414 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1416 for certificate in certificates.clone() {
1417 if !committed_certificates.contains(&certificate) {
1418 candidate_pending_certificates.insert(certificate);
1419 }
1420 }
1421 assert_eq!(pending_certificates, candidate_pending_certificates);
1423 Ok(())
1424 }
1425}