snarkos_node_bft/sync/
mod.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_FETCH_TIMEOUT_IN_MS,
19    PRIMARY_PING_IN_MS,
20    Transport,
21    helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22    spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
27use snarkos_node_tcp::P2P;
28use snarkvm::{
29    console::{network::Network, types::Field},
30    ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31    prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, bail};
35use parking_lot::Mutex;
36use rayon::prelude::*;
37use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration};
38use tokio::{
39    sync::{Mutex as TMutex, OnceCell, oneshot},
40    task::JoinHandle,
41};
42
43#[derive(Clone)]
44pub struct Sync<N: Network> {
45    /// The gateway.
46    gateway: Gateway<N>,
47    /// The storage.
48    storage: Storage<N>,
49    /// The ledger service.
50    ledger: Arc<dyn LedgerService<N>>,
51    /// The block sync module.
52    block_sync: BlockSync<N>,
53    /// The pending certificates queue.
54    pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
55    /// The BFT sender.
56    bft_sender: Arc<OnceCell<BFTSender<N>>>,
57    /// The spawned handles.
58    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
59    /// The response lock.
60    response_lock: Arc<TMutex<()>>,
61    /// The sync lock.
62    sync_lock: Arc<TMutex<()>>,
63    /// The latest block responses.
64    latest_block_responses: Arc<TMutex<HashMap<u32, Block<N>>>>,
65}
66
67impl<N: Network> Sync<N> {
68    /// Initializes a new sync instance.
69    pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
70        // Initialize the block sync module.
71        let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
72        // Return the sync instance.
73        Self {
74            gateway,
75            storage,
76            ledger,
77            block_sync,
78            pending: Default::default(),
79            bft_sender: Default::default(),
80            handles: Default::default(),
81            response_lock: Default::default(),
82            sync_lock: Default::default(),
83            latest_block_responses: Default::default(),
84        }
85    }
86
87    /// Initializes the sync module and sync the storage with the ledger at bootup.
88    pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
89        // If a BFT sender was provided, set it.
90        if let Some(bft_sender) = bft_sender {
91            self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
92        }
93
94        info!("Syncing storage with the ledger...");
95
96        // Sync the storage with the ledger.
97        self.sync_storage_with_ledger_at_bootup().await
98    }
99
100    /// Starts the sync module.
101    pub async fn run(&self, sync_receiver: SyncReceiver<N>) -> Result<()> {
102        info!("Starting the sync module...");
103
104        // Start the block sync loop.
105        let self_ = self.clone();
106        self.handles.lock().push(tokio::spawn(async move {
107            // Sleep briefly to allow an initial primary ping to come in prior to entering the loop.
108            // Ideally, a node does not consider itself synced when it has not received
109            // any block locators from peers. However, in the initial bootup of validators,
110            // this needs to happen, so we use this additional sleep as a grace period.
111            tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
112            loop {
113                // Sleep briefly to avoid triggering spam detection.
114                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
115                // Perform the sync routine.
116                let communication = &self_.gateway;
117                // let communication = &node.router;
118                self_.block_sync.try_block_sync(communication).await;
119
120                // Sync the storage with the blocks.
121                if let Err(e) = self_.sync_storage_with_blocks().await {
122                    error!("Unable to sync storage with blocks - {e}");
123                }
124
125                // If the node is synced, clear the `latest_block_responses`.
126                if self_.is_synced() {
127                    self_.latest_block_responses.lock().await.clear();
128                }
129            }
130        }));
131
132        // Start the pending queue expiration loop.
133        let self_ = self.clone();
134        self.spawn(async move {
135            loop {
136                // Sleep briefly.
137                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
138
139                // Remove the expired pending transmission requests.
140                let self__ = self_.clone();
141                let _ = spawn_blocking!({
142                    self__.pending.clear_expired_callbacks();
143                    Ok(())
144                });
145            }
146        });
147
148        // Retrieve the sync receiver.
149        let SyncReceiver {
150            mut rx_block_sync_advance_with_sync_blocks,
151            mut rx_block_sync_remove_peer,
152            mut rx_block_sync_update_peer_locators,
153            mut rx_certificate_request,
154            mut rx_certificate_response,
155        } = sync_receiver;
156
157        // Process the block sync request to advance with sync blocks.
158        // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
159        // which is initially handled by [`Gateway::inbound()`],
160        // which calls [`SyncSender::advance_with_sync_blocks()`],
161        // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
162        // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
163        let self_ = self.clone();
164        self.spawn(async move {
165            while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
166                // Process the block response.
167                if let Err(e) = self_.block_sync.process_block_response(peer_ip, blocks) {
168                    // Send the error to the callback.
169                    callback.send(Err(e)).ok();
170                    continue;
171                }
172
173                // Sync the storage with the blocks.
174                if let Err(e) = self_.sync_storage_with_blocks().await {
175                    // Send the error to the callback.
176                    callback.send(Err(e)).ok();
177                    continue;
178                }
179
180                // Send the result to the callback.
181                callback.send(Ok(())).ok();
182            }
183        });
184
185        // Process the block sync request to remove the peer.
186        let self_ = self.clone();
187        self.spawn(async move {
188            while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
189                self_.block_sync.remove_peer(&peer_ip);
190            }
191        });
192
193        // Process each block sync request to update peer locators.
194        // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
195        // which is initially handled by [`Gateway::inbound()`],
196        // which calls [`SyncSender::update_peer_locators()`],
197        // which calls [`tx_block_sync_update_peer_locators.send()`],
198        // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
199        let self_ = self.clone();
200        self.spawn(async move {
201            while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
202                let self_clone = self_.clone();
203                tokio::spawn(async move {
204                    // Update the peer locators.
205                    let result = self_clone.block_sync.update_peer_locators(peer_ip, locators);
206                    // Send the result to the callback.
207                    callback.send(result).ok();
208                });
209            }
210        });
211
212        // Process each certificate request.
213        // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
214        // which is initially handled by [`Gateway::inbound()`],
215        // which calls [`tx_certificate_request.send()`],
216        // which causes the `rx_certificate_request.recv()` call below to return.
217        let self_ = self.clone();
218        self.spawn(async move {
219            while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
220                self_.send_certificate_response(peer_ip, certificate_request);
221            }
222        });
223
224        // Process each certificate response.
225        // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
226        // which is initially handled by [`Gateway::inbound()`],
227        // which calls [`tx_certificate_response.send()`],
228        // which causes the `rx_certificate_response.recv()` call below to return.
229        let self_ = self.clone();
230        self.spawn(async move {
231            while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
232                self_.finish_certificate_request(peer_ip, certificate_response)
233            }
234        });
235
236        Ok(())
237    }
238}
239
240// Methods to manage storage.
241impl<N: Network> Sync<N> {
242    /// Syncs the storage with the ledger at bootup.
243    pub async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
244        // Retrieve the latest block in the ledger.
245        let latest_block = self.ledger.latest_block();
246
247        // Retrieve the block height.
248        let block_height = latest_block.height();
249        // Determine the maximum number of blocks corresponding to rounds
250        // that would not have been garbage collected, i.e. that would be kept in storage.
251        // Since at most one block is created every two rounds,
252        // this is half of the maximum number of rounds kept in storage.
253        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
254        // Determine the earliest height of blocks corresponding to rounds kept in storage,
255        // conservatively set to the block height minus the maximum number of blocks calculated above.
256        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
257        let gc_height = block_height.saturating_sub(max_gc_blocks);
258        // Retrieve the blocks.
259        let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
260
261        // Acquire the sync lock.
262        let _lock = self.sync_lock.lock().await;
263
264        debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
265
266        /* Sync storage */
267
268        // Sync the height with the block.
269        self.storage.sync_height_with_block(latest_block.height());
270        // Sync the round with the block.
271        self.storage.sync_round_with_block(latest_block.round());
272        // Perform GC on the latest block round.
273        self.storage.garbage_collect_certificates(latest_block.round());
274        // Iterate over the blocks.
275        for block in &blocks {
276            // If the block authority is a sub-DAG, then sync the batch certificates with the block.
277            // Note that the block authority is always a sub-DAG in production;
278            // beacon signatures are only used for testing,
279            // and as placeholder (irrelevant) block authority in the genesis block.
280            if let Authority::Quorum(subdag) = block.authority() {
281                // Reconstruct the unconfirmed transactions.
282                let unconfirmed_transactions = cfg_iter!(block.transactions())
283                    .filter_map(|tx| {
284                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
285                    })
286                    .collect::<HashMap<_, _>>();
287
288                // Iterate over the certificates.
289                for certificates in subdag.values().cloned() {
290                    cfg_into_iter!(certificates).for_each(|certificate| {
291                        self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
292                    });
293                }
294            }
295        }
296
297        /* Sync the BFT DAG */
298
299        // Construct a list of the certificates.
300        let certificates = blocks
301            .iter()
302            .flat_map(|block| {
303                match block.authority() {
304                    // If the block authority is a beacon, then skip the block.
305                    Authority::Beacon(_) => None,
306                    // If the block authority is a subdag, then retrieve the certificates.
307                    Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
308                }
309            })
310            .flatten()
311            .collect::<Vec<_>>();
312
313        // If a BFT sender was provided, send the certificates to the BFT.
314        if let Some(bft_sender) = self.bft_sender.get() {
315            // Await the callback to continue.
316            if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
317                bail!("Failed to update the BFT DAG from sync: {e}");
318            }
319        }
320
321        Ok(())
322    }
323
324    /// Syncs the storage with blocks already received from peers.
325    pub async fn sync_storage_with_blocks(&self) -> Result<()> {
326        // Acquire the response lock.
327        let _lock = self.response_lock.lock().await;
328
329        // Retrieve the next block height.
330        // This variable is used to index blocks that are added to the ledger;
331        // it is incremented as blocks as added.
332        // So 'current' means 'currently being added'.
333        let mut current_height = self.ledger.latest_block_height() + 1;
334
335        // Retrieve the maximum block height of the peers.
336        let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
337        // Determine the maximum number of blocks corresponding to rounds
338        // that would not have been garbage collected, i.e. that would be kept in storage.
339        // Since at most one block is created every two rounds,
340        // this is half of the maximum number of rounds kept in storage.
341        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
342        // Determine the earliest height of blocks corresponding to rounds kept in storage,
343        // conservatively set to the block height minus the maximum number of blocks calculated above.
344        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
345        let max_gc_height = tip.saturating_sub(max_gc_blocks);
346
347        // Determine if we can sync the ledger without updating the BFT first.
348        if current_height <= max_gc_height {
349            // Try to advance the ledger *to tip* without updating the BFT.
350            while let Some(block) = self.block_sync.peek_next_block(current_height) {
351                info!("Syncing the ledger to block {}...", block.height());
352                // Sync the ledger with the block without BFT.
353                match self.sync_ledger_with_block_without_bft(block).await {
354                    Ok(_) => {
355                        // Update the current height if sync succeeds.
356                        current_height += 1;
357                    }
358                    Err(e) => {
359                        // Mark the current height as processed in block_sync.
360                        self.block_sync.remove_block_response(current_height);
361                        return Err(e);
362                    }
363                }
364            }
365            // Sync the storage with the ledger if we should transition to the BFT sync.
366            if current_height > max_gc_height {
367                if let Err(e) = self.sync_storage_with_ledger_at_bootup().await {
368                    error!("BFT sync (with bootup routine) failed - {e}");
369                }
370            }
371        }
372
373        // Try to advance the ledger with sync blocks.
374        while let Some(block) = self.block_sync.peek_next_block(current_height) {
375            info!("Syncing the BFT to block {}...", block.height());
376            // Sync the storage with the block.
377            match self.sync_storage_with_block(block).await {
378                Ok(_) => {
379                    // Update the current height if sync succeeds.
380                    current_height += 1;
381                }
382                Err(e) => {
383                    // Mark the current height as processed in block_sync.
384                    self.block_sync.remove_block_response(current_height);
385                    return Err(e);
386                }
387            }
388        }
389        Ok(())
390    }
391
392    /// Syncs the ledger with the given block without updating the BFT.
393    async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
394        // Acquire the sync lock.
395        let _lock = self.sync_lock.lock().await;
396
397        let self_ = self.clone();
398        tokio::task::spawn_blocking(move || {
399            // Check the next block.
400            self_.ledger.check_next_block(&block)?;
401            // Attempt to advance to the next block.
402            self_.ledger.advance_to_next_block(&block)?;
403
404            // Sync the height with the block.
405            self_.storage.sync_height_with_block(block.height());
406            // Sync the round with the block.
407            self_.storage.sync_round_with_block(block.round());
408            // Mark the block height as processed in block_sync.
409            self_.block_sync.remove_block_response(block.height());
410
411            Ok(())
412        })
413        .await?
414    }
415
416    /// Syncs the storage with the given block.
417    pub async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
418        // Acquire the sync lock.
419        let _lock = self.sync_lock.lock().await;
420        // Acquire the latest block responses lock.
421        let mut latest_block_responses = self.latest_block_responses.lock().await;
422
423        // If this block has already been processed, return early.
424        if self.ledger.contains_block_height(block.height()) || latest_block_responses.contains_key(&block.height()) {
425            return Ok(());
426        }
427
428        // If the block authority is a sub-DAG, then sync the batch certificates with the block.
429        // Note that the block authority is always a sub-DAG in production;
430        // beacon signatures are only used for testing,
431        // and as placeholder (irrelevant) block authority in the genesis block.
432        if let Authority::Quorum(subdag) = block.authority() {
433            // Reconstruct the unconfirmed transactions.
434            let unconfirmed_transactions = cfg_iter!(block.transactions())
435                .filter_map(|tx| {
436                    tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
437                })
438                .collect::<HashMap<_, _>>();
439
440            // Iterate over the certificates.
441            for certificates in subdag.values().cloned() {
442                cfg_into_iter!(certificates.clone()).for_each(|certificate| {
443                    // Sync the batch certificate with the block.
444                    self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
445                });
446
447                // Sync the BFT DAG with the certificates.
448                for certificate in certificates {
449                    // If a BFT sender was provided, send the certificate to the BFT.
450                    if let Some(bft_sender) = self.bft_sender.get() {
451                        // Await the callback to continue.
452                        if let Err(e) = bft_sender.send_sync_bft(certificate).await {
453                            bail!("Sync - {e}");
454                        };
455                    }
456                }
457            }
458        }
459
460        // Fetch the latest block height.
461        let latest_block_height = self.ledger.latest_block_height();
462
463        // Insert the latest block response.
464        latest_block_responses.insert(block.height(), block);
465        // Clear the latest block responses of older blocks.
466        latest_block_responses.retain(|height, _| *height > latest_block_height);
467
468        // Get a list of contiguous blocks from the latest block responses.
469        let contiguous_blocks: Vec<Block<N>> = (latest_block_height.saturating_add(1)..)
470            .take_while(|&k| latest_block_responses.contains_key(&k))
471            .filter_map(|k| latest_block_responses.get(&k).cloned())
472            .collect();
473
474        // Check if the block response is ready to be added to the ledger.
475        // Ensure that the previous block's leader certificate meets the availability threshold
476        // based on the certificates in the current block.
477        // If the availability threshold is not met, process the next block and check if it is linked to the current block.
478        // Note: We do not advance to the most recent block response because we would be unable to
479        // validate if the leader certificate in the block has been certified properly.
480        for next_block in contiguous_blocks.into_iter() {
481            // Retrieve the height of the next block.
482            let next_block_height = next_block.height();
483
484            // Fetch the leader certificate and the relevant rounds.
485            let leader_certificate = match next_block.authority() {
486                Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
487                _ => bail!("Received a block with an unexpected authority type."),
488            };
489            let commit_round = leader_certificate.round();
490            let certificate_round = commit_round.saturating_add(1);
491
492            // Get the committee lookback for the commit round.
493            let committee_lookback = self.ledger.get_committee_lookback_for_round(commit_round)?;
494            // Retrieve all of the certificates for the **certificate** round.
495            let certificates = self.storage.get_certificates_for_round(certificate_round);
496            // Construct a set over the authors who included the leader's certificate in the certificate round.
497            let authors = certificates
498                .iter()
499                .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
500                    true => Some(c.author()),
501                    false => None,
502                })
503                .collect();
504
505            debug!("Validating sync block {next_block_height} at round {commit_round}...");
506            // Check if the leader is ready to be committed.
507            if committee_lookback.is_availability_threshold_reached(&authors) {
508                // Initialize the current certificate.
509                let mut current_certificate = leader_certificate;
510                // Check if there are any linked blocks that need to be added.
511                let mut blocks_to_add = vec![next_block];
512
513                // Check if there are other blocks to process based on `is_linked`.
514                for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
515                    // Retrieve the previous block.
516                    let Some(previous_block) = latest_block_responses.get(&height) else {
517                        bail!("Block {height} is missing from the latest block responses.");
518                    };
519                    // Retrieve the previous certificate.
520                    let previous_certificate = match previous_block.authority() {
521                        Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
522                        _ => bail!("Received a block with an unexpected authority type."),
523                    };
524                    // Determine if there is a path between the previous certificate and the current certificate.
525                    if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
526                        debug!("Previous sync block {height} is linked to the current block {next_block_height}");
527                        // Add the previous leader certificate to the list of certificates to commit.
528                        blocks_to_add.insert(0, previous_block.clone());
529                        // Update the current certificate to the previous leader certificate.
530                        current_certificate = previous_certificate;
531                    }
532                }
533
534                // Add the blocks to the ledger.
535                for block in blocks_to_add {
536                    // Check that the blocks are sequential and can be added to the ledger.
537                    let block_height = block.height();
538                    if block_height != self.ledger.latest_block_height().saturating_add(1) {
539                        warn!("Skipping block {block_height} from the latest block responses - not sequential.");
540                        continue;
541                    }
542
543                    let self_ = self.clone();
544                    tokio::task::spawn_blocking(move || {
545                        // Check the next block.
546                        self_.ledger.check_next_block(&block)?;
547                        // Attempt to advance to the next block.
548                        self_.ledger.advance_to_next_block(&block)?;
549
550                        // Sync the height with the block.
551                        self_.storage.sync_height_with_block(block.height());
552                        // Sync the round with the block.
553                        self_.storage.sync_round_with_block(block.round());
554
555                        Ok::<(), anyhow::Error>(())
556                    })
557                    .await??;
558                    // Remove the block height from the latest block responses.
559                    latest_block_responses.remove(&block_height);
560                    // Mark the block height as processed in block_sync.
561                    self.block_sync.remove_block_response(block_height);
562                }
563            } else {
564                debug!(
565                    "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
566                );
567            }
568        }
569
570        Ok(())
571    }
572
573    /// Returns `true` if there is a path from the previous certificate to the current certificate.
574    fn is_linked(
575        &self,
576        previous_certificate: BatchCertificate<N>,
577        current_certificate: BatchCertificate<N>,
578    ) -> Result<bool> {
579        // Initialize the list containing the traversal.
580        let mut traversal = vec![current_certificate.clone()];
581        // Iterate over the rounds from the current certificate to the previous certificate.
582        for round in (previous_certificate.round()..current_certificate.round()).rev() {
583            // Retrieve all of the certificates for this past round.
584            let certificates = self.storage.get_certificates_for_round(round);
585            // Filter the certificates to only include those that are in the traversal.
586            traversal = certificates
587                .into_iter()
588                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
589                .collect();
590        }
591        Ok(traversal.contains(&previous_certificate))
592    }
593}
594
595// Methods to assist with the block sync module.
596impl<N: Network> Sync<N> {
597    /// Returns `true` if the node is synced and has connected peers.
598    pub fn is_synced(&self) -> bool {
599        if self.gateway.number_of_connected_peers() == 0 {
600            return false;
601        }
602        self.block_sync.is_block_synced()
603    }
604
605    /// Returns the number of blocks the node is behind the greatest peer height.
606    pub fn num_blocks_behind(&self) -> u32 {
607        self.block_sync.num_blocks_behind()
608    }
609
610    /// Returns `true` if the node is in gateway mode.
611    pub const fn is_gateway_mode(&self) -> bool {
612        self.block_sync.mode().is_gateway()
613    }
614
615    /// Returns the current block locators of the node.
616    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
617        self.block_sync.get_block_locators()
618    }
619
620    /// Returns the block sync module.
621    #[cfg(test)]
622    #[doc(hidden)]
623    pub(super) fn block_sync(&self) -> &BlockSync<N> {
624        &self.block_sync
625    }
626}
627
628// Methods to assist with fetching batch certificates from peers.
629impl<N: Network> Sync<N> {
630    /// Sends a certificate request to the specified peer.
631    pub async fn send_certificate_request(
632        &self,
633        peer_ip: SocketAddr,
634        certificate_id: Field<N>,
635    ) -> Result<BatchCertificate<N>> {
636        // Initialize a oneshot channel.
637        let (callback_sender, callback_receiver) = oneshot::channel();
638        // Determine how many sent requests are pending.
639        let num_sent_requests = self.pending.num_sent_requests(certificate_id);
640        // Determine if we've already sent a request to the peer.
641        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
642        // Determine the maximum number of redundant requests.
643        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
644        // Determine if we should send a certificate request to the peer.
645        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
646        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
647
648        // Insert the certificate ID into the pending queue.
649        self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
650
651        // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
652        if should_send_request {
653            // Send the certificate request to the peer.
654            if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
655                bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
656            }
657        } else {
658            debug!(
659                "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
660                fmt_id(certificate_id)
661            );
662        }
663        // Wait for the certificate to be fetched.
664        // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
665        match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
666            // If the certificate was fetched, return it.
667            Ok(result) => Ok(result?),
668            // If the certificate was not fetched, return an error.
669            Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
670        }
671    }
672
673    /// Handles the incoming certificate request.
674    fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
675        // Attempt to retrieve the certificate.
676        if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
677            // Send the certificate response to the peer.
678            let self_ = self.clone();
679            tokio::spawn(async move {
680                let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
681            });
682        }
683    }
684
685    /// Handles the incoming certificate response.
686    /// This method ensures the certificate response is well-formed and matches the certificate ID.
687    fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
688        let certificate = response.certificate;
689        // Check if the peer IP exists in the pending queue for the given certificate ID.
690        let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
691        // If the peer IP exists, finish the pending request.
692        if exists {
693            // TODO: Validate the certificate.
694            // Remove the certificate ID from the pending queue.
695            self.pending.remove(certificate.id(), Some(certificate));
696        }
697    }
698}
699
700impl<N: Network> Sync<N> {
701    /// Spawns a task with the given future; it should only be used for long-running tasks.
702    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
703        self.handles.lock().push(tokio::spawn(future));
704    }
705
706    /// Shuts down the primary.
707    pub async fn shut_down(&self) {
708        info!("Shutting down the sync module...");
709        // Acquire the response lock.
710        let _lock = self.response_lock.lock().await;
711        // Acquire the sync lock.
712        let _lock = self.sync_lock.lock().await;
713        // Abort the tasks.
714        self.handles.lock().iter().for_each(|handle| handle.abort());
715    }
716}
717#[cfg(test)]
718mod tests {
719    use super::*;
720
721    use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
722    use snarkos_account::Account;
723    use snarkvm::{
724        console::{
725            account::{Address, PrivateKey},
726            network::MainnetV0,
727        },
728        ledger::{
729            narwhal::{BatchCertificate, BatchHeader, Subdag},
730            store::{ConsensusStore, helpers::memory::ConsensusMemory},
731        },
732        prelude::{Ledger, VM},
733        utilities::TestRng,
734    };
735
736    use aleo_std::StorageMode;
737    use indexmap::IndexSet;
738    use rand::Rng;
739    use std::collections::BTreeMap;
740
741    type CurrentNetwork = MainnetV0;
742    type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
743    type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
744
745    #[tokio::test]
746    #[tracing_test::traced_test]
747    async fn test_commit_via_is_linked() -> anyhow::Result<()> {
748        let rng = &mut TestRng::default();
749        // Initialize the round parameters.
750        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
751        let commit_round = 2;
752
753        // Initialize the store.
754        let store = CurrentConsensusStore::open(None).unwrap();
755        let account: Account<CurrentNetwork> = Account::new(rng)?;
756
757        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
758        let seed: u64 = rng.gen();
759        let genesis_rng = &mut TestRng::from_seed(seed);
760        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
761
762        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
763        let genesis_rng = &mut TestRng::from_seed(seed);
764        let private_keys = [
765            *account.private_key(),
766            PrivateKey::new(genesis_rng)?,
767            PrivateKey::new(genesis_rng)?,
768            PrivateKey::new(genesis_rng)?,
769        ];
770
771        // Initialize the ledger with the genesis block.
772        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
773        // Initialize the ledger.
774        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
775
776        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
777        let (round_to_certificates_map, committee) = {
778            let addresses = vec![
779                Address::try_from(private_keys[0])?,
780                Address::try_from(private_keys[1])?,
781                Address::try_from(private_keys[2])?,
782                Address::try_from(private_keys[3])?,
783            ];
784
785            let committee = ledger.latest_committee().unwrap();
786
787            // Initialize a mapping from the round number to the set of batch certificates in the round.
788            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
789                HashMap::new();
790            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
791
792            for round in 0..=commit_round + 8 {
793                let mut current_certificates = IndexSet::new();
794                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
795                    IndexSet::new()
796                } else {
797                    previous_certificates.iter().map(|c| c.id()).collect()
798                };
799                let committee_id = committee.id();
800
801                // Create a certificate for the leader.
802                if round <= 5 {
803                    let leader = committee.get_leader(round).unwrap();
804                    let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
805                    let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
806                    for i in [leader_index, non_leader_index].into_iter() {
807                        let batch_header = BatchHeader::new(
808                            &private_keys[i],
809                            round,
810                            now(),
811                            committee_id,
812                            Default::default(),
813                            previous_certificate_ids.clone(),
814                            rng,
815                        )
816                        .unwrap();
817                        // Sign the batch header.
818                        let mut signatures = IndexSet::with_capacity(4);
819                        for (j, private_key_2) in private_keys.iter().enumerate() {
820                            if i != j {
821                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
822                            }
823                        }
824                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
825                    }
826                }
827
828                // Create a certificate for each validator.
829                if round > 5 {
830                    for (i, private_key_1) in private_keys.iter().enumerate() {
831                        let batch_header = BatchHeader::new(
832                            private_key_1,
833                            round,
834                            now(),
835                            committee_id,
836                            Default::default(),
837                            previous_certificate_ids.clone(),
838                            rng,
839                        )
840                        .unwrap();
841                        // Sign the batch header.
842                        let mut signatures = IndexSet::with_capacity(4);
843                        for (j, private_key_2) in private_keys.iter().enumerate() {
844                            if i != j {
845                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
846                            }
847                        }
848                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
849                    }
850                }
851                // Update the map of certificates.
852                round_to_certificates_map.insert(round, current_certificates.clone());
853                previous_certificates = current_certificates.clone();
854            }
855            (round_to_certificates_map, committee)
856        };
857
858        // Initialize the storage.
859        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
860        // Insert certificates into storage.
861        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
862        for i in 1..=commit_round + 8 {
863            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
864            certificates.extend(c);
865        }
866        for certificate in certificates.clone().iter() {
867            storage.testing_only_insert_certificate_testing_only(certificate.clone());
868        }
869
870        // Create block 1.
871        let leader_round_1 = commit_round;
872        let leader_1 = committee.get_leader(leader_round_1).unwrap();
873        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
874        let block_1 = {
875            let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
876            let mut leader_cert_map = IndexSet::new();
877            leader_cert_map.insert(leader_certificate.clone());
878            let mut previous_cert_map = IndexSet::new();
879            for cert in storage.get_certificates_for_round(commit_round - 1) {
880                previous_cert_map.insert(cert);
881            }
882            subdag_map.insert(commit_round, leader_cert_map.clone());
883            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
884            let subdag = Subdag::from(subdag_map.clone())?;
885            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
886        };
887        // Insert block 1.
888        core_ledger.advance_to_next_block(&block_1)?;
889
890        // Create block 2.
891        let leader_round_2 = commit_round + 2;
892        let leader_2 = committee.get_leader(leader_round_2).unwrap();
893        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
894        let block_2 = {
895            let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
896            let mut leader_cert_map_2 = IndexSet::new();
897            leader_cert_map_2.insert(leader_certificate_2.clone());
898            let mut previous_cert_map_2 = IndexSet::new();
899            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
900                previous_cert_map_2.insert(cert);
901            }
902            let mut prev_commit_cert_map_2 = IndexSet::new();
903            for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
904                if cert != leader_certificate {
905                    prev_commit_cert_map_2.insert(cert);
906                }
907            }
908            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
909            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
910            subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
911            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
912            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
913        };
914        // Insert block 2.
915        core_ledger.advance_to_next_block(&block_2)?;
916
917        // Create block 3
918        let leader_round_3 = commit_round + 4;
919        let leader_3 = committee.get_leader(leader_round_3).unwrap();
920        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
921        let block_3 = {
922            let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
923            let mut leader_cert_map_3 = IndexSet::new();
924            leader_cert_map_3.insert(leader_certificate_3.clone());
925            let mut previous_cert_map_3 = IndexSet::new();
926            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
927                previous_cert_map_3.insert(cert);
928            }
929            let mut prev_commit_cert_map_3 = IndexSet::new();
930            for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
931                if cert != leader_certificate_2 {
932                    prev_commit_cert_map_3.insert(cert);
933                }
934            }
935            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
936            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
937            subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
938            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
939            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
940        };
941        // Insert block 3.
942        core_ledger.advance_to_next_block(&block_3)?;
943
944        // Initialize the syncing ledger.
945        let syncing_ledger = Arc::new(CoreLedgerService::new(
946            CurrentLedger::load(genesis, StorageMode::Production).unwrap(),
947            Default::default(),
948        ));
949        // Initialize the gateway.
950        let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
951        // Initialize the sync module.
952        let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone());
953        // Try to sync block 1.
954        sync.sync_storage_with_block(block_1).await?;
955        assert_eq!(syncing_ledger.latest_block_height(), 1);
956        // Try to sync block 2.
957        sync.sync_storage_with_block(block_2).await?;
958        assert_eq!(syncing_ledger.latest_block_height(), 2);
959        // Try to sync block 3.
960        sync.sync_storage_with_block(block_3).await?;
961        assert_eq!(syncing_ledger.latest_block_height(), 3);
962        // Ensure blocks 1 and 2 were added to the ledger.
963        assert!(syncing_ledger.contains_block_height(1));
964        assert!(syncing_ledger.contains_block_height(2));
965
966        Ok(())
967    }
968
969    #[tokio::test]
970    #[tracing_test::traced_test]
971    async fn test_pending_certificates() -> anyhow::Result<()> {
972        let rng = &mut TestRng::default();
973        // Initialize the round parameters.
974        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
975        let commit_round = 2;
976
977        // Initialize the store.
978        let store = CurrentConsensusStore::open(None).unwrap();
979        let account: Account<CurrentNetwork> = Account::new(rng)?;
980
981        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
982        let seed: u64 = rng.gen();
983        let genesis_rng = &mut TestRng::from_seed(seed);
984        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
985
986        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
987        let genesis_rng = &mut TestRng::from_seed(seed);
988        let private_keys = [
989            *account.private_key(),
990            PrivateKey::new(genesis_rng)?,
991            PrivateKey::new(genesis_rng)?,
992            PrivateKey::new(genesis_rng)?,
993        ];
994        // Initialize the ledger with the genesis block.
995        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
996        // Initialize the ledger.
997        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
998        // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
999        let (round_to_certificates_map, committee) = {
1000            // Initialize the committee.
1001            let committee = ledger.latest_committee().unwrap();
1002            // Initialize a mapping from the round number to the set of batch certificates in the round.
1003            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1004                HashMap::new();
1005            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1006
1007            for round in 0..=commit_round + 8 {
1008                let mut current_certificates = IndexSet::new();
1009                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1010                    IndexSet::new()
1011                } else {
1012                    previous_certificates.iter().map(|c| c.id()).collect()
1013                };
1014                let committee_id = committee.id();
1015                // Create a certificate for each validator.
1016                for (i, private_key_1) in private_keys.iter().enumerate() {
1017                    let batch_header = BatchHeader::new(
1018                        private_key_1,
1019                        round,
1020                        now(),
1021                        committee_id,
1022                        Default::default(),
1023                        previous_certificate_ids.clone(),
1024                        rng,
1025                    )
1026                    .unwrap();
1027                    // Sign the batch header.
1028                    let mut signatures = IndexSet::with_capacity(4);
1029                    for (j, private_key_2) in private_keys.iter().enumerate() {
1030                        if i != j {
1031                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1032                        }
1033                    }
1034                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1035                }
1036
1037                // Update the map of certificates.
1038                round_to_certificates_map.insert(round, current_certificates.clone());
1039                previous_certificates = current_certificates.clone();
1040            }
1041            (round_to_certificates_map, committee)
1042        };
1043
1044        // Initialize the storage.
1045        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1046        // Insert certificates into storage.
1047        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1048        for i in 1..=commit_round + 8 {
1049            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1050            certificates.extend(c);
1051        }
1052        for certificate in certificates.clone().iter() {
1053            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1054        }
1055        // Create block 1.
1056        let leader_round_1 = commit_round;
1057        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1058        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1059        let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1060        let block_1 = {
1061            let mut leader_cert_map = IndexSet::new();
1062            leader_cert_map.insert(leader_certificate.clone());
1063            let mut previous_cert_map = IndexSet::new();
1064            for cert in storage.get_certificates_for_round(commit_round - 1) {
1065                previous_cert_map.insert(cert);
1066            }
1067            subdag_map.insert(commit_round, leader_cert_map.clone());
1068            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1069            let subdag = Subdag::from(subdag_map.clone())?;
1070            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1071        };
1072        // Insert block 1.
1073        core_ledger.advance_to_next_block(&block_1)?;
1074
1075        // Create block 2.
1076        let leader_round_2 = commit_round + 2;
1077        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1078        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1079        let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1080        let block_2 = {
1081            let mut leader_cert_map_2 = IndexSet::new();
1082            leader_cert_map_2.insert(leader_certificate_2.clone());
1083            let mut previous_cert_map_2 = IndexSet::new();
1084            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1085                previous_cert_map_2.insert(cert);
1086            }
1087            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1088            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1089            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1090            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1091        };
1092        // Insert block 2.
1093        core_ledger.advance_to_next_block(&block_2)?;
1094
1095        // Create block 3
1096        let leader_round_3 = commit_round + 4;
1097        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1098        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1099        let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1100        let block_3 = {
1101            let mut leader_cert_map_3 = IndexSet::new();
1102            leader_cert_map_3.insert(leader_certificate_3.clone());
1103            let mut previous_cert_map_3 = IndexSet::new();
1104            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1105                previous_cert_map_3.insert(cert);
1106            }
1107            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1108            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1109            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1110            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1111        };
1112        // Insert block 3.
1113        core_ledger.advance_to_next_block(&block_3)?;
1114
1115        /*
1116            Check that the pending certificates are computed correctly.
1117        */
1118
1119        // Retrieve the pending certificates.
1120        let pending_certificates = storage.get_pending_certificates();
1121        // Check that all of the pending certificates are not contained in the ledger.
1122        for certificate in pending_certificates.clone() {
1123            assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1124        }
1125        // Initialize an empty set to be populated with the committed certificates in the block subdags.
1126        let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1127        {
1128            let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1129            for subdag in subdag_maps.iter() {
1130                for subdag_certificates in subdag.values() {
1131                    committed_certificates.extend(subdag_certificates.iter().cloned());
1132                }
1133            }
1134        };
1135        // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1136        let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1137        for certificate in certificates.clone() {
1138            if !committed_certificates.contains(&certificate) {
1139                candidate_pending_certificates.insert(certificate);
1140            }
1141        }
1142        // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1143        assert_eq!(pending_certificates, candidate_pending_certificates);
1144        Ok(())
1145    }
1146}