Skip to main content

snarkos_node_bft/sync/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_FETCH_TIMEOUT_IN_MS,
19    Transport,
20    events::DataBlocks,
21    helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22    spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_network::PeerPoolHandling;
27use snarkos_node_sync::{
28    BLOCK_REQUEST_BATCH_DELAY,
29    BlockSync,
30    InsertBlockResponseError,
31    Ping,
32    PrepareSyncRequest,
33    locators::BlockLocators,
34};
35
36use snarkvm::{
37    console::{
38        network::{ConsensusVersion, Network},
39        types::Field,
40    },
41    ledger::{PendingBlock, authority::Authority, block::Block, narwhal::BatchCertificate},
42    utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error},
43};
44
45use anyhow::{Context, Result, anyhow, bail, ensure};
46use indexmap::IndexMap;
47#[cfg(feature = "locktick")]
48use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
49#[cfg(not(feature = "locktick"))]
50use parking_lot::Mutex;
51#[cfg(not(feature = "serial"))]
52use rayon::prelude::*;
53use std::{
54    collections::{HashMap, VecDeque},
55    future::Future,
56    net::SocketAddr,
57    sync::Arc,
58    time::Duration,
59};
60#[cfg(not(feature = "locktick"))]
61use tokio::sync::Mutex as TMutex;
62use tokio::{
63    sync::{OnceCell, oneshot},
64    task::JoinHandle,
65};
66
67/// Block synchronization logic for validators.
68///
69/// Synchronization works differently for nodes that act as validators in AleoBFT;
70/// In the common case, validators generate blocks after receiving an anchor block that has been accepted
71/// by a supermajority of the committee instead of fetching entire blocks from other nodes.
72/// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes.
73///
74/// This struct also manages fetching certificates from other validators during normal operation,
75/// and blocks when falling behind.
76///
77/// Finally, `Sync` handles synchronization of blocks with the validator's local storage:
78/// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them.
79#[derive(Clone)]
80pub struct Sync<N: Network> {
81    /// The gateway enables communication with other validators.
82    gateway: Gateway<N>,
83    /// The storage.
84    storage: Storage<N>,
85    /// The ledger service.
86    ledger: Arc<dyn LedgerService<N>>,
87    /// The block synchronization logic.
88    block_sync: Arc<BlockSync<N>>,
89    /// The pending certificates queue.
90    pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
91    /// The BFT sender.
92    bft_sender: Arc<OnceCell<BFTSender<N>>>,
93    /// Handles to the spawned background tasks.
94    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
95    /// The response lock.
96    response_lock: Arc<TMutex<()>>,
97    /// The sync lock. Ensures that only one task syncs the ledger at a time.
98    sync_lock: Arc<TMutex<()>>,
99    /// The latest block responses.
100    ///
101    /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks
102    /// whose addition to the ledger is deferred until certain checks pass.
103    /// Blocks need to be processed in order, hence a BTree map.
104    ///
105    /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called.
106    pending_blocks: Arc<TMutex<VecDeque<PendingBlock<N>>>>,
107}
108
109impl<N: Network> Sync<N> {
110    /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
111    /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
112    const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
113
114    /// Initializes a new sync instance.
115    pub fn new(
116        gateway: Gateway<N>,
117        storage: Storage<N>,
118        ledger: Arc<dyn LedgerService<N>>,
119        block_sync: Arc<BlockSync<N>>,
120    ) -> Self {
121        // Return the sync instance.
122        Self {
123            gateway,
124            storage,
125            ledger,
126            block_sync,
127            pending: Default::default(),
128            bft_sender: Default::default(),
129            handles: Default::default(),
130            response_lock: Default::default(),
131            sync_lock: Default::default(),
132            pending_blocks: Default::default(),
133        }
134    }
135
136    /// Initializes the sync module and sync the storage with the ledger at bootup.
137    pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
138        // If a BFT sender was provided, set it.
139        if let Some(bft_sender) = bft_sender {
140            self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
141        }
142
143        info!("Syncing storage with the ledger...");
144
145        // Sync the storage with the ledger.
146        self.sync_storage_with_ledger_at_bootup()
147            .await
148            .with_context(|| "Syncing storage with the ledger at bootup failed")?;
149
150        debug!("Finished initial block synchronization at startup");
151        Ok(())
152    }
153
154    /// Sends the given batch of block requests to peers.
155    ///
156    /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`.
157    #[inline]
158    async fn send_block_requests(
159        &self,
160
161        block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
162        sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
163    ) {
164        trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
165
166        // Sends the block requests to the sync peers.
167        for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
168            if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
169                // Stop if we fail to process a batch of requests.
170                break;
171            }
172
173            // Sleep to avoid triggering spam detection.
174            tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
175        }
176    }
177
178    /// Starts the sync module.
179    ///
180    /// When this function returns successfully, the sync module will have spawned background tasks
181    /// that fetch blocks from other validators.
182    pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
183        info!("Starting the sync module...");
184
185        // Start the block request generation loop (outgoing).
186        let self_ = self.clone();
187        self.spawn(async move {
188            loop {
189                // Wait for peer updates or timeout
190                let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
191
192                // Issue block requests to peers.
193                self_.try_issuing_block_requests().await;
194
195                // Rate limiting happens in [`Self::send_block_requests`] and no additional sleeps are needed here.
196            }
197        });
198
199        // Start the block response processing loop (incoming).
200        let self_ = self.clone();
201        let ping = ping.clone();
202        self.spawn(async move {
203            loop {
204                // Wait until there is something to do or until the timeout.
205                let _ =
206                    tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
207
208                let ping = ping.clone();
209                let self_ = self_.clone();
210                let hdl = tokio::spawn(async move {
211                    self_.try_advancing_block_synchronization(&ping).await;
212                });
213
214                if let Err(err) = hdl.await
215                    && let Ok(panic) = err.try_into_panic()
216                {
217                    error!("Sync block advancement panicked: {panic:?}");
218                }
219
220                // We perform no additional rate limiting here as
221                // requests are already rate-limited.
222            }
223        });
224
225        // Start the pending queue expiration loop.
226        let self_ = self.clone();
227        self.spawn(async move {
228            loop {
229                // Sleep briefly.
230                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
231
232                // Remove the expired pending transmission requests.
233                let self__ = self_.clone();
234                let _ = spawn_blocking!({
235                    self__.pending.clear_expired_callbacks();
236                    Ok(())
237                });
238            }
239        });
240
241        /* Set up callbacks for events from the Gateway */
242
243        // Retrieve the sync receiver.
244        let SyncReceiver {
245            mut rx_block_sync_insert_block_response,
246            mut rx_block_sync_remove_peer,
247            mut rx_block_sync_update_peer_locators,
248            mut rx_certificate_request,
249            mut rx_certificate_response,
250        } = sync_receiver;
251
252        // Process the block sync request to advance with sync blocks.
253        // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
254        // which is initially handled by [`Gateway::inbound()`],
255        // which calls [`SyncSender::advance_with_sync_blocks()`],
256        // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
257        // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
258        let self_ = self.clone();
259        self.spawn(async move {
260            while let Some((peer_ip, blocks, latest_consensus_version, callback)) =
261                rx_block_sync_insert_block_response.recv().await
262            {
263                let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await;
264                //TODO remove this once channels are gone
265                if let Err(err) = &result {
266                    warn!("Failed to insert block response from '{peer_ip}' - {err}");
267                }
268
269                callback.send(result).ok();
270            }
271        });
272
273        // Process the block sync request to remove the peer.
274        let self_ = self.clone();
275        self.spawn(async move {
276            while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
277                self_.remove_peer(peer_ip);
278            }
279        });
280
281        // Process each block sync request to update peer locators.
282        // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
283        // which is initially handled by [`Gateway::inbound()`],
284        // which calls [`SyncSender::update_peer_locators()`],
285        // which calls [`tx_block_sync_update_peer_locators.send()`],
286        // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
287        let self_ = self.clone();
288        self.spawn(async move {
289            while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
290                let self_clone = self_.clone();
291                tokio::spawn(async move {
292                    callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
293                });
294            }
295        });
296
297        // Process each certificate request.
298        // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
299        // which is initially handled by [`Gateway::inbound()`],
300        // which calls [`tx_certificate_request.send()`],
301        // which causes the `rx_certificate_request.recv()` call below to return.
302        let self_ = self.clone();
303        self.spawn(async move {
304            while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
305                self_.send_certificate_response(peer_ip, certificate_request);
306            }
307        });
308
309        // Process each certificate response.
310        // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
311        // which is initially handled by [`Gateway::inbound()`],
312        // which calls [`tx_certificate_response.send()`],
313        // which causes the `rx_certificate_response.recv()` call below to return.
314        let self_ = self.clone();
315        self.spawn(async move {
316            while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
317                self_.finish_certificate_request(peer_ip, certificate_response);
318            }
319        });
320
321        Ok(())
322    }
323
324    /// BFT-specific version of `Client::try_issuing_block_requests()`.
325    ///
326    /// This method handles timeout removal, checks if block sync is possible,
327    /// and issues block requests to peers.
328    async fn try_issuing_block_requests(&self) {
329        // Update the sync height to the latest ledger height.
330        // (if the ledger height is lower or equal to the current sync height, this is a noop)
331        self.block_sync.set_sync_height(self.ledger.latest_block_height());
332
333        // Check if any existing requests can be removed.
334        // We should do this even if we cannot block sync, to ensure
335        // there are no dangling block requests.
336        match self.block_sync.handle_block_request_timeouts(&self.gateway) {
337            Ok(Some((requests, sync_peers))) => {
338                // Re-request blocks instead of performing regular block sync.
339                self.send_block_requests(requests, sync_peers).await;
340                return;
341            }
342            Ok(None) => {}
343            Err(err) => {
344                // Abort and retry later.
345                error!("{}", &flatten_error(err));
346                return;
347            }
348        }
349
350        // Do not attempt to sync if there are no blocks to sync.
351        // This prevents redundant log messages and performing unnecessary computation.
352        if !self.block_sync.can_block_sync() {
353            return;
354        }
355
356        // Prepare the block requests, if any.
357        // In the process, we update the state of `is_block_synced` for the sync module.
358        let (requests, sync_peers) = self.block_sync.prepare_block_requests();
359
360        // If there are no block requests, return early.
361        if requests.is_empty() {
362            return;
363        }
364
365        // Send the block requests to peers.
366        self.send_block_requests(requests, sync_peers).await;
367    }
368
369    /// Test-only method to manually trigger block synchronization.
370    /// This combines both request generation and response processing for testing purposes.
371    #[cfg(test)]
372    pub(crate) async fn testing_only_try_block_sync_testing_only(&self) {
373        // First try issuing block requests
374        self.try_issuing_block_requests().await;
375
376        // Then try advancing with any available responses
377        self.try_advancing_block_synchronization(&None).await;
378    }
379}
380
381// Callbacks used when receiving messages from the Gateway
382impl<N: Network> Sync<N> {
383    /// We received a block response and can (possibly) advance synchronization.
384    async fn insert_block_response(
385        &self,
386        peer_ip: SocketAddr,
387        blocks: Vec<Block<N>>,
388        latest_consensus_version: Option<ConsensusVersion>,
389    ) -> Result<(), InsertBlockResponseError> {
390        self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version)
391
392        // No need to advance block sync here, as the new response will
393        // notify the incoming task.
394    }
395
396    /// We received new peer locators during a Ping.
397    fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
398        self.block_sync.update_peer_locators(peer_ip, &locators)
399    }
400
401    /// A peer disconnected.
402    fn remove_peer(&self, peer_ip: SocketAddr) {
403        self.block_sync.remove_peer(&peer_ip);
404    }
405
406    #[cfg(test)]
407    pub fn testing_only_update_peer_locators_testing_only(
408        &self,
409        peer_ip: SocketAddr,
410        locators: BlockLocators<N>,
411    ) -> Result<()> {
412        self.update_peer_locators(peer_ip, locators)
413    }
414}
415
416// Methods to manage storage.
417impl<N: Network> Sync<N> {
418    /// Syncs the storage with the ledger at bootup.
419    ///
420    /// This is called when starting the validator and after finishing a sync without BFT.
421    async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
422        // Retrieve the latest block in the ledger.
423        let latest_block = self.ledger.latest_block();
424
425        // Retrieve the block height.
426        let block_height = latest_block.height();
427        // Determine the maximum number of blocks corresponding to rounds
428        // that would not have been garbage collected, i.e. that would be kept in storage.
429        // Since at most one block is created every two rounds,
430        // this is half of the maximum number of rounds kept in storage.
431        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
432        // Determine the earliest height of blocks corresponding to rounds kept in storage,
433        // conservatively set to the block height minus the maximum number of blocks calculated above.
434        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
435        let gc_height = block_height.saturating_sub(max_gc_blocks);
436        // Retrieve the blocks.
437        let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
438
439        // Acquire the sync lock.
440        let _lock = self.sync_lock.lock().await;
441
442        debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
443
444        /* Sync storage */
445
446        // Sync the height with the block.
447        self.storage.sync_height_with_block(latest_block.height());
448        // Sync the round with the block.
449        self.storage.sync_round_with_block(latest_block.round());
450        // Perform GC on the latest block round.
451        self.storage.garbage_collect_certificates(latest_block.round());
452        // Iterate over the blocks.
453        for block in &blocks {
454            // If the block authority is a sub-DAG, then sync the batch certificates with the block.
455            // Note that the block authority is always a sub-DAG in production;
456            // beacon signatures are only used for testing,
457            // and as placeholder (irrelevant) block authority in the genesis block.
458            if let Authority::Quorum(subdag) = block.authority() {
459                // Reconstruct the unconfirmed transactions.
460                let unconfirmed_transactions = cfg_iter!(block.transactions())
461                    .filter_map(|tx| {
462                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
463                    })
464                    .collect::<HashMap<_, _>>();
465
466                // Iterate over the certificates.
467                for certificates in subdag.values().cloned() {
468                    cfg_into_iter!(certificates).for_each(|certificate| {
469                        self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
470                    });
471                }
472
473                // Update the validator telemetry.
474                #[cfg(feature = "telemetry")]
475                self.gateway.validator_telemetry().insert_subdag(subdag);
476            }
477        }
478
479        /* Sync the BFT DAG */
480
481        // Construct a list of the certificates.
482        let certificates = blocks
483            .iter()
484            .flat_map(|block| {
485                match block.authority() {
486                    // If the block authority is a beacon, then skip the block.
487                    Authority::Beacon(_) => None,
488                    // If the block authority is a subdag, then retrieve the certificates.
489                    Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
490                }
491            })
492            .flatten()
493            .collect::<Vec<_>>();
494
495        // If a BFT sender was provided, send the certificates to the BFT.
496        if let Some(bft_sender) = self.bft_sender.get() {
497            // Await the callback to continue.
498            bft_sender
499                .tx_sync_bft_dag_at_bootup
500                .send(certificates)
501                .await
502                .with_context(|| "Failed to update the BFT DAG from sync")?;
503        }
504
505        self.block_sync.set_sync_height(block_height);
506
507        Ok(())
508    }
509
510    /// Returns which height we are synchronized to.
511    /// If there are queued block responses, this might be higher than the latest block in the ledger.
512    async fn compute_sync_height(&self) -> u32 {
513        let ledger_height = self.ledger.latest_block_height();
514        let mut pending_blocks = self.pending_blocks.lock().await;
515
516        // Remove any old responses.
517        while let Some(b) = pending_blocks.front()
518            && b.height() <= ledger_height
519        {
520            pending_blocks.pop_front();
521        }
522
523        // Ensure the returned value is always greater or equal than ledger height.
524        pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height)
525    }
526
527    /// BFT-version of [`snarkos_node_client::Client::try_advancing_block_synchronization`].
528    async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
529        // Process block responses and advance the ledger.
530        let new_blocks = match self
531            .try_advancing_block_synchronization_inner()
532            .await
533            .with_context(|| "Block synchronization failed")
534        {
535            Ok(new_blocks) => new_blocks,
536            Err(err) => {
537                error!("{}", &flatten_error(err));
538                false
539            }
540        };
541
542        if let Some(ping) = &ping
543            && new_blocks
544        {
545            match self.get_block_locators() {
546                Ok(locators) => ping.update_block_locators(locators),
547                Err(err) => error!("Failed to update block locators: {err}"),
548            }
549        }
550    }
551
552    /// Aims to advance synchronization using any recent block responses received from peers.
553    ///
554    /// This is the validator's version of `BlockSync::try_advancing_block_synchronization`
555    /// and is called periodically at runtime.
556    ///
557    /// This returns Ok(true) if we successfully advanced the ledger by at least one new block.
558    ///
559    /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network.
560    /// If blocks are not confirmed yet, they will be kept in [`Self::pending_blocks`].
561    /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected
562    /// (see [`Self::sync_storage_with_block`] for more details).
563    ///
564    /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead,
565    /// which syncs without updating the BFT state.
566    async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
567        // Acquire the response lock.
568        let _lock = self.response_lock.lock().await;
569
570        // For sanity, set the sync height again.
571        // (if the sync height is already larger or equal, this is a noop)
572        let ledger_height = self.ledger.latest_block_height();
573        self.block_sync.set_sync_height(ledger_height);
574
575        // Retrieve the maximum block height of the peers.
576        let tip = self
577            .block_sync
578            .find_sync_peers()
579            .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
580            .unwrap_or(0);
581
582        // Determine the maximum number of blocks corresponding to rounds
583        // that would not have been garbage collected, i.e. that would be kept in storage.
584        // Since at most one block is created every two rounds,
585        // this is half of the maximum number of rounds kept in storage.
586        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
587
588        // Updates sync state and returns the error (if any).
589        let cleanup = |start_height, current_height, error| {
590            let new_blocks = current_height > start_height;
591
592            // Make the underlying `BlockSync` instance aware of the new sync height.
593            if new_blocks {
594                self.block_sync.set_sync_height(current_height);
595            }
596
597            if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
598        };
599
600        // Determine the earliest height of blocks corresponding to rounds kept in storage,
601        // conservatively set to the block height minus the maximum number of blocks calculated above.
602        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
603        let max_gc_height = tip.saturating_sub(max_gc_blocks);
604        let within_gc = (ledger_height + 1) > max_gc_height;
605
606        if within_gc {
607            // Retrieve the current height, based on the ledger height and the
608            // (unconfirmed) blocks that are already queued up.
609            let start_height = self.compute_sync_height().await;
610
611            // The height is incremented as blocks are added.
612            let mut current_height = start_height;
613            trace!(
614                "Try advancing blocks responses with BFT (starting at block {current_height}, current sync speed is {})",
615                self.block_sync.get_sync_speed()
616            );
617
618            // If we already were within GC or successfully caught up with GC, try to advance BFT normally again.
619            loop {
620                let next_height = current_height + 1;
621                let Some(block) = self.block_sync.peek_next_block(next_height) else {
622                    break;
623                };
624                info!("Syncing the BFT to block {}...", block.height());
625                // Sync the storage with the block.
626                match self.sync_storage_with_block(block).await {
627                    Ok(_) => {
628                        // Update the current height if sync succeeds.
629                        current_height = next_height;
630                    }
631                    Err(err) => {
632                        // Mark the current height as processed in block_sync.
633                        self.block_sync.remove_block_response(next_height);
634                        return cleanup(start_height, current_height, Some(err));
635                    }
636                }
637            }
638
639            cleanup(start_height, current_height, None)
640        } else {
641            // For non-BFT sync we need to start at the current height of the ledger,as blocks are immediately
642            // added to it and not queue up in `latest_block_responses`.
643            let start_height = ledger_height;
644            let mut current_height = start_height;
645
646            trace!("Try advancing block responses without BFT (starting at block {current_height})");
647
648            // Try to advance the ledger *to tip* without updating the BFT.
649            // TODO(kaimast): why to tip and not to tip-GC?
650            loop {
651                let next_height = current_height + 1;
652
653                let Some(block) = self.block_sync.peek_next_block(next_height) else {
654                    break;
655                };
656                info!("Syncing the ledger to block {}...", block.height());
657
658                // Sync the ledger with the block without BFT.
659                match self.sync_ledger_with_block_without_bft(block).await {
660                    Ok(_) => {
661                        // Update the current height if sync succeeds.
662                        current_height = next_height;
663                        self.block_sync.count_request_completed();
664                    }
665                    Err(err) => {
666                        // Mark the current height as processed in block_sync.
667                        self.block_sync.remove_block_response(next_height);
668                        return cleanup(start_height, current_height, Some(err));
669                    }
670                }
671            }
672
673            // Sync the storage with the ledger if we should transition to the BFT sync.
674            let within_gc = (current_height + 1) > max_gc_height;
675            if within_gc {
676                info!("Finished catching up with the network. Switching back to BFT sync.");
677                self.sync_storage_with_ledger_at_bootup()
678                    .await
679                    .with_context(|| "BFT sync (with bootup routine) failed")?;
680            }
681
682            cleanup(start_height, current_height, None)
683        }
684    }
685
686    /// Syncs the ledger with the given block without updating the BFT.
687    ///
688    /// This is only used by `[Self::try_advancing_block_synchronization`].
689    async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
690        // Acquire the sync lock.
691        let _lock = self.sync_lock.lock().await;
692
693        let self_ = self.clone();
694        spawn_blocking!({
695            // Check the next block.
696            self_.ledger.check_next_block(&block)?;
697            // Attempt to advance to the next block.
698            self_.ledger.advance_to_next_block(&block)?;
699
700            // Sync the height with the block.
701            self_.storage.sync_height_with_block(block.height());
702            // Sync the round with the block.
703            self_.storage.sync_round_with_block(block.round());
704            // Mark the block height as processed in block_sync.
705            self_.block_sync.remove_block_response(block.height());
706
707            Ok(())
708        })
709    }
710
711    /// Helper function for [`Self::sync_storage_with_block`].
712    /// It syncs the batch certificates with the BFT, if the block's authority is a sub-DAG.
713    ///
714    /// Note that the block authority is always a sub-DAG in production; beacon signatures are only used for testing,
715    /// and as placeholder (irrelevant) block authority in the genesis block.
716    async fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> {
717        // Nothing to do if this is a beacon block
718        let Authority::Quorum(subdag) = block.authority() else {
719            return Ok(());
720        };
721
722        // Reconstruct the unconfirmed transactions.
723        let unconfirmed_transactions = cfg_iter!(block.transactions())
724            .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok())
725            .collect::<HashMap<_, _>>();
726
727        // Iterate over the certificates.
728        for certificates in subdag.values().cloned() {
729            cfg_into_iter!(certificates.clone()).for_each(|certificate| {
730                // Sync the batch certificate with the block.
731                self.storage.sync_certificate_with_block(block, certificate.clone(), &unconfirmed_transactions);
732            });
733
734            // Sync the BFT DAG with the certificates.
735            for certificate in certificates {
736                // If a BFT sender was provided, send the certificate to the BFT.
737                // For validators, BFT spawns a receiver task in `BFT::start_handlers`.
738                if let Some(bft_sender) = self.bft_sender.get() {
739                    let (callback_tx, callback_rx) = oneshot::channel();
740                    bft_sender
741                        .tx_sync_bft
742                        .send((certificate, callback_tx))
743                        .await
744                        .with_context(|| "Failed to sync certificate")?;
745                    callback_rx.await?.with_context(|| "Failed to sync certificate")?;
746                }
747            }
748        }
749        Ok(())
750    }
751
752    /// Helper function for [`Self::sync_storage_with_block`].
753    ///
754    /// It checks that successor of a given block contains enough votes to commit it.
755    /// This can only return `Ok(true)` if the certificates of the block's successor were added to the storage.
756    fn is_block_availability_threshold_reached(&self, block: &PendingBlock<N>) -> Result<bool> {
757        // Fetch the leader certificate and the relevant rounds.
758        let leader_certificate = match block.authority() {
759            Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
760            _ => bail!("Received a block with an unexpected authority type."),
761        };
762        let commit_round = leader_certificate.round();
763        let certificate_round =
764            commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
765
766        // Get the committee lookback for the round just after the leader.
767        let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
768        // Retrieve all of the certificates for the round just after the leader.
769        let certificates = self.storage.get_certificates_for_round(certificate_round);
770        // Construct a set over the authors, at the round just after the leader,
771        // who included the leader's certificate in their previous certificate IDs.
772        let authors = certificates
773            .iter()
774            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
775                true => Some(c.author()),
776                false => None,
777            })
778            .collect();
779
780        // Check if the leader is ready to be committed.
781        if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
782            trace!(
783                "Block {hash} at height {height} has reached availability threshold",
784                hash = block.hash(),
785                height = block.height()
786            );
787            Ok(true)
788        } else {
789            Ok(false)
790        }
791    }
792
793    /// Advances the ledger by the given block and updates the storage accordingly.
794    ///
795    /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
796    /// meets the voter availability threshold (i.e. > f voting stake)
797    /// or is reachable via a DAG path from a later leader certificate that does.
798    /// Since performing this check requires DAG certificates from later blocks,
799    /// the block is stored in `Sync::pending_blocks`,
800    /// and its addition to the ledger is deferred until the check passes.
801    /// Several blocks may be stored in `Sync::pending_blocks`
802    /// before they can be all checked and added to the ledger.
803    ///
804    /// # Usage
805    /// This function assumes that blocks are passed in order, i.e.,
806    /// that the given block is a direct successor of the block that was last passed to this function.
807    async fn sync_storage_with_block(&self, new_block: Block<N>) -> Result<()> {
808        // Acquire the sync lock.
809        let _lock = self.sync_lock.lock().await;
810        let new_block_height = new_block.height();
811
812        // If this block has already been processed, return early.
813        // TODO(kaimast): Should we remove the response here?
814        if self.ledger.contains_block_height(new_block.height()) {
815            debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
816            return Ok(());
817        }
818
819        // Acquire the pending blocks lock.
820        let mut pending_blocks = self.pending_blocks.lock().await;
821
822        // Append the certificates to the storage.
823        self.add_block_subdag_to_bft(&new_block).await?;
824
825        // Fetch the latest block height.
826        let ledger_block_height = self.ledger.latest_block_height();
827
828        // First, clear any older pending blocks.
829        // TODO(kaimast): ensure there are no dangling block requests
830        while let Some(pending_block) = pending_blocks.front() {
831            if pending_block.height() > ledger_block_height {
832                break;
833            }
834
835            pending_blocks.pop_front();
836        }
837
838        if let Some(tail) = pending_blocks.back() {
839            if tail.height() >= new_block.height() {
840                debug!(
841                    "A unconfirmed block is queued already for height {height}. \
842                    Will not sync.",
843                    height = new_block.height()
844                );
845                return Ok(());
846            }
847
848            ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block");
849        }
850
851        // Fetch the latest block height.
852        let ledger_block_height = self.ledger.latest_block_height();
853
854        // Clear any older pending blocks.
855        // TODO(kaimast): ensure there are no dangling block requests
856        while let Some(pending_block) = pending_blocks.front() {
857            if pending_block.height() > ledger_block_height {
858                break;
859            }
860
861            trace!(
862                "Pending block {hash} at height {height} became obsolete",
863                hash = pending_block.hash(),
864                height = pending_block.height()
865            );
866            pending_blocks.pop_front();
867        }
868
869        // Check the block against the chain of pending blocks and append it on success.
870        let new_block = match self.ledger.check_block_subdag(new_block, pending_blocks.make_contiguous()) {
871            Ok(new_block) => new_block,
872            Err(err) => {
873                // TODO(kaimast): this shoud not return an error on the snarkVM side.
874                if err.to_string().contains("already in the ledger") {
875                    debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
876
877                    return Ok(());
878                } else {
879                    return Err(err.into());
880                }
881            }
882        };
883
884        trace!(
885            "Adding new pending block {hash} at height {height}",
886            hash = new_block.hash(),
887            height = new_block.height()
888        );
889        pending_blocks.push_back(new_block);
890
891        // Now, figure out if and which pending block we can commit.
892        // To do this effectively and because commits are transitive,
893        // we iterate in reverse so that we can stop at the first successful check.
894        //
895        // Note, that if the storage already contains certificates for the round after new block,
896        // the availability threshold for the new block could also be reached.
897        let mut commit_height = None;
898        for block in pending_blocks.iter().rev() {
899            // This check assumes that the pending blocks are properly linked together, based on the fact that,
900            // to generate the sequence of `PendingBlocks`, each block needs to successfully be processed by `Ledger::check_block_subdag`.
901            // As a result, the safety of this piece of code relies on the correctness `Ledger::check_block_subdag`,
902            // which is tested in `snarkvm/ledger/tests/pending_block.rs`.
903            if self
904                .is_block_availability_threshold_reached(block)
905                .with_context(|| "Availability threshold check failed")?
906            {
907                commit_height = Some(block.height());
908                break;
909            }
910        }
911
912        if let Some(commit_height) = commit_height {
913            let start_height = ledger_block_height + 1;
914            ensure!(commit_height >= start_height, "Invalid commit height");
915            let num_blocks = (commit_height - start_height + 1) as usize;
916
917            // Create a more detailed log message if we are committing more than one block at a time.
918            if num_blocks > 1 {
919                trace!(
920                    "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.",
921                    chain_length = pending_blocks.len(),
922                );
923            }
924
925            for pending_block in pending_blocks.drain(0..num_blocks) {
926                let hash = pending_block.hash();
927                let height = pending_block.height();
928                let ledger = self.ledger.clone();
929                let storage = self.storage.clone();
930
931                let leader_certificate: Option<BatchCertificate<N>> = spawn_blocking!({
932                    let block = ledger.check_block_content(pending_block).with_context(|| {
933                        format!("Failed to check contents of pending block {hash} at height {height}")
934                    })?;
935
936                    trace!("Adding pending block {hash} at height {height} to the ledger");
937                    ledger.advance_to_next_block(&block)?;
938                    // Sync the height with the block.
939                    storage.sync_height_with_block(block.height());
940                    // Sync the round with the block.
941                    storage.sync_round_with_block(block.round());
942
943                    if let Authority::Quorum(subdag) = block.authority() {
944                        Ok(Some(subdag.leader_certificate().clone()))
945                    } else {
946                        Ok(None)
947                    }
948                })?;
949
950                // If a BFT sender was provided, send the leader certificate to the BFT.
951                // This will update the primary's DAG as expected.
952                if let Some(leader_certificate) = leader_certificate
953                    && let Some(bft_sender) = self.bft_sender.get()
954                {
955                    let (callback_tx, callback_rx) = oneshot::channel();
956                    bft_sender
957                        .tx_sync_block_committed
958                        .send((leader_certificate, callback_tx))
959                        .await
960                        .with_context(|| "Failed to mark leader certificate as committed")?;
961                    callback_rx.await?.with_context(|| "Failed to mark leader certificate as committed")?;
962                }
963            }
964        } else {
965            trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len());
966        }
967
968        Ok(())
969    }
970}
971
972// Methods to assist with the block sync module.
973impl<N: Network> Sync<N> {
974    /// Returns `true` if the node is synced and has connected peers.
975    pub fn is_synced(&self) -> bool {
976        // Ensure the validator is connected to other validators,
977        // not just clients.
978        if self.gateway.number_of_connected_peers() == 0 {
979            return false;
980        }
981
982        self.block_sync.is_block_synced()
983    }
984
985    /// Returns the number of blocks the node is behind the greatest peer height.
986    pub fn num_blocks_behind(&self) -> Option<u32> {
987        self.block_sync.num_blocks_behind()
988    }
989
990    /// Returns the current block locators of the node.
991    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
992        self.block_sync.get_block_locators()
993    }
994}
995
996// Methods to assist with fetching batch certificates from peers.
997impl<N: Network> Sync<N> {
998    /// Sends a certificate request to the specified peer.
999    pub async fn send_certificate_request(
1000        &self,
1001        peer_ip: SocketAddr,
1002        certificate_id: Field<N>,
1003    ) -> Result<BatchCertificate<N>> {
1004        // Initialize a oneshot channel.
1005        let (callback_sender, callback_receiver) = oneshot::channel();
1006        // Determine how many sent requests are pending.
1007        let num_sent_requests = self.pending.num_sent_requests(certificate_id);
1008        // Determine if we've already sent a request to the peer.
1009        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
1010        // Determine the maximum number of redundant requests.
1011        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
1012        // Determine if we should send a certificate request to the peer.
1013        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
1014        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
1015
1016        // Insert the certificate ID into the pending queue.
1017        self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
1018
1019        // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
1020        if should_send_request {
1021            // Send the certificate request to the peer.
1022            if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
1023                bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)")
1024            }
1025        } else {
1026            debug!(
1027                "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
1028                fmt_id(certificate_id)
1029            );
1030        }
1031        // Wait for the certificate to be fetched.
1032        // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
1033        tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver)
1034            .await
1035            .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))?
1036            .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id)))
1037    }
1038
1039    /// Handles the incoming certificate request.
1040    fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
1041        // Attempt to retrieve the certificate.
1042        if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
1043            // Send the certificate response to the peer.
1044            let self_ = self.clone();
1045            tokio::spawn(async move {
1046                let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
1047            });
1048        }
1049    }
1050
1051    /// Handles the incoming certificate response.
1052    /// This method ensures the certificate response is well-formed and matches the certificate ID.
1053    fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
1054        let certificate = response.certificate;
1055        // Check if the peer IP exists in the pending queue for the given certificate ID.
1056        let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
1057        // If the peer IP exists, finish the pending request.
1058        if exists {
1059            // TODO: Validate the certificate.
1060            // Remove the certificate ID from the pending queue.
1061            self.pending.remove(certificate.id(), Some(certificate));
1062        }
1063    }
1064}
1065
1066impl<N: Network> Sync<N> {
1067    /// Spawns a task with the given future; it should only be used for long-running tasks.
1068    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1069        self.handles.lock().push(tokio::spawn(future));
1070    }
1071
1072    /// Shuts down the primary.
1073    pub async fn shut_down(&self) {
1074        info!("Shutting down the sync module...");
1075        // Acquire the response lock.
1076        let _lock = self.response_lock.lock().await;
1077        // Acquire the sync lock.
1078        let _lock = self.sync_lock.lock().await;
1079        // Abort the tasks.
1080        self.handles.lock().iter().for_each(|handle| handle.abort());
1081    }
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    use super::*;
1087
1088    use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
1089
1090    use snarkos_account::Account;
1091    use snarkos_node_sync::BlockSync;
1092    use snarkos_utilities::{NodeDataDir, SimpleStoppable};
1093
1094    use snarkvm::{
1095        console::{
1096            account::{Address, PrivateKey},
1097            network::MainnetV0,
1098        },
1099        ledger::{
1100            narwhal::{BatchCertificate, BatchHeader, Subdag},
1101            store::{ConsensusStore, helpers::memory::ConsensusMemory},
1102        },
1103        prelude::{Ledger, VM},
1104        utilities::TestRng,
1105    };
1106
1107    use aleo_std::StorageMode;
1108    use indexmap::IndexSet;
1109    use rand::Rng;
1110    use std::collections::BTreeMap;
1111
1112    type CurrentNetwork = MainnetV0;
1113    type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1114    type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1115
1116    /// Tests that commits work as expected when some anchors are not committed immediately.
1117    #[tokio::test]
1118    #[tracing_test::traced_test]
1119    async fn test_commit_chain() -> anyhow::Result<()> {
1120        let rng = &mut TestRng::default();
1121        // Initialize the round parameters.
1122        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1123
1124        // The first round of the first block.
1125        let first_round = 1;
1126        // The total number of blocks we test
1127        let num_blocks = 3;
1128        // The number of certificate rounds needed.
1129        // There is one additional round to provide availability for the inal block.
1130        let num_rounds = first_round + num_blocks * 2 + 1;
1131        // The first round that has at least N-f certificates referencing the anchor from the previous round.
1132        // This is also the last round we use in the test.
1133        let first_committed_round = num_rounds - 1;
1134
1135        // Initialize the store.
1136        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1137        let account: Account<CurrentNetwork> = Account::new(rng)?;
1138
1139        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1140        let seed: u64 = rng.r#gen();
1141        let vm = VM::from(store).unwrap();
1142        let genesis_pk = *account.private_key();
1143        let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1144
1145        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1146        let genesis_rng = &mut TestRng::from_seed(seed);
1147        let private_keys = [
1148            *account.private_key(),
1149            PrivateKey::new(genesis_rng)?,
1150            PrivateKey::new(genesis_rng)?,
1151            PrivateKey::new(genesis_rng)?,
1152        ];
1153
1154        // Initialize the ledger with the genesis block.
1155        let genesis_clone = genesis.clone();
1156        let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap();
1157        // Initialize the ledger.
1158        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1159
1160        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1161        let (round_to_certificates_map, committee) = {
1162            let addresses = vec![
1163                Address::try_from(private_keys[0])?,
1164                Address::try_from(private_keys[1])?,
1165                Address::try_from(private_keys[2])?,
1166                Address::try_from(private_keys[3])?,
1167            ];
1168
1169            let committee = ledger.latest_committee().unwrap();
1170
1171            // Initialize a mapping from the round number to the set of batch certificates in the round.
1172            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1173                HashMap::new();
1174            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1175
1176            for round in first_round..=first_committed_round {
1177                let mut current_certificates = IndexSet::new();
1178                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1179                    IndexSet::new()
1180                } else {
1181                    previous_certificates.iter().map(|c| c.id()).collect()
1182                };
1183
1184                let committee_id = committee.id();
1185                let prev_leader = committee.get_leader(round - 1).unwrap();
1186
1187                // For the first two blocks non-leaders will not reference the leader certificate.
1188                // This means, while there is an anchor, it is isn't committed
1189                // until later.
1190                for (i, private_key) in private_keys.iter().enumerate() {
1191                    let leader_index = addresses.iter().position(|&address| address == prev_leader).unwrap();
1192                    let is_certificate_round = round % 2 == 1;
1193                    let is_leader = i == leader_index;
1194
1195                    let previous_certs = if round < first_committed_round && is_certificate_round && !is_leader {
1196                        previous_certificate_ids
1197                            .iter()
1198                            .cloned()
1199                            .enumerate()
1200                            .filter(|(idx, _)| *idx != leader_index)
1201                            .map(|(_, id)| id)
1202                            .collect()
1203                    } else {
1204                        previous_certificate_ids.clone()
1205                    };
1206
1207                    let batch_header = BatchHeader::new(
1208                        private_key,
1209                        round,
1210                        now(),
1211                        committee_id,
1212                        Default::default(),
1213                        previous_certs,
1214                        rng,
1215                    )
1216                    .unwrap();
1217
1218                    // Sign the batch header.
1219                    let mut signatures = IndexSet::with_capacity(4);
1220                    for (j, private_key_2) in private_keys.iter().enumerate() {
1221                        if i != j {
1222                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1223                        }
1224                    }
1225                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1226                }
1227
1228                // Update the map of certificates.
1229                round_to_certificates_map.insert(round, current_certificates.clone());
1230                previous_certificates = current_certificates;
1231            }
1232            (round_to_certificates_map, committee)
1233        };
1234
1235        // Initialize the storage.
1236        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1237        // Insert all certificates into storage.
1238        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1239        for i in first_round..=first_committed_round {
1240            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1241            certificates.extend(c);
1242        }
1243        for certificate in certificates.clone().iter() {
1244            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1245        }
1246
1247        // Create the blocks
1248        let mut previous_leader_cert = None;
1249        let mut blocks = vec![];
1250
1251        for block_height in 1..=num_blocks {
1252            let leader_round = block_height * 2;
1253
1254            let leader = committee.get_leader(leader_round).unwrap();
1255            let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap();
1256
1257            let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1258            let mut leader_cert_map = IndexSet::new();
1259            leader_cert_map.insert(leader_certificate.clone());
1260
1261            let previous_cert_map = storage.get_certificates_for_round(leader_round - 1);
1262
1263            subdag_map.insert(leader_round, leader_cert_map.clone());
1264            subdag_map.insert(leader_round - 1, previous_cert_map.clone());
1265
1266            if leader_round > 2 {
1267                let previous_commit_cert_map: IndexSet<_> = storage
1268                    .get_certificates_for_round(leader_round - 2)
1269                    .into_iter()
1270                    .filter(|cert| {
1271                        if let Some(previous_leader_cert) = &previous_leader_cert {
1272                            cert != previous_leader_cert
1273                        } else {
1274                            true
1275                        }
1276                    })
1277                    .collect();
1278                subdag_map.insert(leader_round - 2, previous_commit_cert_map);
1279            }
1280
1281            let subdag = Subdag::from(subdag_map.clone())?;
1282            previous_leader_cert = Some(leader_certificate);
1283
1284            let core_ledger = core_ledger.clone();
1285            let block = spawn_blocking!({
1286                let block = core_ledger.clone().prepare_advance_to_next_quorum_block(subdag, Default::default())?;
1287                core_ledger.advance_to_next_block(&block)?;
1288                Ok(block)
1289            })?;
1290
1291            blocks.push(block);
1292        }
1293
1294        // ### Test that sync works as expected ###
1295        let storage_mode = StorageMode::new_test(None);
1296
1297        // Create a new ledger to test with, but use the existing storage
1298        // so that the certificates exist.
1299        let syncing_ledger = {
1300            let storage_mode = storage_mode.clone();
1301            Arc::new(CoreLedgerService::new(
1302                spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1303                SimpleStoppable::new(),
1304            ))
1305        };
1306
1307        // Set up sync and its dependencies.
1308        let gateway = Gateway::new(
1309            account.clone(),
1310            storage.clone(),
1311            syncing_ledger.clone(),
1312            None,
1313            &[],
1314            false,
1315            NodeDataDir::new_test(None),
1316            None,
1317        )?;
1318        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1319        let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1320
1321        let mut block_iter = blocks.into_iter();
1322
1323        // Insert the blocks into the new sync module
1324        for _ in 0..num_blocks - 1 {
1325            let block = block_iter.next().unwrap();
1326            sync.sync_storage_with_block(block).await?;
1327
1328            // Availability threshold is not met, so we should not advance yet.
1329            assert_eq!(syncing_ledger.latest_block_height(), 0);
1330        }
1331
1332        // Only for the final block, the availability threshold is met,
1333        // because certificates for the subsequent round are already in storage.
1334        sync.sync_storage_with_block(block_iter.next().unwrap()).await?;
1335        assert_eq!(syncing_ledger.latest_block_height(), 3);
1336
1337        // Ensure blocks 1 and 2 were added to the ledger.
1338        assert!(syncing_ledger.contains_block_height(1));
1339        assert!(syncing_ledger.contains_block_height(2));
1340
1341        Ok(())
1342    }
1343
1344    #[tokio::test]
1345    #[tracing_test::traced_test]
1346    async fn test_pending_certificates() -> anyhow::Result<()> {
1347        let rng = &mut TestRng::default();
1348        // Initialize the round parameters.
1349        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1350        let commit_round = 2;
1351
1352        // Initialize the store.
1353        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1354        let account: Account<CurrentNetwork> = Account::new(rng)?;
1355
1356        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1357        let seed: u64 = rng.r#gen();
1358        let vm = VM::from(store).unwrap();
1359        let genesis_pk = *account.private_key();
1360        let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1361
1362        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1363        let genesis_rng = &mut TestRng::from_seed(seed);
1364        let private_keys = [
1365            *account.private_key(),
1366            PrivateKey::new(genesis_rng)?,
1367            PrivateKey::new(genesis_rng)?,
1368            PrivateKey::new(genesis_rng)?,
1369        ];
1370        // Initialize the ledger with the genesis block.
1371        let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap();
1372        // Initialize the ledger.
1373        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1374        // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1375        let (round_to_certificates_map, committee) = {
1376            // Initialize the committee.
1377            let committee = core_ledger.current_committee().unwrap();
1378            // Initialize a mapping from the round number to the set of batch certificates in the round.
1379            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1380                HashMap::new();
1381            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1382
1383            for round in 0..=commit_round + 8 {
1384                let mut current_certificates = IndexSet::new();
1385                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1386                    IndexSet::new()
1387                } else {
1388                    previous_certificates.iter().map(|c| c.id()).collect()
1389                };
1390                let committee_id = committee.id();
1391                // Create a certificate for each validator.
1392                for (i, private_key_1) in private_keys.iter().enumerate() {
1393                    let batch_header = BatchHeader::new(
1394                        private_key_1,
1395                        round,
1396                        now(),
1397                        committee_id,
1398                        Default::default(),
1399                        previous_certificate_ids.clone(),
1400                        rng,
1401                    )
1402                    .unwrap();
1403                    // Sign the batch header.
1404                    let mut signatures = IndexSet::with_capacity(4);
1405                    for (j, private_key_2) in private_keys.iter().enumerate() {
1406                        if i != j {
1407                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1408                        }
1409                    }
1410                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1411                }
1412
1413                // Update the map of certificates.
1414                round_to_certificates_map.insert(round, current_certificates.clone());
1415                previous_certificates = current_certificates.clone();
1416            }
1417            (round_to_certificates_map, committee)
1418        };
1419
1420        // Initialize the storage.
1421        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1422        // Insert certificates into storage.
1423        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1424        for i in 1..=commit_round + 8 {
1425            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1426            certificates.extend(c);
1427        }
1428        for certificate in certificates.clone().iter() {
1429            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1430        }
1431        // Create block 1.
1432        let leader_round_1 = commit_round;
1433        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1434        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1435        let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1436        let block_1 = {
1437            let mut leader_cert_map = IndexSet::new();
1438            leader_cert_map.insert(leader_certificate.clone());
1439            let mut previous_cert_map = IndexSet::new();
1440            for cert in storage.get_certificates_for_round(commit_round - 1) {
1441                previous_cert_map.insert(cert);
1442            }
1443            subdag_map.insert(commit_round, leader_cert_map.clone());
1444            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1445            let subdag = Subdag::from(subdag_map.clone())?;
1446            let ledger = core_ledger.clone();
1447            spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag, Default::default()))?
1448        };
1449        // Insert block 1.
1450        let ledger = core_ledger.clone();
1451        let block = block_1.clone();
1452        spawn_blocking!(ledger.advance_to_next_block(&block))?;
1453
1454        // Create block 2.
1455        let leader_round_2 = commit_round + 2;
1456        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1457        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1458        let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1459        let block_2 = {
1460            let mut leader_cert_map_2 = IndexSet::new();
1461            leader_cert_map_2.insert(leader_certificate_2.clone());
1462            let mut previous_cert_map_2 = IndexSet::new();
1463            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1464                previous_cert_map_2.insert(cert);
1465            }
1466            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1467            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1468            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1469            let ledger = core_ledger.clone();
1470            spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default()))?
1471        };
1472        // Insert block 2.
1473        let ledger = core_ledger.clone();
1474        let block = block_2.clone();
1475        spawn_blocking!(ledger.advance_to_next_block(&block))?;
1476
1477        // Create block 3
1478        let leader_round_3 = commit_round + 4;
1479        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1480        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1481        let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1482        let block_3 = {
1483            let mut leader_cert_map_3 = IndexSet::new();
1484            leader_cert_map_3.insert(leader_certificate_3.clone());
1485            let mut previous_cert_map_3 = IndexSet::new();
1486            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1487                previous_cert_map_3.insert(cert);
1488            }
1489            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1490            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1491            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1492            let ledger = core_ledger.clone();
1493            spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default()))?
1494        };
1495        // Insert block 3.
1496        let ledger = core_ledger.clone();
1497        let block = block_3.clone();
1498        spawn_blocking!(ledger.advance_to_next_block(&block))?;
1499
1500        /*
1501            Check that the pending certificates are computed correctly.
1502        */
1503
1504        // Retrieve the pending certificates.
1505        let pending_certificates = storage.get_pending_certificates();
1506        // Check that all of the pending certificates are not contained in the ledger.
1507        for certificate in pending_certificates.clone() {
1508            assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1509        }
1510        // Initialize an empty set to be populated with the committed certificates in the block subdags.
1511        let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1512        {
1513            let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1514            for subdag in subdag_maps.iter() {
1515                for subdag_certificates in subdag.values() {
1516                    committed_certificates.extend(subdag_certificates.iter().cloned());
1517                }
1518            }
1519        };
1520        // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1521        let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1522        for certificate in certificates.clone() {
1523            if !committed_certificates.contains(&certificate) {
1524                candidate_pending_certificates.insert(certificate);
1525            }
1526        }
1527        // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1528        assert_eq!(pending_certificates, candidate_pending_certificates);
1529
1530        Ok(())
1531    }
1532}