snarkos_node_bft/sync/
mod.rs

1// Copyright 2024-2025 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_FETCH_TIMEOUT_IN_MS,
19    PRIMARY_PING_IN_MS,
20    Transport,
21    helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22    spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
27use snarkos_node_tcp::P2P;
28use snarkvm::{
29    console::{network::Network, types::Field},
30    ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31    prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, anyhow, bail};
35#[cfg(feature = "locktick")]
36use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
37#[cfg(not(feature = "locktick"))]
38use parking_lot::Mutex;
39use rayon::prelude::*;
40use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration};
41#[cfg(not(feature = "locktick"))]
42use tokio::sync::Mutex as TMutex;
43use tokio::{
44    sync::{OnceCell, oneshot},
45    task::JoinHandle,
46};
47
48#[derive(Clone)]
49pub struct Sync<N: Network> {
50    /// The gateway.
51    gateway: Gateway<N>,
52    /// The storage.
53    storage: Storage<N>,
54    /// The ledger service.
55    ledger: Arc<dyn LedgerService<N>>,
56    /// The block sync module.
57    block_sync: BlockSync<N>,
58    /// The pending certificates queue.
59    pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
60    /// The BFT sender.
61    bft_sender: Arc<OnceCell<BFTSender<N>>>,
62    /// The spawned handles.
63    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
64    /// The response lock.
65    response_lock: Arc<TMutex<()>>,
66    /// The sync lock.
67    sync_lock: Arc<TMutex<()>>,
68    /// The latest block responses.
69    ///
70    /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks
71    /// whose addition to the ledger is deferred until certain checks pass.
72    latest_block_responses: Arc<TMutex<HashMap<u32, Block<N>>>>,
73}
74
75impl<N: Network> Sync<N> {
76    /// Initializes a new sync instance.
77    pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
78        // Initialize the block sync module.
79        let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
80        // Return the sync instance.
81        Self {
82            gateway,
83            storage,
84            ledger,
85            block_sync,
86            pending: Default::default(),
87            bft_sender: Default::default(),
88            handles: Default::default(),
89            response_lock: Default::default(),
90            sync_lock: Default::default(),
91            latest_block_responses: Default::default(),
92        }
93    }
94
95    /// Initializes the sync module and sync the storage with the ledger at bootup.
96    pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
97        // If a BFT sender was provided, set it.
98        if let Some(bft_sender) = bft_sender {
99            self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
100        }
101
102        info!("Syncing storage with the ledger...");
103
104        // Sync the storage with the ledger.
105        self.sync_storage_with_ledger_at_bootup().await
106    }
107
108    /// Starts the sync module.
109    pub async fn run(&self, sync_receiver: SyncReceiver<N>) -> Result<()> {
110        info!("Starting the sync module...");
111
112        // Start the block sync loop.
113        let self_ = self.clone();
114        self.handles.lock().push(tokio::spawn(async move {
115            // Sleep briefly to allow an initial primary ping to come in prior to entering the loop.
116            // Ideally, a node does not consider itself synced when it has not received
117            // any block locators from peers. However, in the initial bootup of validators,
118            // this needs to happen, so we use this additional sleep as a grace period.
119            tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
120            loop {
121                // Sleep briefly to avoid triggering spam detection.
122                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
123                // Perform the sync routine.
124                let communication = &self_.gateway;
125                // let communication = &node.router;
126                self_.block_sync.try_block_sync(communication).await;
127
128                // Sync the storage with the blocks.
129                if let Err(e) = self_.sync_storage_with_blocks().await {
130                    error!("Unable to sync storage with blocks - {e}");
131                }
132
133                // If the node is synced, clear the `latest_block_responses`.
134                if self_.is_synced() {
135                    self_.latest_block_responses.lock().await.clear();
136                }
137            }
138        }));
139
140        // Start the pending queue expiration loop.
141        let self_ = self.clone();
142        self.spawn(async move {
143            loop {
144                // Sleep briefly.
145                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
146
147                // Remove the expired pending transmission requests.
148                let self__ = self_.clone();
149                let _ = spawn_blocking!({
150                    self__.pending.clear_expired_callbacks();
151                    Ok(())
152                });
153            }
154        });
155
156        // Retrieve the sync receiver.
157        let SyncReceiver {
158            mut rx_block_sync_advance_with_sync_blocks,
159            mut rx_block_sync_remove_peer,
160            mut rx_block_sync_update_peer_locators,
161            mut rx_certificate_request,
162            mut rx_certificate_response,
163        } = sync_receiver;
164
165        // Process the block sync request to advance with sync blocks.
166        // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
167        // which is initially handled by [`Gateway::inbound()`],
168        // which calls [`SyncSender::advance_with_sync_blocks()`],
169        // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
170        // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
171        let self_ = self.clone();
172        self.spawn(async move {
173            while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
174                // Process the block response.
175                if let Err(e) = self_.block_sync.process_block_response(peer_ip, blocks) {
176                    // Send the error to the callback.
177                    callback.send(Err(e)).ok();
178                    continue;
179                }
180
181                // Sync the storage with the blocks.
182                if let Err(e) = self_.sync_storage_with_blocks().await {
183                    // Send the error to the callback.
184                    callback.send(Err(e)).ok();
185                    continue;
186                }
187
188                // Send the result to the callback.
189                callback.send(Ok(())).ok();
190            }
191        });
192
193        // Process the block sync request to remove the peer.
194        let self_ = self.clone();
195        self.spawn(async move {
196            while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
197                self_.block_sync.remove_peer(&peer_ip);
198            }
199        });
200
201        // Process each block sync request to update peer locators.
202        // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
203        // which is initially handled by [`Gateway::inbound()`],
204        // which calls [`SyncSender::update_peer_locators()`],
205        // which calls [`tx_block_sync_update_peer_locators.send()`],
206        // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
207        let self_ = self.clone();
208        self.spawn(async move {
209            while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
210                let self_clone = self_.clone();
211                tokio::spawn(async move {
212                    // Update the peer locators.
213                    let result = self_clone.block_sync.update_peer_locators(peer_ip, locators);
214                    // Send the result to the callback.
215                    callback.send(result).ok();
216                });
217            }
218        });
219
220        // Process each certificate request.
221        // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
222        // which is initially handled by [`Gateway::inbound()`],
223        // which calls [`tx_certificate_request.send()`],
224        // which causes the `rx_certificate_request.recv()` call below to return.
225        let self_ = self.clone();
226        self.spawn(async move {
227            while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
228                self_.send_certificate_response(peer_ip, certificate_request);
229            }
230        });
231
232        // Process each certificate response.
233        // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
234        // which is initially handled by [`Gateway::inbound()`],
235        // which calls [`tx_certificate_response.send()`],
236        // which causes the `rx_certificate_response.recv()` call below to return.
237        let self_ = self.clone();
238        self.spawn(async move {
239            while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
240                self_.finish_certificate_request(peer_ip, certificate_response)
241            }
242        });
243
244        Ok(())
245    }
246}
247
248// Methods to manage storage.
249impl<N: Network> Sync<N> {
250    /// Syncs the storage with the ledger at bootup.
251    pub async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
252        // Retrieve the latest block in the ledger.
253        let latest_block = self.ledger.latest_block();
254
255        // Retrieve the block height.
256        let block_height = latest_block.height();
257        // Determine the maximum number of blocks corresponding to rounds
258        // that would not have been garbage collected, i.e. that would be kept in storage.
259        // Since at most one block is created every two rounds,
260        // this is half of the maximum number of rounds kept in storage.
261        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
262        // Determine the earliest height of blocks corresponding to rounds kept in storage,
263        // conservatively set to the block height minus the maximum number of blocks calculated above.
264        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
265        let gc_height = block_height.saturating_sub(max_gc_blocks);
266        // Retrieve the blocks.
267        let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
268
269        // Acquire the sync lock.
270        let _lock = self.sync_lock.lock().await;
271
272        debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
273
274        /* Sync storage */
275
276        // Sync the height with the block.
277        self.storage.sync_height_with_block(latest_block.height());
278        // Sync the round with the block.
279        self.storage.sync_round_with_block(latest_block.round());
280        // Perform GC on the latest block round.
281        self.storage.garbage_collect_certificates(latest_block.round());
282        // Iterate over the blocks.
283        for block in &blocks {
284            // If the block authority is a sub-DAG, then sync the batch certificates with the block.
285            // Note that the block authority is always a sub-DAG in production;
286            // beacon signatures are only used for testing,
287            // and as placeholder (irrelevant) block authority in the genesis block.
288            if let Authority::Quorum(subdag) = block.authority() {
289                // Reconstruct the unconfirmed transactions.
290                let unconfirmed_transactions = cfg_iter!(block.transactions())
291                    .filter_map(|tx| {
292                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
293                    })
294                    .collect::<HashMap<_, _>>();
295
296                // Iterate over the certificates.
297                for certificates in subdag.values().cloned() {
298                    cfg_into_iter!(certificates).for_each(|certificate| {
299                        self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
300                    });
301                }
302
303                // Update the validator telemetry.
304                #[cfg(feature = "telemetry")]
305                self.gateway.validator_telemetry().insert_subdag(subdag);
306            }
307        }
308
309        /* Sync the BFT DAG */
310
311        // Construct a list of the certificates.
312        let certificates = blocks
313            .iter()
314            .flat_map(|block| {
315                match block.authority() {
316                    // If the block authority is a beacon, then skip the block.
317                    Authority::Beacon(_) => None,
318                    // If the block authority is a subdag, then retrieve the certificates.
319                    Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
320                }
321            })
322            .flatten()
323            .collect::<Vec<_>>();
324
325        // If a BFT sender was provided, send the certificates to the BFT.
326        if let Some(bft_sender) = self.bft_sender.get() {
327            // Await the callback to continue.
328            if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
329                bail!("Failed to update the BFT DAG from sync: {e}");
330            }
331        }
332
333        Ok(())
334    }
335
336    /// Syncs the storage with blocks already received from peers.
337    pub async fn sync_storage_with_blocks(&self) -> Result<()> {
338        // Acquire the response lock.
339        let _lock = self.response_lock.lock().await;
340
341        // Retrieve the next block height.
342        // This variable is used to index blocks that are added to the ledger;
343        // it is incremented as blocks as added.
344        // So 'current' means 'currently being added'.
345        let mut current_height = self.ledger.latest_block_height() + 1;
346
347        // Retrieve the maximum block height of the peers.
348        let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
349        // Determine the maximum number of blocks corresponding to rounds
350        // that would not have been garbage collected, i.e. that would be kept in storage.
351        // Since at most one block is created every two rounds,
352        // this is half of the maximum number of rounds kept in storage.
353        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
354        // Determine the earliest height of blocks corresponding to rounds kept in storage,
355        // conservatively set to the block height minus the maximum number of blocks calculated above.
356        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
357        let max_gc_height = tip.saturating_sub(max_gc_blocks);
358
359        // Determine if we can sync the ledger without updating the BFT first.
360        if current_height <= max_gc_height {
361            // Try to advance the ledger *to tip* without updating the BFT.
362            while let Some(block) = self.block_sync.peek_next_block(current_height) {
363                info!("Syncing the ledger to block {}...", block.height());
364                // Sync the ledger with the block without BFT.
365                match self.sync_ledger_with_block_without_bft(block).await {
366                    Ok(_) => {
367                        // Update the current height if sync succeeds.
368                        current_height += 1;
369                    }
370                    Err(e) => {
371                        // Mark the current height as processed in block_sync.
372                        self.block_sync.remove_block_response(current_height);
373                        return Err(e);
374                    }
375                }
376            }
377            // Sync the storage with the ledger if we should transition to the BFT sync.
378            if current_height > max_gc_height {
379                if let Err(e) = self.sync_storage_with_ledger_at_bootup().await {
380                    error!("BFT sync (with bootup routine) failed - {e}");
381                }
382            }
383        }
384
385        // Try to advance the ledger with sync blocks.
386        while let Some(block) = self.block_sync.peek_next_block(current_height) {
387            info!("Syncing the BFT to block {}...", block.height());
388            // Sync the storage with the block.
389            match self.sync_storage_with_block(block).await {
390                Ok(_) => {
391                    // Update the current height if sync succeeds.
392                    current_height += 1;
393                }
394                Err(e) => {
395                    // Mark the current height as processed in block_sync.
396                    self.block_sync.remove_block_response(current_height);
397                    return Err(e);
398                }
399            }
400        }
401        Ok(())
402    }
403
404    /// Syncs the ledger with the given block without updating the BFT.
405    async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
406        // Acquire the sync lock.
407        let _lock = self.sync_lock.lock().await;
408
409        let self_ = self.clone();
410        tokio::task::spawn_blocking(move || {
411            // Check the next block.
412            self_.ledger.check_next_block(&block)?;
413            // Attempt to advance to the next block.
414            self_.ledger.advance_to_next_block(&block)?;
415
416            // Sync the height with the block.
417            self_.storage.sync_height_with_block(block.height());
418            // Sync the round with the block.
419            self_.storage.sync_round_with_block(block.round());
420            // Mark the block height as processed in block_sync.
421            self_.block_sync.remove_block_response(block.height());
422
423            Ok(())
424        })
425        .await?
426    }
427
428    /// Syncs the storage with the given block.
429    ///
430    /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
431    /// meets the voter availability threshold (i.e. > f voting stake)
432    /// or is reachable via a DAG path from a later leader certificate that does.
433    /// Since performing this check requires DAG certificates from later blocks,
434    /// the block is stored in `Sync::latest_block_responses`,
435    /// and its addition to the ledger is deferred until the check passes.
436    /// Several blocks may be stored in `Sync::latest_block_responses`
437    /// before they can be all checked and added to the ledger.
438    pub async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
439        // Acquire the sync lock.
440        let _lock = self.sync_lock.lock().await;
441        // Acquire the latest block responses lock.
442        let mut latest_block_responses = self.latest_block_responses.lock().await;
443
444        // If this block has already been processed, return early.
445        if self.ledger.contains_block_height(block.height()) || latest_block_responses.contains_key(&block.height()) {
446            return Ok(());
447        }
448
449        // If the block authority is a sub-DAG, then sync the batch certificates with the block.
450        // Note that the block authority is always a sub-DAG in production;
451        // beacon signatures are only used for testing,
452        // and as placeholder (irrelevant) block authority in the genesis block.
453        if let Authority::Quorum(subdag) = block.authority() {
454            // Reconstruct the unconfirmed transactions.
455            let unconfirmed_transactions = cfg_iter!(block.transactions())
456                .filter_map(|tx| {
457                    tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
458                })
459                .collect::<HashMap<_, _>>();
460
461            // Iterate over the certificates.
462            for certificates in subdag.values().cloned() {
463                cfg_into_iter!(certificates.clone()).for_each(|certificate| {
464                    // Sync the batch certificate with the block.
465                    self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
466                });
467
468                // Sync the BFT DAG with the certificates.
469                for certificate in certificates {
470                    // If a BFT sender was provided, send the certificate to the BFT.
471                    if let Some(bft_sender) = self.bft_sender.get() {
472                        // Await the callback to continue.
473                        if let Err(e) = bft_sender.send_sync_bft(certificate).await {
474                            bail!("Sync - {e}");
475                        };
476                    }
477                }
478            }
479        }
480
481        // Fetch the latest block height.
482        let latest_block_height = self.ledger.latest_block_height();
483
484        // Insert the latest block response.
485        latest_block_responses.insert(block.height(), block);
486        // Clear the latest block responses of older blocks.
487        latest_block_responses.retain(|height, _| *height > latest_block_height);
488
489        // Get a list of contiguous blocks from the latest block responses.
490        let contiguous_blocks: Vec<Block<N>> = (latest_block_height.saturating_add(1)..)
491            .take_while(|&k| latest_block_responses.contains_key(&k))
492            .filter_map(|k| latest_block_responses.get(&k).cloned())
493            .collect();
494
495        // Check if each block response, from the contiguous sequence just constructed,
496        // is ready to be added to the ledger.
497        // Ensure that the block's leader certificate meets the availability threshold
498        // based on the certificates in the DAG just after the block's round.
499        // If the availability threshold is not met,
500        // process the next block and check if it is linked to the current block,
501        // in the sense that there is a path in the DAG
502        // from the next block's leader certificate
503        // to the current block's leader certificate.
504        // Note: We do not advance to the most recent block response because we would be unable to
505        // validate if the leader certificate in the block has been certified properly.
506        for next_block in contiguous_blocks.into_iter() {
507            // Retrieve the height of the next block.
508            let next_block_height = next_block.height();
509
510            // Fetch the leader certificate and the relevant rounds.
511            let leader_certificate = match next_block.authority() {
512                Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
513                _ => bail!("Received a block with an unexpected authority type."),
514            };
515            let commit_round = leader_certificate.round();
516            let certificate_round =
517                commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
518
519            // Get the committee lookback for the round just after the leader.
520            let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
521            // Retrieve all of the certificates for the round just after the leader.
522            let certificates = self.storage.get_certificates_for_round(certificate_round);
523            // Construct a set over the authors, at the round just after the leader,
524            // who included the leader's certificate in their previous certificate IDs.
525            let authors = certificates
526                .iter()
527                .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
528                    true => Some(c.author()),
529                    false => None,
530                })
531                .collect();
532
533            debug!("Validating sync block {next_block_height} at round {commit_round}...");
534            // Check if the leader is ready to be committed.
535            if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
536                // Initialize the current certificate.
537                let mut current_certificate = leader_certificate;
538                // Check if there are any linked blocks that need to be added.
539                let mut blocks_to_add = vec![next_block];
540
541                // Check if there are other blocks to process based on `is_linked`.
542                for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
543                    // Retrieve the previous block.
544                    let Some(previous_block) = latest_block_responses.get(&height) else {
545                        bail!("Block {height} is missing from the latest block responses.");
546                    };
547                    // Retrieve the previous block's leader certificate.
548                    let previous_certificate = match previous_block.authority() {
549                        Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
550                        _ => bail!("Received a block with an unexpected authority type."),
551                    };
552                    // Determine if there is a path between the previous certificate and the current certificate.
553                    if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
554                        debug!("Previous sync block {height} is linked to the current block {next_block_height}");
555                        // Add the previous leader certificate to the list of certificates to commit.
556                        blocks_to_add.insert(0, previous_block.clone());
557                        // Update the current certificate to the previous leader certificate.
558                        current_certificate = previous_certificate;
559                    }
560                }
561
562                // Add the blocks to the ledger.
563                for block in blocks_to_add {
564                    // Check that the blocks are sequential and can be added to the ledger.
565                    let block_height = block.height();
566                    if block_height != self.ledger.latest_block_height().saturating_add(1) {
567                        warn!("Skipping block {block_height} from the latest block responses - not sequential.");
568                        continue;
569                    }
570                    #[cfg(feature = "telemetry")]
571                    let block_authority = block.authority().clone();
572
573                    let self_ = self.clone();
574                    tokio::task::spawn_blocking(move || {
575                        // Check the next block.
576                        self_.ledger.check_next_block(&block)?;
577                        // Attempt to advance to the next block.
578                        self_.ledger.advance_to_next_block(&block)?;
579
580                        // Sync the height with the block.
581                        self_.storage.sync_height_with_block(block.height());
582                        // Sync the round with the block.
583                        self_.storage.sync_round_with_block(block.round());
584
585                        Ok::<(), anyhow::Error>(())
586                    })
587                    .await??;
588                    // Remove the block height from the latest block responses.
589                    latest_block_responses.remove(&block_height);
590                    // Mark the block height as processed in block_sync.
591                    self.block_sync.remove_block_response(block_height);
592
593                    // Update the validator telemetry.
594                    #[cfg(feature = "telemetry")]
595                    if let Authority::Quorum(subdag) = block_authority {
596                        self_.gateway.validator_telemetry().insert_subdag(&subdag);
597                    }
598                }
599            } else {
600                debug!(
601                    "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
602                );
603            }
604        }
605
606        Ok(())
607    }
608
609    /// Returns `true` if there is a path from the previous certificate to the current certificate.
610    fn is_linked(
611        &self,
612        previous_certificate: BatchCertificate<N>,
613        current_certificate: BatchCertificate<N>,
614    ) -> Result<bool> {
615        // Initialize the list containing the traversal.
616        let mut traversal = vec![current_certificate.clone()];
617        // Iterate over the rounds from the current certificate to the previous certificate.
618        for round in (previous_certificate.round()..current_certificate.round()).rev() {
619            // Retrieve all of the certificates for this past round.
620            let certificates = self.storage.get_certificates_for_round(round);
621            // Filter the certificates to only include those that are in the traversal.
622            traversal = certificates
623                .into_iter()
624                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
625                .collect();
626        }
627        Ok(traversal.contains(&previous_certificate))
628    }
629}
630
631// Methods to assist with the block sync module.
632impl<N: Network> Sync<N> {
633    /// Returns `true` if the node is synced and has connected peers.
634    pub fn is_synced(&self) -> bool {
635        if self.gateway.number_of_connected_peers() == 0 {
636            return false;
637        }
638        self.block_sync.is_block_synced()
639    }
640
641    /// Returns the number of blocks the node is behind the greatest peer height.
642    pub fn num_blocks_behind(&self) -> u32 {
643        self.block_sync.num_blocks_behind()
644    }
645
646    /// Returns `true` if the node is in gateway mode.
647    pub const fn is_gateway_mode(&self) -> bool {
648        self.block_sync.mode().is_gateway()
649    }
650
651    /// Returns the current block locators of the node.
652    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
653        self.block_sync.get_block_locators()
654    }
655
656    /// Returns the block sync module.
657    #[cfg(test)]
658    #[doc(hidden)]
659    pub(super) fn block_sync(&self) -> &BlockSync<N> {
660        &self.block_sync
661    }
662}
663
664// Methods to assist with fetching batch certificates from peers.
665impl<N: Network> Sync<N> {
666    /// Sends a certificate request to the specified peer.
667    pub async fn send_certificate_request(
668        &self,
669        peer_ip: SocketAddr,
670        certificate_id: Field<N>,
671    ) -> Result<BatchCertificate<N>> {
672        // Initialize a oneshot channel.
673        let (callback_sender, callback_receiver) = oneshot::channel();
674        // Determine how many sent requests are pending.
675        let num_sent_requests = self.pending.num_sent_requests(certificate_id);
676        // Determine if we've already sent a request to the peer.
677        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
678        // Determine the maximum number of redundant requests.
679        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
680        // Determine if we should send a certificate request to the peer.
681        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
682        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
683
684        // Insert the certificate ID into the pending queue.
685        self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
686
687        // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
688        if should_send_request {
689            // Send the certificate request to the peer.
690            if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
691                bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
692            }
693        } else {
694            debug!(
695                "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
696                fmt_id(certificate_id)
697            );
698        }
699        // Wait for the certificate to be fetched.
700        // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
701        match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
702            // If the certificate was fetched, return it.
703            Ok(result) => Ok(result?),
704            // If the certificate was not fetched, return an error.
705            Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
706        }
707    }
708
709    /// Handles the incoming certificate request.
710    fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
711        // Attempt to retrieve the certificate.
712        if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
713            // Send the certificate response to the peer.
714            let self_ = self.clone();
715            tokio::spawn(async move {
716                let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
717            });
718        }
719    }
720
721    /// Handles the incoming certificate response.
722    /// This method ensures the certificate response is well-formed and matches the certificate ID.
723    fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
724        let certificate = response.certificate;
725        // Check if the peer IP exists in the pending queue for the given certificate ID.
726        let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
727        // If the peer IP exists, finish the pending request.
728        if exists {
729            // TODO: Validate the certificate.
730            // Remove the certificate ID from the pending queue.
731            self.pending.remove(certificate.id(), Some(certificate));
732        }
733    }
734}
735
736impl<N: Network> Sync<N> {
737    /// Spawns a task with the given future; it should only be used for long-running tasks.
738    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
739        self.handles.lock().push(tokio::spawn(future));
740    }
741
742    /// Shuts down the primary.
743    pub async fn shut_down(&self) {
744        info!("Shutting down the sync module...");
745        // Acquire the response lock.
746        let _lock = self.response_lock.lock().await;
747        // Acquire the sync lock.
748        let _lock = self.sync_lock.lock().await;
749        // Abort the tasks.
750        self.handles.lock().iter().for_each(|handle| handle.abort());
751    }
752}
753#[cfg(test)]
754mod tests {
755    use super::*;
756
757    use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
758    use snarkos_account::Account;
759    use snarkvm::{
760        console::{
761            account::{Address, PrivateKey},
762            network::MainnetV0,
763        },
764        ledger::{
765            narwhal::{BatchCertificate, BatchHeader, Subdag},
766            store::{ConsensusStore, helpers::memory::ConsensusMemory},
767        },
768        prelude::{Ledger, VM},
769        utilities::TestRng,
770    };
771
772    use aleo_std::StorageMode;
773    use indexmap::IndexSet;
774    use rand::Rng;
775    use std::collections::BTreeMap;
776
777    type CurrentNetwork = MainnetV0;
778    type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
779    type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
780
781    #[tokio::test]
782    #[tracing_test::traced_test]
783    async fn test_commit_via_is_linked() -> anyhow::Result<()> {
784        let rng = &mut TestRng::default();
785        // Initialize the round parameters.
786        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
787        let commit_round = 2;
788
789        // Initialize the store.
790        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
791        let account: Account<CurrentNetwork> = Account::new(rng)?;
792
793        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
794        let seed: u64 = rng.gen();
795        let genesis_rng = &mut TestRng::from_seed(seed);
796        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
797
798        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
799        let genesis_rng = &mut TestRng::from_seed(seed);
800        let private_keys = [
801            *account.private_key(),
802            PrivateKey::new(genesis_rng)?,
803            PrivateKey::new(genesis_rng)?,
804            PrivateKey::new(genesis_rng)?,
805        ];
806
807        // Initialize the ledger with the genesis block.
808        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
809        // Initialize the ledger.
810        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
811
812        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
813        let (round_to_certificates_map, committee) = {
814            let addresses = vec![
815                Address::try_from(private_keys[0])?,
816                Address::try_from(private_keys[1])?,
817                Address::try_from(private_keys[2])?,
818                Address::try_from(private_keys[3])?,
819            ];
820
821            let committee = ledger.latest_committee().unwrap();
822
823            // Initialize a mapping from the round number to the set of batch certificates in the round.
824            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
825                HashMap::new();
826            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
827
828            for round in 0..=commit_round + 8 {
829                let mut current_certificates = IndexSet::new();
830                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
831                    IndexSet::new()
832                } else {
833                    previous_certificates.iter().map(|c| c.id()).collect()
834                };
835                let committee_id = committee.id();
836
837                // Create a certificate for the leader.
838                if round <= 5 {
839                    let leader = committee.get_leader(round).unwrap();
840                    let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
841                    let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
842                    for i in [leader_index, non_leader_index].into_iter() {
843                        let batch_header = BatchHeader::new(
844                            &private_keys[i],
845                            round,
846                            now(),
847                            committee_id,
848                            Default::default(),
849                            previous_certificate_ids.clone(),
850                            rng,
851                        )
852                        .unwrap();
853                        // Sign the batch header.
854                        let mut signatures = IndexSet::with_capacity(4);
855                        for (j, private_key_2) in private_keys.iter().enumerate() {
856                            if i != j {
857                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
858                            }
859                        }
860                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
861                    }
862                }
863
864                // Create a certificate for each validator.
865                if round > 5 {
866                    for (i, private_key_1) in private_keys.iter().enumerate() {
867                        let batch_header = BatchHeader::new(
868                            private_key_1,
869                            round,
870                            now(),
871                            committee_id,
872                            Default::default(),
873                            previous_certificate_ids.clone(),
874                            rng,
875                        )
876                        .unwrap();
877                        // Sign the batch header.
878                        let mut signatures = IndexSet::with_capacity(4);
879                        for (j, private_key_2) in private_keys.iter().enumerate() {
880                            if i != j {
881                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
882                            }
883                        }
884                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
885                    }
886                }
887                // Update the map of certificates.
888                round_to_certificates_map.insert(round, current_certificates.clone());
889                previous_certificates = current_certificates.clone();
890            }
891            (round_to_certificates_map, committee)
892        };
893
894        // Initialize the storage.
895        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
896        // Insert certificates into storage.
897        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
898        for i in 1..=commit_round + 8 {
899            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
900            certificates.extend(c);
901        }
902        for certificate in certificates.clone().iter() {
903            storage.testing_only_insert_certificate_testing_only(certificate.clone());
904        }
905
906        // Create block 1.
907        let leader_round_1 = commit_round;
908        let leader_1 = committee.get_leader(leader_round_1).unwrap();
909        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
910        let block_1 = {
911            let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
912            let mut leader_cert_map = IndexSet::new();
913            leader_cert_map.insert(leader_certificate.clone());
914            let mut previous_cert_map = IndexSet::new();
915            for cert in storage.get_certificates_for_round(commit_round - 1) {
916                previous_cert_map.insert(cert);
917            }
918            subdag_map.insert(commit_round, leader_cert_map.clone());
919            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
920            let subdag = Subdag::from(subdag_map.clone())?;
921            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
922        };
923        // Insert block 1.
924        core_ledger.advance_to_next_block(&block_1)?;
925
926        // Create block 2.
927        let leader_round_2 = commit_round + 2;
928        let leader_2 = committee.get_leader(leader_round_2).unwrap();
929        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
930        let block_2 = {
931            let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
932            let mut leader_cert_map_2 = IndexSet::new();
933            leader_cert_map_2.insert(leader_certificate_2.clone());
934            let mut previous_cert_map_2 = IndexSet::new();
935            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
936                previous_cert_map_2.insert(cert);
937            }
938            let mut prev_commit_cert_map_2 = IndexSet::new();
939            for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
940                if cert != leader_certificate {
941                    prev_commit_cert_map_2.insert(cert);
942                }
943            }
944            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
945            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
946            subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
947            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
948            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
949        };
950        // Insert block 2.
951        core_ledger.advance_to_next_block(&block_2)?;
952
953        // Create block 3
954        let leader_round_3 = commit_round + 4;
955        let leader_3 = committee.get_leader(leader_round_3).unwrap();
956        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
957        let block_3 = {
958            let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
959            let mut leader_cert_map_3 = IndexSet::new();
960            leader_cert_map_3.insert(leader_certificate_3.clone());
961            let mut previous_cert_map_3 = IndexSet::new();
962            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
963                previous_cert_map_3.insert(cert);
964            }
965            let mut prev_commit_cert_map_3 = IndexSet::new();
966            for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
967                if cert != leader_certificate_2 {
968                    prev_commit_cert_map_3.insert(cert);
969                }
970            }
971            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
972            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
973            subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
974            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
975            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
976        };
977        // Insert block 3.
978        core_ledger.advance_to_next_block(&block_3)?;
979
980        // Initialize the syncing ledger.
981        let syncing_ledger = Arc::new(CoreLedgerService::new(
982            CurrentLedger::load(genesis, StorageMode::new_test(None)).unwrap(),
983            Default::default(),
984        ));
985        // Initialize the gateway.
986        let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
987        // Initialize the sync module.
988        let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone());
989        // Try to sync block 1.
990        sync.sync_storage_with_block(block_1).await?;
991        assert_eq!(syncing_ledger.latest_block_height(), 1);
992        // Try to sync block 2.
993        sync.sync_storage_with_block(block_2).await?;
994        assert_eq!(syncing_ledger.latest_block_height(), 2);
995        // Try to sync block 3.
996        sync.sync_storage_with_block(block_3).await?;
997        assert_eq!(syncing_ledger.latest_block_height(), 3);
998        // Ensure blocks 1 and 2 were added to the ledger.
999        assert!(syncing_ledger.contains_block_height(1));
1000        assert!(syncing_ledger.contains_block_height(2));
1001
1002        Ok(())
1003    }
1004
1005    #[tokio::test]
1006    #[tracing_test::traced_test]
1007    async fn test_pending_certificates() -> anyhow::Result<()> {
1008        let rng = &mut TestRng::default();
1009        // Initialize the round parameters.
1010        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1011        let commit_round = 2;
1012
1013        // Initialize the store.
1014        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1015        let account: Account<CurrentNetwork> = Account::new(rng)?;
1016
1017        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1018        let seed: u64 = rng.gen();
1019        let genesis_rng = &mut TestRng::from_seed(seed);
1020        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1021
1022        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1023        let genesis_rng = &mut TestRng::from_seed(seed);
1024        let private_keys = [
1025            *account.private_key(),
1026            PrivateKey::new(genesis_rng)?,
1027            PrivateKey::new(genesis_rng)?,
1028            PrivateKey::new(genesis_rng)?,
1029        ];
1030        // Initialize the ledger with the genesis block.
1031        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1032        // Initialize the ledger.
1033        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1034        // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1035        let (round_to_certificates_map, committee) = {
1036            // Initialize the committee.
1037            let committee = ledger.latest_committee().unwrap();
1038            // Initialize a mapping from the round number to the set of batch certificates in the round.
1039            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1040                HashMap::new();
1041            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1042
1043            for round in 0..=commit_round + 8 {
1044                let mut current_certificates = IndexSet::new();
1045                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1046                    IndexSet::new()
1047                } else {
1048                    previous_certificates.iter().map(|c| c.id()).collect()
1049                };
1050                let committee_id = committee.id();
1051                // Create a certificate for each validator.
1052                for (i, private_key_1) in private_keys.iter().enumerate() {
1053                    let batch_header = BatchHeader::new(
1054                        private_key_1,
1055                        round,
1056                        now(),
1057                        committee_id,
1058                        Default::default(),
1059                        previous_certificate_ids.clone(),
1060                        rng,
1061                    )
1062                    .unwrap();
1063                    // Sign the batch header.
1064                    let mut signatures = IndexSet::with_capacity(4);
1065                    for (j, private_key_2) in private_keys.iter().enumerate() {
1066                        if i != j {
1067                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1068                        }
1069                    }
1070                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1071                }
1072
1073                // Update the map of certificates.
1074                round_to_certificates_map.insert(round, current_certificates.clone());
1075                previous_certificates = current_certificates.clone();
1076            }
1077            (round_to_certificates_map, committee)
1078        };
1079
1080        // Initialize the storage.
1081        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1082        // Insert certificates into storage.
1083        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1084        for i in 1..=commit_round + 8 {
1085            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1086            certificates.extend(c);
1087        }
1088        for certificate in certificates.clone().iter() {
1089            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1090        }
1091        // Create block 1.
1092        let leader_round_1 = commit_round;
1093        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1094        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1095        let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1096        let block_1 = {
1097            let mut leader_cert_map = IndexSet::new();
1098            leader_cert_map.insert(leader_certificate.clone());
1099            let mut previous_cert_map = IndexSet::new();
1100            for cert in storage.get_certificates_for_round(commit_round - 1) {
1101                previous_cert_map.insert(cert);
1102            }
1103            subdag_map.insert(commit_round, leader_cert_map.clone());
1104            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1105            let subdag = Subdag::from(subdag_map.clone())?;
1106            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1107        };
1108        // Insert block 1.
1109        core_ledger.advance_to_next_block(&block_1)?;
1110
1111        // Create block 2.
1112        let leader_round_2 = commit_round + 2;
1113        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1114        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1115        let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1116        let block_2 = {
1117            let mut leader_cert_map_2 = IndexSet::new();
1118            leader_cert_map_2.insert(leader_certificate_2.clone());
1119            let mut previous_cert_map_2 = IndexSet::new();
1120            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1121                previous_cert_map_2.insert(cert);
1122            }
1123            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1124            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1125            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1126            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1127        };
1128        // Insert block 2.
1129        core_ledger.advance_to_next_block(&block_2)?;
1130
1131        // Create block 3
1132        let leader_round_3 = commit_round + 4;
1133        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1134        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1135        let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1136        let block_3 = {
1137            let mut leader_cert_map_3 = IndexSet::new();
1138            leader_cert_map_3.insert(leader_certificate_3.clone());
1139            let mut previous_cert_map_3 = IndexSet::new();
1140            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1141                previous_cert_map_3.insert(cert);
1142            }
1143            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1144            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1145            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1146            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1147        };
1148        // Insert block 3.
1149        core_ledger.advance_to_next_block(&block_3)?;
1150
1151        /*
1152            Check that the pending certificates are computed correctly.
1153        */
1154
1155        // Retrieve the pending certificates.
1156        let pending_certificates = storage.get_pending_certificates();
1157        // Check that all of the pending certificates are not contained in the ledger.
1158        for certificate in pending_certificates.clone() {
1159            assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1160        }
1161        // Initialize an empty set to be populated with the committed certificates in the block subdags.
1162        let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1163        {
1164            let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1165            for subdag in subdag_maps.iter() {
1166                for subdag_certificates in subdag.values() {
1167                    committed_certificates.extend(subdag_certificates.iter().cloned());
1168                }
1169            }
1170        };
1171        // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1172        let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1173        for certificate in certificates.clone() {
1174            if !committed_certificates.contains(&certificate) {
1175                candidate_pending_certificates.insert(certificate);
1176            }
1177        }
1178        // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1179        assert_eq!(pending_certificates, candidate_pending_certificates);
1180        Ok(())
1181    }
1182}