1use crate::{
17 MAX_FETCH_TIMEOUT_IN_MS,
18 events::{CertificateRequest, CertificateResponse, DataBlocks, Event},
19 gateway::{Gateway, Transport},
20 helpers::{Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
21 ledger_service::LedgerService,
22 spawn_blocking,
23};
24use snarkos_node_network::PeerPoolHandling;
25use snarkos_node_sync::{
26 BLOCK_REQUEST_BATCH_DELAY,
27 BlockSync,
28 InsertBlockResponseError,
29 Ping,
30 PrepareSyncRequest,
31 locators::BlockLocators,
32};
33use snarkos_utilities::CallbackHandle;
34
35use snarkvm::{
36 console::{
37 network::{ConsensusVersion, Network},
38 types::Field,
39 },
40 ledger::{PendingBlock, authority::Authority, block::Block, narwhal::BatchCertificate},
41 utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error},
42};
43
44use anyhow::{Context, Result, anyhow, bail, ensure};
45use indexmap::IndexMap;
46#[cfg(feature = "locktick")]
47use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
48#[cfg(not(feature = "locktick"))]
49use parking_lot::Mutex;
50#[cfg(not(feature = "serial"))]
51use rayon::prelude::*;
52use std::{
53 collections::{HashMap, VecDeque},
54 future::Future,
55 net::SocketAddr,
56 sync::Arc,
57 time::Duration,
58};
59#[cfg(not(feature = "locktick"))]
60use tokio::sync::Mutex as TMutex;
61use tokio::{sync::oneshot, task::JoinHandle};
62
63#[async_trait::async_trait]
66pub trait SyncCallback<N: Network>: Send + std::marker::Sync {
67 async fn sync_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>);
68
69 async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()>;
71}
72
73#[derive(Clone)]
86pub struct Sync<N: Network> {
87 gateway: Gateway<N>,
89 storage: Storage<N>,
91 ledger: Arc<dyn LedgerService<N>>,
93 block_sync: Arc<BlockSync<N>>,
95 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
97 sync_callback: Arc<CallbackHandle<Arc<dyn SyncCallback<N>>>>,
99 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
101 response_lock: Arc<TMutex<()>>,
103 sync_lock: Arc<TMutex<()>>,
105 pending_blocks: Arc<TMutex<VecDeque<PendingBlock<N>>>>,
113}
114
115impl<N: Network> Sync<N> {
116 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
119
120 pub fn new(
122 gateway: Gateway<N>,
123 storage: Storage<N>,
124 ledger: Arc<dyn LedgerService<N>>,
125 block_sync: Arc<BlockSync<N>>,
126 ) -> Self {
127 Self {
129 gateway,
130 storage,
131 ledger,
132 block_sync,
133 pending: Default::default(),
134 sync_callback: Default::default(),
135 handles: Default::default(),
136 response_lock: Default::default(),
137 sync_lock: Default::default(),
138 pending_blocks: Default::default(),
139 }
140 }
141
142 pub async fn initialize(&self, sync_callback: Option<Arc<dyn SyncCallback<N>>>) -> Result<()> {
144 if let Some(callback) = sync_callback {
146 self.sync_callback.set(callback).with_context(|| "Failed to set sync callback")?;
147 }
148
149 info!("Syncing storage with the ledger...");
150
151 self.sync_storage_with_ledger_at_bootup()
153 .await
154 .with_context(|| "Syncing storage with the ledger at bootup failed")?;
155
156 debug!("Finished initial block synchronization at startup");
157 Ok(())
158 }
159
160 #[inline]
164 async fn send_block_requests(
165 &self,
166
167 block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
168 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
169 ) {
170 trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
171
172 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
174 if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
175 break;
177 }
178
179 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
181 }
182 }
183
184 pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
189 info!("Starting the sync module...");
190
191 let self_ = self.clone();
193 self.spawn(async move {
194 loop {
195 let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
197
198 self_.try_issuing_block_requests().await;
200
201 }
203 });
204
205 let self_ = self.clone();
207 let ping = ping.clone();
208 self.spawn(async move {
209 loop {
210 let _ =
212 tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
213
214 let ping = ping.clone();
215 let self_ = self_.clone();
216 let hdl = tokio::spawn(async move {
217 self_.try_advancing_block_synchronization(&ping).await;
218 });
219
220 if let Err(err) = hdl.await
221 && let Ok(panic) = err.try_into_panic()
222 {
223 error!("Sync block advancement panicked: {panic:?}");
224 }
225
226 }
229 });
230
231 let self_ = self.clone();
233 self.spawn(async move {
234 loop {
235 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
237
238 let self__ = self_.clone();
240 let _ = spawn_blocking!({
241 self__.pending.clear_expired_callbacks();
242 Ok(())
243 });
244 }
245 });
246
247 let SyncReceiver {
251 mut rx_block_sync_insert_block_response,
252 mut rx_block_sync_remove_peer,
253 mut rx_block_sync_update_peer_locators,
254 mut rx_certificate_request,
255 mut rx_certificate_response,
256 } = sync_receiver;
257
258 let self_ = self.clone();
265 self.spawn(async move {
266 while let Some((peer_ip, blocks, latest_consensus_version, callback)) =
267 rx_block_sync_insert_block_response.recv().await
268 {
269 let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await;
270 if let Err(err) = &result {
272 warn!("Failed to insert block response from '{peer_ip}' - {err}");
273 }
274
275 callback.send(result).ok();
276 }
277 });
278
279 let self_ = self.clone();
281 self.spawn(async move {
282 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
283 self_.remove_peer(peer_ip);
284 }
285 });
286
287 let self_ = self.clone();
294 self.spawn(async move {
295 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
296 let self_clone = self_.clone();
297 tokio::spawn(async move {
298 callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
299 });
300 }
301 });
302
303 let self_ = self.clone();
309 self.spawn(async move {
310 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
311 self_.send_certificate_response(peer_ip, certificate_request);
312 }
313 });
314
315 let self_ = self.clone();
321 self.spawn(async move {
322 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
323 self_.finish_certificate_request(peer_ip, certificate_response);
324 }
325 });
326
327 Ok(())
328 }
329
330 async fn try_issuing_block_requests(&self) {
335 self.block_sync.set_sync_height(self.ledger.latest_block_height());
338
339 match self.block_sync.handle_block_request_timeouts(&self.gateway) {
343 Ok(Some((requests, sync_peers))) => {
344 self.send_block_requests(requests, sync_peers).await;
346 return;
347 }
348 Ok(None) => {}
349 Err(err) => {
350 error!("{}", &flatten_error(err));
352 return;
353 }
354 }
355
356 if self.is_synced() {
358 return;
359 }
360
361 let (requests, sync_peers) = self.block_sync.prepare_block_requests();
364
365 if requests.is_empty() {
367 return;
368 }
369
370 self.send_block_requests(requests, sync_peers).await;
372 }
373
374 #[cfg(test)]
377 pub(crate) async fn testing_only_try_block_sync_testing_only(&self) {
378 self.try_issuing_block_requests().await;
380
381 self.try_advancing_block_synchronization(&None).await;
383 }
384}
385
386impl<N: Network> Sync<N> {
388 async fn insert_block_response(
390 &self,
391 peer_ip: SocketAddr,
392 blocks: Vec<Block<N>>,
393 latest_consensus_version: Option<ConsensusVersion>,
394 ) -> Result<(), InsertBlockResponseError<N>> {
395 self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version)
396
397 }
400
401 fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
403 self.block_sync.update_peer_locators(peer_ip, &locators)
404 }
405
406 fn remove_peer(&self, peer_ip: SocketAddr) {
408 self.block_sync.remove_peer(&peer_ip);
409 }
410
411 #[cfg(test)]
412 pub fn testing_only_update_peer_locators_testing_only(
413 &self,
414 peer_ip: SocketAddr,
415 locators: BlockLocators<N>,
416 ) -> Result<()> {
417 self.update_peer_locators(peer_ip, locators)
418 }
419}
420
421impl<N: Network> Sync<N> {
423 async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
427 let latest_block = self.ledger.latest_block();
429
430 let block_height = latest_block.height();
432 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
437 let gc_height = block_height.saturating_sub(max_gc_blocks);
441 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
443
444 let _lock = self.sync_lock.lock().await;
446
447 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
448
449 self.storage.sync_height_with_block(latest_block.height());
453 self.storage.sync_round_with_block(latest_block.round());
455 self.storage.garbage_collect_certificates(latest_block.round());
457 for block in &blocks {
459 if let Authority::Quorum(subdag) = block.authority() {
464 let unconfirmed_transactions = cfg_iter!(block.transactions())
466 .filter_map(|tx| {
467 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
468 })
469 .collect::<HashMap<_, _>>();
470
471 for certificates in subdag.values().cloned() {
473 cfg_into_iter!(certificates).for_each(|certificate| {
474 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
475 });
476 }
477
478 #[cfg(feature = "telemetry")]
480 self.gateway.validator_telemetry().insert_subdag(subdag);
481 }
482 }
483
484 let certificates = blocks
488 .iter()
489 .flat_map(|block| {
490 match block.authority() {
491 Authority::Beacon(_) => None,
493 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
495 }
496 })
497 .flatten()
498 .collect::<Vec<_>>();
499
500 if let Some(cb) = self.sync_callback.get() {
502 cb.sync_dag_at_bootup(certificates).await;
504 }
505
506 self.block_sync.set_sync_height(block_height);
507
508 Ok(())
509 }
510
511 async fn compute_sync_height(&self) -> u32 {
514 let ledger_height = self.ledger.latest_block_height();
515 let mut pending_blocks = self.pending_blocks.lock().await;
516
517 while let Some(b) = pending_blocks.front()
519 && b.height() <= ledger_height
520 {
521 pending_blocks.pop_front();
522 }
523
524 pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height)
526 }
527
528 async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
530 let new_blocks = match self
532 .try_advancing_block_synchronization_inner()
533 .await
534 .with_context(|| "Block synchronization failed")
535 {
536 Ok(new_blocks) => new_blocks,
537 Err(err) => {
538 error!("{}", &flatten_error(err));
539 false
540 }
541 };
542
543 if let Some(ping) = &ping
544 && new_blocks
545 {
546 match self.get_block_locators() {
547 Ok(locators) => ping.update_block_locators(locators),
548 Err(err) => error!("Failed to update block locators: {err}"),
549 }
550 }
551 }
552
553 async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
568 let _lock = self.response_lock.lock().await;
570
571 let ledger_height = self.ledger.latest_block_height();
574 self.block_sync.set_sync_height(ledger_height);
575
576 let tip = self
578 .block_sync
579 .find_sync_peers()
580 .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
581 .unwrap_or(0);
582
583 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
588
589 let cleanup = |start_height, current_height, error| {
591 let new_blocks = current_height > start_height;
592
593 if new_blocks {
595 self.block_sync.set_sync_height(current_height);
596 }
597
598 if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
599 };
600
601 let max_gc_height = tip.saturating_sub(max_gc_blocks);
605 let within_gc = (ledger_height + 1) > max_gc_height;
606
607 if within_gc {
608 let start_height = self.compute_sync_height().await;
611
612 let mut current_height = start_height;
614 trace!(
615 "Try advancing blocks responses with BFT (starting at block {current_height}, current sync speed is {})",
616 self.block_sync.get_sync_speed()
617 );
618
619 loop {
621 let next_height = current_height + 1;
622 let Some(block) = self.block_sync.peek_next_block(next_height) else {
623 break;
624 };
625 info!("Syncing the BFT to block {}...", block.height());
626 match self.sync_storage_with_block(block).await {
628 Ok(_) => {
629 current_height = next_height;
631 }
632 Err(err) => {
633 self.block_sync.remove_block_response(next_height);
635 return cleanup(start_height, current_height, Some(err));
636 }
637 }
638 }
639
640 cleanup(start_height, current_height, None)
641 } else {
642 let start_height = ledger_height;
645 let mut current_height = start_height;
646
647 trace!("Try advancing block responses without BFT (starting at block {current_height})");
648
649 loop {
652 let next_height = current_height + 1;
653
654 let Some(block) = self.block_sync.peek_next_block(next_height) else {
655 break;
656 };
657 info!("Syncing the ledger to block {}...", block.height());
658
659 match self.sync_ledger_with_block_without_bft(block).await {
661 Ok(_) => {
662 current_height = next_height;
664 self.block_sync.count_request_completed();
665 }
666 Err(err) => {
667 self.block_sync.remove_block_response(next_height);
669 return cleanup(start_height, current_height, Some(err));
670 }
671 }
672 }
673
674 let within_gc = (current_height + 1) > max_gc_height;
676 if within_gc {
677 info!("Finished catching up with the network. Switching back to BFT sync.");
678 self.sync_storage_with_ledger_at_bootup()
679 .await
680 .with_context(|| "BFT sync (with bootup routine) failed")?;
681 }
682
683 cleanup(start_height, current_height, None)
684 }
685 }
686
687 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
691 let _lock = self.sync_lock.lock().await;
693
694 let self_ = self.clone();
695 spawn_blocking!({
696 self_.ledger.check_next_block(&block)?;
698 self_.ledger.advance_to_next_block(&block)?;
700
701 self_.storage.sync_height_with_block(block.height());
703 self_.storage.sync_round_with_block(block.round());
705 self_.block_sync.remove_block_response(block.height());
707
708 Ok(())
709 })
710 }
711
712 async fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> {
718 let Authority::Quorum(subdag) = block.authority() else {
720 return Ok(());
721 };
722
723 let unconfirmed_transactions = cfg_iter!(block.transactions())
725 .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok())
726 .collect::<HashMap<_, _>>();
727
728 for certificates in subdag.values().cloned() {
730 cfg_into_iter!(certificates.clone()).for_each(|certificate| {
731 self.storage.sync_certificate_with_block(block, certificate.clone(), &unconfirmed_transactions);
733 });
734
735 if let Some(cb) = self.sync_callback.get() {
737 for certificate in certificates {
738 cb.add_new_certificate(certificate).await.with_context(|| "Failed to sync certificate")?;
741 }
742 }
743 }
744
745 Ok(())
746 }
747
748 fn is_block_availability_threshold_reached(&self, block: &PendingBlock<N>) -> Result<bool> {
753 let leader_certificate = match block.authority() {
755 Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
756 _ => bail!("Received a block with an unexpected authority type."),
757 };
758 let commit_round = leader_certificate.round();
759 let certificate_round =
760 commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
761
762 let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
764 let certificates = self.storage.get_certificates_for_round(certificate_round);
766 let authors = certificates
769 .iter()
770 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
771 true => Some(c.author()),
772 false => None,
773 })
774 .collect();
775
776 if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
778 trace!(
779 "Block {hash} at height {height} has reached availability threshold",
780 hash = block.hash(),
781 height = block.height()
782 );
783 Ok(true)
784 } else {
785 Ok(false)
786 }
787 }
788
789 async fn sync_storage_with_block(&self, new_block: Block<N>) -> Result<()> {
804 let _lock = self.sync_lock.lock().await;
806 let new_block_height = new_block.height();
807
808 if self.ledger.contains_block_height(new_block.height()) {
811 debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
812 return Ok(());
813 }
814
815 let mut pending_blocks = self.pending_blocks.lock().await;
817
818 self.add_block_subdag_to_bft(&new_block).await?;
820
821 let ledger_block_height = self.ledger.latest_block_height();
823
824 while let Some(pending_block) = pending_blocks.front() {
827 if pending_block.height() > ledger_block_height {
828 break;
829 }
830
831 pending_blocks.pop_front();
832 }
833
834 if let Some(tail) = pending_blocks.back() {
835 if tail.height() >= new_block.height() {
836 debug!(
837 "A unconfirmed block is queued already for height {height}. \
838 Will not sync.",
839 height = new_block.height()
840 );
841 return Ok(());
842 }
843
844 ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block");
845 }
846
847 let ledger_block_height = self.ledger.latest_block_height();
849
850 while let Some(pending_block) = pending_blocks.front() {
853 if pending_block.height() > ledger_block_height {
854 break;
855 }
856
857 trace!(
858 "Pending block {hash} at height {height} became obsolete",
859 hash = pending_block.hash(),
860 height = pending_block.height()
861 );
862 pending_blocks.pop_front();
863 }
864
865 let new_block = match self.ledger.check_block_subdag(new_block, pending_blocks.make_contiguous()) {
867 Ok(new_block) => new_block,
868 Err(err) => {
869 if err.to_string().contains("already in the ledger") {
871 debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
872
873 return Ok(());
874 } else {
875 return Err(err.into());
876 }
877 }
878 };
879
880 trace!(
881 "Adding new pending block {hash} at height {height}",
882 hash = new_block.hash(),
883 height = new_block.height()
884 );
885 pending_blocks.push_back(new_block);
886
887 let mut commit_height = None;
894 for block in pending_blocks.iter().rev() {
895 if self
900 .is_block_availability_threshold_reached(block)
901 .with_context(|| "Availability threshold check failed")?
902 {
903 commit_height = Some(block.height());
904 break;
905 }
906 }
907
908 if let Some(commit_height) = commit_height {
909 let start_height = ledger_block_height + 1;
910 ensure!(commit_height >= start_height, "Invalid commit height");
911 let num_blocks = (commit_height - start_height + 1) as usize;
912
913 if num_blocks > 1 {
915 trace!(
916 "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.",
917 chain_length = pending_blocks.len(),
918 );
919 }
920
921 for pending_block in pending_blocks.drain(0..num_blocks) {
922 let hash = pending_block.hash();
923 let height = pending_block.height();
924 let ledger = self.ledger.clone();
925 let storage = self.storage.clone();
926
927 spawn_blocking!({
928 let block = ledger.check_block_content(pending_block).with_context(|| {
929 format!("Failed to check contents of pending block {hash} at height {height}")
930 })?;
931
932 trace!("Adding pending block {hash} at height {height} to the ledger");
933 ledger.advance_to_next_block(&block)?;
934 storage.sync_height_with_block(block.height());
936 storage.sync_round_with_block(block.round());
938
939 Ok(())
940 })?
941 }
942 } else {
943 trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len());
944 }
945
946 Ok(())
947 }
948}
949
950impl<N: Network> Sync<N> {
952 pub fn is_synced(&self) -> bool {
954 if self.gateway.number_of_connected_peers() == 0 {
957 return false;
958 }
959
960 self.block_sync.is_block_synced()
961 }
962
963 pub fn num_blocks_behind(&self) -> Option<u32> {
965 self.block_sync.num_blocks_behind()
966 }
967
968 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
970 self.block_sync.get_block_locators()
971 }
972}
973
974impl<N: Network> Sync<N> {
976 pub async fn send_certificate_request(
978 &self,
979 peer_ip: SocketAddr,
980 certificate_id: Field<N>,
981 ) -> Result<BatchCertificate<N>> {
982 let (callback_sender, callback_receiver) = oneshot::channel();
984 let num_sent_requests = self.pending.num_sent_requests(certificate_id);
986 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
988 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
990 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
993
994 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
996
997 if should_send_request {
999 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
1001 bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)")
1002 }
1003 } else {
1004 debug!(
1005 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
1006 fmt_id(certificate_id)
1007 );
1008 }
1009 tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver)
1012 .await
1013 .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))?
1014 .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id)))
1015 }
1016
1017 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
1019 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
1021 let self_ = self.clone();
1023 tokio::spawn(async move {
1024 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
1025 });
1026 }
1027 }
1028
1029 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
1032 let certificate = response.certificate;
1033 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
1035 if exists {
1037 self.pending.remove(certificate.id(), Some(certificate));
1040 }
1041 }
1042}
1043
1044impl<N: Network> Sync<N> {
1045 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1047 self.handles.lock().push(tokio::spawn(future));
1048 }
1049
1050 pub async fn shut_down(&self) {
1052 info!("Shutting down the sync module...");
1053 self.sync_callback.clear();
1055 let _lock = self.response_lock.lock().await;
1057 let _lock = self.sync_lock.lock().await;
1059 self.handles.lock().iter().for_each(|handle| handle.abort());
1061 }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use super::*;
1067
1068 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
1069
1070 use snarkos_account::Account;
1071 use snarkos_node_sync::BlockSync;
1072 use snarkos_utilities::{NodeDataDir, SimpleStoppable};
1073
1074 use snarkvm::{
1075 console::{
1076 account::{Address, PrivateKey},
1077 network::MainnetV0,
1078 },
1079 ledger::{
1080 narwhal::{BatchCertificate, BatchHeader, Subdag},
1081 store::{ConsensusStore, helpers::memory::ConsensusMemory},
1082 },
1083 prelude::{Ledger, VM},
1084 utilities::TestRng,
1085 };
1086
1087 use aleo_std::StorageMode;
1088 use indexmap::IndexSet;
1089 use rand::Rng;
1090 use std::collections::BTreeMap;
1091
1092 type CurrentNetwork = MainnetV0;
1093 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1094 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1095
1096 #[tokio::test]
1098 #[tracing_test::traced_test]
1099 async fn test_commit_chain() -> anyhow::Result<()> {
1100 let rng = &mut TestRng::default();
1101 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1103
1104 let first_round = 1;
1106 let num_blocks = 3;
1108 let num_rounds = first_round + num_blocks * 2 + 1;
1111 let first_committed_round = num_rounds - 1;
1114
1115 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1117 let account: Account<CurrentNetwork> = Account::new(rng)?;
1118
1119 let seed: u64 = rng.r#gen();
1121 let vm = VM::from(store).unwrap();
1122 let genesis_pk = *account.private_key();
1123 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1124
1125 let genesis_rng = &mut TestRng::from_seed(seed);
1127 let private_keys = [
1128 *account.private_key(),
1129 PrivateKey::new(genesis_rng)?,
1130 PrivateKey::new(genesis_rng)?,
1131 PrivateKey::new(genesis_rng)?,
1132 ];
1133
1134 let genesis_clone = genesis.clone();
1136 let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap();
1137 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1139
1140 let (round_to_certificates_map, committee) = {
1142 let addresses = [
1143 Address::try_from(private_keys[0])?,
1144 Address::try_from(private_keys[1])?,
1145 Address::try_from(private_keys[2])?,
1146 Address::try_from(private_keys[3])?,
1147 ];
1148
1149 let committee = ledger.latest_committee().unwrap();
1150
1151 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1153 HashMap::new();
1154 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1155
1156 for round in first_round..=first_committed_round {
1157 let mut current_certificates = IndexSet::new();
1158 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1159 IndexSet::new()
1160 } else {
1161 previous_certificates.iter().map(|c| c.id()).collect()
1162 };
1163
1164 let committee_id = committee.id();
1165 let prev_leader = committee.get_leader(round - 1).unwrap();
1166
1167 for (i, private_key) in private_keys.iter().enumerate() {
1171 let leader_index = addresses.iter().position(|&address| address == prev_leader).unwrap();
1172 let is_certificate_round = round % 2 == 1;
1173 let is_leader = i == leader_index;
1174
1175 let previous_certs = if round < first_committed_round && is_certificate_round && !is_leader {
1176 previous_certificate_ids
1177 .iter()
1178 .cloned()
1179 .enumerate()
1180 .filter(|(idx, _)| *idx != leader_index)
1181 .map(|(_, id)| id)
1182 .collect()
1183 } else {
1184 previous_certificate_ids.clone()
1185 };
1186
1187 let batch_header = BatchHeader::new(
1188 private_key,
1189 round,
1190 now(),
1191 committee_id,
1192 Default::default(),
1193 previous_certs,
1194 rng,
1195 )
1196 .unwrap();
1197
1198 let mut signatures = IndexSet::with_capacity(4);
1200 for (j, private_key_2) in private_keys.iter().enumerate() {
1201 if i != j {
1202 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1203 }
1204 }
1205 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1206 }
1207
1208 round_to_certificates_map.insert(round, current_certificates.clone());
1210 previous_certificates = current_certificates;
1211 }
1212 (round_to_certificates_map, committee)
1213 };
1214
1215 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1217 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1219 for i in first_round..=first_committed_round {
1220 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1221 certificates.extend(c);
1222 }
1223 for certificate in certificates.clone().iter() {
1224 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1225 }
1226
1227 let mut previous_leader_cert = None;
1229 let mut blocks = vec![];
1230
1231 for block_height in 1..=num_blocks {
1232 let leader_round = block_height * 2;
1233
1234 let leader = committee.get_leader(leader_round).unwrap();
1235 let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap();
1236
1237 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1238 let mut leader_cert_map = IndexSet::new();
1239 leader_cert_map.insert(leader_certificate.clone());
1240
1241 let previous_cert_map = storage.get_certificates_for_round(leader_round - 1);
1242
1243 subdag_map.insert(leader_round, leader_cert_map.clone());
1244 subdag_map.insert(leader_round - 1, previous_cert_map.clone());
1245
1246 if leader_round > 2 {
1247 let previous_commit_cert_map: IndexSet<_> = storage
1248 .get_certificates_for_round(leader_round - 2)
1249 .into_iter()
1250 .filter(|cert| {
1251 if let Some(previous_leader_cert) = &previous_leader_cert {
1252 cert != previous_leader_cert
1253 } else {
1254 true
1255 }
1256 })
1257 .collect();
1258 subdag_map.insert(leader_round - 2, previous_commit_cert_map);
1259 }
1260
1261 let subdag = Subdag::from(subdag_map.clone())?;
1262 previous_leader_cert = Some(leader_certificate);
1263
1264 let core_ledger = core_ledger.clone();
1265 let block = spawn_blocking!({
1266 let block = core_ledger.clone().prepare_advance_to_next_quorum_block(subdag, Default::default())?;
1267 core_ledger.advance_to_next_block(&block)?;
1268 Ok(block)
1269 })?;
1270
1271 blocks.push(block);
1272 }
1273
1274 let storage_mode = StorageMode::new_test(None);
1276
1277 let syncing_ledger = {
1280 let storage_mode = storage_mode.clone();
1281 Arc::new(CoreLedgerService::new(
1282 spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1283 SimpleStoppable::new(),
1284 ))
1285 };
1286
1287 let gateway = Gateway::new(
1289 account.clone(),
1290 storage.clone(),
1291 syncing_ledger.clone(),
1292 None,
1293 &[],
1294 false,
1295 NodeDataDir::new_test(None),
1296 None,
1297 )?;
1298 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1299 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1300
1301 let mut block_iter = blocks.into_iter();
1302
1303 for _ in 0..num_blocks - 1 {
1305 let block = block_iter.next().unwrap();
1306 sync.sync_storage_with_block(block).await?;
1307
1308 assert_eq!(syncing_ledger.latest_block_height(), 0);
1310 }
1311
1312 sync.sync_storage_with_block(block_iter.next().unwrap()).await?;
1315 assert_eq!(syncing_ledger.latest_block_height(), 3);
1316
1317 assert!(syncing_ledger.contains_block_height(1));
1319 assert!(syncing_ledger.contains_block_height(2));
1320
1321 Ok(())
1322 }
1323
1324 #[tokio::test]
1325 #[tracing_test::traced_test]
1326 async fn test_pending_certificates() -> anyhow::Result<()> {
1327 let rng = &mut TestRng::default();
1328 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1330 let commit_round = 2;
1331
1332 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1334 let account: Account<CurrentNetwork> = Account::new(rng)?;
1335
1336 let seed: u64 = rng.r#gen();
1338 let vm = VM::from(store).unwrap();
1339 let genesis_pk = *account.private_key();
1340 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1341
1342 let genesis_rng = &mut TestRng::from_seed(seed);
1344 let private_keys = [
1345 *account.private_key(),
1346 PrivateKey::new(genesis_rng)?,
1347 PrivateKey::new(genesis_rng)?,
1348 PrivateKey::new(genesis_rng)?,
1349 ];
1350 let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap();
1352 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1354 let (round_to_certificates_map, committee) = {
1356 let committee = core_ledger.current_committee().unwrap();
1358 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1360 HashMap::new();
1361 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1362
1363 for round in 0..=commit_round + 8 {
1364 let mut current_certificates = IndexSet::new();
1365 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1366 IndexSet::new()
1367 } else {
1368 previous_certificates.iter().map(|c| c.id()).collect()
1369 };
1370 let committee_id = committee.id();
1371 for (i, private_key_1) in private_keys.iter().enumerate() {
1373 let batch_header = BatchHeader::new(
1374 private_key_1,
1375 round,
1376 now(),
1377 committee_id,
1378 Default::default(),
1379 previous_certificate_ids.clone(),
1380 rng,
1381 )
1382 .unwrap();
1383 let mut signatures = IndexSet::with_capacity(4);
1385 for (j, private_key_2) in private_keys.iter().enumerate() {
1386 if i != j {
1387 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1388 }
1389 }
1390 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1391 }
1392
1393 round_to_certificates_map.insert(round, current_certificates.clone());
1395 previous_certificates = current_certificates.clone();
1396 }
1397 (round_to_certificates_map, committee)
1398 };
1399
1400 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1402 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1404 for i in 1..=commit_round + 8 {
1405 let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1406 certificates.extend(c);
1407 }
1408 for certificate in certificates.clone().iter() {
1409 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1410 }
1411 let leader_round_1 = commit_round;
1413 let leader_1 = committee.get_leader(leader_round_1).unwrap();
1414 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1415 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1416 let block_1 = {
1417 let mut leader_cert_map = IndexSet::new();
1418 leader_cert_map.insert(leader_certificate.clone());
1419 let mut previous_cert_map = IndexSet::new();
1420 for cert in storage.get_certificates_for_round(commit_round - 1) {
1421 previous_cert_map.insert(cert);
1422 }
1423 subdag_map.insert(commit_round, leader_cert_map.clone());
1424 subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1425 let subdag = Subdag::from(subdag_map.clone())?;
1426 let ledger = core_ledger.clone();
1427 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag, Default::default()))?
1428 };
1429 let ledger = core_ledger.clone();
1431 let block = block_1.clone();
1432 spawn_blocking!(ledger.advance_to_next_block(&block))?;
1433
1434 let leader_round_2 = commit_round + 2;
1436 let leader_2 = committee.get_leader(leader_round_2).unwrap();
1437 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1438 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1439 let block_2 = {
1440 let mut leader_cert_map_2 = IndexSet::new();
1441 leader_cert_map_2.insert(leader_certificate_2.clone());
1442 let mut previous_cert_map_2 = IndexSet::new();
1443 for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1444 previous_cert_map_2.insert(cert);
1445 }
1446 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1447 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1448 let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1449 let ledger = core_ledger.clone();
1450 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default()))?
1451 };
1452 let ledger = core_ledger.clone();
1454 let block = block_2.clone();
1455 spawn_blocking!(ledger.advance_to_next_block(&block))?;
1456
1457 let leader_round_3 = commit_round + 4;
1459 let leader_3 = committee.get_leader(leader_round_3).unwrap();
1460 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1461 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1462 let block_3 = {
1463 let mut leader_cert_map_3 = IndexSet::new();
1464 leader_cert_map_3.insert(leader_certificate_3.clone());
1465 let mut previous_cert_map_3 = IndexSet::new();
1466 for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1467 previous_cert_map_3.insert(cert);
1468 }
1469 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1470 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1471 let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1472 let ledger = core_ledger.clone();
1473 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default()))?
1474 };
1475 let ledger = core_ledger.clone();
1477 let block = block_3.clone();
1478 spawn_blocking!(ledger.advance_to_next_block(&block))?;
1479
1480 let pending_certificates = storage.get_pending_certificates();
1486 for certificate in pending_certificates.clone() {
1488 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1489 }
1490 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1492 {
1493 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1494 for subdag in subdag_maps.iter() {
1495 for subdag_certificates in subdag.values() {
1496 committed_certificates.extend(subdag_certificates.iter().cloned());
1497 }
1498 }
1499 };
1500 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1502 for certificate in certificates.clone() {
1503 if !committed_certificates.contains(&certificate) {
1504 candidate_pending_certificates.insert(certificate);
1505 }
1506 }
1507 assert_eq!(pending_certificates, candidate_pending_certificates);
1509
1510 Ok(())
1511 }
1512}