snarkos_node_bft/sync/
mod.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_FETCH_TIMEOUT_IN_MS,
19    PRIMARY_PING_IN_MS,
20    Transport,
21    helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22    spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
27use snarkos_node_tcp::P2P;
28use snarkvm::{
29    console::{network::Network, types::Field},
30    ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31    prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, bail};
35use parking_lot::Mutex;
36use rayon::prelude::*;
37use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration};
38use tokio::{
39    sync::{Mutex as TMutex, OnceCell, oneshot},
40    task::JoinHandle,
41};
42
43#[derive(Clone)]
44pub struct Sync<N: Network> {
45    /// The gateway.
46    gateway: Gateway<N>,
47    /// The storage.
48    storage: Storage<N>,
49    /// The ledger service.
50    ledger: Arc<dyn LedgerService<N>>,
51    /// The block sync module.
52    block_sync: BlockSync<N>,
53    /// The pending certificates queue.
54    pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
55    /// The BFT sender.
56    bft_sender: Arc<OnceCell<BFTSender<N>>>,
57    /// The spawned handles.
58    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
59    /// The response lock.
60    response_lock: Arc<TMutex<()>>,
61    /// The sync lock.
62    sync_lock: Arc<TMutex<()>>,
63    /// The latest block responses.
64    latest_block_responses: Arc<TMutex<HashMap<u32, Block<N>>>>,
65}
66
67impl<N: Network> Sync<N> {
68    /// Initializes a new sync instance.
69    pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
70        // Initialize the block sync module.
71        let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
72        // Return the sync instance.
73        Self {
74            gateway,
75            storage,
76            ledger,
77            block_sync,
78            pending: Default::default(),
79            bft_sender: Default::default(),
80            handles: Default::default(),
81            response_lock: Default::default(),
82            sync_lock: Default::default(),
83            latest_block_responses: Default::default(),
84        }
85    }
86
87    /// Initializes the sync module and sync the storage with the ledger at bootup.
88    pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
89        // If a BFT sender was provided, set it.
90        if let Some(bft_sender) = bft_sender {
91            self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
92        }
93
94        info!("Syncing storage with the ledger...");
95
96        // Sync the storage with the ledger.
97        self.sync_storage_with_ledger_at_bootup().await
98    }
99
100    /// Starts the sync module.
101    pub async fn run(&self, sync_receiver: SyncReceiver<N>) -> Result<()> {
102        info!("Starting the sync module...");
103
104        // Start the block sync loop.
105        let self_ = self.clone();
106        self.handles.lock().push(tokio::spawn(async move {
107            // Sleep briefly to allow an initial primary ping to come in prior to entering the loop.
108            // Ideally, a node does not consider itself synced when it has not received
109            // any block locators from peer. However, in the initial bootup of validators,
110            // this needs to happen, so we use this additional sleep as a grace period.
111            tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
112            loop {
113                // Sleep briefly to avoid triggering spam detection.
114                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
115                // Perform the sync routine.
116                let communication = &self_.gateway;
117                // let communication = &node.router;
118                self_.block_sync.try_block_sync(communication).await;
119
120                // Sync the storage with the blocks.
121                if let Err(e) = self_.sync_storage_with_blocks().await {
122                    error!("Unable to sync storage with blocks - {e}");
123                }
124
125                // If the node is synced, clear the `latest_block_responses`.
126                if self_.is_synced() {
127                    self_.latest_block_responses.lock().await.clear();
128                }
129            }
130        }));
131
132        // Start the pending queue expiration loop.
133        let self_ = self.clone();
134        self.spawn(async move {
135            loop {
136                // Sleep briefly.
137                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
138
139                // Remove the expired pending transmission requests.
140                let self__ = self_.clone();
141                let _ = spawn_blocking!({
142                    self__.pending.clear_expired_callbacks();
143                    Ok(())
144                });
145            }
146        });
147
148        // Retrieve the sync receiver.
149        let SyncReceiver {
150            mut rx_block_sync_advance_with_sync_blocks,
151            mut rx_block_sync_remove_peer,
152            mut rx_block_sync_update_peer_locators,
153            mut rx_certificate_request,
154            mut rx_certificate_response,
155        } = sync_receiver;
156
157        // Process the block sync request to advance with sync blocks.
158        let self_ = self.clone();
159        self.spawn(async move {
160            while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
161                // Process the block response.
162                if let Err(e) = self_.block_sync.process_block_response(peer_ip, blocks) {
163                    // Send the error to the callback.
164                    callback.send(Err(e)).ok();
165                    continue;
166                }
167
168                // Sync the storage with the blocks.
169                if let Err(e) = self_.sync_storage_with_blocks().await {
170                    // Send the error to the callback.
171                    callback.send(Err(e)).ok();
172                    continue;
173                }
174
175                // Send the result to the callback.
176                callback.send(Ok(())).ok();
177            }
178        });
179
180        // Process the block sync request to remove the peer.
181        let self_ = self.clone();
182        self.spawn(async move {
183            while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
184                self_.block_sync.remove_peer(&peer_ip);
185            }
186        });
187
188        // Process the block sync request to update peer locators.
189        let self_ = self.clone();
190        self.spawn(async move {
191            while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
192                let self_clone = self_.clone();
193                tokio::spawn(async move {
194                    // Update the peer locators.
195                    let result = self_clone.block_sync.update_peer_locators(peer_ip, locators);
196                    // Send the result to the callback.
197                    callback.send(result).ok();
198                });
199            }
200        });
201
202        // Process the certificate request.
203        let self_ = self.clone();
204        self.spawn(async move {
205            while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
206                self_.send_certificate_response(peer_ip, certificate_request);
207            }
208        });
209
210        // Process the certificate response.
211        let self_ = self.clone();
212        self.spawn(async move {
213            while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
214                self_.finish_certificate_request(peer_ip, certificate_response)
215            }
216        });
217
218        Ok(())
219    }
220}
221
222// Methods to manage storage.
223impl<N: Network> Sync<N> {
224    /// Syncs the storage with the ledger at bootup.
225    pub async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
226        // Retrieve the latest block in the ledger.
227        let latest_block = self.ledger.latest_block();
228
229        // Retrieve the block height.
230        let block_height = latest_block.height();
231        // Determine the number of maximum number of blocks that would have been garbage collected.
232        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
233        // Determine the earliest height, conservatively set to the block height minus the max GC rounds.
234        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
235        let gc_height = block_height.saturating_sub(max_gc_blocks);
236        // Retrieve the blocks.
237        let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
238
239        // Acquire the sync lock.
240        let _lock = self.sync_lock.lock().await;
241
242        debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
243
244        /* Sync storage */
245
246        // Sync the height with the block.
247        self.storage.sync_height_with_block(latest_block.height());
248        // Sync the round with the block.
249        self.storage.sync_round_with_block(latest_block.round());
250        // Perform GC on the latest block round.
251        self.storage.garbage_collect_certificates(latest_block.round());
252        // Iterate over the blocks.
253        for block in &blocks {
254            // If the block authority is a subdag, then sync the batch certificates with the block.
255            if let Authority::Quorum(subdag) = block.authority() {
256                // Reconstruct the unconfirmed transactions.
257                let unconfirmed_transactions = cfg_iter!(block.transactions())
258                    .filter_map(|tx| {
259                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
260                    })
261                    .collect::<HashMap<_, _>>();
262
263                // Iterate over the certificates.
264                for certificates in subdag.values().cloned() {
265                    cfg_into_iter!(certificates).for_each(|certificate| {
266                        self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
267                    });
268                }
269            }
270        }
271
272        /* Sync the BFT DAG */
273
274        // Construct a list of the certificates.
275        let certificates = blocks
276            .iter()
277            .flat_map(|block| {
278                match block.authority() {
279                    // If the block authority is a beacon, then skip the block.
280                    Authority::Beacon(_) => None,
281                    // If the block authority is a subdag, then retrieve the certificates.
282                    Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
283                }
284            })
285            .flatten()
286            .collect::<Vec<_>>();
287
288        // If a BFT sender was provided, send the certificates to the BFT.
289        if let Some(bft_sender) = self.bft_sender.get() {
290            // Await the callback to continue.
291            if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
292                bail!("Failed to update the BFT DAG from sync: {e}");
293            }
294        }
295
296        Ok(())
297    }
298
299    /// Syncs the storage with the given blocks.
300    pub async fn sync_storage_with_blocks(&self) -> Result<()> {
301        // Acquire the response lock.
302        let _lock = self.response_lock.lock().await;
303
304        // Retrieve the latest block height.
305        let mut current_height = self.ledger.latest_block_height() + 1;
306
307        // Retrieve the maximum block height of the peers.
308        let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
309        // Determine the number of maximum number of blocks that would have been garbage collected.
310        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
311        // Determine the maximum height that the peer would have garbage collected.
312        let max_gc_height = tip.saturating_sub(max_gc_blocks);
313
314        // Determine if we can sync the ledger without updating the BFT first.
315        if current_height <= max_gc_height {
316            // Try to advance the ledger *to tip* without updating the BFT.
317            while let Some(block) = self.block_sync.peek_next_block(current_height) {
318                info!("Syncing the ledger to block {}...", block.height());
319                // Sync the ledger with the block without BFT.
320                match self.sync_ledger_with_block_without_bft(block).await {
321                    Ok(_) => {
322                        // Update the current height if sync succeeds.
323                        current_height += 1;
324                    }
325                    Err(e) => {
326                        // Mark the current height as processed in block_sync.
327                        self.block_sync.remove_block_response(current_height);
328                        return Err(e);
329                    }
330                }
331            }
332            // Sync the storage with the ledger if we should transition to the BFT sync.
333            if current_height > max_gc_height {
334                if let Err(e) = self.sync_storage_with_ledger_at_bootup().await {
335                    error!("BFT sync (with bootup routine) failed - {e}");
336                }
337            }
338        }
339
340        // Try to advance the ledger with sync blocks.
341        while let Some(block) = self.block_sync.peek_next_block(current_height) {
342            info!("Syncing the BFT to block {}...", block.height());
343            // Sync the storage with the block.
344            match self.sync_storage_with_block(block).await {
345                Ok(_) => {
346                    // Update the current height if sync succeeds.
347                    current_height += 1;
348                }
349                Err(e) => {
350                    // Mark the current height as processed in block_sync.
351                    self.block_sync.remove_block_response(current_height);
352                    return Err(e);
353                }
354            }
355        }
356        Ok(())
357    }
358
359    /// Syncs the ledger with the given block without updating the BFT.
360    async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
361        // Acquire the sync lock.
362        let _lock = self.sync_lock.lock().await;
363
364        let self_ = self.clone();
365        tokio::task::spawn_blocking(move || {
366            // Check the next block.
367            self_.ledger.check_next_block(&block)?;
368            // Attempt to advance to the next block.
369            self_.ledger.advance_to_next_block(&block)?;
370
371            // Sync the height with the block.
372            self_.storage.sync_height_with_block(block.height());
373            // Sync the round with the block.
374            self_.storage.sync_round_with_block(block.round());
375            // Mark the block height as processed in block_sync.
376            self_.block_sync.remove_block_response(block.height());
377
378            Ok(())
379        })
380        .await?
381    }
382
383    /// Syncs the storage with the given blocks.
384    pub async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
385        // Acquire the sync lock.
386        let _lock = self.sync_lock.lock().await;
387        // Acquire the latest block responses lock.
388        let mut latest_block_responses = self.latest_block_responses.lock().await;
389
390        // If this block has already been processed, return early.
391        if self.ledger.contains_block_height(block.height()) || latest_block_responses.contains_key(&block.height()) {
392            return Ok(());
393        }
394
395        // If the block authority is a subdag, then sync the batch certificates with the block.
396        if let Authority::Quorum(subdag) = block.authority() {
397            // Reconstruct the unconfirmed transactions.
398            let unconfirmed_transactions = cfg_iter!(block.transactions())
399                .filter_map(|tx| {
400                    tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
401                })
402                .collect::<HashMap<_, _>>();
403
404            // Iterate over the certificates.
405            for certificates in subdag.values().cloned() {
406                cfg_into_iter!(certificates.clone()).for_each(|certificate| {
407                    // Sync the batch certificate with the block.
408                    self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
409                });
410
411                // Sync the BFT DAG with the certificates.
412                for certificate in certificates {
413                    // If a BFT sender was provided, send the certificate to the BFT.
414                    if let Some(bft_sender) = self.bft_sender.get() {
415                        // Await the callback to continue.
416                        if let Err(e) = bft_sender.send_sync_bft(certificate).await {
417                            bail!("Sync - {e}");
418                        };
419                    }
420                }
421            }
422        }
423
424        // Fetch the latest block height.
425        let latest_block_height = self.ledger.latest_block_height();
426
427        // Insert the latest block response.
428        latest_block_responses.insert(block.height(), block);
429        // Clear the latest block responses of older blocks.
430        latest_block_responses.retain(|height, _| *height > latest_block_height);
431
432        // Get a list of contiguous blocks from the latest block responses.
433        let contiguous_blocks: Vec<Block<N>> = (latest_block_height.saturating_add(1)..)
434            .take_while(|&k| latest_block_responses.contains_key(&k))
435            .filter_map(|k| latest_block_responses.get(&k).cloned())
436            .collect();
437
438        // Check if the block response is ready to be added to the ledger.
439        // Ensure that the previous block's leader certificate meets the availability threshold
440        // based on the certificates in the current block.
441        // If the availability threshold is not met, process the next block and check if it is linked to the current block.
442        // Note: We do not advance to the most recent block response because we would be unable to
443        // validate if the leader certificate in the block has been certified properly.
444        for next_block in contiguous_blocks.into_iter() {
445            // Retrieve the height of the next block.
446            let next_block_height = next_block.height();
447
448            // Fetch the leader certificate and the relevant rounds.
449            let leader_certificate = match next_block.authority() {
450                Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
451                _ => bail!("Received a block with an unexpected authority type."),
452            };
453            let commit_round = leader_certificate.round();
454            let certificate_round = commit_round.saturating_add(1);
455
456            // Get the committee lookback for the commit round.
457            let committee_lookback = self.ledger.get_committee_lookback_for_round(commit_round)?;
458            // Retrieve all of the certificates for the **certificate** round.
459            let certificates = self.storage.get_certificates_for_round(certificate_round);
460            // Construct a set over the authors who included the leader's certificate in the certificate round.
461            let authors = certificates
462                .iter()
463                .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
464                    true => Some(c.author()),
465                    false => None,
466                })
467                .collect();
468
469            debug!("Validating sync block {next_block_height} at round {commit_round}...");
470            // Check if the leader is ready to be committed.
471            if committee_lookback.is_availability_threshold_reached(&authors) {
472                // Initialize the current certificate.
473                let mut current_certificate = leader_certificate;
474                // Check if there are any linked blocks that need to be added.
475                let mut blocks_to_add = vec![next_block];
476
477                // Check if there are other blocks to process based on `is_linked`.
478                for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
479                    // Retrieve the previous block.
480                    let Some(previous_block) = latest_block_responses.get(&height) else {
481                        bail!("Block {height} is missing from the latest block responses.");
482                    };
483                    // Retrieve the previous certificate.
484                    let previous_certificate = match previous_block.authority() {
485                        Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
486                        _ => bail!("Received a block with an unexpected authority type."),
487                    };
488                    // Determine if there is a path between the previous certificate and the current certificate.
489                    if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
490                        debug!("Previous sync block {height} is linked to the current block {next_block_height}");
491                        // Add the previous leader certificate to the list of certificates to commit.
492                        blocks_to_add.insert(0, previous_block.clone());
493                        // Update the current certificate to the previous leader certificate.
494                        current_certificate = previous_certificate;
495                    }
496                }
497
498                // Add the blocks to the ledger.
499                for block in blocks_to_add {
500                    // Check that the blocks are sequential and can be added to the ledger.
501                    let block_height = block.height();
502                    if block_height != self.ledger.latest_block_height().saturating_add(1) {
503                        warn!("Skipping block {block_height} from the latest block responses - not sequential.");
504                        continue;
505                    }
506
507                    let self_ = self.clone();
508                    tokio::task::spawn_blocking(move || {
509                        // Check the next block.
510                        self_.ledger.check_next_block(&block)?;
511                        // Attempt to advance to the next block.
512                        self_.ledger.advance_to_next_block(&block)?;
513
514                        // Sync the height with the block.
515                        self_.storage.sync_height_with_block(block.height());
516                        // Sync the round with the block.
517                        self_.storage.sync_round_with_block(block.round());
518
519                        Ok::<(), anyhow::Error>(())
520                    })
521                    .await??;
522                    // Remove the block height from the latest block responses.
523                    latest_block_responses.remove(&block_height);
524                    // Mark the block height as processed in block_sync.
525                    self.block_sync.remove_block_response(block_height);
526                }
527            } else {
528                debug!(
529                    "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
530                );
531            }
532        }
533
534        Ok(())
535    }
536
537    /// Returns `true` if there is a path from the previous certificate to the current certificate.
538    fn is_linked(
539        &self,
540        previous_certificate: BatchCertificate<N>,
541        current_certificate: BatchCertificate<N>,
542    ) -> Result<bool> {
543        // Initialize the list containing the traversal.
544        let mut traversal = vec![current_certificate.clone()];
545        // Iterate over the rounds from the current certificate to the previous certificate.
546        for round in (previous_certificate.round()..current_certificate.round()).rev() {
547            // Retrieve all of the certificates for this past round.
548            let certificates = self.storage.get_certificates_for_round(round);
549            // Filter the certificates to only include those that are in the traversal.
550            traversal = certificates
551                .into_iter()
552                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
553                .collect();
554        }
555        Ok(traversal.contains(&previous_certificate))
556    }
557}
558
559// Methods to assist with the block sync module.
560impl<N: Network> Sync<N> {
561    /// Returns `true` if the node is synced and has connected peers.
562    pub fn is_synced(&self) -> bool {
563        if self.gateway.number_of_connected_peers() == 0 {
564            return false;
565        }
566        self.block_sync.is_block_synced()
567    }
568
569    /// Returns the number of blocks the node is behind the greatest peer height.
570    pub fn num_blocks_behind(&self) -> u32 {
571        self.block_sync.num_blocks_behind()
572    }
573
574    /// Returns `true` if the node is in gateway mode.
575    pub const fn is_gateway_mode(&self) -> bool {
576        self.block_sync.mode().is_gateway()
577    }
578
579    /// Returns the current block locators of the node.
580    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
581        self.block_sync.get_block_locators()
582    }
583
584    /// Returns the block sync module.
585    #[cfg(test)]
586    #[doc(hidden)]
587    pub(super) fn block_sync(&self) -> &BlockSync<N> {
588        &self.block_sync
589    }
590}
591
592// Methods to assist with fetching batch certificates from peers.
593impl<N: Network> Sync<N> {
594    /// Sends a certificate request to the specified peer.
595    pub async fn send_certificate_request(
596        &self,
597        peer_ip: SocketAddr,
598        certificate_id: Field<N>,
599    ) -> Result<BatchCertificate<N>> {
600        // Initialize a oneshot channel.
601        let (callback_sender, callback_receiver) = oneshot::channel();
602        // Determine how many sent requests are pending.
603        let num_sent_requests = self.pending.num_sent_requests(certificate_id);
604        // Determine if we've already sent a request to the peer.
605        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
606        // Determine the maximum number of redundant requests.
607        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round());
608        // Determine if we should send a certificate request to the peer.
609        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
610        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
611
612        // Insert the certificate ID into the pending queue.
613        self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
614
615        // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
616        if should_send_request {
617            // Send the certificate request to the peer.
618            if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
619                bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
620            }
621        } else {
622            debug!(
623                "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
624                fmt_id(certificate_id)
625            );
626        }
627        // Wait for the certificate to be fetched.
628        // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
629        match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
630            // If the certificate was fetched, return it.
631            Ok(result) => Ok(result?),
632            // If the certificate was not fetched, return an error.
633            Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
634        }
635    }
636
637    /// Handles the incoming certificate request.
638    fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
639        // Attempt to retrieve the certificate.
640        if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
641            // Send the certificate response to the peer.
642            let self_ = self.clone();
643            tokio::spawn(async move {
644                let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
645            });
646        }
647    }
648
649    /// Handles the incoming certificate response.
650    /// This method ensures the certificate response is well-formed and matches the certificate ID.
651    fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
652        let certificate = response.certificate;
653        // Check if the peer IP exists in the pending queue for the given certificate ID.
654        let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
655        // If the peer IP exists, finish the pending request.
656        if exists {
657            // TODO: Validate the certificate.
658            // Remove the certificate ID from the pending queue.
659            self.pending.remove(certificate.id(), Some(certificate));
660        }
661    }
662}
663
664impl<N: Network> Sync<N> {
665    /// Spawns a task with the given future; it should only be used for long-running tasks.
666    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
667        self.handles.lock().push(tokio::spawn(future));
668    }
669
670    /// Shuts down the primary.
671    pub async fn shut_down(&self) {
672        info!("Shutting down the sync module...");
673        // Acquire the response lock.
674        let _lock = self.response_lock.lock().await;
675        // Acquire the sync lock.
676        let _lock = self.sync_lock.lock().await;
677        // Abort the tasks.
678        self.handles.lock().iter().for_each(|handle| handle.abort());
679    }
680}
681#[cfg(test)]
682mod tests {
683    use super::*;
684
685    use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
686    use snarkos_account::Account;
687    use snarkvm::{
688        console::{
689            account::{Address, PrivateKey},
690            network::MainnetV0,
691        },
692        ledger::{
693            narwhal::{BatchCertificate, BatchHeader, Subdag},
694            store::{ConsensusStore, helpers::memory::ConsensusMemory},
695        },
696        prelude::{Ledger, VM},
697        utilities::TestRng,
698    };
699
700    use aleo_std::StorageMode;
701    use indexmap::IndexSet;
702    use rand::Rng;
703    use std::collections::BTreeMap;
704
705    type CurrentNetwork = MainnetV0;
706    type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
707    type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
708
709    #[tokio::test]
710    #[tracing_test::traced_test]
711    async fn test_commit_via_is_linked() -> anyhow::Result<()> {
712        let rng = &mut TestRng::default();
713        // Initialize the round parameters.
714        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
715        let commit_round = 2;
716
717        // Initialize the store.
718        let store = CurrentConsensusStore::open(None).unwrap();
719        let account: Account<CurrentNetwork> = Account::new(rng)?;
720
721        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
722        let seed: u64 = rng.gen();
723        let genesis_rng = &mut TestRng::from_seed(seed);
724        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
725
726        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
727        let genesis_rng = &mut TestRng::from_seed(seed);
728        let private_keys = [
729            *account.private_key(),
730            PrivateKey::new(genesis_rng)?,
731            PrivateKey::new(genesis_rng)?,
732            PrivateKey::new(genesis_rng)?,
733        ];
734
735        // Initialize the ledger with the genesis block.
736        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
737        // Initialize the ledger.
738        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
739
740        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
741        let (round_to_certificates_map, committee) = {
742            let addresses = vec![
743                Address::try_from(private_keys[0])?,
744                Address::try_from(private_keys[1])?,
745                Address::try_from(private_keys[2])?,
746                Address::try_from(private_keys[3])?,
747            ];
748
749            let committee = ledger.latest_committee().unwrap();
750
751            // Initialize a mapping from the round number to the set of batch certificates in the round.
752            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
753                HashMap::new();
754            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
755
756            for round in 0..=commit_round + 8 {
757                let mut current_certificates = IndexSet::new();
758                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
759                    IndexSet::new()
760                } else {
761                    previous_certificates.iter().map(|c| c.id()).collect()
762                };
763                let committee_id = committee.id();
764
765                // Create a certificate for the leader.
766                if round <= 5 {
767                    let leader = committee.get_leader(round).unwrap();
768                    let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
769                    let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
770                    for i in [leader_index, non_leader_index].into_iter() {
771                        let batch_header = BatchHeader::new(
772                            &private_keys[i],
773                            round,
774                            now(),
775                            committee_id,
776                            Default::default(),
777                            previous_certificate_ids.clone(),
778                            rng,
779                        )
780                        .unwrap();
781                        // Sign the batch header.
782                        let mut signatures = IndexSet::with_capacity(4);
783                        for (j, private_key_2) in private_keys.iter().enumerate() {
784                            if i != j {
785                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
786                            }
787                        }
788                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
789                    }
790                }
791
792                // Create a certificate for each validator.
793                if round > 5 {
794                    for (i, private_key_1) in private_keys.iter().enumerate() {
795                        let batch_header = BatchHeader::new(
796                            private_key_1,
797                            round,
798                            now(),
799                            committee_id,
800                            Default::default(),
801                            previous_certificate_ids.clone(),
802                            rng,
803                        )
804                        .unwrap();
805                        // Sign the batch header.
806                        let mut signatures = IndexSet::with_capacity(4);
807                        for (j, private_key_2) in private_keys.iter().enumerate() {
808                            if i != j {
809                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
810                            }
811                        }
812                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
813                    }
814                }
815                // Update the map of certificates.
816                round_to_certificates_map.insert(round, current_certificates.clone());
817                previous_certificates = current_certificates.clone();
818            }
819            (round_to_certificates_map, committee)
820        };
821
822        // Initialize the storage.
823        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
824        // Insert certificates into storage.
825        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
826        for i in 1..=commit_round + 8 {
827            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
828            certificates.extend(c);
829        }
830        for certificate in certificates.clone().iter() {
831            storage.testing_only_insert_certificate_testing_only(certificate.clone());
832        }
833
834        // Create block 1.
835        let leader_round_1 = commit_round;
836        let leader_1 = committee.get_leader(leader_round_1).unwrap();
837        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
838        let block_1 = {
839            let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
840            let mut leader_cert_map = IndexSet::new();
841            leader_cert_map.insert(leader_certificate.clone());
842            let mut previous_cert_map = IndexSet::new();
843            for cert in storage.get_certificates_for_round(commit_round - 1) {
844                previous_cert_map.insert(cert);
845            }
846            subdag_map.insert(commit_round, leader_cert_map.clone());
847            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
848            let subdag = Subdag::from(subdag_map.clone())?;
849            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
850        };
851        // Insert block 1.
852        core_ledger.advance_to_next_block(&block_1)?;
853
854        // Create block 2.
855        let leader_round_2 = commit_round + 2;
856        let leader_2 = committee.get_leader(leader_round_2).unwrap();
857        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
858        let block_2 = {
859            let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
860            let mut leader_cert_map_2 = IndexSet::new();
861            leader_cert_map_2.insert(leader_certificate_2.clone());
862            let mut previous_cert_map_2 = IndexSet::new();
863            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
864                previous_cert_map_2.insert(cert);
865            }
866            let mut prev_commit_cert_map_2 = IndexSet::new();
867            for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
868                if cert != leader_certificate {
869                    prev_commit_cert_map_2.insert(cert);
870                }
871            }
872            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
873            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
874            subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
875            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
876            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
877        };
878        // Insert block 2.
879        core_ledger.advance_to_next_block(&block_2)?;
880
881        // Create block 3
882        let leader_round_3 = commit_round + 4;
883        let leader_3 = committee.get_leader(leader_round_3).unwrap();
884        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
885        let block_3 = {
886            let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
887            let mut leader_cert_map_3 = IndexSet::new();
888            leader_cert_map_3.insert(leader_certificate_3.clone());
889            let mut previous_cert_map_3 = IndexSet::new();
890            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
891                previous_cert_map_3.insert(cert);
892            }
893            let mut prev_commit_cert_map_3 = IndexSet::new();
894            for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
895                if cert != leader_certificate_2 {
896                    prev_commit_cert_map_3.insert(cert);
897                }
898            }
899            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
900            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
901            subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
902            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
903            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
904        };
905        // Insert block 3.
906        core_ledger.advance_to_next_block(&block_3)?;
907
908        // Initialize the syncing ledger.
909        let syncing_ledger = Arc::new(CoreLedgerService::new(
910            CurrentLedger::load(genesis, StorageMode::Production).unwrap(),
911            Default::default(),
912        ));
913        // Initialize the gateway.
914        let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
915        // Initialize the sync module.
916        let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone());
917        // Try to sync block 1.
918        sync.sync_storage_with_block(block_1).await?;
919        assert_eq!(syncing_ledger.latest_block_height(), 1);
920        // Try to sync block 2.
921        sync.sync_storage_with_block(block_2).await?;
922        assert_eq!(syncing_ledger.latest_block_height(), 2);
923        // Try to sync block 3.
924        sync.sync_storage_with_block(block_3).await?;
925        assert_eq!(syncing_ledger.latest_block_height(), 3);
926        // Ensure blocks 1 and 2 were added to the ledger.
927        assert!(syncing_ledger.contains_block_height(1));
928        assert!(syncing_ledger.contains_block_height(2));
929
930        Ok(())
931    }
932
933    #[tokio::test]
934    #[tracing_test::traced_test]
935    async fn test_pending_certificates() -> anyhow::Result<()> {
936        let rng = &mut TestRng::default();
937        // Initialize the round parameters.
938        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
939        let commit_round = 2;
940
941        // Initialize the store.
942        let store = CurrentConsensusStore::open(None).unwrap();
943        let account: Account<CurrentNetwork> = Account::new(rng)?;
944
945        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
946        let seed: u64 = rng.gen();
947        let genesis_rng = &mut TestRng::from_seed(seed);
948        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
949
950        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
951        let genesis_rng = &mut TestRng::from_seed(seed);
952        let private_keys = [
953            *account.private_key(),
954            PrivateKey::new(genesis_rng)?,
955            PrivateKey::new(genesis_rng)?,
956            PrivateKey::new(genesis_rng)?,
957        ];
958        // Initialize the ledger with the genesis block.
959        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
960        // Initialize the ledger.
961        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
962        // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
963        let (round_to_certificates_map, committee) = {
964            // Initialize the committee.
965            let committee = ledger.latest_committee().unwrap();
966            // Initialize a mapping from the round number to the set of batch certificates in the round.
967            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
968                HashMap::new();
969            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
970
971            for round in 0..=commit_round + 8 {
972                let mut current_certificates = IndexSet::new();
973                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
974                    IndexSet::new()
975                } else {
976                    previous_certificates.iter().map(|c| c.id()).collect()
977                };
978                let committee_id = committee.id();
979                // Create a certificate for each validator.
980                for (i, private_key_1) in private_keys.iter().enumerate() {
981                    let batch_header = BatchHeader::new(
982                        private_key_1,
983                        round,
984                        now(),
985                        committee_id,
986                        Default::default(),
987                        previous_certificate_ids.clone(),
988                        rng,
989                    )
990                    .unwrap();
991                    // Sign the batch header.
992                    let mut signatures = IndexSet::with_capacity(4);
993                    for (j, private_key_2) in private_keys.iter().enumerate() {
994                        if i != j {
995                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
996                        }
997                    }
998                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
999                }
1000
1001                // Update the map of certificates.
1002                round_to_certificates_map.insert(round, current_certificates.clone());
1003                previous_certificates = current_certificates.clone();
1004            }
1005            (round_to_certificates_map, committee)
1006        };
1007
1008        // Initialize the storage.
1009        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1010        // Insert certificates into storage.
1011        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1012        for i in 1..=commit_round + 8 {
1013            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1014            certificates.extend(c);
1015        }
1016        for certificate in certificates.clone().iter() {
1017            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1018        }
1019        // Create block 1.
1020        let leader_round_1 = commit_round;
1021        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1022        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1023        let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1024        let block_1 = {
1025            let mut leader_cert_map = IndexSet::new();
1026            leader_cert_map.insert(leader_certificate.clone());
1027            let mut previous_cert_map = IndexSet::new();
1028            for cert in storage.get_certificates_for_round(commit_round - 1) {
1029                previous_cert_map.insert(cert);
1030            }
1031            subdag_map.insert(commit_round, leader_cert_map.clone());
1032            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1033            let subdag = Subdag::from(subdag_map.clone())?;
1034            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1035        };
1036        // Insert block 1.
1037        core_ledger.advance_to_next_block(&block_1)?;
1038
1039        // Create block 2.
1040        let leader_round_2 = commit_round + 2;
1041        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1042        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1043        let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1044        let block_2 = {
1045            let mut leader_cert_map_2 = IndexSet::new();
1046            leader_cert_map_2.insert(leader_certificate_2.clone());
1047            let mut previous_cert_map_2 = IndexSet::new();
1048            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1049                previous_cert_map_2.insert(cert);
1050            }
1051            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1052            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1053            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1054            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1055        };
1056        // Insert block 2.
1057        core_ledger.advance_to_next_block(&block_2)?;
1058
1059        // Create block 3
1060        let leader_round_3 = commit_round + 4;
1061        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1062        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1063        let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1064        let block_3 = {
1065            let mut leader_cert_map_3 = IndexSet::new();
1066            leader_cert_map_3.insert(leader_certificate_3.clone());
1067            let mut previous_cert_map_3 = IndexSet::new();
1068            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1069                previous_cert_map_3.insert(cert);
1070            }
1071            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1072            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1073            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1074            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1075        };
1076        // Insert block 3.
1077        core_ledger.advance_to_next_block(&block_3)?;
1078
1079        /*
1080            Check that the pending certificates are computed correctly.
1081        */
1082
1083        // Retrieve the pending certificates.
1084        let pending_certificates = storage.get_pending_certificates();
1085        // Check that all of the pending certificates are not contained in the ledger.
1086        for certificate in pending_certificates.clone() {
1087            assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1088        }
1089        // Initialize an empty set to be populated with the committed certificates in the block subdags.
1090        let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1091        {
1092            let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1093            for subdag in subdag_maps.iter() {
1094                for subdag_certificates in subdag.values() {
1095                    committed_certificates.extend(subdag_certificates.iter().cloned());
1096                }
1097            }
1098        };
1099        // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1100        let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1101        for certificate in certificates.clone() {
1102            if !committed_certificates.contains(&certificate) {
1103                candidate_pending_certificates.insert(certificate);
1104            }
1105        }
1106        // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1107        assert_eq!(pending_certificates, candidate_pending_certificates);
1108        Ok(())
1109    }
1110}