Skip to main content

snarkos_node_bft/sync/
mod.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_FETCH_TIMEOUT_IN_MS,
18    events::{CertificateRequest, CertificateResponse, DataBlocks, Event},
19    gateway::{Gateway, Transport},
20    helpers::{Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
21    ledger_service::LedgerService,
22    spawn_blocking,
23};
24use snarkos_node_network::PeerPoolHandling;
25use snarkos_node_sync::{
26    BLOCK_REQUEST_BATCH_DELAY,
27    BlockSync,
28    InsertBlockResponseError,
29    Ping,
30    PrepareSyncRequest,
31    locators::BlockLocators,
32};
33use snarkos_utilities::CallbackHandle;
34
35use snarkvm::{
36    console::{
37        network::{ConsensusVersion, Network},
38        types::Field,
39    },
40    ledger::{PendingBlock, authority::Authority, block::Block, narwhal::BatchCertificate},
41    utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error},
42};
43
44use anyhow::{Context, Result, anyhow, bail, ensure};
45use indexmap::IndexMap;
46#[cfg(feature = "locktick")]
47use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
48#[cfg(not(feature = "locktick"))]
49use parking_lot::Mutex;
50#[cfg(not(feature = "serial"))]
51use rayon::prelude::*;
52use std::{
53    collections::{HashMap, VecDeque},
54    future::Future,
55    net::SocketAddr,
56    sync::Arc,
57    time::Duration,
58};
59#[cfg(not(feature = "locktick"))]
60use tokio::sync::Mutex as TMutex;
61use tokio::{sync::oneshot, task::JoinHandle};
62
63/// This callback trait allows listening to synchronization updates, such as discorvering new `BatchCertificate`s.
64/// This is currently used by BFT.
65#[async_trait::async_trait]
66pub trait SyncCallback<N: Network>: Send + std::marker::Sync {
67    async fn sync_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>);
68
69    /// Sends a new certificate.
70    async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()>;
71}
72
73/// Block synchronization logic for validators.
74///
75/// Synchronization works differently for nodes that act as validators in AleoBFT;
76/// In the common case, validators generate blocks after receiving an anchor block that has been accepted
77/// by a supermajority of the committee instead of fetching entire blocks from other nodes.
78/// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes.
79///
80/// This struct also manages fetching certificates from other validators during normal operation,
81/// and blocks when falling behind.
82///
83/// Finally, `Sync` handles synchronization of blocks with the validator's local storage:
84/// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them.
85#[derive(Clone)]
86pub struct Sync<N: Network> {
87    /// The gateway enables communication with other validators.
88    gateway: Gateway<N>,
89    /// The storage.
90    storage: Storage<N>,
91    /// The ledger service.
92    ledger: Arc<dyn LedgerService<N>>,
93    /// The block synchronization logic.
94    block_sync: Arc<BlockSync<N>>,
95    /// The pending certificates queue.
96    pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
97    /// The sync callback (used by [`BFT`]).
98    sync_callback: Arc<CallbackHandle<Arc<dyn SyncCallback<N>>>>,
99    /// Handles to the spawned background tasks.
100    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
101    /// The response lock.
102    response_lock: Arc<TMutex<()>>,
103    /// The sync lock. Ensures that only one task syncs the ledger at a time.
104    sync_lock: Arc<TMutex<()>>,
105    /// The latest block responses.
106    ///
107    /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks
108    /// whose addition to the ledger is deferred until certain checks pass.
109    /// Blocks need to be processed in order, hence a BTree map.
110    ///
111    /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called.
112    pending_blocks: Arc<TMutex<VecDeque<PendingBlock<N>>>>,
113}
114
115impl<N: Network> Sync<N> {
116    /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
117    /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
118    const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
119
120    /// Initializes a new sync instance.
121    pub fn new(
122        gateway: Gateway<N>,
123        storage: Storage<N>,
124        ledger: Arc<dyn LedgerService<N>>,
125        block_sync: Arc<BlockSync<N>>,
126    ) -> Self {
127        // Return the sync instance.
128        Self {
129            gateway,
130            storage,
131            ledger,
132            block_sync,
133            pending: Default::default(),
134            sync_callback: Default::default(),
135            handles: Default::default(),
136            response_lock: Default::default(),
137            sync_lock: Default::default(),
138            pending_blocks: Default::default(),
139        }
140    }
141
142    /// Initializes the sync module and sync the storage with the ledger at bootup.
143    pub async fn initialize(&self, sync_callback: Option<Arc<dyn SyncCallback<N>>>) -> Result<()> {
144        // If a callback was provided, set it.
145        if let Some(callback) = sync_callback {
146            self.sync_callback.set(callback).with_context(|| "Failed to set sync callback")?;
147        }
148
149        info!("Syncing storage with the ledger...");
150
151        // Sync the storage with the ledger.
152        self.sync_storage_with_ledger_at_bootup()
153            .await
154            .with_context(|| "Syncing storage with the ledger at bootup failed")?;
155
156        debug!("Finished initial block synchronization at startup");
157        Ok(())
158    }
159
160    /// Sends the given batch of block requests to peers.
161    ///
162    /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`.
163    #[inline]
164    async fn send_block_requests(
165        &self,
166
167        block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
168        sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
169    ) {
170        trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
171
172        // Sends the block requests to the sync peers.
173        for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
174            if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
175                // Stop if we fail to process a batch of requests.
176                break;
177            }
178
179            // Sleep to avoid triggering spam detection.
180            tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
181        }
182    }
183
184    /// Starts the sync module.
185    ///
186    /// When this function returns successfully, the sync module will have spawned background tasks
187    /// that fetch blocks from other validators.
188    pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
189        info!("Starting the sync module...");
190
191        // Start the block request generation loop (outgoing).
192        let self_ = self.clone();
193        self.spawn(async move {
194            loop {
195                // Wait for peer updates or timeout
196                let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
197
198                // Issue block requests to peers.
199                self_.try_issuing_block_requests().await;
200
201                // Rate limiting happens in [`Self::send_block_requests`] and no additional sleeps are needed here.
202            }
203        });
204
205        // Start the block response processing loop (incoming).
206        let self_ = self.clone();
207        let ping = ping.clone();
208        self.spawn(async move {
209            loop {
210                // Wait until there is something to do or until the timeout.
211                let _ =
212                    tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
213
214                let ping = ping.clone();
215                let self_ = self_.clone();
216                let hdl = tokio::spawn(async move {
217                    self_.try_advancing_block_synchronization(&ping).await;
218                });
219
220                if let Err(err) = hdl.await
221                    && let Ok(panic) = err.try_into_panic()
222                {
223                    error!("Sync block advancement panicked: {panic:?}");
224                }
225
226                // We perform no additional rate limiting here as
227                // requests are already rate-limited.
228            }
229        });
230
231        // Start the pending queue expiration loop.
232        let self_ = self.clone();
233        self.spawn(async move {
234            loop {
235                // Sleep briefly.
236                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
237
238                // Remove the expired pending transmission requests.
239                let self__ = self_.clone();
240                let _ = spawn_blocking!({
241                    self__.pending.clear_expired_callbacks();
242                    Ok(())
243                });
244            }
245        });
246
247        /* Set up callbacks for events from the Gateway */
248
249        // Retrieve the sync receiver.
250        let SyncReceiver {
251            mut rx_block_sync_insert_block_response,
252            mut rx_block_sync_remove_peer,
253            mut rx_block_sync_update_peer_locators,
254            mut rx_certificate_request,
255            mut rx_certificate_response,
256        } = sync_receiver;
257
258        // Process the block sync request to advance with sync blocks.
259        // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
260        // which is initially handled by [`Gateway::inbound()`],
261        // which calls [`SyncSender::advance_with_sync_blocks()`],
262        // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
263        // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
264        let self_ = self.clone();
265        self.spawn(async move {
266            while let Some((peer_ip, blocks, latest_consensus_version, callback)) =
267                rx_block_sync_insert_block_response.recv().await
268            {
269                let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await;
270                //TODO remove this once channels are gone
271                if let Err(err) = &result {
272                    warn!("Failed to insert block response from '{peer_ip}' - {err}");
273                }
274
275                callback.send(result).ok();
276            }
277        });
278
279        // Process the block sync request to remove the peer.
280        let self_ = self.clone();
281        self.spawn(async move {
282            while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
283                self_.remove_peer(peer_ip);
284            }
285        });
286
287        // Process each block sync request to update peer locators.
288        // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
289        // which is initially handled by [`Gateway::inbound()`],
290        // which calls [`SyncSender::update_peer_locators()`],
291        // which calls [`tx_block_sync_update_peer_locators.send()`],
292        // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
293        let self_ = self.clone();
294        self.spawn(async move {
295            while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
296                let self_clone = self_.clone();
297                tokio::spawn(async move {
298                    callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
299                });
300            }
301        });
302
303        // Process each certificate request.
304        // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
305        // which is initially handled by [`Gateway::inbound()`],
306        // which calls [`tx_certificate_request.send()`],
307        // which causes the `rx_certificate_request.recv()` call below to return.
308        let self_ = self.clone();
309        self.spawn(async move {
310            while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
311                self_.send_certificate_response(peer_ip, certificate_request);
312            }
313        });
314
315        // Process each certificate response.
316        // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
317        // which is initially handled by [`Gateway::inbound()`],
318        // which calls [`tx_certificate_response.send()`],
319        // which causes the `rx_certificate_response.recv()` call below to return.
320        let self_ = self.clone();
321        self.spawn(async move {
322            while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
323                self_.finish_certificate_request(peer_ip, certificate_response);
324            }
325        });
326
327        Ok(())
328    }
329
330    /// BFT-specific version of `Client::try_issuing_block_requests()`.
331    ///
332    /// This method handles timeout removal, checks if block sync is possible,
333    /// and issues block requests to peers.
334    async fn try_issuing_block_requests(&self) {
335        // Update the sync height to the latest ledger height.
336        // (if the ledger height is lower or equal to the current sync height, this is a noop)
337        self.block_sync.set_sync_height(self.ledger.latest_block_height());
338
339        // Check if any existing requests can be removed.
340        // We should do this even if we cannot block sync, to ensure
341        // there are no dangling block requests.
342        match self.block_sync.handle_block_request_timeouts(&self.gateway) {
343            Ok(Some((requests, sync_peers))) => {
344                // Re-request blocks instead of performing regular block sync.
345                self.send_block_requests(requests, sync_peers).await;
346                return;
347            }
348            Ok(None) => {}
349            Err(err) => {
350                // Abort and retry later.
351                error!("{}", &flatten_error(err));
352                return;
353            }
354        }
355
356        // Do not attempt to sync if there are no blocks to sync, or we are too close to the tip.
357        if self.is_synced() {
358            return;
359        }
360
361        // Prepare the block requests, if any.
362        // In the process, we update the state of `is_block_synced` for the sync module.
363        let (requests, sync_peers) = self.block_sync.prepare_block_requests();
364
365        // If there are no block requests, return early.
366        if requests.is_empty() {
367            return;
368        }
369
370        // Send the block requests to peers.
371        self.send_block_requests(requests, sync_peers).await;
372    }
373
374    /// Test-only method to manually trigger block synchronization.
375    /// This combines both request generation and response processing for testing purposes.
376    #[cfg(test)]
377    pub(crate) async fn testing_only_try_block_sync_testing_only(&self) {
378        // First try issuing block requests
379        self.try_issuing_block_requests().await;
380
381        // Then try advancing with any available responses
382        self.try_advancing_block_synchronization(&None).await;
383    }
384}
385
386// Callbacks used when receiving messages from the Gateway
387impl<N: Network> Sync<N> {
388    /// We received a block response and can (possibly) advance synchronization.
389    async fn insert_block_response(
390        &self,
391        peer_ip: SocketAddr,
392        blocks: Vec<Block<N>>,
393        latest_consensus_version: Option<ConsensusVersion>,
394    ) -> Result<(), InsertBlockResponseError<N>> {
395        self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version)
396
397        // No need to advance block sync here, as the new response will
398        // notify the incoming task.
399    }
400
401    /// We received new peer locators during a Ping.
402    fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
403        self.block_sync.update_peer_locators(peer_ip, &locators)
404    }
405
406    /// A peer disconnected.
407    fn remove_peer(&self, peer_ip: SocketAddr) {
408        self.block_sync.remove_peer(&peer_ip);
409    }
410
411    #[cfg(test)]
412    pub fn testing_only_update_peer_locators_testing_only(
413        &self,
414        peer_ip: SocketAddr,
415        locators: BlockLocators<N>,
416    ) -> Result<()> {
417        self.update_peer_locators(peer_ip, locators)
418    }
419}
420
421// Methods to manage storage.
422impl<N: Network> Sync<N> {
423    /// Syncs the storage with the ledger at bootup.
424    ///
425    /// This is called when starting the validator and after finishing a sync without BFT.
426    async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
427        // Retrieve the latest block in the ledger.
428        let latest_block = self.ledger.latest_block();
429
430        // Retrieve the block height.
431        let block_height = latest_block.height();
432        // Determine the maximum number of blocks corresponding to rounds
433        // that would not have been garbage collected, i.e. that would be kept in storage.
434        // Since at most one block is created every two rounds,
435        // this is half of the maximum number of rounds kept in storage.
436        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
437        // Determine the earliest height of blocks corresponding to rounds kept in storage,
438        // conservatively set to the block height minus the maximum number of blocks calculated above.
439        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
440        let gc_height = block_height.saturating_sub(max_gc_blocks);
441        // Retrieve the blocks.
442        let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
443
444        // Acquire the sync lock.
445        let _lock = self.sync_lock.lock().await;
446
447        debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
448
449        /* Sync storage */
450
451        // Sync the height with the block.
452        self.storage.sync_height_with_block(latest_block.height());
453        // Sync the round with the block.
454        self.storage.sync_round_with_block(latest_block.round());
455        // Perform GC on the latest block round.
456        self.storage.garbage_collect_certificates(latest_block.round());
457        // Iterate over the blocks.
458        for block in &blocks {
459            // If the block authority is a sub-DAG, then sync the batch certificates with the block.
460            // Note that the block authority is always a sub-DAG in production;
461            // beacon signatures are only used for testing,
462            // and as placeholder (irrelevant) block authority in the genesis block.
463            if let Authority::Quorum(subdag) = block.authority() {
464                // Reconstruct the unconfirmed transactions.
465                let unconfirmed_transactions = cfg_iter!(block.transactions())
466                    .filter_map(|tx| {
467                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
468                    })
469                    .collect::<HashMap<_, _>>();
470
471                // Iterate over the certificates.
472                for certificates in subdag.values().cloned() {
473                    cfg_into_iter!(certificates).for_each(|certificate| {
474                        self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
475                    });
476                }
477
478                // Update the validator telemetry.
479                #[cfg(feature = "telemetry")]
480                self.gateway.validator_telemetry().insert_subdag(subdag);
481            }
482        }
483
484        /* Sync the BFT DAG */
485
486        // Construct a list of the certificates.
487        let certificates = blocks
488            .iter()
489            .flat_map(|block| {
490                match block.authority() {
491                    // If the block authority is a beacon, then skip the block.
492                    Authority::Beacon(_) => None,
493                    // If the block authority is a subdag, then retrieve the certificates.
494                    Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
495                }
496            })
497            .flatten()
498            .collect::<Vec<_>>();
499
500        // If a BFT sender was provided, send the certificates to the BFT.
501        if let Some(cb) = self.sync_callback.get() {
502            // Await the callback to continue.
503            cb.sync_dag_at_bootup(certificates).await;
504        }
505
506        self.block_sync.set_sync_height(block_height);
507
508        Ok(())
509    }
510
511    /// Returns which height we are synchronized to.
512    /// If there are queued block responses, this might be higher than the latest block in the ledger.
513    async fn compute_sync_height(&self) -> u32 {
514        let ledger_height = self.ledger.latest_block_height();
515        let mut pending_blocks = self.pending_blocks.lock().await;
516
517        // Remove any old responses.
518        while let Some(b) = pending_blocks.front()
519            && b.height() <= ledger_height
520        {
521            pending_blocks.pop_front();
522        }
523
524        // Ensure the returned value is always greater or equal than ledger height.
525        pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height)
526    }
527
528    /// BFT-version of [`snarkos_node_client::Client::try_advancing_block_synchronization`].
529    async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
530        // Process block responses and advance the ledger.
531        let new_blocks = match self
532            .try_advancing_block_synchronization_inner()
533            .await
534            .with_context(|| "Block synchronization failed")
535        {
536            Ok(new_blocks) => new_blocks,
537            Err(err) => {
538                error!("{}", &flatten_error(err));
539                false
540            }
541        };
542
543        if let Some(ping) = &ping
544            && new_blocks
545        {
546            match self.get_block_locators() {
547                Ok(locators) => ping.update_block_locators(locators),
548                Err(err) => error!("Failed to update block locators: {err}"),
549            }
550        }
551    }
552
553    /// Aims to advance synchronization using any recent block responses received from peers.
554    ///
555    /// This is the validator's version of `BlockSync::try_advancing_block_synchronization`
556    /// and is called periodically at runtime.
557    ///
558    /// This returns Ok(true) if we successfully advanced the ledger by at least one new block.
559    ///
560    /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network.
561    /// If blocks are not confirmed yet, they will be kept in [`Self::pending_blocks`].
562    /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected
563    /// (see [`Self::sync_storage_with_block`] for more details).
564    ///
565    /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead,
566    /// which syncs without updating the BFT state.
567    async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
568        // Acquire the response lock.
569        let _lock = self.response_lock.lock().await;
570
571        // For sanity, set the sync height again.
572        // (if the sync height is already larger or equal, this is a noop)
573        let ledger_height = self.ledger.latest_block_height();
574        self.block_sync.set_sync_height(ledger_height);
575
576        // Retrieve the maximum block height of the peers.
577        let tip = self
578            .block_sync
579            .find_sync_peers()
580            .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
581            .unwrap_or(0);
582
583        // Determine the maximum number of blocks corresponding to rounds
584        // that would not have been garbage collected, i.e. that would be kept in storage.
585        // Since at most one block is created every two rounds,
586        // this is half of the maximum number of rounds kept in storage.
587        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
588
589        // Updates sync state and returns the error (if any).
590        let cleanup = |start_height, current_height, error| {
591            let new_blocks = current_height > start_height;
592
593            // Make the underlying `BlockSync` instance aware of the new sync height.
594            if new_blocks {
595                self.block_sync.set_sync_height(current_height);
596            }
597
598            if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
599        };
600
601        // Determine the earliest height of blocks corresponding to rounds kept in storage,
602        // conservatively set to the block height minus the maximum number of blocks calculated above.
603        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
604        let max_gc_height = tip.saturating_sub(max_gc_blocks);
605        let within_gc = (ledger_height + 1) > max_gc_height;
606
607        if within_gc {
608            // Retrieve the current height, based on the ledger height and the
609            // (unconfirmed) blocks that are already queued up.
610            let start_height = self.compute_sync_height().await;
611
612            // The height is incremented as blocks are added.
613            let mut current_height = start_height;
614            trace!(
615                "Try advancing blocks responses with BFT (starting at block {current_height}, current sync speed is {})",
616                self.block_sync.get_sync_speed()
617            );
618
619            // If we already were within GC or successfully caught up with GC, try to advance BFT normally again.
620            loop {
621                let next_height = current_height + 1;
622                let Some(block) = self.block_sync.peek_next_block(next_height) else {
623                    break;
624                };
625                info!("Syncing the BFT to block {}...", block.height());
626                // Sync the storage with the block.
627                match self.sync_storage_with_block(block).await {
628                    Ok(_) => {
629                        // Update the current height if sync succeeds.
630                        current_height = next_height;
631                    }
632                    Err(err) => {
633                        // Mark the current height as processed in block_sync.
634                        self.block_sync.remove_block_response(next_height);
635                        return cleanup(start_height, current_height, Some(err));
636                    }
637                }
638            }
639
640            cleanup(start_height, current_height, None)
641        } else {
642            // For non-BFT sync we need to start at the current height of the ledger,as blocks are immediately
643            // added to it and not queue up in `latest_block_responses`.
644            let start_height = ledger_height;
645            let mut current_height = start_height;
646
647            trace!("Try advancing block responses without BFT (starting at block {current_height})");
648
649            // Try to advance the ledger *to tip* without updating the BFT.
650            // TODO(kaimast): why to tip and not to tip-GC?
651            loop {
652                let next_height = current_height + 1;
653
654                let Some(block) = self.block_sync.peek_next_block(next_height) else {
655                    break;
656                };
657                info!("Syncing the ledger to block {}...", block.height());
658
659                // Sync the ledger with the block without BFT.
660                match self.sync_ledger_with_block_without_bft(block).await {
661                    Ok(_) => {
662                        // Update the current height if sync succeeds.
663                        current_height = next_height;
664                        self.block_sync.count_request_completed();
665                    }
666                    Err(err) => {
667                        // Mark the current height as processed in block_sync.
668                        self.block_sync.remove_block_response(next_height);
669                        return cleanup(start_height, current_height, Some(err));
670                    }
671                }
672            }
673
674            // Sync the storage with the ledger if we should transition to the BFT sync.
675            let within_gc = (current_height + 1) > max_gc_height;
676            if within_gc {
677                info!("Finished catching up with the network. Switching back to BFT sync.");
678                self.sync_storage_with_ledger_at_bootup()
679                    .await
680                    .with_context(|| "BFT sync (with bootup routine) failed")?;
681            }
682
683            cleanup(start_height, current_height, None)
684        }
685    }
686
687    /// Syncs the ledger with the given block without updating the BFT.
688    ///
689    /// This is only used by `[Self::try_advancing_block_synchronization`].
690    async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
691        // Acquire the sync lock.
692        let _lock = self.sync_lock.lock().await;
693
694        let self_ = self.clone();
695        spawn_blocking!({
696            // Check the next block.
697            self_.ledger.check_next_block(&block)?;
698            // Attempt to advance to the next block.
699            self_.ledger.advance_to_next_block(&block)?;
700
701            // Sync the height with the block.
702            self_.storage.sync_height_with_block(block.height());
703            // Sync the round with the block.
704            self_.storage.sync_round_with_block(block.round());
705            // Mark the block height as processed in block_sync.
706            self_.block_sync.remove_block_response(block.height());
707
708            Ok(())
709        })
710    }
711
712    /// Helper function for [`Self::sync_storage_with_block`].
713    /// It syncs the batch certificates with the BFT, if the block's authority is a sub-DAG.
714    ///
715    /// Note that the block authority is always a sub-DAG in production; beacon signatures are only used for testing,
716    /// and as placeholder (irrelevant) block authority in the genesis block.
717    async fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> {
718        // Nothing to do if this is a beacon block
719        let Authority::Quorum(subdag) = block.authority() else {
720            return Ok(());
721        };
722
723        // Reconstruct the unconfirmed transactions.
724        let unconfirmed_transactions = cfg_iter!(block.transactions())
725            .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok())
726            .collect::<HashMap<_, _>>();
727
728        // Iterate over the certificates.
729        for certificates in subdag.values().cloned() {
730            cfg_into_iter!(certificates.clone()).for_each(|certificate| {
731                // Sync the batch certificate with the block.
732                self.storage.sync_certificate_with_block(block, certificate.clone(), &unconfirmed_transactions);
733            });
734
735            // Sync the BFT DAG with the certificates.
736            if let Some(cb) = self.sync_callback.get() {
737                for certificate in certificates {
738                    // If a BFT sender was provided, send the certificate to the BFT.
739                    // For validators, BFT spawns a receiver task in `BFT::start_handlers`.
740                    cb.add_new_certificate(certificate).await.with_context(|| "Failed to sync certificate")?;
741                }
742            }
743        }
744
745        Ok(())
746    }
747
748    /// Helper function for [`Self::sync_storage_with_block`].
749    ///
750    /// It checks that successor of a given block contains enough votes to commit it.
751    /// This can only return `Ok(true)` if the certificates of the block's successor were added to the storage.
752    fn is_block_availability_threshold_reached(&self, block: &PendingBlock<N>) -> Result<bool> {
753        // Fetch the leader certificate and the relevant rounds.
754        let leader_certificate = match block.authority() {
755            Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
756            _ => bail!("Received a block with an unexpected authority type."),
757        };
758        let commit_round = leader_certificate.round();
759        let certificate_round =
760            commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
761
762        // Get the committee lookback for the round just after the leader.
763        let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
764        // Retrieve all of the certificates for the round just after the leader.
765        let certificates = self.storage.get_certificates_for_round(certificate_round);
766        // Construct a set over the authors, at the round just after the leader,
767        // who included the leader's certificate in their previous certificate IDs.
768        let authors = certificates
769            .iter()
770            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
771                true => Some(c.author()),
772                false => None,
773            })
774            .collect();
775
776        // Check if the leader is ready to be committed.
777        if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
778            trace!(
779                "Block {hash} at height {height} has reached availability threshold",
780                hash = block.hash(),
781                height = block.height()
782            );
783            Ok(true)
784        } else {
785            Ok(false)
786        }
787    }
788
789    /// Advances the ledger by the given block and updates the storage accordingly.
790    ///
791    /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
792    /// meets the voter availability threshold (i.e. > f voting stake)
793    /// or is reachable via a DAG path from a later leader certificate that does.
794    /// Since performing this check requires DAG certificates from later blocks,
795    /// the block is stored in `Sync::pending_blocks`,
796    /// and its addition to the ledger is deferred until the check passes.
797    /// Several blocks may be stored in `Sync::pending_blocks`
798    /// before they can be all checked and added to the ledger.
799    ///
800    /// # Usage
801    /// This function assumes that blocks are passed in order, i.e.,
802    /// that the given block is a direct successor of the block that was last passed to this function.
803    async fn sync_storage_with_block(&self, new_block: Block<N>) -> Result<()> {
804        // Acquire the sync lock.
805        let _lock = self.sync_lock.lock().await;
806        let new_block_height = new_block.height();
807
808        // If this block has already been processed, return early.
809        // TODO(kaimast): Should we remove the response here?
810        if self.ledger.contains_block_height(new_block.height()) {
811            debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
812            return Ok(());
813        }
814
815        // Acquire the pending blocks lock.
816        let mut pending_blocks = self.pending_blocks.lock().await;
817
818        // Append the certificates to the storage.
819        self.add_block_subdag_to_bft(&new_block).await?;
820
821        // Fetch the latest block height.
822        let ledger_block_height = self.ledger.latest_block_height();
823
824        // First, clear any older pending blocks.
825        // TODO(kaimast): ensure there are no dangling block requests
826        while let Some(pending_block) = pending_blocks.front() {
827            if pending_block.height() > ledger_block_height {
828                break;
829            }
830
831            pending_blocks.pop_front();
832        }
833
834        if let Some(tail) = pending_blocks.back() {
835            if tail.height() >= new_block.height() {
836                debug!(
837                    "A unconfirmed block is queued already for height {height}. \
838                    Will not sync.",
839                    height = new_block.height()
840                );
841                return Ok(());
842            }
843
844            ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block");
845        }
846
847        // Fetch the latest block height.
848        let ledger_block_height = self.ledger.latest_block_height();
849
850        // Clear any older pending blocks.
851        // TODO(kaimast): ensure there are no dangling block requests
852        while let Some(pending_block) = pending_blocks.front() {
853            if pending_block.height() > ledger_block_height {
854                break;
855            }
856
857            trace!(
858                "Pending block {hash} at height {height} became obsolete",
859                hash = pending_block.hash(),
860                height = pending_block.height()
861            );
862            pending_blocks.pop_front();
863        }
864
865        // Check the block against the chain of pending blocks and append it on success.
866        let new_block = match self.ledger.check_block_subdag(new_block, pending_blocks.make_contiguous()) {
867            Ok(new_block) => new_block,
868            Err(err) => {
869                // TODO(kaimast): this shoud not return an error on the snarkVM side.
870                if err.to_string().contains("already in the ledger") {
871                    debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
872
873                    return Ok(());
874                } else {
875                    return Err(err.into());
876                }
877            }
878        };
879
880        trace!(
881            "Adding new pending block {hash} at height {height}",
882            hash = new_block.hash(),
883            height = new_block.height()
884        );
885        pending_blocks.push_back(new_block);
886
887        // Now, figure out if and which pending block we can commit.
888        // To do this effectively and because commits are transitive,
889        // we iterate in reverse so that we can stop at the first successful check.
890        //
891        // Note, that if the storage already contains certificates for the round after new block,
892        // the availability threshold for the new block could also be reached.
893        let mut commit_height = None;
894        for block in pending_blocks.iter().rev() {
895            // This check assumes that the pending blocks are properly linked together, based on the fact that,
896            // to generate the sequence of `PendingBlocks`, each block needs to successfully be processed by `Ledger::check_block_subdag`.
897            // As a result, the safety of this piece of code relies on the correctness `Ledger::check_block_subdag`,
898            // which is tested in `snarkvm/ledger/tests/pending_block.rs`.
899            if self
900                .is_block_availability_threshold_reached(block)
901                .with_context(|| "Availability threshold check failed")?
902            {
903                commit_height = Some(block.height());
904                break;
905            }
906        }
907
908        if let Some(commit_height) = commit_height {
909            let start_height = ledger_block_height + 1;
910            ensure!(commit_height >= start_height, "Invalid commit height");
911            let num_blocks = (commit_height - start_height + 1) as usize;
912
913            // Create a more detailed log message if we are committing more than one block at a time.
914            if num_blocks > 1 {
915                trace!(
916                    "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.",
917                    chain_length = pending_blocks.len(),
918                );
919            }
920
921            for pending_block in pending_blocks.drain(0..num_blocks) {
922                let hash = pending_block.hash();
923                let height = pending_block.height();
924                let ledger = self.ledger.clone();
925                let storage = self.storage.clone();
926
927                spawn_blocking!({
928                    let block = ledger.check_block_content(pending_block).with_context(|| {
929                        format!("Failed to check contents of pending block {hash} at height {height}")
930                    })?;
931
932                    trace!("Adding pending block {hash} at height {height} to the ledger");
933                    ledger.advance_to_next_block(&block)?;
934                    // Sync the height with the block.
935                    storage.sync_height_with_block(block.height());
936                    // Sync the round with the block.
937                    storage.sync_round_with_block(block.round());
938
939                    Ok(())
940                })?
941            }
942        } else {
943            trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len());
944        }
945
946        Ok(())
947    }
948}
949
950// Methods to assist with the block sync module.
951impl<N: Network> Sync<N> {
952    /// Returns `true` if the node is synced and has connected peers.
953    pub fn is_synced(&self) -> bool {
954        // Ensure the validator is connected to other validators,
955        // not just clients.
956        if self.gateway.number_of_connected_peers() == 0 {
957            return false;
958        }
959
960        self.block_sync.is_block_synced()
961    }
962
963    /// Returns the number of blocks the node is behind the greatest peer height.
964    pub fn num_blocks_behind(&self) -> Option<u32> {
965        self.block_sync.num_blocks_behind()
966    }
967
968    /// Returns the current block locators of the node.
969    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
970        self.block_sync.get_block_locators()
971    }
972}
973
974// Methods to assist with fetching batch certificates from peers.
975impl<N: Network> Sync<N> {
976    /// Sends a certificate request to the specified peer.
977    pub async fn send_certificate_request(
978        &self,
979        peer_ip: SocketAddr,
980        certificate_id: Field<N>,
981    ) -> Result<BatchCertificate<N>> {
982        // Initialize a oneshot channel.
983        let (callback_sender, callback_receiver) = oneshot::channel();
984        // Determine how many sent requests are pending.
985        let num_sent_requests = self.pending.num_sent_requests(certificate_id);
986        // Determine if we've already sent a request to the peer.
987        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
988        // Determine the maximum number of redundant requests.
989        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
990        // Determine if we should send a certificate request to the peer.
991        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
992        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
993
994        // Insert the certificate ID into the pending queue.
995        self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
996
997        // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
998        if should_send_request {
999            // Send the certificate request to the peer.
1000            if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
1001                bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)")
1002            }
1003        } else {
1004            debug!(
1005                "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
1006                fmt_id(certificate_id)
1007            );
1008        }
1009        // Wait for the certificate to be fetched.
1010        // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
1011        tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver)
1012            .await
1013            .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))?
1014            .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id)))
1015    }
1016
1017    /// Handles the incoming certificate request.
1018    fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
1019        // Attempt to retrieve the certificate.
1020        if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
1021            // Send the certificate response to the peer.
1022            let self_ = self.clone();
1023            tokio::spawn(async move {
1024                let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
1025            });
1026        }
1027    }
1028
1029    /// Handles the incoming certificate response.
1030    /// This method ensures the certificate response is well-formed and matches the certificate ID.
1031    fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
1032        let certificate = response.certificate;
1033        // Check if the peer IP exists in the pending queue for the given certificate ID.
1034        let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
1035        // If the peer IP exists, finish the pending request.
1036        if exists {
1037            // TODO: Validate the certificate.
1038            // Remove the certificate ID from the pending queue.
1039            self.pending.remove(certificate.id(), Some(certificate));
1040        }
1041    }
1042}
1043
1044impl<N: Network> Sync<N> {
1045    /// Spawns a task with the given future; it should only be used for long-running tasks.
1046    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1047        self.handles.lock().push(tokio::spawn(future));
1048    }
1049
1050    /// Shuts down the primary.
1051    pub async fn shut_down(&self) {
1052        info!("Shutting down the sync module...");
1053        // Remove the callback.
1054        self.sync_callback.clear();
1055        // Acquire the response lock.
1056        let _lock = self.response_lock.lock().await;
1057        // Acquire the sync lock.
1058        let _lock = self.sync_lock.lock().await;
1059        // Abort the tasks.
1060        self.handles.lock().iter().for_each(|handle| handle.abort());
1061    }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use super::*;
1067
1068    use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
1069
1070    use snarkos_account::Account;
1071    use snarkos_node_sync::BlockSync;
1072    use snarkos_utilities::{NodeDataDir, SimpleStoppable};
1073
1074    use snarkvm::{
1075        console::{
1076            account::{Address, PrivateKey},
1077            network::MainnetV0,
1078        },
1079        ledger::{
1080            narwhal::{BatchCertificate, BatchHeader, Subdag},
1081            store::{ConsensusStore, helpers::memory::ConsensusMemory},
1082        },
1083        prelude::{Ledger, VM},
1084        utilities::TestRng,
1085    };
1086
1087    use aleo_std::StorageMode;
1088    use indexmap::IndexSet;
1089    use rand::Rng;
1090    use std::collections::BTreeMap;
1091
1092    type CurrentNetwork = MainnetV0;
1093    type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1094    type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1095
1096    /// Tests that commits work as expected when some anchors are not committed immediately.
1097    #[tokio::test]
1098    #[tracing_test::traced_test]
1099    async fn test_commit_chain() -> anyhow::Result<()> {
1100        let rng = &mut TestRng::default();
1101        // Initialize the round parameters.
1102        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1103
1104        // The first round of the first block.
1105        let first_round = 1;
1106        // The total number of blocks we test
1107        let num_blocks = 3;
1108        // The number of certificate rounds needed.
1109        // There is one additional round to provide availability for the inal block.
1110        let num_rounds = first_round + num_blocks * 2 + 1;
1111        // The first round that has at least N-f certificates referencing the anchor from the previous round.
1112        // This is also the last round we use in the test.
1113        let first_committed_round = num_rounds - 1;
1114
1115        // Initialize the store.
1116        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1117        let account: Account<CurrentNetwork> = Account::new(rng)?;
1118
1119        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1120        let seed: u64 = rng.r#gen();
1121        let vm = VM::from(store).unwrap();
1122        let genesis_pk = *account.private_key();
1123        let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1124
1125        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1126        let genesis_rng = &mut TestRng::from_seed(seed);
1127        let private_keys = [
1128            *account.private_key(),
1129            PrivateKey::new(genesis_rng)?,
1130            PrivateKey::new(genesis_rng)?,
1131            PrivateKey::new(genesis_rng)?,
1132        ];
1133
1134        // Initialize the ledger with the genesis block.
1135        let genesis_clone = genesis.clone();
1136        let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap();
1137        // Initialize the ledger.
1138        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1139
1140        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1141        let (round_to_certificates_map, committee) = {
1142            let addresses = [
1143                Address::try_from(private_keys[0])?,
1144                Address::try_from(private_keys[1])?,
1145                Address::try_from(private_keys[2])?,
1146                Address::try_from(private_keys[3])?,
1147            ];
1148
1149            let committee = ledger.latest_committee().unwrap();
1150
1151            // Initialize a mapping from the round number to the set of batch certificates in the round.
1152            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1153                HashMap::new();
1154            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1155
1156            for round in first_round..=first_committed_round {
1157                let mut current_certificates = IndexSet::new();
1158                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1159                    IndexSet::new()
1160                } else {
1161                    previous_certificates.iter().map(|c| c.id()).collect()
1162                };
1163
1164                let committee_id = committee.id();
1165                let prev_leader = committee.get_leader(round - 1).unwrap();
1166
1167                // For the first two blocks non-leaders will not reference the leader certificate.
1168                // This means, while there is an anchor, it is isn't committed
1169                // until later.
1170                for (i, private_key) in private_keys.iter().enumerate() {
1171                    let leader_index = addresses.iter().position(|&address| address == prev_leader).unwrap();
1172                    let is_certificate_round = round % 2 == 1;
1173                    let is_leader = i == leader_index;
1174
1175                    let previous_certs = if round < first_committed_round && is_certificate_round && !is_leader {
1176                        previous_certificate_ids
1177                            .iter()
1178                            .cloned()
1179                            .enumerate()
1180                            .filter(|(idx, _)| *idx != leader_index)
1181                            .map(|(_, id)| id)
1182                            .collect()
1183                    } else {
1184                        previous_certificate_ids.clone()
1185                    };
1186
1187                    let batch_header = BatchHeader::new(
1188                        private_key,
1189                        round,
1190                        now(),
1191                        committee_id,
1192                        Default::default(),
1193                        previous_certs,
1194                        rng,
1195                    )
1196                    .unwrap();
1197
1198                    // Sign the batch header.
1199                    let mut signatures = IndexSet::with_capacity(4);
1200                    for (j, private_key_2) in private_keys.iter().enumerate() {
1201                        if i != j {
1202                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1203                        }
1204                    }
1205                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1206                }
1207
1208                // Update the map of certificates.
1209                round_to_certificates_map.insert(round, current_certificates.clone());
1210                previous_certificates = current_certificates;
1211            }
1212            (round_to_certificates_map, committee)
1213        };
1214
1215        // Initialize the storage.
1216        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1217        // Insert all certificates into storage.
1218        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1219        for i in first_round..=first_committed_round {
1220            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1221            certificates.extend(c);
1222        }
1223        for certificate in certificates.clone().iter() {
1224            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1225        }
1226
1227        // Create the blocks
1228        let mut previous_leader_cert = None;
1229        let mut blocks = vec![];
1230
1231        for block_height in 1..=num_blocks {
1232            let leader_round = block_height * 2;
1233
1234            let leader = committee.get_leader(leader_round).unwrap();
1235            let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap();
1236
1237            let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1238            let mut leader_cert_map = IndexSet::new();
1239            leader_cert_map.insert(leader_certificate.clone());
1240
1241            let previous_cert_map = storage.get_certificates_for_round(leader_round - 1);
1242
1243            subdag_map.insert(leader_round, leader_cert_map.clone());
1244            subdag_map.insert(leader_round - 1, previous_cert_map.clone());
1245
1246            if leader_round > 2 {
1247                let previous_commit_cert_map: IndexSet<_> = storage
1248                    .get_certificates_for_round(leader_round - 2)
1249                    .into_iter()
1250                    .filter(|cert| {
1251                        if let Some(previous_leader_cert) = &previous_leader_cert {
1252                            cert != previous_leader_cert
1253                        } else {
1254                            true
1255                        }
1256                    })
1257                    .collect();
1258                subdag_map.insert(leader_round - 2, previous_commit_cert_map);
1259            }
1260
1261            let subdag = Subdag::from(subdag_map.clone())?;
1262            previous_leader_cert = Some(leader_certificate);
1263
1264            let core_ledger = core_ledger.clone();
1265            let block = spawn_blocking!({
1266                let block = core_ledger.clone().prepare_advance_to_next_quorum_block(subdag, Default::default())?;
1267                core_ledger.advance_to_next_block(&block)?;
1268                Ok(block)
1269            })?;
1270
1271            blocks.push(block);
1272        }
1273
1274        // ### Test that sync works as expected ###
1275        let storage_mode = StorageMode::new_test(None);
1276
1277        // Create a new ledger to test with, but use the existing storage
1278        // so that the certificates exist.
1279        let syncing_ledger = {
1280            let storage_mode = storage_mode.clone();
1281            Arc::new(CoreLedgerService::new(
1282                spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1283                SimpleStoppable::new(),
1284            ))
1285        };
1286
1287        // Set up sync and its dependencies.
1288        let gateway = Gateway::new(
1289            account.clone(),
1290            storage.clone(),
1291            syncing_ledger.clone(),
1292            None,
1293            &[],
1294            false,
1295            NodeDataDir::new_test(None),
1296            None,
1297        )?;
1298        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1299        let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1300
1301        let mut block_iter = blocks.into_iter();
1302
1303        // Insert the blocks into the new sync module
1304        for _ in 0..num_blocks - 1 {
1305            let block = block_iter.next().unwrap();
1306            sync.sync_storage_with_block(block).await?;
1307
1308            // Availability threshold is not met, so we should not advance yet.
1309            assert_eq!(syncing_ledger.latest_block_height(), 0);
1310        }
1311
1312        // Only for the final block, the availability threshold is met,
1313        // because certificates for the subsequent round are already in storage.
1314        sync.sync_storage_with_block(block_iter.next().unwrap()).await?;
1315        assert_eq!(syncing_ledger.latest_block_height(), 3);
1316
1317        // Ensure blocks 1 and 2 were added to the ledger.
1318        assert!(syncing_ledger.contains_block_height(1));
1319        assert!(syncing_ledger.contains_block_height(2));
1320
1321        Ok(())
1322    }
1323
1324    #[tokio::test]
1325    #[tracing_test::traced_test]
1326    async fn test_pending_certificates() -> anyhow::Result<()> {
1327        let rng = &mut TestRng::default();
1328        // Initialize the round parameters.
1329        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1330        let commit_round = 2;
1331
1332        // Initialize the store.
1333        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1334        let account: Account<CurrentNetwork> = Account::new(rng)?;
1335
1336        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1337        let seed: u64 = rng.r#gen();
1338        let vm = VM::from(store).unwrap();
1339        let genesis_pk = *account.private_key();
1340        let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1341
1342        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1343        let genesis_rng = &mut TestRng::from_seed(seed);
1344        let private_keys = [
1345            *account.private_key(),
1346            PrivateKey::new(genesis_rng)?,
1347            PrivateKey::new(genesis_rng)?,
1348            PrivateKey::new(genesis_rng)?,
1349        ];
1350        // Initialize the ledger with the genesis block.
1351        let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap();
1352        // Initialize the ledger.
1353        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1354        // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1355        let (round_to_certificates_map, committee) = {
1356            // Initialize the committee.
1357            let committee = core_ledger.current_committee().unwrap();
1358            // Initialize a mapping from the round number to the set of batch certificates in the round.
1359            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1360                HashMap::new();
1361            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1362
1363            for round in 0..=commit_round + 8 {
1364                let mut current_certificates = IndexSet::new();
1365                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1366                    IndexSet::new()
1367                } else {
1368                    previous_certificates.iter().map(|c| c.id()).collect()
1369                };
1370                let committee_id = committee.id();
1371                // Create a certificate for each validator.
1372                for (i, private_key_1) in private_keys.iter().enumerate() {
1373                    let batch_header = BatchHeader::new(
1374                        private_key_1,
1375                        round,
1376                        now(),
1377                        committee_id,
1378                        Default::default(),
1379                        previous_certificate_ids.clone(),
1380                        rng,
1381                    )
1382                    .unwrap();
1383                    // Sign the batch header.
1384                    let mut signatures = IndexSet::with_capacity(4);
1385                    for (j, private_key_2) in private_keys.iter().enumerate() {
1386                        if i != j {
1387                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1388                        }
1389                    }
1390                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1391                }
1392
1393                // Update the map of certificates.
1394                round_to_certificates_map.insert(round, current_certificates.clone());
1395                previous_certificates = current_certificates.clone();
1396            }
1397            (round_to_certificates_map, committee)
1398        };
1399
1400        // Initialize the storage.
1401        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1402        // Insert certificates into storage.
1403        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1404        for i in 1..=commit_round + 8 {
1405            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1406            certificates.extend(c);
1407        }
1408        for certificate in certificates.clone().iter() {
1409            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1410        }
1411        // Create block 1.
1412        let leader_round_1 = commit_round;
1413        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1414        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1415        let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1416        let block_1 = {
1417            let mut leader_cert_map = IndexSet::new();
1418            leader_cert_map.insert(leader_certificate.clone());
1419            let mut previous_cert_map = IndexSet::new();
1420            for cert in storage.get_certificates_for_round(commit_round - 1) {
1421                previous_cert_map.insert(cert);
1422            }
1423            subdag_map.insert(commit_round, leader_cert_map.clone());
1424            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1425            let subdag = Subdag::from(subdag_map.clone())?;
1426            let ledger = core_ledger.clone();
1427            spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag, Default::default()))?
1428        };
1429        // Insert block 1.
1430        let ledger = core_ledger.clone();
1431        let block = block_1.clone();
1432        spawn_blocking!(ledger.advance_to_next_block(&block))?;
1433
1434        // Create block 2.
1435        let leader_round_2 = commit_round + 2;
1436        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1437        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1438        let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1439        let block_2 = {
1440            let mut leader_cert_map_2 = IndexSet::new();
1441            leader_cert_map_2.insert(leader_certificate_2.clone());
1442            let mut previous_cert_map_2 = IndexSet::new();
1443            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1444                previous_cert_map_2.insert(cert);
1445            }
1446            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1447            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1448            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1449            let ledger = core_ledger.clone();
1450            spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default()))?
1451        };
1452        // Insert block 2.
1453        let ledger = core_ledger.clone();
1454        let block = block_2.clone();
1455        spawn_blocking!(ledger.advance_to_next_block(&block))?;
1456
1457        // Create block 3
1458        let leader_round_3 = commit_round + 4;
1459        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1460        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1461        let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1462        let block_3 = {
1463            let mut leader_cert_map_3 = IndexSet::new();
1464            leader_cert_map_3.insert(leader_certificate_3.clone());
1465            let mut previous_cert_map_3 = IndexSet::new();
1466            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1467                previous_cert_map_3.insert(cert);
1468            }
1469            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1470            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1471            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1472            let ledger = core_ledger.clone();
1473            spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default()))?
1474        };
1475        // Insert block 3.
1476        let ledger = core_ledger.clone();
1477        let block = block_3.clone();
1478        spawn_blocking!(ledger.advance_to_next_block(&block))?;
1479
1480        /*
1481            Check that the pending certificates are computed correctly.
1482        */
1483
1484        // Retrieve the pending certificates.
1485        let pending_certificates = storage.get_pending_certificates();
1486        // Check that all of the pending certificates are not contained in the ledger.
1487        for certificate in pending_certificates.clone() {
1488            assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1489        }
1490        // Initialize an empty set to be populated with the committed certificates in the block subdags.
1491        let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1492        {
1493            let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1494            for subdag in subdag_maps.iter() {
1495                for subdag_certificates in subdag.values() {
1496                    committed_certificates.extend(subdag_certificates.iter().cloned());
1497                }
1498            }
1499        };
1500        // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1501        let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1502        for certificate in certificates.clone() {
1503            if !committed_certificates.contains(&certificate) {
1504                candidate_pending_certificates.insert(certificate);
1505            }
1506        }
1507        // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1508        assert_eq!(pending_certificates, candidate_pending_certificates);
1509
1510        Ok(())
1511    }
1512}