snarkos_node_bft/sync/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_FETCH_TIMEOUT_IN_MS,
19    PRIMARY_PING_IN_MS,
20    Transport,
21    events::DataBlocks,
22    helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
23    spawn_blocking,
24};
25use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
26use snarkos_node_bft_ledger_service::LedgerService;
27use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
28use snarkvm::{
29    console::{network::Network, types::Field},
30    ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31    prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, anyhow, bail};
35use indexmap::IndexMap;
36#[cfg(feature = "locktick")]
37use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
38#[cfg(not(feature = "locktick"))]
39use parking_lot::Mutex;
40#[cfg(not(feature = "serial"))]
41use rayon::prelude::*;
42use std::{
43    collections::{BTreeMap, HashMap},
44    future::Future,
45    net::SocketAddr,
46    sync::Arc,
47    time::Duration,
48};
49#[cfg(not(feature = "locktick"))]
50use tokio::sync::Mutex as TMutex;
51use tokio::{
52    sync::{OnceCell, oneshot},
53    task::JoinHandle,
54};
55
56/// Block synchronization logic for validators.
57///
58/// Synchronization works differently for nodes that act as validators in AleoBFT;
59/// In the common case, validators generate blocks after receiving an anchor block that has been accepted
60/// by a supermajority of the committee instead of fetching entire blocks from other nodes.
61/// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes.
62///
63/// This struct also manages fetching certificates from other validators during normal operation,
64/// and blocks when falling behind.
65///
66/// Finally, `Sync` handles synchronization of blocks with the validator's local storage:
67/// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them.
68#[derive(Clone)]
69pub struct Sync<N: Network> {
70    /// The gateway enables communication with other validators.
71    gateway: Gateway<N>,
72    /// The storage.
73    storage: Storage<N>,
74    /// The ledger service.
75    ledger: Arc<dyn LedgerService<N>>,
76    /// The block synchronization logic.
77    block_sync: Arc<BlockSync<N>>,
78    /// The pending certificates queue.
79    pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
80    /// The BFT sender.
81    bft_sender: Arc<OnceCell<BFTSender<N>>>,
82    /// Handles to the spawned background tasks.
83    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
84    /// The response lock.
85    response_lock: Arc<TMutex<()>>,
86    /// The sync lock. Ensures that only one task syncs the ledger at a time.
87    sync_lock: Arc<TMutex<()>>,
88    /// The latest block responses.
89    ///
90    /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks  whose addition to the ledger is
91    /// deferred until certain checks pass.
92    /// Blocks need to be processed in order, hence a BTree map.
93    ///
94    /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called.
95    latest_block_responses: Arc<TMutex<BTreeMap<u32, Block<N>>>>,
96}
97
98impl<N: Network> Sync<N> {
99    /// Initializes a new sync instance.
100    pub fn new(
101        gateway: Gateway<N>,
102        storage: Storage<N>,
103        ledger: Arc<dyn LedgerService<N>>,
104        block_sync: Arc<BlockSync<N>>,
105    ) -> Self {
106        // Return the sync instance.
107        Self {
108            gateway,
109            storage,
110            ledger,
111            block_sync,
112            pending: Default::default(),
113            bft_sender: Default::default(),
114            handles: Default::default(),
115            response_lock: Default::default(),
116            sync_lock: Default::default(),
117            latest_block_responses: Default::default(),
118        }
119    }
120
121    /// Initializes the sync module and sync the storage with the ledger at bootup.
122    pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
123        // If a BFT sender was provided, set it.
124        if let Some(bft_sender) = bft_sender {
125            self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
126        }
127
128        info!("Syncing storage with the ledger...");
129
130        // Sync the storage with the ledger.
131        self.sync_storage_with_ledger_at_bootup().await?;
132
133        debug!("Finished initial block synchronization at startup");
134        Ok(())
135    }
136
137    /// Sends the given batch of block requests to peers.
138    ///
139    /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`.
140    #[inline]
141    async fn send_block_requests(
142        &self,
143
144        block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
145        sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
146    ) {
147        trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
148
149        // Sends the block requests to the sync peers.
150        for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
151            if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
152                // Stop if we fail to process a batch of requests.
153                break;
154            }
155
156            // Sleep to avoid triggering spam detection.
157            tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
158        }
159    }
160
161    /// Starts the sync module.
162    ///
163    /// When this function returns successfully, the sync module will have spawned background tasks
164    /// that fetch blocks from other validators.
165    pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
166        info!("Starting the sync module...");
167
168        // Start the block sync loop.
169        let self_ = self.clone();
170        self.spawn(async move {
171            // Sleep briefly to allow an initial primary ping to come in prior to entering the loop.
172            // Ideally, a node does not consider itself synced when it has not received
173            // any block locators from peers. However, in the initial bootup of validators,
174            // this needs to happen, so we use this additional sleep as a grace period.
175            tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
176            loop {
177                // Sleep briefly to avoid triggering spam detection.
178                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
179
180                let new_blocks = self_.try_block_sync().await;
181                if new_blocks {
182                    if let Some(ping) = &ping {
183                        match self_.get_block_locators() {
184                            Ok(locators) => ping.update_block_locators(locators),
185                            Err(err) => error!("Failed to update block locators: {err}"),
186                        }
187                    }
188                }
189            }
190        });
191
192        // Start the pending queue expiration loop.
193        let self_ = self.clone();
194        self.spawn(async move {
195            loop {
196                // Sleep briefly.
197                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
198
199                // Remove the expired pending transmission requests.
200                let self__ = self_.clone();
201                let _ = spawn_blocking!({
202                    self__.pending.clear_expired_callbacks();
203                    Ok(())
204                });
205            }
206        });
207
208        /* Set up callbacks for events from the Gateway */
209
210        // Retrieve the sync receiver.
211        let SyncReceiver {
212            mut rx_block_sync_advance_with_sync_blocks,
213            mut rx_block_sync_remove_peer,
214            mut rx_block_sync_update_peer_locators,
215            mut rx_certificate_request,
216            mut rx_certificate_response,
217        } = sync_receiver;
218
219        // Process the block sync request to advance with sync blocks.
220        // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
221        // which is initially handled by [`Gateway::inbound()`],
222        // which calls [`SyncSender::advance_with_sync_blocks()`],
223        // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
224        // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
225        let self_ = self.clone();
226        self.spawn(async move {
227            while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
228                callback.send(self_.advance_with_sync_blocks(peer_ip, blocks).await).ok();
229            }
230        });
231
232        // Process the block sync request to remove the peer.
233        let self_ = self.clone();
234        self.spawn(async move {
235            while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
236                self_.remove_peer(peer_ip);
237            }
238        });
239
240        // Process each block sync request to update peer locators.
241        // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
242        // which is initially handled by [`Gateway::inbound()`],
243        // which calls [`SyncSender::update_peer_locators()`],
244        // which calls [`tx_block_sync_update_peer_locators.send()`],
245        // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
246        let self_ = self.clone();
247        self.spawn(async move {
248            while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
249                let self_clone = self_.clone();
250                tokio::spawn(async move {
251                    callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
252                });
253            }
254        });
255
256        // Process each certificate request.
257        // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
258        // which is initially handled by [`Gateway::inbound()`],
259        // which calls [`tx_certificate_request.send()`],
260        // which causes the `rx_certificate_request.recv()` call below to return.
261        let self_ = self.clone();
262        self.spawn(async move {
263            while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
264                self_.send_certificate_response(peer_ip, certificate_request);
265            }
266        });
267
268        // Process each certificate response.
269        // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
270        // which is initially handled by [`Gateway::inbound()`],
271        // which calls [`tx_certificate_response.send()`],
272        // which causes the `rx_certificate_response.recv()` call below to return.
273        let self_ = self.clone();
274        self.spawn(async move {
275            while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
276                self_.finish_certificate_request(peer_ip, certificate_response);
277            }
278        });
279
280        Ok(())
281    }
282
283    /// Execute one iteration of block synchronization.
284    ///
285    /// Returns true if we made progress. Returning true does *not* imply being fully synced.
286    ///
287    /// This is called periodically by a tokio background task spawned in `Self::run`.
288    /// Some unit tests also call this function directly to manually trigger block synchronization.
289    pub(crate) async fn try_block_sync(&self) -> bool {
290        // Check if any existing requests can be removed.
291        // We should do this even if we cannot block sync, to ensure
292        // there are no dangling block requests.
293        let new_requests = self.block_sync.handle_block_request_timeouts(&self.gateway);
294        if let Some((sync_peers, requests)) = new_requests {
295            self.send_block_requests(sync_peers, requests).await;
296        }
297
298        // Do not attempt to sync if there are no blocks to sync.
299        // This prevents redundant log messages and performing unnecessary computation.
300        if !self.block_sync.can_block_sync() {
301            trace!("No blocks to sync");
302            return false;
303        }
304
305        // Prepare the block requests, if any.
306        // In the process, we update the state of `is_block_synced` for the sync module.
307        let (sync_peers, requests) = self.block_sync.prepare_block_requests();
308        self.send_block_requests(sync_peers, requests).await;
309
310        // Sync the storage with the blocks
311        match self.try_advancing_block_synchronization().await {
312            Ok(new_blocks) => new_blocks,
313            Err(err) => {
314                error!("Block synchronization failed - {err}");
315                false
316            }
317        }
318    }
319}
320
321// Callbacks used when receiving messages from the Gateway
322impl<N: Network> Sync<N> {
323    /// We received a block response and can (possibly) advance synchronization.
324    async fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
325        // Verify that the response is valid and add it to block sync.
326        self.block_sync.insert_block_responses(peer_ip, blocks)?;
327
328        // Try to process responses stored in BlockSync.
329        // Note: Do not call `self.block_sync.try_advancing_block_synchronziation` here as it will process
330        // and remove any completed requests, which means the call to `sync_storage_with_blocks` will not process
331        // them as expected.
332        self.try_advancing_block_synchronization().await?;
333
334        Ok(())
335    }
336
337    /// We received new peer locators during a Ping.
338    fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
339        self.block_sync.update_peer_locators(peer_ip, locators)
340    }
341
342    /// A peer disconnected.
343    fn remove_peer(&self, peer_ip: SocketAddr) {
344        self.block_sync.remove_peer(&peer_ip);
345    }
346
347    #[cfg(test)]
348    pub fn test_update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
349        self.update_peer_locators(peer_ip, locators)
350    }
351}
352
353// Methods to manage storage.
354impl<N: Network> Sync<N> {
355    /// Syncs the storage with the ledger at bootup.
356    ///
357    /// This is called when starting the validator and after finishing a sync without BFT.
358    async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
359        // Retrieve the latest block in the ledger.
360        let latest_block = self.ledger.latest_block();
361
362        // Retrieve the block height.
363        let block_height = latest_block.height();
364        // Determine the maximum number of blocks corresponding to rounds
365        // that would not have been garbage collected, i.e. that would be kept in storage.
366        // Since at most one block is created every two rounds,
367        // this is half of the maximum number of rounds kept in storage.
368        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
369        // Determine the earliest height of blocks corresponding to rounds kept in storage,
370        // conservatively set to the block height minus the maximum number of blocks calculated above.
371        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
372        let gc_height = block_height.saturating_sub(max_gc_blocks);
373        // Retrieve the blocks.
374        let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
375
376        // Acquire the sync lock.
377        let _lock = self.sync_lock.lock().await;
378
379        debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
380
381        /* Sync storage */
382
383        // Sync the height with the block.
384        self.storage.sync_height_with_block(latest_block.height());
385        // Sync the round with the block.
386        self.storage.sync_round_with_block(latest_block.round());
387        // Perform GC on the latest block round.
388        self.storage.garbage_collect_certificates(latest_block.round());
389        // Iterate over the blocks.
390        for block in &blocks {
391            // If the block authority is a sub-DAG, then sync the batch certificates with the block.
392            // Note that the block authority is always a sub-DAG in production;
393            // beacon signatures are only used for testing,
394            // and as placeholder (irrelevant) block authority in the genesis block.
395            if let Authority::Quorum(subdag) = block.authority() {
396                // Reconstruct the unconfirmed transactions.
397                let unconfirmed_transactions = cfg_iter!(block.transactions())
398                    .filter_map(|tx| {
399                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
400                    })
401                    .collect::<HashMap<_, _>>();
402
403                // Iterate over the certificates.
404                for certificates in subdag.values().cloned() {
405                    cfg_into_iter!(certificates).for_each(|certificate| {
406                        self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
407                    });
408                }
409
410                // Update the validator telemetry.
411                #[cfg(feature = "telemetry")]
412                self.gateway.validator_telemetry().insert_subdag(subdag);
413            }
414        }
415
416        /* Sync the BFT DAG */
417
418        // Construct a list of the certificates.
419        let certificates = blocks
420            .iter()
421            .flat_map(|block| {
422                match block.authority() {
423                    // If the block authority is a beacon, then skip the block.
424                    Authority::Beacon(_) => None,
425                    // If the block authority is a subdag, then retrieve the certificates.
426                    Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
427                }
428            })
429            .flatten()
430            .collect::<Vec<_>>();
431
432        // If a BFT sender was provided, send the certificates to the BFT.
433        if let Some(bft_sender) = self.bft_sender.get() {
434            // Await the callback to continue.
435            if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
436                bail!("Failed to update the BFT DAG from sync: {e}");
437            }
438        }
439
440        self.block_sync.set_sync_height(block_height);
441
442        Ok(())
443    }
444
445    /// Returns which height we are synchronized to.
446    /// If there are queued block responses, this might be higher than the latest block in the ledger.
447    async fn compute_sync_height(&self) -> u32 {
448        let ledger_height = self.ledger.latest_block_height();
449        let mut responses = self.latest_block_responses.lock().await;
450
451        // Remove any old responses.
452        responses.retain(|height, _| *height > ledger_height);
453
454        // Ensure the returned value is always greater or equal than ledger height.
455        responses.last_key_value().map(|(height, _)| *height).unwrap_or(0).max(ledger_height)
456    }
457
458    /// Aims to advance synchronization using any recent block responses received from peers.
459    ///
460    /// This is the validator's version of `BlockSync::try_advancing_block_synchronization`
461    /// and is called periodically at runtime.
462    ///
463    /// This returns Ok(true) if we successfully advanced the ledger by at least one new block.
464    ///
465    /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network.
466    /// If blocks are not confirmed yet, they will be kept in [`Self::latest_block_responses`].
467    /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected
468    /// (see [`Self::sync_storage_with_block`] for more details).
469    ///
470    /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead,
471    /// which syncs without updating the BFT state.
472    async fn try_advancing_block_synchronization(&self) -> Result<bool> {
473        // Acquire the response lock.
474        let _lock = self.response_lock.lock().await;
475
476        // For sanity, set the sync height again.
477        // (if the sync height is already larger or equal, this is a noop)
478        let ledger_height = self.ledger.latest_block_height();
479        self.block_sync.set_sync_height(ledger_height);
480
481        // Retrieve the maximum block height of the peers.
482        let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
483
484        // Determine the maximum number of blocks corresponding to rounds
485        // that would not have been garbage collected, i.e. that would be kept in storage.
486        // Since at most one block is created every two rounds,
487        // this is half of the maximum number of rounds kept in storage.
488        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
489
490        // Updates sync state and returns the error (if any).
491        let cleanup = |start_height, current_height, error| {
492            let new_blocks = current_height > start_height;
493
494            // Make the underlying `BlockSync` instance aware of the new sync height.
495            if new_blocks {
496                self.block_sync.set_sync_height(current_height);
497            }
498
499            if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
500        };
501
502        // Determine the earliest height of blocks corresponding to rounds kept in storage,
503        // conservatively set to the block height minus the maximum number of blocks calculated above.
504        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
505        let max_gc_height = tip.saturating_sub(max_gc_blocks);
506        let within_gc = (ledger_height + 1) > max_gc_height;
507
508        if within_gc {
509            // Retrieve the current height, based on the ledger height and the
510            // (unconfirmed) blocks that are already queued up.
511            let start_height = self.compute_sync_height().await;
512
513            // For sanity, update the sync height before starting.
514            // (if this is lower or equal to the current sync height, this is a noop)
515            self.block_sync.set_sync_height(start_height);
516
517            // The height is incremented as blocks are added.
518            let mut current_height = start_height;
519            trace!("Try advancing with block responses (at block {current_height})");
520
521            // If we already were within GC or successfully caught up with GC, try to advance BFT normally again.
522            loop {
523                let next_height = current_height + 1;
524                let Some(block) = self.block_sync.peek_next_block(next_height) else {
525                    break;
526                };
527                info!("Syncing the BFT to block {}...", block.height());
528                // Sync the storage with the block.
529                match self.sync_storage_with_block(block).await {
530                    Ok(_) => {
531                        // Update the current height if sync succeeds.
532                        current_height = next_height;
533                    }
534                    Err(err) => {
535                        // Mark the current height as processed in block_sync.
536                        self.block_sync.remove_block_response(next_height);
537                        return cleanup(start_height, current_height, Some(err));
538                    }
539                }
540            }
541
542            cleanup(start_height, current_height, None)
543        } else {
544            info!("Block sync is too far behind other validators. Syncing without BFT.");
545
546            // For non-BFT sync we need to start at the current height of the ledger,as blocks are immediately
547            // added to it and not queue up in `latest_block_responses`.
548            let start_height = ledger_height;
549            let mut current_height = start_height;
550
551            // For sanity, update the sync height before starting.
552            // (if this is lower or equal to the current sync height, this is a noop)
553            self.block_sync.set_sync_height(start_height);
554
555            // Try to advance the ledger *to tip* without updating the BFT.
556            // TODO(kaimast): why to tip and not to tip-GC?
557            loop {
558                let next_height = current_height + 1;
559
560                let Some(block) = self.block_sync.peek_next_block(next_height) else {
561                    break;
562                };
563                info!("Syncing the ledger to block {}...", block.height());
564
565                // Sync the ledger with the block without BFT.
566                match self.sync_ledger_with_block_without_bft(block).await {
567                    Ok(_) => {
568                        // Update the current height if sync succeeds.
569                        current_height = next_height;
570                    }
571                    Err(err) => {
572                        // Mark the current height as processed in block_sync.
573                        self.block_sync.remove_block_response(next_height);
574                        return cleanup(start_height, current_height, Some(err));
575                    }
576                }
577            }
578
579            // Sync the storage with the ledger if we should transition to the BFT sync.
580            let within_gc = (current_height + 1) > max_gc_height;
581            if within_gc {
582                info!("Finished catching up with the network. Switching back to BFT sync.");
583                if let Err(err) = self.sync_storage_with_ledger_at_bootup().await {
584                    error!("BFT sync (with bootup routine) failed - {err}");
585                }
586            }
587
588            cleanup(start_height, current_height, None)
589        }
590    }
591
592    /// Syncs the ledger with the given block without updating the BFT.
593    ///
594    /// This is only used by `[Self::try_advancing_block_synchronization`].
595    async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
596        // Acquire the sync lock.
597        let _lock = self.sync_lock.lock().await;
598
599        let self_ = self.clone();
600        tokio::task::spawn_blocking(move || {
601            // Check the next block.
602            self_.ledger.check_next_block(&block)?;
603            // Attempt to advance to the next block.
604            self_.ledger.advance_to_next_block(&block)?;
605
606            // Sync the height with the block.
607            self_.storage.sync_height_with_block(block.height());
608            // Sync the round with the block.
609            self_.storage.sync_round_with_block(block.round());
610            // Mark the block height as processed in block_sync.
611            self_.block_sync.remove_block_response(block.height());
612
613            Ok(())
614        })
615        .await?
616    }
617
618    /// Advances the ledger by the given block and updates the storage accordingly.
619    ///
620    /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
621    /// meets the voter availability threshold (i.e. > f voting stake)
622    /// or is reachable via a DAG path from a later leader certificate that does.
623    /// Since performing this check requires DAG certificates from later blocks,
624    /// the block is stored in `Sync::latest_block_responses`,
625    /// and its addition to the ledger is deferred until the check passes.
626    /// Several blocks may be stored in `Sync::latest_block_responses`
627    /// before they can be all checked and added to the ledger.
628    async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
629        // Acquire the sync lock.
630        let _lock = self.sync_lock.lock().await;
631
632        // If this block has already been processed, return early.
633        // TODO(kaimast): Should we remove the response here?
634        if self.ledger.contains_block_height(block.height()) {
635            debug!("Ledger is already synced with block at height {}. Will not sync.", block.height());
636            return Ok(());
637        }
638
639        // Acquire the latest block responses lock.
640        let mut latest_block_responses = self.latest_block_responses.lock().await;
641
642        if latest_block_responses.contains_key(&block.height()) {
643            debug!("An unconfirmed block is queued already for height {}. Will not sync.", block.height());
644            return Ok(());
645        }
646
647        // If the block authority is a sub-DAG, then sync the batch certificates with the block.
648        // Note that the block authority is always a sub-DAG in production;
649        // beacon signatures are only used for testing,
650        // and as placeholder (irrelevant) block authority in the genesis block.
651        if let Authority::Quorum(subdag) = block.authority() {
652            // Reconstruct the unconfirmed transactions.
653            let unconfirmed_transactions = cfg_iter!(block.transactions())
654                .filter_map(|tx| {
655                    tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
656                })
657                .collect::<HashMap<_, _>>();
658
659            // Iterate over the certificates.
660            for certificates in subdag.values().cloned() {
661                cfg_into_iter!(certificates.clone()).for_each(|certificate| {
662                    // Sync the batch certificate with the block.
663                    self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
664                });
665
666                // Sync the BFT DAG with the certificates.
667                for certificate in certificates {
668                    // If a BFT sender was provided, send the certificate to the BFT.
669                    // For validators, BFT spawns a receiver task in `BFT::start_handlers`.
670                    if let Some(bft_sender) = self.bft_sender.get() {
671                        // Await the callback to continue.
672                        if let Err(err) = bft_sender.send_sync_bft(certificate).await {
673                            bail!("Failed to sync certificate - {err}");
674                        };
675                    }
676                }
677            }
678        }
679
680        // Fetch the latest block height.
681        let ledger_block_height = self.ledger.latest_block_height();
682
683        // Insert the latest block response.
684        latest_block_responses.insert(block.height(), block);
685        // Clear the latest block responses of older blocks.
686        latest_block_responses.retain(|height, _| *height > ledger_block_height);
687
688        // Get a list of contiguous blocks from the latest block responses.
689        let contiguous_blocks: Vec<Block<N>> = (ledger_block_height.saturating_add(1)..)
690            .take_while(|&k| latest_block_responses.contains_key(&k))
691            .filter_map(|k| latest_block_responses.get(&k).cloned())
692            .collect();
693
694        // Check if each block response, from the contiguous sequence just constructed,
695        // is ready to be added to the ledger.
696        // Ensure that the block's leader certificate meets the availability threshold
697        // based on the certificates in the DAG just after the block's round.
698        // If the availability threshold is not met,
699        // process the next block and check if it is linked to the current block,
700        // in the sense that there is a path in the DAG
701        // from the next block's leader certificate
702        // to the current block's leader certificate.
703        // Note: We do not advance to the most recent block response because we would be unable to
704        // validate if the leader certificate in the block has been certified properly.
705        for next_block in contiguous_blocks.into_iter() {
706            // Retrieve the height of the next block.
707            let next_block_height = next_block.height();
708
709            // Fetch the leader certificate and the relevant rounds.
710            let leader_certificate = match next_block.authority() {
711                Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
712                _ => bail!("Received a block with an unexpected authority type."),
713            };
714            let commit_round = leader_certificate.round();
715            let certificate_round =
716                commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
717
718            // Get the committee lookback for the round just after the leader.
719            let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
720            // Retrieve all of the certificates for the round just after the leader.
721            let certificates = self.storage.get_certificates_for_round(certificate_round);
722            // Construct a set over the authors, at the round just after the leader,
723            // who included the leader's certificate in their previous certificate IDs.
724            let authors = certificates
725                .iter()
726                .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
727                    true => Some(c.author()),
728                    false => None,
729                })
730                .collect();
731
732            debug!("Validating sync block {next_block_height} at round {commit_round}...");
733            // Check if the leader is ready to be committed.
734            if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
735                // Initialize the current certificate.
736                let mut current_certificate = leader_certificate;
737                // Check if there are any linked blocks that need to be added.
738                let mut blocks_to_add = vec![next_block];
739
740                // Check if there are other blocks to process based on `is_linked`.
741                for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
742                    // Retrieve the previous block.
743                    let Some(previous_block) = latest_block_responses.get(&height) else {
744                        bail!("Block {height} is missing from the latest block responses.");
745                    };
746                    // Retrieve the previous block's leader certificate.
747                    let previous_certificate = match previous_block.authority() {
748                        Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
749                        _ => bail!("Received a block with an unexpected authority type."),
750                    };
751                    // Determine if there is a path between the previous certificate and the current certificate.
752                    if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
753                        debug!("Previous sync block {height} is linked to the current block {next_block_height}");
754                        // Add the previous leader certificate to the list of certificates to commit.
755                        blocks_to_add.insert(0, previous_block.clone());
756                        // Update the current certificate to the previous leader certificate.
757                        current_certificate = previous_certificate;
758                    }
759                }
760
761                // Add the blocks to the ledger.
762                for block in blocks_to_add {
763                    // Check that the blocks are sequential and can be added to the ledger.
764                    let block_height = block.height();
765                    if block_height != self.ledger.latest_block_height().saturating_add(1) {
766                        warn!("Skipping block {block_height} from the latest block responses - not sequential.");
767                        continue;
768                    }
769                    #[cfg(feature = "telemetry")]
770                    let block_authority = block.authority().clone();
771
772                    let self_ = self.clone();
773                    tokio::task::spawn_blocking(move || {
774                        // Check the next block.
775                        self_.ledger.check_next_block(&block)?;
776                        // Attempt to advance to the next block.
777                        self_.ledger.advance_to_next_block(&block)?;
778
779                        // Sync the height with the block.
780                        self_.storage.sync_height_with_block(block.height());
781                        // Sync the round with the block.
782                        self_.storage.sync_round_with_block(block.round());
783
784                        Ok::<(), anyhow::Error>(())
785                    })
786                    .await??;
787                    // Remove the block height from the latest block responses.
788                    latest_block_responses.remove(&block_height);
789
790                    // Update the validator telemetry.
791                    #[cfg(feature = "telemetry")]
792                    if let Authority::Quorum(subdag) = block_authority {
793                        self_.gateway.validator_telemetry().insert_subdag(&subdag);
794                    }
795                }
796            } else {
797                debug!(
798                    "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
799                );
800            }
801
802            // Don't remove the response from BlockSync yet as we might fall back to non-BFT sync.
803        }
804
805        Ok(())
806    }
807
808    /// Returns `true` if there is a path from the previous certificate to the current certificate.
809    fn is_linked(
810        &self,
811        previous_certificate: BatchCertificate<N>,
812        current_certificate: BatchCertificate<N>,
813    ) -> Result<bool> {
814        // Initialize the list containing the traversal.
815        let mut traversal = vec![current_certificate.clone()];
816        // Iterate over the rounds from the current certificate to the previous certificate.
817        for round in (previous_certificate.round()..current_certificate.round()).rev() {
818            // Retrieve all of the certificates for this past round.
819            let certificates = self.storage.get_certificates_for_round(round);
820            // Filter the certificates to only include those that are in the traversal.
821            traversal = certificates
822                .into_iter()
823                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
824                .collect();
825        }
826        Ok(traversal.contains(&previous_certificate))
827    }
828}
829
830// Methods to assist with the block sync module.
831impl<N: Network> Sync<N> {
832    /// Returns `true` if the node is synced and has connected peers.
833    pub fn is_synced(&self) -> bool {
834        // Ensure the validator is connected to other validators,
835        // not just clients.
836        if self.gateway.number_of_connected_peers() == 0 {
837            return false;
838        }
839
840        self.block_sync.is_block_synced()
841    }
842
843    /// Returns the number of blocks the node is behind the greatest peer height.
844    pub fn num_blocks_behind(&self) -> Option<u32> {
845        self.block_sync.num_blocks_behind()
846    }
847
848    /// Returns the current block locators of the node.
849    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
850        self.block_sync.get_block_locators()
851    }
852}
853
854// Methods to assist with fetching batch certificates from peers.
855impl<N: Network> Sync<N> {
856    /// Sends a certificate request to the specified peer.
857    pub async fn send_certificate_request(
858        &self,
859        peer_ip: SocketAddr,
860        certificate_id: Field<N>,
861    ) -> Result<BatchCertificate<N>> {
862        // Initialize a oneshot channel.
863        let (callback_sender, callback_receiver) = oneshot::channel();
864        // Determine how many sent requests are pending.
865        let num_sent_requests = self.pending.num_sent_requests(certificate_id);
866        // Determine if we've already sent a request to the peer.
867        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
868        // Determine the maximum number of redundant requests.
869        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
870        // Determine if we should send a certificate request to the peer.
871        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
872        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
873
874        // Insert the certificate ID into the pending queue.
875        self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
876
877        // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
878        if should_send_request {
879            // Send the certificate request to the peer.
880            if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
881                bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
882            }
883        } else {
884            debug!(
885                "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
886                fmt_id(certificate_id)
887            );
888        }
889        // Wait for the certificate to be fetched.
890        // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
891        match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
892            // If the certificate was fetched, return it.
893            Ok(result) => Ok(result?),
894            // If the certificate was not fetched, return an error.
895            Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
896        }
897    }
898
899    /// Handles the incoming certificate request.
900    fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
901        // Attempt to retrieve the certificate.
902        if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
903            // Send the certificate response to the peer.
904            let self_ = self.clone();
905            tokio::spawn(async move {
906                let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
907            });
908        }
909    }
910
911    /// Handles the incoming certificate response.
912    /// This method ensures the certificate response is well-formed and matches the certificate ID.
913    fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
914        let certificate = response.certificate;
915        // Check if the peer IP exists in the pending queue for the given certificate ID.
916        let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
917        // If the peer IP exists, finish the pending request.
918        if exists {
919            // TODO: Validate the certificate.
920            // Remove the certificate ID from the pending queue.
921            self.pending.remove(certificate.id(), Some(certificate));
922        }
923    }
924}
925
926impl<N: Network> Sync<N> {
927    /// Spawns a task with the given future; it should only be used for long-running tasks.
928    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
929        self.handles.lock().push(tokio::spawn(future));
930    }
931
932    /// Shuts down the primary.
933    pub async fn shut_down(&self) {
934        info!("Shutting down the sync module...");
935        // Acquire the response lock.
936        let _lock = self.response_lock.lock().await;
937        // Acquire the sync lock.
938        let _lock = self.sync_lock.lock().await;
939        // Abort the tasks.
940        self.handles.lock().iter().for_each(|handle| handle.abort());
941    }
942}
943
944#[cfg(test)]
945mod tests {
946    use super::*;
947
948    use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
949
950    use snarkos_account::Account;
951    use snarkos_node_sync::BlockSync;
952    use snarkvm::{
953        console::{
954            account::{Address, PrivateKey},
955            network::MainnetV0,
956        },
957        ledger::{
958            narwhal::{BatchCertificate, BatchHeader, Subdag},
959            store::{ConsensusStore, helpers::memory::ConsensusMemory},
960        },
961        prelude::{Ledger, VM},
962        utilities::TestRng,
963    };
964
965    use aleo_std::StorageMode;
966    use indexmap::IndexSet;
967    use rand::Rng;
968    use std::collections::BTreeMap;
969
970    type CurrentNetwork = MainnetV0;
971    type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
972    type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
973
974    #[tokio::test]
975    #[tracing_test::traced_test]
976    async fn test_commit_via_is_linked() -> anyhow::Result<()> {
977        let rng = &mut TestRng::default();
978        // Initialize the round parameters.
979        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
980        let commit_round = 2;
981
982        // Initialize the store.
983        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
984        let account: Account<CurrentNetwork> = Account::new(rng)?;
985
986        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
987        let seed: u64 = rng.r#gen();
988        let genesis_rng = &mut TestRng::from_seed(seed);
989        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
990
991        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
992        let genesis_rng = &mut TestRng::from_seed(seed);
993        let private_keys = [
994            *account.private_key(),
995            PrivateKey::new(genesis_rng)?,
996            PrivateKey::new(genesis_rng)?,
997            PrivateKey::new(genesis_rng)?,
998        ];
999
1000        // Initialize the ledger with the genesis block.
1001        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1002        // Initialize the ledger.
1003        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1004
1005        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1006        let (round_to_certificates_map, committee) = {
1007            let addresses = vec![
1008                Address::try_from(private_keys[0])?,
1009                Address::try_from(private_keys[1])?,
1010                Address::try_from(private_keys[2])?,
1011                Address::try_from(private_keys[3])?,
1012            ];
1013
1014            let committee = ledger.latest_committee().unwrap();
1015
1016            // Initialize a mapping from the round number to the set of batch certificates in the round.
1017            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1018                HashMap::new();
1019            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1020
1021            for round in 0..=commit_round + 8 {
1022                let mut current_certificates = IndexSet::new();
1023                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1024                    IndexSet::new()
1025                } else {
1026                    previous_certificates.iter().map(|c| c.id()).collect()
1027                };
1028                let committee_id = committee.id();
1029
1030                // Create a certificate for the leader.
1031                if round <= 5 {
1032                    let leader = committee.get_leader(round).unwrap();
1033                    let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
1034                    let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
1035                    for i in [leader_index, non_leader_index].into_iter() {
1036                        let batch_header = BatchHeader::new(
1037                            &private_keys[i],
1038                            round,
1039                            now(),
1040                            committee_id,
1041                            Default::default(),
1042                            previous_certificate_ids.clone(),
1043                            rng,
1044                        )
1045                        .unwrap();
1046                        // Sign the batch header.
1047                        let mut signatures = IndexSet::with_capacity(4);
1048                        for (j, private_key_2) in private_keys.iter().enumerate() {
1049                            if i != j {
1050                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1051                            }
1052                        }
1053                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1054                    }
1055                }
1056
1057                // Create a certificate for each validator.
1058                if round > 5 {
1059                    for (i, private_key_1) in private_keys.iter().enumerate() {
1060                        let batch_header = BatchHeader::new(
1061                            private_key_1,
1062                            round,
1063                            now(),
1064                            committee_id,
1065                            Default::default(),
1066                            previous_certificate_ids.clone(),
1067                            rng,
1068                        )
1069                        .unwrap();
1070                        // Sign the batch header.
1071                        let mut signatures = IndexSet::with_capacity(4);
1072                        for (j, private_key_2) in private_keys.iter().enumerate() {
1073                            if i != j {
1074                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1075                            }
1076                        }
1077                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1078                    }
1079                }
1080                // Update the map of certificates.
1081                round_to_certificates_map.insert(round, current_certificates.clone());
1082                previous_certificates = current_certificates.clone();
1083            }
1084            (round_to_certificates_map, committee)
1085        };
1086
1087        // Initialize the storage.
1088        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1089        // Insert certificates into storage.
1090        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1091        for i in 1..=commit_round + 8 {
1092            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1093            certificates.extend(c);
1094        }
1095        for certificate in certificates.clone().iter() {
1096            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1097        }
1098
1099        // Create block 1.
1100        let leader_round_1 = commit_round;
1101        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1102        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1103        let block_1 = {
1104            let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1105            let mut leader_cert_map = IndexSet::new();
1106            leader_cert_map.insert(leader_certificate.clone());
1107            let mut previous_cert_map = IndexSet::new();
1108            for cert in storage.get_certificates_for_round(commit_round - 1) {
1109                previous_cert_map.insert(cert);
1110            }
1111            subdag_map.insert(commit_round, leader_cert_map.clone());
1112            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1113            let subdag = Subdag::from(subdag_map.clone())?;
1114            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1115        };
1116        // Insert block 1.
1117        core_ledger.advance_to_next_block(&block_1)?;
1118
1119        // Create block 2.
1120        let leader_round_2 = commit_round + 2;
1121        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1122        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1123        let block_2 = {
1124            let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1125            let mut leader_cert_map_2 = IndexSet::new();
1126            leader_cert_map_2.insert(leader_certificate_2.clone());
1127            let mut previous_cert_map_2 = IndexSet::new();
1128            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1129                previous_cert_map_2.insert(cert);
1130            }
1131            let mut prev_commit_cert_map_2 = IndexSet::new();
1132            for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
1133                if cert != leader_certificate {
1134                    prev_commit_cert_map_2.insert(cert);
1135                }
1136            }
1137            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1138            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1139            subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
1140            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1141            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1142        };
1143        // Insert block 2.
1144        core_ledger.advance_to_next_block(&block_2)?;
1145
1146        // Create block 3
1147        let leader_round_3 = commit_round + 4;
1148        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1149        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1150        let block_3 = {
1151            let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1152            let mut leader_cert_map_3 = IndexSet::new();
1153            leader_cert_map_3.insert(leader_certificate_3.clone());
1154            let mut previous_cert_map_3 = IndexSet::new();
1155            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1156                previous_cert_map_3.insert(cert);
1157            }
1158            let mut prev_commit_cert_map_3 = IndexSet::new();
1159            for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
1160                if cert != leader_certificate_2 {
1161                    prev_commit_cert_map_3.insert(cert);
1162                }
1163            }
1164            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1165            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1166            subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
1167            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1168            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1169        };
1170        // Insert block 3.
1171        core_ledger.advance_to_next_block(&block_3)?;
1172
1173        // Initialize the syncing ledger.
1174        let syncing_ledger = Arc::new(CoreLedgerService::new(
1175            CurrentLedger::load(genesis, StorageMode::new_test(None)).unwrap(),
1176            Default::default(),
1177        ));
1178        // Initialize the gateway.
1179        let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
1180        // Initialize the block synchronization logic.
1181        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1182        // Initialize the sync module.
1183        let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1184        // Try to sync block 1.
1185        sync.sync_storage_with_block(block_1).await?;
1186        assert_eq!(syncing_ledger.latest_block_height(), 1);
1187        // Try to sync block 2.
1188        sync.sync_storage_with_block(block_2).await?;
1189        assert_eq!(syncing_ledger.latest_block_height(), 2);
1190        // Try to sync block 3.
1191        sync.sync_storage_with_block(block_3).await?;
1192        assert_eq!(syncing_ledger.latest_block_height(), 3);
1193        // Ensure blocks 1 and 2 were added to the ledger.
1194        assert!(syncing_ledger.contains_block_height(1));
1195        assert!(syncing_ledger.contains_block_height(2));
1196
1197        Ok(())
1198    }
1199
1200    #[tokio::test]
1201    #[tracing_test::traced_test]
1202    async fn test_pending_certificates() -> anyhow::Result<()> {
1203        let rng = &mut TestRng::default();
1204        // Initialize the round parameters.
1205        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1206        let commit_round = 2;
1207
1208        // Initialize the store.
1209        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1210        let account: Account<CurrentNetwork> = Account::new(rng)?;
1211
1212        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1213        let seed: u64 = rng.r#gen();
1214        let genesis_rng = &mut TestRng::from_seed(seed);
1215        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1216
1217        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1218        let genesis_rng = &mut TestRng::from_seed(seed);
1219        let private_keys = [
1220            *account.private_key(),
1221            PrivateKey::new(genesis_rng)?,
1222            PrivateKey::new(genesis_rng)?,
1223            PrivateKey::new(genesis_rng)?,
1224        ];
1225        // Initialize the ledger with the genesis block.
1226        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1227        // Initialize the ledger.
1228        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1229        // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1230        let (round_to_certificates_map, committee) = {
1231            // Initialize the committee.
1232            let committee = ledger.latest_committee().unwrap();
1233            // Initialize a mapping from the round number to the set of batch certificates in the round.
1234            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1235                HashMap::new();
1236            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1237
1238            for round in 0..=commit_round + 8 {
1239                let mut current_certificates = IndexSet::new();
1240                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1241                    IndexSet::new()
1242                } else {
1243                    previous_certificates.iter().map(|c| c.id()).collect()
1244                };
1245                let committee_id = committee.id();
1246                // Create a certificate for each validator.
1247                for (i, private_key_1) in private_keys.iter().enumerate() {
1248                    let batch_header = BatchHeader::new(
1249                        private_key_1,
1250                        round,
1251                        now(),
1252                        committee_id,
1253                        Default::default(),
1254                        previous_certificate_ids.clone(),
1255                        rng,
1256                    )
1257                    .unwrap();
1258                    // Sign the batch header.
1259                    let mut signatures = IndexSet::with_capacity(4);
1260                    for (j, private_key_2) in private_keys.iter().enumerate() {
1261                        if i != j {
1262                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1263                        }
1264                    }
1265                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1266                }
1267
1268                // Update the map of certificates.
1269                round_to_certificates_map.insert(round, current_certificates.clone());
1270                previous_certificates = current_certificates.clone();
1271            }
1272            (round_to_certificates_map, committee)
1273        };
1274
1275        // Initialize the storage.
1276        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1277        // Insert certificates into storage.
1278        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1279        for i in 1..=commit_round + 8 {
1280            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1281            certificates.extend(c);
1282        }
1283        for certificate in certificates.clone().iter() {
1284            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1285        }
1286        // Create block 1.
1287        let leader_round_1 = commit_round;
1288        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1289        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1290        let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1291        let block_1 = {
1292            let mut leader_cert_map = IndexSet::new();
1293            leader_cert_map.insert(leader_certificate.clone());
1294            let mut previous_cert_map = IndexSet::new();
1295            for cert in storage.get_certificates_for_round(commit_round - 1) {
1296                previous_cert_map.insert(cert);
1297            }
1298            subdag_map.insert(commit_round, leader_cert_map.clone());
1299            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1300            let subdag = Subdag::from(subdag_map.clone())?;
1301            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1302        };
1303        // Insert block 1.
1304        core_ledger.advance_to_next_block(&block_1)?;
1305
1306        // Create block 2.
1307        let leader_round_2 = commit_round + 2;
1308        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1309        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1310        let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1311        let block_2 = {
1312            let mut leader_cert_map_2 = IndexSet::new();
1313            leader_cert_map_2.insert(leader_certificate_2.clone());
1314            let mut previous_cert_map_2 = IndexSet::new();
1315            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1316                previous_cert_map_2.insert(cert);
1317            }
1318            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1319            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1320            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1321            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1322        };
1323        // Insert block 2.
1324        core_ledger.advance_to_next_block(&block_2)?;
1325
1326        // Create block 3
1327        let leader_round_3 = commit_round + 4;
1328        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1329        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1330        let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1331        let block_3 = {
1332            let mut leader_cert_map_3 = IndexSet::new();
1333            leader_cert_map_3.insert(leader_certificate_3.clone());
1334            let mut previous_cert_map_3 = IndexSet::new();
1335            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1336                previous_cert_map_3.insert(cert);
1337            }
1338            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1339            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1340            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1341            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1342        };
1343        // Insert block 3.
1344        core_ledger.advance_to_next_block(&block_3)?;
1345
1346        /*
1347            Check that the pending certificates are computed correctly.
1348        */
1349
1350        // Retrieve the pending certificates.
1351        let pending_certificates = storage.get_pending_certificates();
1352        // Check that all of the pending certificates are not contained in the ledger.
1353        for certificate in pending_certificates.clone() {
1354            assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1355        }
1356        // Initialize an empty set to be populated with the committed certificates in the block subdags.
1357        let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1358        {
1359            let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1360            for subdag in subdag_maps.iter() {
1361                for subdag_certificates in subdag.values() {
1362                    committed_certificates.extend(subdag_certificates.iter().cloned());
1363                }
1364            }
1365        };
1366        // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1367        let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1368        for certificate in certificates.clone() {
1369            if !committed_certificates.contains(&certificate) {
1370                candidate_pending_certificates.insert(certificate);
1371            }
1372        }
1373        // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1374        assert_eq!(pending_certificates, candidate_pending_certificates);
1375        Ok(())
1376    }
1377}