Skip to main content

snarkos_node_bft/sync/
mod.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_FETCH_TIMEOUT,
19    Transport,
20    events::{CertificateRequest, CertificateResponse, Event},
21    helpers::{Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22    ledger_service::{BeginLedgerUpdateError, LedgerService},
23    spawn_blocking,
24};
25
26use snarkos_node_sync::{BftSyncMode, BlockSync, InsertBlockResponseError, Ping, locators::BlockLocators};
27use snarkos_utilities::CallbackHandle;
28
29use snarkvm::{
30    console::{
31        network::{ConsensusVersion, Network},
32        types::Field,
33    },
34    ledger::{CheckBlockError, PendingBlock, authority::Authority, block::Block, narwhal::BatchCertificate},
35    utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error},
36};
37
38use anyhow::{Context, Result, anyhow, bail, ensure};
39#[cfg(feature = "locktick")]
40use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
41#[cfg(not(feature = "locktick"))]
42use parking_lot::Mutex;
43#[cfg(not(feature = "serial"))]
44use rayon::prelude::*;
45use std::{
46    collections::{HashMap, HashSet, VecDeque},
47    future::Future,
48    net::SocketAddr,
49    ops::Deref,
50    sync::Arc,
51    time::Duration,
52};
53#[cfg(not(feature = "locktick"))]
54use tokio::sync::Mutex as TMutex;
55use tokio::{sync::oneshot, task::JoinHandle};
56
57/// This callback trait allows listening to synchronization updates, such as discorvering new `BatchCertificate`s.
58/// This is currently used by BFT.
59#[async_trait::async_trait]
60pub trait SyncCallback<N: Network>: Send + std::marker::Sync {
61    // Adds a new certificate to the DAG.
62    fn add_certificate_from_sync(&self, certificate: BatchCertificate<N>);
63
64    // Commits a certificate into the DAG.
65    fn commit_certificate_from_sync(&self, certificate: &BatchCertificate<N>);
66}
67
68/// Block synchronization logic for validators.
69///
70/// Synchronization works differently for nodes that act as validators in AleoBFT;
71/// In the common case, validators generate blocks after receiving an anchor block that has been accepted
72/// by a supermajority of the committee instead of fetching entire blocks from other nodes.
73/// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes.
74///
75/// This struct also manages fetching certificates from other validators during normal operation,
76/// and blocks when falling behind.
77///
78/// Finally, `Sync` handles synchronization of blocks with the validator's local storage:
79/// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them.
80#[derive(Clone)]
81pub struct Sync<N: Network> {
82    /// The gateway enables communication with other validators.
83    gateway: Gateway<N>,
84    /// The storage.
85    storage: Storage<N>,
86    /// The ledger service.
87    ledger: Arc<dyn LedgerService<N>>,
88    /// The block synchronization logic.
89    block_sync: Arc<BlockSync<N>>,
90    /// The pending certificates queue.
91    pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
92    /// The sync callback (used by [`BFT`]).
93    sync_callback: Arc<CallbackHandle<Arc<dyn SyncCallback<N>>>>,
94    /// Handles to the spawned background tasks.
95    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
96    /// The response lock.
97    response_lock: Arc<TMutex<()>>,
98
99    /// The latest block responses.
100    ///
101    /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks
102    /// whose addition to the ledger is deferred until certain checks pass.
103    /// Blocks need to be processed in order, hence a BTree map.
104    ///
105    /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called.
106    pending_blocks: Arc<Mutex<VecDeque<PendingBlock<N>>>>,
107}
108
109impl<N: Network> Sync<N> {
110    /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
111    /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
112    const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
113
114    /// Initializes a new sync instance.
115    pub fn new(
116        gateway: Gateway<N>,
117        storage: Storage<N>,
118        ledger: Arc<dyn LedgerService<N>>,
119        block_sync: Arc<BlockSync<N>>,
120    ) -> Self {
121        // Validators start in fast-sync mode until they confirm they are within the GC range.
122        block_sync.set_bft_sync_mode(BftSyncMode::Fast);
123
124        // Return the sync instance.
125        Self {
126            gateway,
127            storage,
128            ledger,
129            block_sync,
130            pending: Default::default(),
131            sync_callback: Default::default(),
132            handles: Default::default(),
133            response_lock: Default::default(),
134            pending_blocks: Default::default(),
135        }
136    }
137
138    /// Waits until the node is synced (has connected peers and is block-synced).
139    /// Returns immediately if already synced.
140    pub async fn wait_for_synced(&self) {
141        self.block_sync.wait_for_synced().await;
142    }
143
144    /// Returns `None` if the node is already synced.
145    /// Otherwise, returns a future that completes once the node becomes synced.
146    pub fn wait_for_synced_if_syncing(&self) -> Option<futures::future::BoxFuture<()>> {
147        self.block_sync.wait_for_synced_if_syncing()
148    }
149
150    /// Initializes the sync module and sync the storage with the ledger at bootup.
151    pub fn initialize(&self, sync_callback: Option<Arc<dyn SyncCallback<N>>>) -> Result<()> {
152        // If a callback was provided, set it.
153        if let Some(callback) = sync_callback {
154            self.sync_callback.set(callback).with_context(|| "Failed to set sync callback")?;
155        }
156
157        info!("Syncing storage with the ledger...");
158
159        // Sync the storage with the ledger.
160        self.sync_storage_with_ledger_at_bootup()
161            .with_context(|| "Syncing storage with the ledger at bootup failed")?;
162
163        debug!("Finished initial block synchronization at startup");
164        Ok(())
165    }
166
167    /// Starts the sync module.
168    ///
169    /// When this function returns successfully, the sync module will have spawned background tasks
170    /// that fetch blocks from other validators.
171    pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
172        info!("Starting the sync module...");
173
174        // Start the block request generation loop (outgoing).
175        let self_ = self.clone();
176        self.spawn(async move {
177            loop {
178                // Wait for peer updates or timeout
179                let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
180
181                // Issue block requests to peers.
182                self_.try_issuing_block_requests().await;
183
184                // Rate limiting happens in [`BlockSync::try_issuing_block_requests`] and no additional sleeps are needed here.
185            }
186        });
187
188        // Start the block response processing loop (incoming).
189        let self_ = self.clone();
190        let ping = ping.clone();
191        self.spawn(async move {
192            loop {
193                // Wait until there is something to do or until the timeout.
194                let _ =
195                    tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
196
197                let ping = ping.clone();
198                let self_ = self_.clone();
199                let hdl = tokio::spawn(async move {
200                    self_.try_advancing_block_synchronization(&ping).await;
201                });
202
203                if let Err(err) = hdl.await
204                    && let Ok(panic) = err.try_into_panic()
205                {
206                    error!("Sync block advancement panicked: {panic:?}");
207                }
208
209                // We perform no additional rate limiting here as
210                // requests are already rate-limited.
211            }
212        });
213
214        // Start the pending queue expiration loop.
215        let self_ = self.clone();
216        self.spawn(async move {
217            loop {
218                // Sleep briefly.
219                tokio::time::sleep(MAX_FETCH_TIMEOUT).await;
220
221                // Remove the expired pending transmission requests.
222                let self__ = self_.clone();
223                let _ = spawn_blocking!({
224                    self__.pending.clear_expired_callbacks();
225                    Ok(())
226                });
227            }
228        });
229
230        /* Set up callbacks for events from the Gateway */
231
232        // Retrieve the sync receiver.
233        let SyncReceiver {
234            mut rx_block_sync_insert_block_response,
235            mut rx_block_sync_remove_peer,
236            mut rx_block_sync_update_peer_locators,
237            mut rx_certificate_request,
238            mut rx_certificate_response,
239        } = sync_receiver;
240
241        // Process the block sync request to advance with sync blocks.
242        // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
243        // which is initially handled by [`Gateway::inbound()`],
244        // which calls [`SyncSender::advance_with_sync_blocks()`],
245        // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
246        // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
247        let self_ = self.clone();
248        self.spawn(async move {
249            while let Some((peer_ip, blocks, latest_consensus_version, callback)) =
250                rx_block_sync_insert_block_response.recv().await
251            {
252                let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await;
253
254                //TODO remove this once channels are gone
255                if let Err(err) = &result {
256                    if err.is_benign() {
257                        trace!("Failed to insert block response from '{peer_ip}' - {err}");
258                    } else {
259                        warn!("Failed to insert block response from '{peer_ip}' - {err}");
260                    }
261                }
262
263                callback.send(result).ok();
264            }
265        });
266
267        // Process the block sync request to remove the peer.
268        let self_ = self.clone();
269        self.spawn(async move {
270            while let Some((peer_ip, tx)) = rx_block_sync_remove_peer.recv().await {
271                self_.remove_peer(peer_ip);
272                tx.send(()).ok();
273            }
274        });
275
276        // Process each block sync request to update peer locators.
277        // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
278        // which is initially handled by [`Gateway::inbound()`],
279        // which calls [`SyncSender::update_peer_locators()`],
280        // which calls [`tx_block_sync_update_peer_locators.send()`],
281        // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
282        let self_ = self.clone();
283        self.spawn(async move {
284            while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
285                let self_clone = self_.clone();
286                tokio::spawn(async move {
287                    callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
288                });
289            }
290        });
291
292        // Process each certificate request.
293        // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
294        // which is initially handled by [`Gateway::inbound()`],
295        // which calls [`tx_certificate_request.send()`],
296        // which causes the `rx_certificate_request.recv()` call below to return.
297        let self_ = self.clone();
298        self.spawn(async move {
299            while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
300                self_.send_certificate_response(peer_ip, certificate_request);
301            }
302        });
303
304        // Process each certificate response.
305        // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
306        // which is initially handled by [`Gateway::inbound()`],
307        // which calls [`tx_certificate_response.send()`],
308        // which causes the `rx_certificate_response.recv()` call below to return.
309        let self_ = self.clone();
310        self.spawn(async move {
311            while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
312                self_.finish_certificate_request(peer_ip, certificate_response);
313            }
314        });
315
316        Ok(())
317    }
318
319    /// BFT-specific version of `Client::try_issuing_block_requests()`.
320    ///
321    /// This method handles timeout removal, checks if block sync is possible,
322    /// and issues block requests to peers.
323    async fn try_issuing_block_requests(&self) {
324        self.block_sync.try_issuing_block_requests(&self.gateway).await;
325    }
326
327    /// Test-only method that allows setting the sync height to the given nubmer    
328    #[cfg(test)]
329    pub(crate) fn testing_only_set_sync_height_testing_only(&self, height: u32) {
330        self.block_sync.set_sync_height(height);
331    }
332}
333
334// Callbacks used when receiving messages from the Gateway
335impl<N: Network> Sync<N> {
336    /// We received a block response and can (possibly) advance synchronization.
337    async fn insert_block_response(
338        &self,
339        peer_ip: SocketAddr,
340        blocks: Vec<Block<N>>,
341        latest_consensus_version: Option<ConsensusVersion>,
342    ) -> Result<(), InsertBlockResponseError<N>> {
343        self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version)
344
345        // No need to advance block sync here, as the new response will
346        // notify the incoming task.
347    }
348
349    /// We received new peer locators during a Ping.
350    fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
351        self.block_sync.update_peer_locators(peer_ip, &locators)
352    }
353
354    /// A peer disconnected.
355    fn remove_peer(&self, peer_ip: SocketAddr) {
356        self.block_sync.remove_peer(&peer_ip)
357    }
358
359    #[cfg(test)]
360    pub fn testing_only_update_peer_locators_testing_only(
361        &self,
362        peer_ip: SocketAddr,
363        locators: BlockLocators<N>,
364    ) -> Result<()> {
365        self.update_peer_locators(peer_ip, locators)
366    }
367}
368
369// Methods to manage storage.
370impl<N: Network> Sync<N> {
371    /// Syncs the storage with the ledger at bootup.
372    ///
373    /// This is called when starting the validator and after finishing a sync without BFT.
374    fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
375        let mut pending_blocks = self.pending_blocks.lock();
376        let latest_ledger_block = self.ledger.latest_block();
377
378        // Remove any obsolete pending blocks.
379        while let Some(block) = pending_blocks.front()
380            && block.height() <= latest_ledger_block.height()
381        {
382            pending_blocks.pop_front();
383        }
384
385        let latest_block: &Block<N> = pending_blocks.back().map(|block| block.deref()).unwrap_or(&latest_ledger_block);
386        let max_height = latest_block.height();
387
388        // Determine the maximum number of blocks corresponding to rounds
389        // that would not have been garbage collected, i.e. that would be kept in storage.
390        // Since at most one block is created every two rounds,
391        // this is half of the maximum number of rounds kept in storage.
392        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
393
394        // Determine the earliest height of blocks corresponding to rounds kept in storage,
395        // conservatively set to the block height minus the maximum number of blocks calculated above.
396        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
397        let gc_height = max_height.saturating_sub(max_gc_blocks);
398
399        // Retrieve the DAGs of all recent blocks..
400        let ledger_blocks = self.ledger.get_blocks(gc_height..(latest_ledger_block.height() + 1))?;
401
402        let blocks = ledger_blocks.iter().chain(pending_blocks.iter().map(|block| block.deref()));
403        debug!("Syncing storage with ledger and pending blocks from height {gc_height} to {max_height}...");
404
405        /* Sync storage */
406
407        // Sync the height with the block.
408        self.storage.sync_height_with_block(latest_block.height());
409        // Sync the round with the block.
410        self.storage.sync_round_with_block(latest_block.round());
411        // Perform GC on the latest block round.
412        self.storage
413            .garbage_collect_certificates(latest_block.round())
414            .with_context(|| "Failed to garbage collect certificates")?;
415
416        // Add the blocks to the BFT storage.
417        for block in blocks {
418            if let Authority::Quorum(subdag) = block.authority() {
419                // If the block authority is a sub-DAG, then sync the batch certificates with the block.
420                // Note that the block authority is always a sub-DAG in production;
421                // beacon signatures are only used for testing,
422                // and as placeholder (irrelevant) block authority in the genesis block.
423                // Reconstruct the unconfirmed transactions.
424                let unconfirmed_transactions = cfg_iter!(block.transactions())
425                    .filter_map(|tx| {
426                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
427                    })
428                    .collect::<HashMap<_, _>>();
429
430                // Iterate over the certificates.
431                for certificates in subdag.values().cloned() {
432                    cfg_into_iter!(certificates).try_for_each(|certificate| {
433                        // The block was already verified when it was added to the
434                        // ledger, so we do not have to re-check its certificates here.
435                        let trusted_ledger_certificate = true;
436                        self.storage
437                            .sync_certificate_with_block(
438                                block,
439                                certificate,
440                                &unconfirmed_transactions,
441                                trusted_ledger_certificate,
442                            )
443                            .with_context(|| format!("Failed to sync certificate with block {}", block.height()))
444                    })?;
445                }
446
447                // Update the validator telemetry.
448                #[cfg(feature = "telemetry")]
449                self.gateway.validator_telemetry().insert_subdag(subdag);
450            }
451        }
452
453        // Add all certificates to the BFT DAG, and update the committed round.
454        if let Some(cb) = self.sync_callback.get() {
455            for block in ledger_blocks.into_iter() {
456                if let Authority::Quorum(subdag) = block.authority() {
457                    for round in subdag.values() {
458                        for cert in round {
459                            cb.add_certificate_from_sync(cert.clone());
460                            cb.commit_certificate_from_sync(cert);
461                        }
462                    }
463                }
464            }
465
466            // Pending blocks have not been committed yet.
467            for block in pending_blocks.iter() {
468                if let Authority::Quorum(subdag) = block.authority() {
469                    for round in subdag.values() {
470                        for cert in round {
471                            cb.add_certificate_from_sync(cert.clone());
472                        }
473                    }
474                }
475            }
476        }
477
478        self.block_sync.set_sync_height(max_height);
479
480        Ok(())
481    }
482
483    /// Returns which height we are synchronized to.
484    /// If there are queued block responses, this might be higher than the latest block in the ledger.
485    fn compute_sync_height(&self) -> u32 {
486        let ledger_height = self.ledger.latest_block_height();
487        let mut pending_blocks = self.pending_blocks.lock();
488
489        // Remove any old responses.
490        while let Some(b) = pending_blocks.front()
491            && b.height() <= ledger_height
492        {
493            pending_blocks.pop_front();
494        }
495
496        // Ensure the returned value is always greater or equal than ledger height.
497        pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height)
498    }
499
500    /// BFT-version of [`snarkos_node_client::Client::try_advancing_block_synchronization`].
501    async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
502        // Process block responses and advance the ledger.
503        let new_blocks = match self
504            .try_advancing_block_synchronization_inner()
505            .await
506            .with_context(|| "Block synchronization failed")
507        {
508            Ok(new_blocks) => new_blocks,
509            Err(err) => {
510                error!("{}", &flatten_error(err));
511                false
512            }
513        };
514
515        if let Some(ping) = &ping
516            && new_blocks
517        {
518            match self.get_block_locators() {
519                Ok(locators) => ping.update_block_locators(locators),
520                Err(err) => error!("Failed to update block locators: {err}"),
521            }
522        }
523    }
524
525    /// Aims to advance synchronization using any recent block responses received from peers.
526    ///
527    /// This is the validator's version of `BlockSync::try_advancing_block_synchronization`
528    /// and is called periodically at runtime.
529    ///
530    /// This returns Ok(true) if we successfully advanced the ledger by at least one new block.
531    ///
532    /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network.
533    /// If blocks are not confirmed yet, they will be kept in [`Self::pending_blocks`].
534    /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected
535    /// (see [`Self::sync_storage_with_block`] for more details).
536    async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
537        // Acquire the response lock.
538        let _lock = self.response_lock.lock().await;
539
540        // For sanity, set the sync height again.
541        // (if the sync height is already larger or equal, this is a noop)
542        let ledger_height = self.ledger.latest_block_height();
543        self.block_sync.set_sync_height(ledger_height);
544
545        // Retrieve the maximum block height of the peers.
546        let tip = self
547            .block_sync
548            .find_sync_peers()
549            .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
550            .unwrap_or(0);
551
552        // Determine the maximum number of blocks corresponding to rounds
553        // that would not have been garbage collected, i.e. that would be kept in storage.
554        // Since at most one block is created every two rounds,
555        // this is half of the maximum number of rounds kept in storage.
556        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
557
558        // Updates sync state and returns the error (if any).
559        let cleanup = |start_height, current_height, error| {
560            let new_blocks = current_height > start_height;
561
562            // Make the underlying `BlockSync` instance aware of the new sync height.
563            if new_blocks {
564                self.block_sync.set_sync_height(current_height);
565            }
566
567            if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
568        };
569
570        // Determine the earliest height of blocks corresponding to rounds kept in storage,
571        // conservatively set to the block height minus the maximum number of blocks calculated above.
572        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
573        let max_gc_height = tip.saturating_sub(max_gc_blocks);
574
575        // Retrieve the current height, based on the ledger height and the
576        // (unconfirmed) blocks that are already queued up.
577        let start_height = self.compute_sync_height();
578
579        // A node that has entered fast-sync must complete the transition via
580        // `sync_storage_with_ledger_at_bootup` before it is allowed to use the BFT/DAG path.
581        // Without this guard a drop in the reported peer tip could shrink `max_gc_height` and
582        // make the outer `within_gc` check flip to `true` prematurely, bypassing the bootup routine.
583        let within_gc = start_height >= max_gc_height;
584
585        if within_gc {
586            // For the (unlikely) case that network tip decreased, check here as well if sync mode has switched.
587            let previous = self.block_sync.set_bft_sync_mode(BftSyncMode::Dag);
588            let was_in_fast_sync = previous == Some(BftSyncMode::Fast);
589
590            if was_in_fast_sync {
591                debug!("Finished catching up with the network. Switching to DAG sync.");
592                self.sync_storage_with_ledger_at_bootup()?;
593            }
594
595            // The height is incremented as blocks are added.
596            let mut current_height = start_height;
597            trace!(
598                "Try advancing blocks responses with DAG updates (starting at block {next_height}, current sync speed is {speed})",
599                next_height = current_height + 1,
600                speed = self.block_sync.get_sync_speed(),
601            );
602
603            // If we already were within GC or successfully caught up with GC, try to advance BFT normally again.
604            loop {
605                let next_height = current_height + 1;
606                let Some(block) = self.block_sync.peek_next_block(next_height) else {
607                    break;
608                };
609                info!("Trying to sync next block at height {} with the BFT...", block.height());
610                // Sync the storage with the block.
611                match self.sync_storage_with_block(block, true).await {
612                    Ok(_) => {
613                        // Update the current height if sync succeeds.
614                        current_height = next_height;
615                    }
616                    Err(err) => {
617                        // Mark the current height as processed in block_sync.
618                        self.block_sync.remove_block_response(next_height);
619                        return cleanup(start_height, current_height, Some(err));
620                    }
621                }
622            }
623
624            cleanup(start_height, current_height, None)
625        } else {
626            let previous = self.block_sync.set_bft_sync_mode(BftSyncMode::Fast);
627            let was_in_dag_sync = previous == Some(BftSyncMode::Dag);
628            if was_in_dag_sync {
629                // Peers may have advanced faster than this node is syncing, so it is reverting back to fast sync.
630                warn!(
631                    "Node is switching from DAG sync back to fast sync. The network tip may have advanced faster than this node is syncing."
632                );
633            }
634
635            // For fast sync, blocks still go through `pending_blocks` and the availability threshold check,
636            // but certificates are *not* inserted into the BFT DAG (see `sync_storage_with_block` with `within_gc_range = false`).
637            let mut current_height = start_height;
638
639            trace!(
640                "Try advancing block responses without updating the DAG (starting at block {next_height})",
641                next_height = current_height + 1
642            );
643
644            // Try to advance the ledger *to tip* without updating the BFT,
645            // The BFT will only be updated if we reached the GC range after adding the new blocks.
646            loop {
647                let next_height = current_height + 1;
648
649                let Some(block) = self.block_sync.peek_next_block(next_height) else {
650                    break;
651                };
652                info!("Syncing the ledger to block {}...", block.height());
653
654                // Sync the ledger with the block without BFT.
655                match self.sync_storage_with_block(block, false).await {
656                    Ok(_) => {
657                        // Update the current height if sync succeeds.
658                        current_height = next_height;
659                        self.block_sync.count_request_completed();
660                    }
661                    Err(err) => {
662                        // Mark the current height as processed in block_sync.
663                        self.block_sync.remove_block_response(next_height);
664                        return cleanup(start_height, current_height, Some(err));
665                    }
666                }
667            }
668
669            // Sync the storage with the ledger if we should transition to the BFT sync.
670            let within_gc = current_height >= max_gc_height;
671            if within_gc {
672                info!("Finished catching up with the network. Switching back to DAG sync.");
673                self.block_sync.set_bft_sync_mode(BftSyncMode::Dag);
674                self.sync_storage_with_ledger_at_bootup().with_context(|| "BFT sync (with bootup routine) failed")?;
675            }
676
677            cleanup(start_height, current_height, None)
678        }
679    }
680
681    /// Helper function for [`Self::sync_storage_with_block`].
682    /// It syncs the batch certificates with the BFT, if the block's authority is a sub-DAG.
683    ///
684    /// Note that the block authority is always a sub-DAG in production; beacon signatures are only used for testing,
685    /// and as placeholder (irrelevant) block authority in the genesis block.
686    fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> {
687        // Nothing to do if this is a beacon block
688        let Authority::Quorum(subdag) = block.authority() else {
689            return Ok(());
690        };
691
692        // Reconstruct the unconfirmed transactions.
693        let unconfirmed_transactions = cfg_iter!(block.transactions())
694            .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok())
695            .collect::<HashMap<_, _>>();
696
697        // Iterate over the certificates.
698        for certificates in subdag.values() {
699            cfg_into_iter!(certificates.clone()).try_for_each(|certificate| -> Result<()> {
700                // Sync the batch certificate with the block.
701                // Make sure to perform full verification of the certificate here.
702                let trusted_ledger_certificate = false;
703                self.storage
704                    .sync_certificate_with_block(
705                        block,
706                        certificate.clone(),
707                        &unconfirmed_transactions,
708                        trusted_ledger_certificate,
709                    )
710                    .with_context(|| format!("Failed to sync certificate with block {}", block.height()))
711            })?;
712        }
713
714        // Sync the BFT DAG with the block's certificates.
715        if let Some(cb) = self.sync_callback.get() {
716            for round in subdag.values() {
717                for certificate in round {
718                    cb.add_certificate_from_sync(certificate.clone());
719                }
720            }
721        }
722
723        Ok(())
724    }
725
726    /// Helper function for [`Self::sync_storage_with_block`].
727    ///
728    /// It checks that successor of a given block contains enough votes to commit it.
729    /// This can only return `Ok(true)` if the certificates of the block's successor were added to the storage.
730    fn is_block_availability_threshold_reached(
731        &self,
732        block: &PendingBlock<N>,
733        successors: &[PendingBlock<N>],
734    ) -> Result<bool> {
735        // Fetch the leader certificate and the relevant rounds.
736        let leader_certificate = match block.authority() {
737            Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
738            _ => bail!("Received a block with an unexpected authority type."),
739        };
740        let commit_round = leader_certificate.round();
741        let certificate_round =
742            commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
743
744        // Get the committee lookback for the round just after the leader.
745        let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
746
747        // Construct a set over the authors, at the round just after the leader,
748        // who included the leader's certificate in their previous certificate IDs.
749        let authors = successors
750            .iter()
751            .filter_map(|successor| {
752                let Authority::Quorum(subdag) = successor.authority() else {
753                    return None;
754                };
755
756                subdag.get(&certificate_round)
757            })
758            .flatten()
759            .filter_map(|certificate| {
760                if certificate.previous_certificate_ids().contains(&leader_certificate.id()) {
761                    Some(certificate.author())
762                } else {
763                    None
764                }
765            })
766            .collect::<HashSet<_>>();
767
768        // Check if the leader is ready to be committed.
769        if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
770            trace!(
771                "Block {hash} at height {height} has reached availability threshold",
772                hash = block.hash(),
773                height = block.height()
774            );
775            Ok(true)
776        } else {
777            Ok(false)
778        }
779    }
780
781    /// Advances the ledger by the given block and updates the storage accordingly.
782    ///
783    /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
784    /// meets the voter availability threshold (i.e. > f voting stake)
785    /// or is reachable via a DAG path from a later leader certificate that does.
786    /// Since performing this check requires DAG certificates from later blocks,
787    /// the block is stored in `Sync::pending_blocks`,
788    /// and its addition to the ledger is deferred until the check passes.
789    /// Several blocks may be stored in `Sync::pending_blocks`
790    /// before they can be all checked and added to the ledger.
791    ///
792    /// # Usage
793    /// This function assumes that blocks are passed in order, i.e.,
794    /// that the given block is a direct successor of the block that was last passed to this function.
795    async fn sync_storage_with_block(&self, new_block: Block<N>, within_gc_range: bool) -> Result<()> {
796        let new_block_height = new_block.height();
797
798        // If this block has already been processed, return early.
799        // TODO(kaimast): Should we remove the response here?
800        if self.ledger.contains_block_height(new_block.height()) {
801            debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
802            return Ok(());
803        }
804
805        // Append the certificates to the storage, if the block is within the GC range.
806        if within_gc_range {
807            self.add_block_subdag_to_bft(&new_block)?;
808        }
809
810        // This optimistically performs updates to the pending block set.
811        let _self = self.clone();
812
813        spawn_blocking!({
814            while !_self.try_sync_storage_with_block(&new_block, within_gc_range)? {
815                trace!("Retrying to sync storage with block at height {new_block_height}");
816            }
817
818            Ok(())
819        })
820    }
821
822    /// Tries to sync the storage with the given block.
823    ///
824    /// # Arguments
825    /// - `new_block`: The new block to sync the storage with.
826    /// - `within_gc_range`: Whether the block is within the GC range.
827    ///
828    ///  # Returns
829    /// - Ok(true) if the storage was synced with the block, or a pending block already exists for the given height.
830    /// - Ok(false) if the block, or one of the pending blocks, is out of order.
831    /// - Err(anyhow::Error) if any other error occured.
832    fn try_sync_storage_with_block(&self, new_block: &Block<N>, within_gc_range: bool) -> Result<bool> {
833        // Acquire the pending blocks lock.
834        let mut pending_blocks = self.pending_blocks.lock();
835
836        if let Some(tail) = pending_blocks.back() {
837            if tail.height() >= new_block.height() {
838                debug!(
839                    "A unconfirmed block is queued already for height {height}. \
840                    Will not sync.",
841                    height = new_block.height()
842                );
843                return Ok(true);
844            }
845
846            ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block");
847        }
848
849        // Fetch the latest block height.
850        let ledger_block_height = self.ledger.latest_block_height();
851        let new_block_height = new_block.height();
852
853        // Clear any older pending blocks.
854        // TODO(kaimast): ensure there are no dangling block requests
855        while let Some(pending_block) = pending_blocks.front() {
856            if pending_block.height() > ledger_block_height {
857                break;
858            }
859
860            trace!(
861                "Pending block {hash} at height {height} became obsolete",
862                hash = pending_block.hash(),
863                height = pending_block.height()
864            );
865            pending_blocks.pop_front();
866        }
867
868        // Check the block against the chain of pending blocks and append it on success.
869        let new_block = match self.ledger.check_block_subdag(new_block.clone(), pending_blocks.make_contiguous()) {
870            Ok(new_block) => new_block,
871            // Retry if one of the pending blocks became obsolete.
872            Err(CheckBlockError::InvalidPrefix { index, .. }) => {
873                let height = pending_blocks.get(index).with_context(|| "Invalid prefix index")?.height();
874                debug!("Pending block at height {height} became obsolete. Will retry with updated prefix.",);
875
876                while let Some(pending_block) = pending_blocks.front()
877                    && pending_block.height() <= height
878                {
879                    trace!("Removing obsolete pending block at height {}.", pending_block.height());
880                    pending_blocks.pop_front();
881                }
882
883                return Ok(false);
884            }
885            // If the ledger already advanced, consider it a success.
886            Err(CheckBlockError::BlockAlreadyExists { .. })
887            | Err(CheckBlockError::InvalidHeight { .. })
888            | Err(CheckBlockError::InvalidRound { .. }) => {
889                debug!(
890                    "Tried to sync storage with block at height {new_block_height}, but it was already in the ledger."
891                );
892                return Ok(true);
893            }
894            // Any other error should be returned to the caller.
895            Err(err) => return Err(err.into_anyhow()),
896        };
897
898        trace!(
899            "Adding new pending block {hash} at height {height}",
900            hash = new_block.hash(),
901            height = new_block.height()
902        );
903        pending_blocks.push_back(new_block);
904
905        // Fetch the latest block height.
906        let ledger_block_height = self.ledger.latest_block_height();
907
908        // We can only commit a pending block when there are at least two, as a successor with sufficient votes is required.
909        let Some(penultimate_index) = pending_blocks.len().checked_sub(1) else {
910            return Ok(true);
911        };
912
913        // Now, figure out if and which pending block we can commit.
914        // To do this effectively and because commits are transitive,
915        // we iterate in reverse so that we can stop at the first successful check.
916        //
917        // Note, that if the storage already contains certificates for the round after new block,
918        // the availability threshold for the new block could also be reached.
919        let commit_height = 'outer: {
920            let pending_blocks = pending_blocks.make_contiguous();
921            for index in (0..penultimate_index).rev() {
922                let block = &pending_blocks[index];
923                let successors = &pending_blocks[index + 1..];
924
925                // This check assumes that the pending blocks are properly linked together, based on the fact that,
926                // to generate the sequence of `PendingBlocks`, each block needs to successfully be processed by `Ledger::check_block_subdag`.
927                // As a result, the safety of this piece of code relies on the correctness `Ledger::check_block_subdag`,
928                // which is tested in `snarkvm/ledger/tests/pending_block.rs`.
929                if self
930                    .is_block_availability_threshold_reached(block, successors)
931                    .with_context(|| "Availability threshold check failed")?
932                {
933                    break 'outer block.height();
934                }
935            }
936
937            trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len());
938            return Ok(true);
939        };
940
941        let ledger_update = match self.ledger.begin_ledger_update() {
942            Ok(update) => update,
943            Err(BeginLedgerUpdateError::ShuttingDown) => {
944                info!("BlockSync cannot advance the ledger any more. The node is shutting down.");
945                return Ok(true);
946            }
947            Err(err) => {
948                return Err(anyhow!("Unexpected error when beginning ledger update: {err}"));
949            }
950        };
951
952        let start_height = ledger_block_height + 1;
953        ensure!(commit_height >= start_height, "Invalid commit height");
954        let num_blocks = (commit_height - start_height + 1) as usize;
955
956        // Create a more detailed log message if we are committing more than one block at a time.
957        if num_blocks > 1 {
958            trace!(
959                "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.",
960                chain_length = pending_blocks.len(),
961            );
962        }
963
964        for pending_block in pending_blocks.drain(0..num_blocks) {
965            let hash = pending_block.hash();
966            let height = pending_block.height();
967            let storage = self.storage.clone();
968
969            let block = match ledger_update.check_block_content(pending_block) {
970                Ok(block) => block,
971                Err(CheckBlockError::InvalidHeight { .. })
972                | Err(CheckBlockError::BlockAlreadyExists { .. })
973                | Err(CheckBlockError::InvalidRound { .. }) => {
974                    // If the block was outdated, stop here and request a retry.
975                    // The outdated pending block has already been removed (due to the `drain` call above)
976                    debug!("Pending block at height {height} became obsolete. Will retry with updated prefix.");
977                    return Ok(false);
978                }
979                Err(err) => {
980                    return Err(err
981                        .into_anyhow()
982                        .context(format!("Failed to check contents of pending block {hash} at height {height}")));
983                }
984            };
985
986            trace!("Adding pending block {hash} at height {height} to the ledger");
987            ledger_update.advance_to_next_block(&block)?;
988            // Sync the height with the block.
989            storage.sync_height_with_block(block.height());
990            // Sync the round with the block.
991            storage.sync_round_with_block(block.round());
992
993            if within_gc_range
994                && let Some(cb) = self.sync_callback.get()
995                && let Authority::Quorum(subdag) = block.authority()
996            {
997                for round in subdag.values() {
998                    for certificate in round {
999                        cb.commit_certificate_from_sync(certificate);
1000                    }
1001                }
1002            }
1003        }
1004
1005        Ok(true)
1006    }
1007}
1008
1009// Methods to assist with the block sync module.
1010impl<N: Network> Sync<N> {
1011    /// Returns `true` if the node is synced and has connected peers.
1012    pub fn is_synced(&self) -> bool {
1013        self.block_sync.is_block_synced()
1014    }
1015
1016    /// Returns the number of blocks the node is behind the greatest peer height.
1017    pub fn num_blocks_behind(&self) -> Option<u32> {
1018        self.block_sync.num_blocks_behind()
1019    }
1020
1021    /// Returns the current block locators of the node.
1022    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
1023        self.block_sync.get_block_locators()
1024    }
1025}
1026
1027// Methods to assist with fetching batch certificates from peers.
1028impl<N: Network> Sync<N> {
1029    /// Sends a certificate request to the specified peer.
1030    pub async fn send_certificate_request(
1031        &self,
1032        peer_ip: SocketAddr,
1033        certificate_id: Field<N>,
1034    ) -> Result<BatchCertificate<N>> {
1035        // Initialize a oneshot channel.
1036        let (callback_sender, callback_receiver) = oneshot::channel();
1037        // Determine how many sent requests are pending.
1038        let num_sent_requests = self.pending.num_sent_requests(certificate_id);
1039        // Determine if we've already sent a request to the peer.
1040        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
1041        // Determine the maximum number of redundant requests.
1042        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
1043        // Establish whether the peers who already got the request collectively hold sufficient stake.
1044        let stake_redundancy_reached = || self.pending.request_stake_redundancy_reached(&self.gateway, certificate_id);
1045        // Determine if we should send a certificate request to the peer.
1046        // Each peer can only receive one request at a time.
1047        // We send at most `num_redundant_requests` requests, unless the stake redundancy factor hasn't been reached.
1048        let should_send_request = !contains_peer_with_sent_request
1049            && (num_sent_requests < num_redundant_requests || !stake_redundancy_reached()?);
1050
1051        // Insert the certificate ID into the pending queue.
1052        self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
1053
1054        // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
1055        if should_send_request {
1056            // Send the certificate request to the peer.
1057            if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
1058                bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)")
1059            }
1060        } else {
1061            debug!(
1062                "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
1063                fmt_id(certificate_id)
1064            );
1065        }
1066        // Wait for the certificate to be fetched.
1067        // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
1068        tokio::time::timeout(MAX_FETCH_TIMEOUT, callback_receiver)
1069            .await
1070            .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))?
1071            .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id)))
1072    }
1073
1074    /// Handles the incoming certificate request.
1075    fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
1076        // Attempt to retrieve the certificate.
1077        if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
1078            // Send the certificate response to the peer.
1079            let self_ = self.clone();
1080            tokio::spawn(async move {
1081                let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
1082            });
1083        }
1084    }
1085
1086    /// Handles the incoming certificate response.
1087    /// This method ensures the certificate response is well-formed and matches the certificate ID.
1088    fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
1089        let certificate = response.certificate;
1090        // Check if the peer IP exists in the pending queue for the given certificate ID.
1091        let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
1092        // If the peer IP exists, finish the pending request.
1093        if exists {
1094            // TODO: Validate the certificate.
1095            // Remove the certificate ID from the pending queue.
1096            self.pending.remove(certificate.id(), Some(certificate));
1097        }
1098    }
1099}
1100
1101impl<N: Network> Sync<N> {
1102    /// Spawns a task with the given future; it should only be used for long-running tasks.
1103    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1104        self.handles.lock().push(tokio::spawn(future));
1105    }
1106
1107    /// Shuts down the primary.
1108    pub async fn shut_down(&self) {
1109        info!("Shutting down the sync module...");
1110        // Remove the callback.
1111        self.sync_callback.clear();
1112        // Acquire the response lock.
1113        let _lock = self.response_lock.lock().await;
1114        // Abort the tasks.
1115        self.handles.lock().iter().for_each(|handle| handle.abort());
1116    }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use super::*;
1122
1123    use crate::{BFT, helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
1124
1125    use snarkos_account::Account;
1126    use snarkos_node_network::ConnectionMode;
1127    use snarkos_node_sync::BlockSync;
1128    use snarkos_utilities::{NodeDataDir, SimpleStoppable};
1129
1130    use snarkvm::{
1131        console::{
1132            account::{Address, PrivateKey},
1133            network::MainnetV0,
1134        },
1135        ledger::{
1136            narwhal::{BatchCertificate, BatchHeader, Subdag},
1137            store::{ConsensusStore, helpers::memory::ConsensusMemory},
1138        },
1139        prelude::{Ledger, VM},
1140        utilities::TestRng,
1141    };
1142
1143    use aleo_std::StorageMode;
1144    use indexmap::IndexSet;
1145    use rand::RngExt;
1146    use std::{collections::BTreeMap, sync::OnceLock};
1147
1148    type CurrentNetwork = MainnetV0;
1149    type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1150    type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1151
1152    /// Create four blocks, where only the last one contains enough certificates to advance the ledger.
1153    async fn setup_commit_chain(rng: &mut TestRng) -> (Block<CurrentNetwork>, Vec<Block<CurrentNetwork>>) {
1154        static CHAIN_CACHE: OnceLock<(Block<CurrentNetwork>, Vec<Block<CurrentNetwork>>)> = OnceLock::new();
1155
1156        // Use cached version if it exists.
1157        if let Some((genesis, blocks)) = CHAIN_CACHE.get() {
1158            return (genesis.clone(), blocks.clone());
1159        }
1160
1161        // Initialize the round parameters.
1162        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1163
1164        // The first round of the first block.
1165        let first_round: u64 = 1;
1166        // The total number of blocks we test
1167        let num_blocks = 3;
1168        // The last round of the last block.
1169        let last_round = first_round + num_blocks * 2;
1170        // The first round that has at least N-f certificates referencing the anchor from the previous round.
1171        // This is also the last round we use in the test.
1172        let first_threshold_round = 5;
1173
1174        // Initialize the store.
1175        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1176        let account: Account<CurrentNetwork> = Account::new(rng).unwrap();
1177
1178        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1179        let seed: u64 = rng.random();
1180        let vm = VM::from(store).unwrap();
1181        let genesis_pk = *account.private_key();
1182        let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1183
1184        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1185        let genesis_rng = &mut TestRng::from_seed(seed);
1186        let private_keys = [
1187            *account.private_key(),
1188            PrivateKey::new(genesis_rng).unwrap(),
1189            PrivateKey::new(genesis_rng).unwrap(),
1190            PrivateKey::new(genesis_rng).unwrap(),
1191        ];
1192
1193        // Initialize the ledger with the genesis block.
1194        let genesis_clone = genesis.clone();
1195        let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap();
1196        // Initialize the ledger.
1197        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1198
1199        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1200        let (round_to_certificates_map, committee) = {
1201            let addresses = vec![
1202                Address::try_from(private_keys[0]).unwrap(),
1203                Address::try_from(private_keys[1]).unwrap(),
1204                Address::try_from(private_keys[2]).unwrap(),
1205                Address::try_from(private_keys[3]).unwrap(),
1206            ];
1207
1208            let committee = ledger.latest_committee().unwrap();
1209
1210            // Initialize a mapping from the round number to the set of batch certificates in the round.
1211            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1212                HashMap::new();
1213            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1214
1215            for round in first_round..=last_round {
1216                let mut current_certificates = IndexSet::new();
1217                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1218                    IndexSet::new()
1219                } else {
1220                    previous_certificates.iter().map(|c| c.id()).collect()
1221                };
1222
1223                let committee_id = committee.id();
1224
1225                // Determine if there was a leader in the previous round.
1226                let is_certificate_round = !round.is_multiple_of(2);
1227                let prev_leader = if is_certificate_round && let Some(prev_round) = round.checked_sub(1) {
1228                    Some(committee.get_leader(prev_round).unwrap())
1229                } else {
1230                    None
1231                };
1232
1233                // Generate all certificates for the round.
1234                for (i, private_key) in private_keys.iter().enumerate() {
1235                    let previous_leader_index =
1236                        addresses.iter().position(|&addr| prev_leader.is_some_and(|prev_leader| addr == prev_leader));
1237
1238                    // For the first two blocks non-leaders will not reference the leader certificate.
1239                    // This means, while there was an anchor in the previous round, it is not committed until later.
1240                    let previous_certs = if let Some(previous_leader_index) = previous_leader_index
1241                        && round < first_threshold_round
1242                        && i != previous_leader_index
1243                    {
1244                        // Remove the reference to the previous leader certificate.
1245                        previous_certificate_ids
1246                            .iter()
1247                            .cloned()
1248                            .enumerate()
1249                            .filter(|(idx, _)| *idx != previous_leader_index)
1250                            .map(|(_, id)| id)
1251                            .collect()
1252                    } else {
1253                        previous_certificate_ids.clone()
1254                    };
1255
1256                    let batch_header = BatchHeader::new(
1257                        private_key,
1258                        round,
1259                        now(),
1260                        committee_id,
1261                        Default::default(),
1262                        previous_certs,
1263                        rng,
1264                    )
1265                    .unwrap();
1266
1267                    // Sign the batch header.
1268                    let mut signatures = IndexSet::with_capacity(4);
1269                    for (j, private_key_2) in private_keys.iter().enumerate() {
1270                        if i != j {
1271                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1272                        }
1273                    }
1274                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1275                }
1276
1277                // Update the map of certificates.
1278                round_to_certificates_map.insert(round, current_certificates.clone());
1279                previous_certificates = current_certificates;
1280            }
1281            (round_to_certificates_map, committee)
1282        };
1283
1284        // Initialize the storage.
1285        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1286
1287        // Create a list of all certificates.
1288        let certificates: Vec<_> =
1289            round_to_certificates_map.into_iter().flat_map(|(_, certificates)| certificates.into_iter()).collect();
1290
1291        // insert all certificates into storage.
1292        for certificate in certificates.iter() {
1293            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1294        }
1295
1296        // Create the blocks
1297        let mut previous_leader_cert = None;
1298        let mut blocks = vec![];
1299
1300        for block_height in 1..=num_blocks {
1301            let leader_round = block_height * 2;
1302
1303            let leader = committee.get_leader(leader_round).unwrap();
1304            let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap();
1305
1306            let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1307            let mut leader_cert_map = IndexSet::new();
1308            leader_cert_map.insert(leader_certificate.clone());
1309
1310            let previous_cert_map = storage.get_certificates_for_round(leader_round - 1);
1311
1312            subdag_map.insert(leader_round, leader_cert_map.clone());
1313            subdag_map.insert(leader_round - 1, previous_cert_map.clone());
1314
1315            if leader_round > 2 {
1316                let previous_commit_cert_map: IndexSet<_> = storage
1317                    .get_certificates_for_round(leader_round - 2)
1318                    .into_iter()
1319                    .filter(|cert| {
1320                        if let Some(previous_leader_cert) = &previous_leader_cert {
1321                            cert != previous_leader_cert
1322                        } else {
1323                            true
1324                        }
1325                    })
1326                    .collect();
1327                subdag_map.insert(leader_round - 2, previous_commit_cert_map);
1328            }
1329
1330            let subdag = Subdag::from(subdag_map.clone()).unwrap();
1331            previous_leader_cert = Some(leader_certificate);
1332
1333            let core_ledger = core_ledger.clone();
1334            let block = spawn_blocking!({
1335                let ledger_update = core_ledger.begin_ledger_update()?;
1336                let block = ledger_update.prepare_advance_to_next_quorum_block(subdag, Default::default())?;
1337                ledger_update.advance_to_next_block(&block)?;
1338                Ok(block)
1339            })
1340            .unwrap();
1341
1342            blocks.push(block);
1343        }
1344
1345        CHAIN_CACHE.get_or_init(|| (genesis, blocks)).clone()
1346    }
1347
1348    #[tokio::test]
1349    #[tracing_test::traced_test]
1350    async fn test_commit_chain_with_bft() {
1351        let rng = &mut TestRng::default();
1352
1353        let (genesis, mut blocks) = setup_commit_chain(rng).await;
1354        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1355
1356        // ### Test that sync works as expected ###
1357        let storage_mode = StorageMode::new_test(None);
1358
1359        // Create a new ledger to test with, but use the existing storage
1360        // so that the certificates exist.
1361        let syncing_ledger = {
1362            let storage_mode = storage_mode.clone();
1363            Arc::new(CoreLedgerService::new(
1364                spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1365                SimpleStoppable::new(),
1366            ))
1367        };
1368
1369        let account = Account::new(rng).unwrap();
1370        let syncing_storage =
1371            Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1372        let gateway = Gateway::new(
1373            account.clone(),
1374            syncing_storage.clone(),
1375            syncing_ledger.clone(),
1376            None,
1377            &[],
1378            false,
1379            NodeDataDir::new_test(None),
1380            None,
1381        )
1382        .unwrap();
1383
1384        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1385        let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1386
1387        let syncing_bft = BFT::new(
1388            account.clone(),
1389            syncing_storage.clone(),
1390            syncing_ledger.clone(),
1391            block_sync,
1392            None,
1393            &[],
1394            false,
1395            NodeDataDir::new_test(None),
1396            None,
1397        )
1398        .unwrap();
1399
1400        sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1401
1402        // -- Run test -- //
1403
1404        let last_block = blocks.pop().unwrap();
1405
1406        // Insert the blocks into the new sync module
1407        for block in blocks {
1408            sync.sync_storage_with_block(block, true).await.unwrap();
1409            // Availability threshold is not met, so we should not advance yet.
1410            assert_eq!(syncing_bft.testing_only_latest_committed_round(), 0);
1411        }
1412
1413        // Only for the final block, the availability threshold is met,
1414        // because certificates for the subsequent round are already in storage.
1415        sync.sync_storage_with_block(last_block, true).await.unwrap();
1416
1417        // Ensure the leaders are committed.
1418        // (blocks are not created as there is no active consensus instance)
1419        assert_eq!(syncing_bft.testing_only_latest_committed_round(), 4);
1420    }
1421
1422    /// Verifies that after syncing blocks, the Sync module updates storage (BFT ledger) accordingly:
1423    /// every certificate from each block's subDAG is present in storage, and height/round are updated.
1424    #[tokio::test]
1425    #[tracing_test::traced_test]
1426    async fn test_sync_updates_storage_with_block_certificates() {
1427        let rng = &mut TestRng::default();
1428
1429        let (genesis, blocks) = setup_commit_chain(rng).await;
1430        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1431        let storage_mode = StorageMode::new_test(None);
1432
1433        let syncing_ledger = Arc::new(CoreLedgerService::new(
1434            spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1435            SimpleStoppable::new(),
1436        ));
1437
1438        let account = Account::new(rng).unwrap();
1439        let syncing_storage =
1440            Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1441        let gateway = Gateway::new(
1442            account.clone(),
1443            syncing_storage.clone(),
1444            syncing_ledger.clone(),
1445            None,
1446            &[],
1447            false,
1448            NodeDataDir::new_test(None),
1449            None,
1450        )
1451        .unwrap();
1452
1453        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1454        let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1455
1456        let syncing_bft = BFT::new(
1457            account.clone(),
1458            syncing_storage.clone(),
1459            syncing_ledger.clone(),
1460            block_sync,
1461            None,
1462            &[],
1463            false,
1464            NodeDataDir::new_test(None),
1465            None,
1466        )
1467        .unwrap();
1468
1469        sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1470
1471        // Sync all blocks in order.
1472        for block in &blocks {
1473            sync.sync_storage_with_block(block.clone(), true).await.unwrap();
1474        }
1475
1476        // The last block stays pending (no successor to satisfy availability threshold).
1477        // Only committed blocks have their certificates in the ledger.
1478        let committed_blocks = &blocks[..blocks.len().saturating_sub(1)];
1479
1480        // Assert Sync updated the underlying ledger accordingly: every certificate from each
1481        // committed block's subDAG is present in the ledger, and ledger height/round are updated.
1482        for block in committed_blocks {
1483            let Authority::Quorum(subdag) = block.authority() else {
1484                continue;
1485            };
1486            for certificates in subdag.values() {
1487                for cert in certificates {
1488                    assert!(
1489                        syncing_ledger.contains_certificate(&cert.id()).unwrap_or(false),
1490                        "Sync should have committed block {} so certificate is in the ledger",
1491                        block.height()
1492                    );
1493                }
1494            }
1495        }
1496
1497        // Ledger height and round should match the latest committed block (not the tip).
1498        let last_committed_block = committed_blocks.last().unwrap();
1499        assert_eq!(
1500            syncing_ledger.latest_block_height(),
1501            last_committed_block.height(),
1502            "Ledger height should match last committed block"
1503        );
1504        assert_eq!(
1505            syncing_ledger.latest_block().round(),
1506            last_committed_block.round(),
1507            "Ledger round should match last committed block"
1508        );
1509    }
1510
1511    #[tokio::test]
1512    #[tracing_test::traced_test]
1513    async fn test_commit_chain_with_swich_to_bft() {
1514        let rng = &mut TestRng::default();
1515        let (genesis, mut blocks) = setup_commit_chain(rng).await;
1516        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1517        let storage_mode = StorageMode::new_test(None);
1518
1519        // Create a new ledger to test with, but use the existing storage
1520        // so that the certificates exist.
1521        let syncing_ledger = {
1522            let storage_mode = storage_mode.clone();
1523            Arc::new(CoreLedgerService::new(
1524                spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1525                SimpleStoppable::new(),
1526            ))
1527        };
1528
1529        let account = Account::new(rng).unwrap();
1530        let syncing_storage =
1531            Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1532        let gateway = Gateway::new(
1533            account.clone(),
1534            syncing_storage.clone(),
1535            syncing_ledger.clone(),
1536            None,
1537            &[],
1538            false,
1539            NodeDataDir::new_test(None),
1540            None,
1541        )
1542        .unwrap();
1543
1544        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1545        let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1546
1547        let syncing_bft = BFT::new(
1548            account.clone(),
1549            syncing_storage.clone(),
1550            syncing_ledger.clone(),
1551            block_sync,
1552            None,
1553            &[],
1554            false,
1555            NodeDataDir::new_test(None),
1556            None,
1557        )
1558        .unwrap();
1559
1560        sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1561
1562        // -- Run test -- //
1563        let last_block = blocks.pop().unwrap();
1564
1565        // Insert all but the last block into the sync module
1566        // These are added without BFT.
1567        for block in blocks {
1568            sync.sync_storage_with_block(block, false).await.unwrap();
1569
1570            // Availability threshold is not met, so we should not advance yet.
1571            assert_eq!(syncing_ledger.latest_block_height(), 0);
1572        }
1573
1574        // -- Switch to BFT --
1575        sync.sync_storage_with_ledger_at_bootup().unwrap();
1576
1577        // Ensure blocks did not commit yet.
1578        assert_eq!(syncing_ledger.latest_block_height(), 0);
1579        assert_eq!(syncing_bft.testing_only_latest_committed_round(), 0);
1580
1581        // Only for the final block, the availability threshold is met,
1582        // because certificates for the subsequent round are already in storage.
1583        sync.sync_storage_with_block(last_block, true).await.unwrap();
1584
1585        // Ensure blocks 1 and 2 were added to the ledger.
1586        // Unlike with normal sync, the ledger is advanced by Sync when pending blocks are committed.
1587        assert_eq!(syncing_bft.testing_only_latest_committed_round(), 4);
1588    }
1589
1590    /// Tests that a node can correctly revert from DAG sync back to fast sync.
1591    ///
1592    /// This mirrors `test_commit_chain_with_swich_to_bft` in the opposite direction:
1593    /// the first blocks are processed in DAG-sync mode (within GC range), then the
1594    /// final block is processed in fast-sync mode (outside GC range, no DAG updates).
1595    ///
1596    /// The ledger should still advance correctly because `pending_blocks` and the
1597    /// availability-threshold check are shared between both modes.
1598    #[tokio::test]
1599    #[tracing_test::traced_test]
1600    async fn test_commit_chain_with_switch_to_fast_sync() {
1601        let rng = &mut TestRng::default();
1602        let (genesis, mut blocks) = setup_commit_chain(rng).await;
1603        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1604        let storage_mode = StorageMode::new_test(None);
1605
1606        let syncing_ledger = {
1607            let storage_mode = storage_mode.clone();
1608            Arc::new(CoreLedgerService::new(
1609                spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1610                SimpleStoppable::new(),
1611            ))
1612        };
1613
1614        let account = Account::new(rng).unwrap();
1615        let syncing_storage =
1616            Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1617        let gateway = Gateway::new(
1618            account.clone(),
1619            syncing_storage.clone(),
1620            syncing_ledger.clone(),
1621            None,
1622            &[],
1623            false,
1624            NodeDataDir::new_test(None),
1625            None,
1626        )
1627        .unwrap();
1628
1629        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1630        let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1631
1632        let syncing_bft = BFT::new(
1633            account.clone(),
1634            syncing_storage.clone(),
1635            syncing_ledger.clone(),
1636            block_sync,
1637            None,
1638            &[],
1639            false,
1640            NodeDataDir::new_test(None),
1641            None,
1642        )
1643        .unwrap();
1644
1645        sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1646
1647        // -- Run test -- //
1648        let last_block = blocks.pop().unwrap();
1649
1650        // Insert all but the last block in DAG-sync mode (within GC range).
1651        // Certificates are inserted into the BFT DAG but the availability threshold
1652        // is not yet met, so the ledger should not advance.
1653        for block in blocks {
1654            sync.sync_storage_with_block(block, true).await.unwrap();
1655            assert_eq!(syncing_ledger.latest_block_height(), 0);
1656        }
1657
1658        // -- Switch back to fast sync (simulate the network tip dropping below the GC boundary) --
1659
1660        // The final block is processed in fast-sync mode: no DAG updates.
1661        // The pending_blocks chain now has enough successors to confirm the availability
1662        // threshold for block 2, so the ledger advances to height 2.
1663        // Block 3 (the fast-sync one) remains pending — it needs a further successor
1664        // with enough votes to be confirmed.
1665        sync.sync_storage_with_block(last_block, false).await.unwrap();
1666
1667        // Blocks 1 and 2 should have been committed to the ledger.
1668        assert_eq!(syncing_ledger.latest_block_height(), 2);
1669        assert!(syncing_ledger.contains_block_height(1));
1670        assert!(syncing_ledger.contains_block_height(2));
1671
1672        // The BFT committed round is 0: the last block was processed in fast-sync mode so
1673        // its certificates were never passed to the BFT DAG, meaning the BFT itself did not
1674        // advance its committed round beyond the initial state.
1675        assert_eq!(syncing_bft.testing_only_latest_committed_round(), 0);
1676    }
1677
1678    #[tokio::test]
1679    #[tracing_test::traced_test]
1680    async fn test_commit_chain_without_bft() {
1681        let rng = &mut TestRng::default();
1682        let (genesis, mut blocks) = setup_commit_chain(rng).await;
1683        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1684        let storage_mode = StorageMode::new_test(None);
1685
1686        // Create a new ledger to test with, but use the existing storage
1687        // so that the certificates exist.
1688        let syncing_ledger = {
1689            let storage_mode = storage_mode.clone();
1690            Arc::new(CoreLedgerService::new(
1691                spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1692                SimpleStoppable::new(),
1693            ))
1694        };
1695
1696        let account = Account::new(rng).unwrap();
1697        let syncing_storage =
1698            Storage::new(syncing_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1699        let gateway = Gateway::new(
1700            account.clone(),
1701            syncing_storage.clone(),
1702            syncing_ledger.clone(),
1703            None,
1704            &[],
1705            false,
1706            NodeDataDir::new_test(None),
1707            None,
1708        )
1709        .unwrap();
1710
1711        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone(), ConnectionMode::Gateway));
1712        let sync = Sync::new(gateway.clone(), syncing_storage.clone(), syncing_ledger.clone(), block_sync.clone());
1713
1714        let syncing_bft = BFT::new(
1715            account.clone(),
1716            syncing_storage.clone(),
1717            syncing_ledger.clone(),
1718            block_sync,
1719            None,
1720            &[],
1721            false,
1722            NodeDataDir::new_test(None),
1723            None,
1724        )
1725        .unwrap();
1726
1727        sync.initialize(Some(Arc::new(syncing_bft.clone()))).unwrap();
1728
1729        // -- Run test -- //
1730        let last_block = blocks.pop().unwrap();
1731
1732        // Insert all but the last block into the sync module
1733        for block in blocks {
1734            sync.sync_storage_with_block(block, false).await.unwrap();
1735
1736            // Availability threshold is not met, so we should not advance yet.
1737            assert_eq!(syncing_ledger.latest_block_height(), 0);
1738        }
1739
1740        // Only for the final block, the availability threshold is met,
1741        // because certificates for the subsequent round are already in storage.
1742        sync.sync_storage_with_block(last_block, false).await.unwrap();
1743        assert_eq!(syncing_ledger.latest_block_height(), 2);
1744
1745        // Ensure blocks 1 and 2 were added to the ledger.
1746        assert!(syncing_ledger.contains_block_height(1));
1747        assert!(syncing_ledger.contains_block_height(2));
1748    }
1749
1750    #[tokio::test]
1751    #[tracing_test::traced_test]
1752    async fn test_pending_certificates() -> anyhow::Result<()> {
1753        let rng = &mut TestRng::default();
1754        // Initialize the round parameters.
1755        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1756        let commit_round = 2;
1757
1758        // Initialize the store.
1759        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1760        let account: Account<CurrentNetwork> = Account::new(rng)?;
1761
1762        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1763        let seed: u64 = rng.random();
1764        let vm = VM::from(store).unwrap();
1765        let genesis_pk = *account.private_key();
1766        let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1767
1768        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1769        let genesis_rng = &mut TestRng::from_seed(seed);
1770        let private_keys = [
1771            *account.private_key(),
1772            PrivateKey::new(genesis_rng)?,
1773            PrivateKey::new(genesis_rng)?,
1774            PrivateKey::new(genesis_rng)?,
1775        ];
1776
1777        // Initialize the ledger with the genesis block.
1778        let core_ledger = {
1779            let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap();
1780            Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()))
1781        };
1782
1783        // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1784        let (round_to_certificates_map, committee) = {
1785            // Initialize the committee.
1786            let committee = core_ledger.current_committee().unwrap();
1787            // Initialize a mapping from the round number to the set of batch certificates in the round.
1788            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1789                HashMap::new();
1790            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1791
1792            for round in 0..=commit_round + 8 {
1793                let mut current_certificates = IndexSet::new();
1794                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1795                    IndexSet::new()
1796                } else {
1797                    previous_certificates.iter().map(|c| c.id()).collect()
1798                };
1799                let committee_id = committee.id();
1800                // Create a certificate for each validator.
1801                for (i, private_key_1) in private_keys.iter().enumerate() {
1802                    let batch_header = BatchHeader::new(
1803                        private_key_1,
1804                        round,
1805                        now(),
1806                        committee_id,
1807                        Default::default(),
1808                        previous_certificate_ids.clone(),
1809                        rng,
1810                    )
1811                    .unwrap();
1812                    // Sign the batch header.
1813                    let mut signatures = IndexSet::with_capacity(4);
1814                    for (j, private_key_2) in private_keys.iter().enumerate() {
1815                        if i != j {
1816                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1817                        }
1818                    }
1819                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1820                }
1821
1822                // Update the map of certificates.
1823                round_to_certificates_map.insert(round, current_certificates.clone());
1824                previous_certificates = current_certificates.clone();
1825            }
1826            (round_to_certificates_map, committee)
1827        };
1828
1829        // Initialize the storage.
1830        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1831        // Insert certificates into storage.
1832        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1833        for i in 1..=commit_round + 8 {
1834            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1835            certificates.extend(c);
1836        }
1837        for certificate in certificates.clone().iter() {
1838            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1839        }
1840
1841        let leader_round_1 = commit_round;
1842        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1843        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1844        let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1845
1846        // Create subdag for block 1.
1847        let subdag_1 = {
1848            let mut leader_cert_map = IndexSet::new();
1849            leader_cert_map.insert(leader_certificate.clone());
1850            let mut previous_cert_map = IndexSet::new();
1851            for cert in storage.get_certificates_for_round(commit_round - 1) {
1852                previous_cert_map.insert(cert);
1853            }
1854            subdag_map.insert(commit_round, leader_cert_map.clone());
1855            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1856            Subdag::from(subdag_map.clone())?
1857        };
1858
1859        let core_ledger_cpy = core_ledger.clone();
1860        spawn_blocking!({
1861            // Create block 1.
1862            let update1 = core_ledger_cpy.begin_ledger_update()?;
1863            let block_1 = update1.prepare_advance_to_next_quorum_block(subdag_1, Default::default())?;
1864
1865            // Insert block 1.
1866            update1.advance_to_next_block(&block_1)?;
1867
1868            Ok(())
1869        })?;
1870
1871        // Prepare DAG for block 2.
1872        let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1873        let subdag_2 = {
1874            let leader_round_2 = commit_round + 2;
1875            let leader_2 = committee.get_leader(leader_round_2).unwrap();
1876            let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1877            let mut leader_cert_map_2 = IndexSet::new();
1878            leader_cert_map_2.insert(leader_certificate_2.clone());
1879            let mut previous_cert_map_2 = IndexSet::new();
1880            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1881                previous_cert_map_2.insert(cert);
1882            }
1883            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1884            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1885            Subdag::from(subdag_map_2.clone())?
1886        };
1887
1888        let core_ledger_cpy = core_ledger.clone();
1889        spawn_blocking!({
1890            let update2 = core_ledger_cpy.begin_ledger_update()?;
1891
1892            // Create block 2.
1893            let block_2 = update2.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?;
1894
1895            // Insert block 2.
1896            update2.advance_to_next_block(&block_2)?;
1897
1898            Ok(())
1899        })?;
1900
1901        // Prepare DAG for block 3.
1902        let leader_round_3 = commit_round + 4;
1903        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1904        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1905
1906        // Prepare DAG for block 3.
1907        let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1908        let subdag_3 = {
1909            let mut leader_cert_map_3 = IndexSet::new();
1910            leader_cert_map_3.insert(leader_certificate_3.clone());
1911            let mut previous_cert_map_3 = IndexSet::new();
1912            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1913                previous_cert_map_3.insert(cert);
1914            }
1915            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1916            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1917            Subdag::from(subdag_map_3.clone())?
1918        };
1919
1920        let core_ledger_cpy = core_ledger.clone();
1921        spawn_blocking!({
1922            let update3 = core_ledger_cpy.begin_ledger_update()?;
1923
1924            // Create block 3
1925            let block_3 = update3.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?;
1926
1927            // Insert block 3.
1928            update3.advance_to_next_block(&block_3)?;
1929
1930            Ok(())
1931        })?;
1932
1933        /*
1934            Check that the pending certificates are computed correctly.
1935        */
1936
1937        // Retrieve the pending certificates.
1938        let pending_certificates = storage.get_pending_certificates();
1939        // Check that all of the pending certificates are not contained in the ledger.
1940        for certificate in pending_certificates.clone() {
1941            assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1942        }
1943        // Initialize an empty set to be populated with the committed certificates in the block subdags.
1944        let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1945        {
1946            let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1947            for subdag in subdag_maps.iter() {
1948                for subdag_certificates in subdag.values() {
1949                    committed_certificates.extend(subdag_certificates.iter().cloned());
1950                }
1951            }
1952        };
1953        // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1954        let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1955        for certificate in certificates.clone() {
1956            if !committed_certificates.contains(&certificate) {
1957                candidate_pending_certificates.insert(certificate);
1958            }
1959        }
1960        // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1961        assert_eq!(pending_certificates, candidate_pending_certificates);
1962
1963        Ok(())
1964    }
1965}