1use crate::{
17 Gateway,
18 MAX_FETCH_TIMEOUT_IN_MS,
19 Transport,
20 events::DataBlocks,
21 helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22 spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_network::PeerPoolHandling;
27use snarkos_node_sync::{
28 BLOCK_REQUEST_BATCH_DELAY,
29 BlockSync,
30 InsertBlockResponseError,
31 Ping,
32 PrepareSyncRequest,
33 locators::BlockLocators,
34};
35
36use snarkvm::{
37 console::{
38 network::{ConsensusVersion, Network},
39 types::Field,
40 },
41 ledger::{PendingBlock, authority::Authority, block::Block, narwhal::BatchCertificate},
42 utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error},
43};
44
45use anyhow::{Context, Result, anyhow, bail, ensure};
46use indexmap::IndexMap;
47#[cfg(feature = "locktick")]
48use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
49#[cfg(not(feature = "locktick"))]
50use parking_lot::Mutex;
51#[cfg(not(feature = "serial"))]
52use rayon::prelude::*;
53use std::{
54 collections::{HashMap, VecDeque},
55 future::Future,
56 net::SocketAddr,
57 sync::Arc,
58 time::Duration,
59};
60#[cfg(not(feature = "locktick"))]
61use tokio::sync::Mutex as TMutex;
62use tokio::{
63 sync::{OnceCell, oneshot},
64 task::JoinHandle,
65};
66
67#[derive(Clone)]
80pub struct Sync<N: Network> {
81 gateway: Gateway<N>,
83 storage: Storage<N>,
85 ledger: Arc<dyn LedgerService<N>>,
87 block_sync: Arc<BlockSync<N>>,
89 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
91 bft_sender: Arc<OnceCell<BFTSender<N>>>,
93 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
95 response_lock: Arc<TMutex<()>>,
97 sync_lock: Arc<TMutex<()>>,
99 pending_blocks: Arc<TMutex<VecDeque<PendingBlock<N>>>>,
107}
108
109impl<N: Network> Sync<N> {
110 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
113
114 pub fn new(
116 gateway: Gateway<N>,
117 storage: Storage<N>,
118 ledger: Arc<dyn LedgerService<N>>,
119 block_sync: Arc<BlockSync<N>>,
120 ) -> Self {
121 Self {
123 gateway,
124 storage,
125 ledger,
126 block_sync,
127 pending: Default::default(),
128 bft_sender: Default::default(),
129 handles: Default::default(),
130 response_lock: Default::default(),
131 sync_lock: Default::default(),
132 pending_blocks: Default::default(),
133 }
134 }
135
136 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
138 if let Some(bft_sender) = bft_sender {
140 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
141 }
142
143 info!("Syncing storage with the ledger...");
144
145 self.sync_storage_with_ledger_at_bootup()
147 .await
148 .with_context(|| "Syncing storage with the ledger at bootup failed")?;
149
150 debug!("Finished initial block synchronization at startup");
151 Ok(())
152 }
153
154 #[inline]
158 async fn send_block_requests(
159 &self,
160
161 block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
162 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
163 ) {
164 trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
165
166 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
168 if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
169 break;
171 }
172
173 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
175 }
176 }
177
178 pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
183 info!("Starting the sync module...");
184
185 let self_ = self.clone();
187 self.spawn(async move {
188 loop {
189 let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
191
192 self_.try_issuing_block_requests().await;
194
195 }
197 });
198
199 let self_ = self.clone();
201 let ping = ping.clone();
202 self.spawn(async move {
203 loop {
204 let _ =
206 tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
207
208 let ping = ping.clone();
209 let self_ = self_.clone();
210 let hdl = tokio::spawn(async move {
211 self_.try_advancing_block_synchronization(&ping).await;
212 });
213
214 if let Err(err) = hdl.await
215 && let Ok(panic) = err.try_into_panic()
216 {
217 error!("Sync block advancement panicked: {panic:?}");
218 }
219
220 }
223 });
224
225 let self_ = self.clone();
227 self.spawn(async move {
228 loop {
229 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
231
232 let self__ = self_.clone();
234 let _ = spawn_blocking!({
235 self__.pending.clear_expired_callbacks();
236 Ok(())
237 });
238 }
239 });
240
241 let SyncReceiver {
245 mut rx_block_sync_insert_block_response,
246 mut rx_block_sync_remove_peer,
247 mut rx_block_sync_update_peer_locators,
248 mut rx_certificate_request,
249 mut rx_certificate_response,
250 } = sync_receiver;
251
252 let self_ = self.clone();
259 self.spawn(async move {
260 while let Some((peer_ip, blocks, latest_consensus_version, callback)) =
261 rx_block_sync_insert_block_response.recv().await
262 {
263 let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await;
264 if let Err(err) = &result {
266 warn!("Failed to insert block response from '{peer_ip}' - {err}");
267 }
268
269 callback.send(result).ok();
270 }
271 });
272
273 let self_ = self.clone();
275 self.spawn(async move {
276 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
277 self_.remove_peer(peer_ip);
278 }
279 });
280
281 let self_ = self.clone();
288 self.spawn(async move {
289 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
290 let self_clone = self_.clone();
291 tokio::spawn(async move {
292 callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
293 });
294 }
295 });
296
297 let self_ = self.clone();
303 self.spawn(async move {
304 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
305 self_.send_certificate_response(peer_ip, certificate_request);
306 }
307 });
308
309 let self_ = self.clone();
315 self.spawn(async move {
316 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
317 self_.finish_certificate_request(peer_ip, certificate_response);
318 }
319 });
320
321 Ok(())
322 }
323
324 async fn try_issuing_block_requests(&self) {
329 self.block_sync.set_sync_height(self.ledger.latest_block_height());
332
333 match self.block_sync.handle_block_request_timeouts(&self.gateway) {
337 Ok(Some((requests, sync_peers))) => {
338 self.send_block_requests(requests, sync_peers).await;
340 return;
341 }
342 Ok(None) => {}
343 Err(err) => {
344 error!("{}", &flatten_error(err));
346 return;
347 }
348 }
349
350 if !self.block_sync.can_block_sync() {
353 return;
354 }
355
356 let (requests, sync_peers) = self.block_sync.prepare_block_requests();
359
360 if requests.is_empty() {
362 return;
363 }
364
365 self.send_block_requests(requests, sync_peers).await;
367 }
368
369 #[cfg(test)]
372 pub(crate) async fn testing_only_try_block_sync_testing_only(&self) {
373 self.try_issuing_block_requests().await;
375
376 self.try_advancing_block_synchronization(&None).await;
378 }
379}
380
381impl<N: Network> Sync<N> {
383 async fn insert_block_response(
385 &self,
386 peer_ip: SocketAddr,
387 blocks: Vec<Block<N>>,
388 latest_consensus_version: Option<ConsensusVersion>,
389 ) -> Result<(), InsertBlockResponseError> {
390 self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version)
391
392 }
395
396 fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
398 self.block_sync.update_peer_locators(peer_ip, &locators)
399 }
400
401 fn remove_peer(&self, peer_ip: SocketAddr) {
403 self.block_sync.remove_peer(&peer_ip);
404 }
405
406 #[cfg(test)]
407 pub fn testing_only_update_peer_locators_testing_only(
408 &self,
409 peer_ip: SocketAddr,
410 locators: BlockLocators<N>,
411 ) -> Result<()> {
412 self.update_peer_locators(peer_ip, locators)
413 }
414}
415
416impl<N: Network> Sync<N> {
418 async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
422 let latest_block = self.ledger.latest_block();
424
425 let block_height = latest_block.height();
427 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
432 let gc_height = block_height.saturating_sub(max_gc_blocks);
436 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
438
439 let _lock = self.sync_lock.lock().await;
441
442 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
443
444 self.storage.sync_height_with_block(latest_block.height());
448 self.storage.sync_round_with_block(latest_block.round());
450 self.storage.garbage_collect_certificates(latest_block.round());
452 for block in &blocks {
454 if let Authority::Quorum(subdag) = block.authority() {
459 let unconfirmed_transactions = cfg_iter!(block.transactions())
461 .filter_map(|tx| {
462 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
463 })
464 .collect::<HashMap<_, _>>();
465
466 for certificates in subdag.values().cloned() {
468 cfg_into_iter!(certificates).for_each(|certificate| {
469 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
470 });
471 }
472
473 #[cfg(feature = "telemetry")]
475 self.gateway.validator_telemetry().insert_subdag(subdag);
476 }
477 }
478
479 let certificates = blocks
483 .iter()
484 .flat_map(|block| {
485 match block.authority() {
486 Authority::Beacon(_) => None,
488 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
490 }
491 })
492 .flatten()
493 .collect::<Vec<_>>();
494
495 if let Some(bft_sender) = self.bft_sender.get() {
497 bft_sender
499 .tx_sync_bft_dag_at_bootup
500 .send(certificates)
501 .await
502 .with_context(|| "Failed to update the BFT DAG from sync")?;
503 }
504
505 self.block_sync.set_sync_height(block_height);
506
507 Ok(())
508 }
509
510 async fn compute_sync_height(&self) -> u32 {
513 let ledger_height = self.ledger.latest_block_height();
514 let mut pending_blocks = self.pending_blocks.lock().await;
515
516 while let Some(b) = pending_blocks.front()
518 && b.height() <= ledger_height
519 {
520 pending_blocks.pop_front();
521 }
522
523 pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height)
525 }
526
527 async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
529 let new_blocks = match self
531 .try_advancing_block_synchronization_inner()
532 .await
533 .with_context(|| "Block synchronization failed")
534 {
535 Ok(new_blocks) => new_blocks,
536 Err(err) => {
537 error!("{}", &flatten_error(err));
538 false
539 }
540 };
541
542 if let Some(ping) = &ping
543 && new_blocks
544 {
545 match self.get_block_locators() {
546 Ok(locators) => ping.update_block_locators(locators),
547 Err(err) => error!("Failed to update block locators: {err}"),
548 }
549 }
550 }
551
552 async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
567 let _lock = self.response_lock.lock().await;
569
570 let ledger_height = self.ledger.latest_block_height();
573 self.block_sync.set_sync_height(ledger_height);
574
575 let tip = self
577 .block_sync
578 .find_sync_peers()
579 .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
580 .unwrap_or(0);
581
582 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
587
588 let cleanup = |start_height, current_height, error| {
590 let new_blocks = current_height > start_height;
591
592 if new_blocks {
594 self.block_sync.set_sync_height(current_height);
595 }
596
597 if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
598 };
599
600 let max_gc_height = tip.saturating_sub(max_gc_blocks);
604 let within_gc = (ledger_height + 1) > max_gc_height;
605
606 if within_gc {
607 let start_height = self.compute_sync_height().await;
610
611 let mut current_height = start_height;
613 trace!(
614 "Try advancing blocks responses with BFT (starting at block {current_height}, current sync speed is {})",
615 self.block_sync.get_sync_speed()
616 );
617
618 loop {
620 let next_height = current_height + 1;
621 let Some(block) = self.block_sync.peek_next_block(next_height) else {
622 break;
623 };
624 info!("Syncing the BFT to block {}...", block.height());
625 match self.sync_storage_with_block(block).await {
627 Ok(_) => {
628 current_height = next_height;
630 }
631 Err(err) => {
632 self.block_sync.remove_block_response(next_height);
634 return cleanup(start_height, current_height, Some(err));
635 }
636 }
637 }
638
639 cleanup(start_height, current_height, None)
640 } else {
641 let start_height = ledger_height;
644 let mut current_height = start_height;
645
646 trace!("Try advancing block responses without BFT (starting at block {current_height})");
647
648 loop {
651 let next_height = current_height + 1;
652
653 let Some(block) = self.block_sync.peek_next_block(next_height) else {
654 break;
655 };
656 info!("Syncing the ledger to block {}...", block.height());
657
658 match self.sync_ledger_with_block_without_bft(block).await {
660 Ok(_) => {
661 current_height = next_height;
663 self.block_sync.count_request_completed();
664 }
665 Err(err) => {
666 self.block_sync.remove_block_response(next_height);
668 return cleanup(start_height, current_height, Some(err));
669 }
670 }
671 }
672
673 let within_gc = (current_height + 1) > max_gc_height;
675 if within_gc {
676 info!("Finished catching up with the network. Switching back to BFT sync.");
677 self.sync_storage_with_ledger_at_bootup()
678 .await
679 .with_context(|| "BFT sync (with bootup routine) failed")?;
680 }
681
682 cleanup(start_height, current_height, None)
683 }
684 }
685
686 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
690 let _lock = self.sync_lock.lock().await;
692
693 let self_ = self.clone();
694 spawn_blocking!({
695 self_.ledger.check_next_block(&block)?;
697 self_.ledger.advance_to_next_block(&block)?;
699
700 self_.storage.sync_height_with_block(block.height());
702 self_.storage.sync_round_with_block(block.round());
704 self_.block_sync.remove_block_response(block.height());
706
707 Ok(())
708 })
709 }
710
711 async fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> {
717 let Authority::Quorum(subdag) = block.authority() else {
719 return Ok(());
720 };
721
722 let unconfirmed_transactions = cfg_iter!(block.transactions())
724 .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok())
725 .collect::<HashMap<_, _>>();
726
727 for certificates in subdag.values().cloned() {
729 cfg_into_iter!(certificates.clone()).for_each(|certificate| {
730 self.storage.sync_certificate_with_block(block, certificate.clone(), &unconfirmed_transactions);
732 });
733
734 for certificate in certificates {
736 if let Some(bft_sender) = self.bft_sender.get() {
739 let (callback_tx, callback_rx) = oneshot::channel();
740 bft_sender
741 .tx_sync_bft
742 .send((certificate, callback_tx))
743 .await
744 .with_context(|| "Failed to sync certificate")?;
745 callback_rx.await?.with_context(|| "Failed to sync certificate")?;
746 }
747 }
748 }
749 Ok(())
750 }
751
752 fn is_block_availability_threshold_reached(&self, block: &PendingBlock<N>) -> Result<bool> {
757 let leader_certificate = match block.authority() {
759 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
760 _ => bail!("Received a block with an unexpected authority type."),
761 };
762 let commit_round = leader_certificate.round();
763 let certificate_round =
764 commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
765
766 let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
768 let certificates = self.storage.get_certificates_for_round(certificate_round);
770 let authors = certificates
773 .iter()
774 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
775 true => Some(c.author()),
776 false => None,
777 })
778 .collect();
779
780 if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
782 trace!(
783 "Block {hash} at height {height} has reached availability threshold",
784 hash = block.hash(),
785 height = block.height()
786 );
787 Ok(true)
788 } else {
789 Ok(false)
790 }
791 }
792
793 async fn sync_storage_with_block(&self, new_block: Block<N>) -> Result<()> {
808 let _lock = self.sync_lock.lock().await;
810 let new_block_height = new_block.height();
811
812 if self.ledger.contains_block_height(new_block.height()) {
815 debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
816 return Ok(());
817 }
818
819 let mut pending_blocks = self.pending_blocks.lock().await;
821
822 self.add_block_subdag_to_bft(&new_block).await?;
824
825 let ledger_block_height = self.ledger.latest_block_height();
827
828 while let Some(pending_block) = pending_blocks.front() {
831 if pending_block.height() > ledger_block_height {
832 break;
833 }
834
835 pending_blocks.pop_front();
836 }
837
838 if let Some(tail) = pending_blocks.back() {
839 if tail.height() >= new_block.height() {
840 debug!(
841 "A unconfirmed block is queued already for height {height}. \
842 Will not sync.",
843 height = new_block.height()
844 );
845 return Ok(());
846 }
847
848 ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block");
849 }
850
851 let ledger_block_height = self.ledger.latest_block_height();
853
854 while let Some(pending_block) = pending_blocks.front() {
857 if pending_block.height() > ledger_block_height {
858 break;
859 }
860
861 trace!(
862 "Pending block {hash} at height {height} became obsolete",
863 hash = pending_block.hash(),
864 height = pending_block.height()
865 );
866 pending_blocks.pop_front();
867 }
868
869 let new_block = match self.ledger.check_block_subdag(new_block, pending_blocks.make_contiguous()) {
871 Ok(new_block) => new_block,
872 Err(err) => {
873 if err.to_string().contains("already in the ledger") {
875 debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
876
877 return Ok(());
878 } else {
879 return Err(err.into());
880 }
881 }
882 };
883
884 trace!(
885 "Adding new pending block {hash} at height {height}",
886 hash = new_block.hash(),
887 height = new_block.height()
888 );
889 pending_blocks.push_back(new_block);
890
891 let mut commit_height = None;
898 for block in pending_blocks.iter().rev() {
899 if self
904 .is_block_availability_threshold_reached(block)
905 .with_context(|| "Availability threshold check failed")?
906 {
907 commit_height = Some(block.height());
908 break;
909 }
910 }
911
912 if let Some(commit_height) = commit_height {
913 let start_height = ledger_block_height + 1;
914 ensure!(commit_height >= start_height, "Invalid commit height");
915 let num_blocks = (commit_height - start_height + 1) as usize;
916
917 if num_blocks > 1 {
919 trace!(
920 "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.",
921 chain_length = pending_blocks.len(),
922 );
923 }
924
925 for pending_block in pending_blocks.drain(0..num_blocks) {
926 let hash = pending_block.hash();
927 let height = pending_block.height();
928 let ledger = self.ledger.clone();
929 let storage = self.storage.clone();
930
931 let leader_certificate: Option<BatchCertificate<N>> = spawn_blocking!({
932 let block = ledger.check_block_content(pending_block).with_context(|| {
933 format!("Failed to check contents of pending block {hash} at height {height}")
934 })?;
935
936 trace!("Adding pending block {hash} at height {height} to the ledger");
937 ledger.advance_to_next_block(&block)?;
938 storage.sync_height_with_block(block.height());
940 storage.sync_round_with_block(block.round());
942
943 if let Authority::Quorum(subdag) = block.authority() {
944 Ok(Some(subdag.leader_certificate().clone()))
945 } else {
946 Ok(None)
947 }
948 })?;
949
950 if let Some(leader_certificate) = leader_certificate
953 && let Some(bft_sender) = self.bft_sender.get()
954 {
955 let (callback_tx, callback_rx) = oneshot::channel();
956 bft_sender
957 .tx_sync_block_committed
958 .send((leader_certificate, callback_tx))
959 .await
960 .with_context(|| "Failed to mark leader certificate as committed")?;
961 callback_rx.await?.with_context(|| "Failed to mark leader certificate as committed")?;
962 }
963 }
964 } else {
965 trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len());
966 }
967
968 Ok(())
969 }
970}
971
972impl<N: Network> Sync<N> {
974 pub fn is_synced(&self) -> bool {
976 if self.gateway.number_of_connected_peers() == 0 {
979 return false;
980 }
981
982 self.block_sync.is_block_synced()
983 }
984
985 pub fn num_blocks_behind(&self) -> Option<u32> {
987 self.block_sync.num_blocks_behind()
988 }
989
990 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
992 self.block_sync.get_block_locators()
993 }
994}
995
996impl<N: Network> Sync<N> {
998 pub async fn send_certificate_request(
1000 &self,
1001 peer_ip: SocketAddr,
1002 certificate_id: Field<N>,
1003 ) -> Result<BatchCertificate<N>> {
1004 let (callback_sender, callback_receiver) = oneshot::channel();
1006 let num_sent_requests = self.pending.num_sent_requests(certificate_id);
1008 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
1010 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
1012 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
1015
1016 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
1018
1019 if should_send_request {
1021 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
1023 bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)")
1024 }
1025 } else {
1026 debug!(
1027 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
1028 fmt_id(certificate_id)
1029 );
1030 }
1031 tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver)
1034 .await
1035 .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))?
1036 .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id)))
1037 }
1038
1039 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
1041 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
1043 let self_ = self.clone();
1045 tokio::spawn(async move {
1046 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
1047 });
1048 }
1049 }
1050
1051 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
1054 let certificate = response.certificate;
1055 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
1057 if exists {
1059 self.pending.remove(certificate.id(), Some(certificate));
1062 }
1063 }
1064}
1065
1066impl<N: Network> Sync<N> {
1067 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1069 self.handles.lock().push(tokio::spawn(future));
1070 }
1071
1072 pub async fn shut_down(&self) {
1074 info!("Shutting down the sync module...");
1075 let _lock = self.response_lock.lock().await;
1077 let _lock = self.sync_lock.lock().await;
1079 self.handles.lock().iter().for_each(|handle| handle.abort());
1081 }
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086 use super::*;
1087
1088 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
1089
1090 use snarkos_account::Account;
1091 use snarkos_node_sync::BlockSync;
1092 use snarkos_utilities::{NodeDataDir, SimpleStoppable};
1093
1094 use snarkvm::{
1095 console::{
1096 account::{Address, PrivateKey},
1097 network::MainnetV0,
1098 },
1099 ledger::{
1100 narwhal::{BatchCertificate, BatchHeader, Subdag},
1101 store::{ConsensusStore, helpers::memory::ConsensusMemory},
1102 },
1103 prelude::{Ledger, VM},
1104 utilities::TestRng,
1105 };
1106
1107 use aleo_std::StorageMode;
1108 use indexmap::IndexSet;
1109 use rand::Rng;
1110 use std::collections::BTreeMap;
1111
1112 type CurrentNetwork = MainnetV0;
1113 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1114 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1115
1116 #[tokio::test]
1118 #[tracing_test::traced_test]
1119 async fn test_commit_chain() -> anyhow::Result<()> {
1120 let rng = &mut TestRng::default();
1121 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1123
1124 let first_round = 1;
1126 let num_blocks = 3;
1128 let num_rounds = first_round + num_blocks * 2 + 1;
1131 let first_committed_round = num_rounds - 1;
1134
1135 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1137 let account: Account<CurrentNetwork> = Account::new(rng)?;
1138
1139 let seed: u64 = rng.r#gen();
1141 let vm = VM::from(store).unwrap();
1142 let genesis_pk = *account.private_key();
1143 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1144
1145 let genesis_rng = &mut TestRng::from_seed(seed);
1147 let private_keys = [
1148 *account.private_key(),
1149 PrivateKey::new(genesis_rng)?,
1150 PrivateKey::new(genesis_rng)?,
1151 PrivateKey::new(genesis_rng)?,
1152 ];
1153
1154 let genesis_clone = genesis.clone();
1156 let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap();
1157 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1159
1160 let (round_to_certificates_map, committee) = {
1162 let addresses = vec![
1163 Address::try_from(private_keys[0])?,
1164 Address::try_from(private_keys[1])?,
1165 Address::try_from(private_keys[2])?,
1166 Address::try_from(private_keys[3])?,
1167 ];
1168
1169 let committee = ledger.latest_committee().unwrap();
1170
1171 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1173 HashMap::new();
1174 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1175
1176 for round in first_round..=first_committed_round {
1177 let mut current_certificates = IndexSet::new();
1178 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1179 IndexSet::new()
1180 } else {
1181 previous_certificates.iter().map(|c| c.id()).collect()
1182 };
1183
1184 let committee_id = committee.id();
1185 let prev_leader = committee.get_leader(round - 1).unwrap();
1186
1187 for (i, private_key) in private_keys.iter().enumerate() {
1191 let leader_index = addresses.iter().position(|&address| address == prev_leader).unwrap();
1192 let is_certificate_round = round % 2 == 1;
1193 let is_leader = i == leader_index;
1194
1195 let previous_certs = if round < first_committed_round && is_certificate_round && !is_leader {
1196 previous_certificate_ids
1197 .iter()
1198 .cloned()
1199 .enumerate()
1200 .filter(|(idx, _)| *idx != leader_index)
1201 .map(|(_, id)| id)
1202 .collect()
1203 } else {
1204 previous_certificate_ids.clone()
1205 };
1206
1207 let batch_header = BatchHeader::new(
1208 private_key,
1209 round,
1210 now(),
1211 committee_id,
1212 Default::default(),
1213 previous_certs,
1214 rng,
1215 )
1216 .unwrap();
1217
1218 let mut signatures = IndexSet::with_capacity(4);
1220 for (j, private_key_2) in private_keys.iter().enumerate() {
1221 if i != j {
1222 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1223 }
1224 }
1225 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1226 }
1227
1228 round_to_certificates_map.insert(round, current_certificates.clone());
1230 previous_certificates = current_certificates;
1231 }
1232 (round_to_certificates_map, committee)
1233 };
1234
1235 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1237 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1239 for i in first_round..=first_committed_round {
1240 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1241 certificates.extend(c);
1242 }
1243 for certificate in certificates.clone().iter() {
1244 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1245 }
1246
1247 let mut previous_leader_cert = None;
1249 let mut blocks = vec![];
1250
1251 for block_height in 1..=num_blocks {
1252 let leader_round = block_height * 2;
1253
1254 let leader = committee.get_leader(leader_round).unwrap();
1255 let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap();
1256
1257 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1258 let mut leader_cert_map = IndexSet::new();
1259 leader_cert_map.insert(leader_certificate.clone());
1260
1261 let previous_cert_map = storage.get_certificates_for_round(leader_round - 1);
1262
1263 subdag_map.insert(leader_round, leader_cert_map.clone());
1264 subdag_map.insert(leader_round - 1, previous_cert_map.clone());
1265
1266 if leader_round > 2 {
1267 let previous_commit_cert_map: IndexSet<_> = storage
1268 .get_certificates_for_round(leader_round - 2)
1269 .into_iter()
1270 .filter(|cert| {
1271 if let Some(previous_leader_cert) = &previous_leader_cert {
1272 cert != previous_leader_cert
1273 } else {
1274 true
1275 }
1276 })
1277 .collect();
1278 subdag_map.insert(leader_round - 2, previous_commit_cert_map);
1279 }
1280
1281 let subdag = Subdag::from(subdag_map.clone())?;
1282 previous_leader_cert = Some(leader_certificate);
1283
1284 let core_ledger = core_ledger.clone();
1285 let block = spawn_blocking!({
1286 let block = core_ledger.clone().prepare_advance_to_next_quorum_block(subdag, Default::default())?;
1287 core_ledger.advance_to_next_block(&block)?;
1288 Ok(block)
1289 })?;
1290
1291 blocks.push(block);
1292 }
1293
1294 let storage_mode = StorageMode::new_test(None);
1296
1297 let syncing_ledger = {
1300 let storage_mode = storage_mode.clone();
1301 Arc::new(CoreLedgerService::new(
1302 spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1303 SimpleStoppable::new(),
1304 ))
1305 };
1306
1307 let gateway = Gateway::new(
1309 account.clone(),
1310 storage.clone(),
1311 syncing_ledger.clone(),
1312 None,
1313 &[],
1314 false,
1315 NodeDataDir::new_test(None),
1316 None,
1317 )?;
1318 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1319 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1320
1321 let mut block_iter = blocks.into_iter();
1322
1323 for _ in 0..num_blocks - 1 {
1325 let block = block_iter.next().unwrap();
1326 sync.sync_storage_with_block(block).await?;
1327
1328 assert_eq!(syncing_ledger.latest_block_height(), 0);
1330 }
1331
1332 sync.sync_storage_with_block(block_iter.next().unwrap()).await?;
1335 assert_eq!(syncing_ledger.latest_block_height(), 3);
1336
1337 assert!(syncing_ledger.contains_block_height(1));
1339 assert!(syncing_ledger.contains_block_height(2));
1340
1341 Ok(())
1342 }
1343
1344 #[tokio::test]
1345 #[tracing_test::traced_test]
1346 async fn test_pending_certificates() -> anyhow::Result<()> {
1347 let rng = &mut TestRng::default();
1348 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1350 let commit_round = 2;
1351
1352 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1354 let account: Account<CurrentNetwork> = Account::new(rng)?;
1355
1356 let seed: u64 = rng.r#gen();
1358 let vm = VM::from(store).unwrap();
1359 let genesis_pk = *account.private_key();
1360 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1361
1362 let genesis_rng = &mut TestRng::from_seed(seed);
1364 let private_keys = [
1365 *account.private_key(),
1366 PrivateKey::new(genesis_rng)?,
1367 PrivateKey::new(genesis_rng)?,
1368 PrivateKey::new(genesis_rng)?,
1369 ];
1370 let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap();
1372 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1374 let (round_to_certificates_map, committee) = {
1376 let committee = core_ledger.current_committee().unwrap();
1378 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1380 HashMap::new();
1381 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1382
1383 for round in 0..=commit_round + 8 {
1384 let mut current_certificates = IndexSet::new();
1385 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1386 IndexSet::new()
1387 } else {
1388 previous_certificates.iter().map(|c| c.id()).collect()
1389 };
1390 let committee_id = committee.id();
1391 for (i, private_key_1) in private_keys.iter().enumerate() {
1393 let batch_header = BatchHeader::new(
1394 private_key_1,
1395 round,
1396 now(),
1397 committee_id,
1398 Default::default(),
1399 previous_certificate_ids.clone(),
1400 rng,
1401 )
1402 .unwrap();
1403 let mut signatures = IndexSet::with_capacity(4);
1405 for (j, private_key_2) in private_keys.iter().enumerate() {
1406 if i != j {
1407 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1408 }
1409 }
1410 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1411 }
1412
1413 round_to_certificates_map.insert(round, current_certificates.clone());
1415 previous_certificates = current_certificates.clone();
1416 }
1417 (round_to_certificates_map, committee)
1418 };
1419
1420 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1422 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1424 for i in 1..=commit_round + 8 {
1425 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1426 certificates.extend(c);
1427 }
1428 for certificate in certificates.clone().iter() {
1429 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1430 }
1431 let leader_round_1 = commit_round;
1433 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1434 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1435 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1436 let block_1 = {
1437 let mut leader_cert_map = IndexSet::new();
1438 leader_cert_map.insert(leader_certificate.clone());
1439 let mut previous_cert_map = IndexSet::new();
1440 for cert in storage.get_certificates_for_round(commit_round - 1) {
1441 previous_cert_map.insert(cert);
1442 }
1443 subdag_map.insert(commit_round, leader_cert_map.clone());
1444 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1445 let subdag = Subdag::from(subdag_map.clone())?;
1446 let ledger = core_ledger.clone();
1447 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag, Default::default()))?
1448 };
1449 let ledger = core_ledger.clone();
1451 let block = block_1.clone();
1452 spawn_blocking!(ledger.advance_to_next_block(&block))?;
1453
1454 let leader_round_2 = commit_round + 2;
1456 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1457 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1458 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1459 let block_2 = {
1460 let mut leader_cert_map_2 = IndexSet::new();
1461 leader_cert_map_2.insert(leader_certificate_2.clone());
1462 let mut previous_cert_map_2 = IndexSet::new();
1463 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1464 previous_cert_map_2.insert(cert);
1465 }
1466 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1467 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1468 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1469 let ledger = core_ledger.clone();
1470 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default()))?
1471 };
1472 let ledger = core_ledger.clone();
1474 let block = block_2.clone();
1475 spawn_blocking!(ledger.advance_to_next_block(&block))?;
1476
1477 let leader_round_3 = commit_round + 4;
1479 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1480 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1481 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1482 let block_3 = {
1483 let mut leader_cert_map_3 = IndexSet::new();
1484 leader_cert_map_3.insert(leader_certificate_3.clone());
1485 let mut previous_cert_map_3 = IndexSet::new();
1486 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1487 previous_cert_map_3.insert(cert);
1488 }
1489 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1490 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1491 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1492 let ledger = core_ledger.clone();
1493 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default()))?
1494 };
1495 let ledger = core_ledger.clone();
1497 let block = block_3.clone();
1498 spawn_blocking!(ledger.advance_to_next_block(&block))?;
1499
1500 let pending_certificates = storage.get_pending_certificates();
1506 for certificate in pending_certificates.clone() {
1508 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1509 }
1510 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1512 {
1513 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1514 for subdag in subdag_maps.iter() {
1515 for subdag_certificates in subdag.values() {
1516 committed_certificates.extend(subdag_certificates.iter().cloned());
1517 }
1518 }
1519 };
1520 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1522 for certificate in certificates.clone() {
1523 if !committed_certificates.contains(&certificate) {
1524 candidate_pending_certificates.insert(certificate);
1525 }
1526 }
1527 assert_eq!(pending_certificates, candidate_pending_certificates);
1529
1530 Ok(())
1531 }
1532}