snarkos_node_bft/sync/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_FETCH_TIMEOUT_IN_MS,
19    Transport,
20    events::DataBlocks,
21    helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
22    spawn_blocking,
23};
24use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkos_node_router::PeerPoolHandling;
27use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
28use snarkvm::{
29    console::{network::Network, types::Field},
30    ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31    prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, anyhow, bail};
35use indexmap::IndexMap;
36#[cfg(feature = "locktick")]
37use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
38#[cfg(not(feature = "locktick"))]
39use parking_lot::Mutex;
40#[cfg(not(feature = "serial"))]
41use rayon::prelude::*;
42use std::{
43    collections::{BTreeMap, HashMap},
44    future::Future,
45    net::SocketAddr,
46    sync::Arc,
47    time::Duration,
48};
49#[cfg(not(feature = "locktick"))]
50use tokio::sync::Mutex as TMutex;
51use tokio::{
52    sync::{OnceCell, oneshot},
53    task::JoinHandle,
54};
55
56/// Block synchronization logic for validators.
57///
58/// Synchronization works differently for nodes that act as validators in AleoBFT;
59/// In the common case, validators generate blocks after receiving an anchor block that has been accepted
60/// by a supermajority of the committee instead of fetching entire blocks from other nodes.
61/// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes.
62///
63/// This struct also manages fetching certificates from other validators during normal operation,
64/// and blocks when falling behind.
65///
66/// Finally, `Sync` handles synchronization of blocks with the validator's local storage:
67/// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them.
68#[derive(Clone)]
69pub struct Sync<N: Network> {
70    /// The gateway enables communication with other validators.
71    gateway: Gateway<N>,
72    /// The storage.
73    storage: Storage<N>,
74    /// The ledger service.
75    ledger: Arc<dyn LedgerService<N>>,
76    /// The block synchronization logic.
77    block_sync: Arc<BlockSync<N>>,
78    /// The pending certificates queue.
79    pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
80    /// The BFT sender.
81    bft_sender: Arc<OnceCell<BFTSender<N>>>,
82    /// Handles to the spawned background tasks.
83    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
84    /// The response lock.
85    response_lock: Arc<TMutex<()>>,
86    /// The sync lock. Ensures that only one task syncs the ledger at a time.
87    sync_lock: Arc<TMutex<()>>,
88    /// The latest block responses.
89    ///
90    /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks  whose addition to the ledger is
91    /// deferred until certain checks pass.
92    /// Blocks need to be processed in order, hence a BTree map.
93    ///
94    /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called.
95    latest_block_responses: Arc<TMutex<BTreeMap<u32, Block<N>>>>,
96}
97
98impl<N: Network> Sync<N> {
99    /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
100    /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
101    const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
102
103    /// Initializes a new sync instance.
104    pub fn new(
105        gateway: Gateway<N>,
106        storage: Storage<N>,
107        ledger: Arc<dyn LedgerService<N>>,
108        block_sync: Arc<BlockSync<N>>,
109    ) -> Self {
110        // Return the sync instance.
111        Self {
112            gateway,
113            storage,
114            ledger,
115            block_sync,
116            pending: Default::default(),
117            bft_sender: Default::default(),
118            handles: Default::default(),
119            response_lock: Default::default(),
120            sync_lock: Default::default(),
121            latest_block_responses: Default::default(),
122        }
123    }
124
125    /// Initializes the sync module and sync the storage with the ledger at bootup.
126    pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
127        // If a BFT sender was provided, set it.
128        if let Some(bft_sender) = bft_sender {
129            self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
130        }
131
132        info!("Syncing storage with the ledger...");
133
134        // Sync the storage with the ledger.
135        self.sync_storage_with_ledger_at_bootup().await?;
136
137        debug!("Finished initial block synchronization at startup");
138        Ok(())
139    }
140
141    /// Sends the given batch of block requests to peers.
142    ///
143    /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`.
144    #[inline]
145    async fn send_block_requests(
146        &self,
147
148        block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
149        sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
150    ) {
151        trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
152
153        // Sends the block requests to the sync peers.
154        for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
155            if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
156                // Stop if we fail to process a batch of requests.
157                break;
158            }
159
160            // Sleep to avoid triggering spam detection.
161            tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
162        }
163    }
164
165    /// Starts the sync module.
166    ///
167    /// When this function returns successfully, the sync module will have spawned background tasks
168    /// that fetch blocks from other validators.
169    pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
170        info!("Starting the sync module...");
171
172        // Start the block request generation loop (outgoing).
173        let self_ = self.clone();
174        self.spawn(async move {
175            loop {
176                // Wait for peer updates or timeout
177                let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
178
179                // Issue block requests to peers.
180                self_.try_issuing_block_requests().await;
181
182                // Rate limiting happens in [`Self::send_block_requests`] and no additional sleeps are needed here.
183            }
184        });
185
186        // Start the block response processing loop (incoming).
187        let self_ = self.clone();
188        let ping = ping.clone();
189        self.spawn(async move {
190            loop {
191                // Wait until there is something to do or until the timeout.
192                let _ =
193                    tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
194
195                self_.try_advancing_block_synchronization(&ping).await;
196
197                // We perform no additional rate limiting here as
198                // requests are already rate-limited.
199            }
200        });
201
202        // Start the pending queue expiration loop.
203        let self_ = self.clone();
204        self.spawn(async move {
205            loop {
206                // Sleep briefly.
207                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
208
209                // Remove the expired pending transmission requests.
210                let self__ = self_.clone();
211                let _ = spawn_blocking!({
212                    self__.pending.clear_expired_callbacks();
213                    Ok(())
214                });
215            }
216        });
217
218        /* Set up callbacks for events from the Gateway */
219
220        // Retrieve the sync receiver.
221        let SyncReceiver {
222            mut rx_block_sync_insert_block_response,
223            mut rx_block_sync_remove_peer,
224            mut rx_block_sync_update_peer_locators,
225            mut rx_certificate_request,
226            mut rx_certificate_response,
227        } = sync_receiver;
228
229        // Process the block sync request to advance with sync blocks.
230        // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
231        // which is initially handled by [`Gateway::inbound()`],
232        // which calls [`SyncSender::advance_with_sync_blocks()`],
233        // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
234        // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
235        let self_ = self.clone();
236        self.spawn(async move {
237            while let Some((peer_ip, blocks, callback)) = rx_block_sync_insert_block_response.recv().await {
238                callback.send(self_.insert_block_response(peer_ip, blocks).await).ok();
239            }
240        });
241
242        // Process the block sync request to remove the peer.
243        let self_ = self.clone();
244        self.spawn(async move {
245            while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
246                self_.remove_peer(peer_ip);
247            }
248        });
249
250        // Process each block sync request to update peer locators.
251        // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
252        // which is initially handled by [`Gateway::inbound()`],
253        // which calls [`SyncSender::update_peer_locators()`],
254        // which calls [`tx_block_sync_update_peer_locators.send()`],
255        // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
256        let self_ = self.clone();
257        self.spawn(async move {
258            while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
259                let self_clone = self_.clone();
260                tokio::spawn(async move {
261                    callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
262                });
263            }
264        });
265
266        // Process each certificate request.
267        // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
268        // which is initially handled by [`Gateway::inbound()`],
269        // which calls [`tx_certificate_request.send()`],
270        // which causes the `rx_certificate_request.recv()` call below to return.
271        let self_ = self.clone();
272        self.spawn(async move {
273            while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
274                self_.send_certificate_response(peer_ip, certificate_request);
275            }
276        });
277
278        // Process each certificate response.
279        // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
280        // which is initially handled by [`Gateway::inbound()`],
281        // which calls [`tx_certificate_response.send()`],
282        // which causes the `rx_certificate_response.recv()` call below to return.
283        let self_ = self.clone();
284        self.spawn(async move {
285            while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
286                self_.finish_certificate_request(peer_ip, certificate_response);
287            }
288        });
289
290        Ok(())
291    }
292
293    /// BFT-specific version of `Client::try_issuing_block_requests()`.
294    ///
295    /// This method handles timeout removal, checks if block sync is possible,
296    /// and issues block requests to peers.
297    async fn try_issuing_block_requests(&self) {
298        // Check if any existing requests can be removed.
299        // We should do this even if we cannot block sync, to ensure
300        // there are no dangling block requests.
301        let timeout_requests = self.block_sync.handle_block_request_timeouts(&self.gateway);
302        if let Some((requests, sync_peers)) = timeout_requests {
303            self.send_block_requests(requests, sync_peers).await;
304            return;
305        }
306
307        // Update the sync height to the latest ledger height.
308        // (if the ledger height is lower or equal to the current sync height, this is a noop)
309        self.block_sync.set_sync_height(self.ledger.latest_block_height());
310
311        // Do not attempt to sync if there are no blocks to sync.
312        // This prevents redundant log messages and performing unnecessary computation.
313        if !self.block_sync.can_block_sync() {
314            return;
315        }
316
317        // Prepare the block requests, if any.
318        // In the process, we update the state of `is_block_synced` for the sync module.
319        let (requests, sync_peers) = self.block_sync.prepare_block_requests();
320
321        // If there are no block requests, return early.
322        if requests.is_empty() {
323            return;
324        }
325
326        // Send the block requests to peers.
327        self.send_block_requests(requests, sync_peers).await;
328    }
329
330    /// Test-only method to manually trigger block synchronization.
331    /// This combines both request generation and response processing for testing purposes.
332    #[cfg(test)]
333    pub(crate) async fn testing_only_try_block_sync_testing_only(&self) {
334        // First try issuing block requests
335        self.try_issuing_block_requests().await;
336
337        // Then try advancing with any available responses
338        self.try_advancing_block_synchronization(&None).await;
339    }
340}
341
342// Callbacks used when receiving messages from the Gateway
343impl<N: Network> Sync<N> {
344    /// We received a block response and can (possibly) advance synchronization.
345    async fn insert_block_response(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
346        // Verify that the response is valid and add it to block sync.
347        self.block_sync.insert_block_responses(peer_ip, blocks)
348
349        // No need to advance block sync here, as the new response will
350        // notify the incoming task.
351    }
352
353    /// We received new peer locators during a Ping.
354    fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
355        self.block_sync.update_peer_locators(peer_ip, &locators)
356    }
357
358    /// A peer disconnected.
359    fn remove_peer(&self, peer_ip: SocketAddr) {
360        self.block_sync.remove_peer(&peer_ip);
361    }
362
363    #[cfg(test)]
364    pub fn testing_only_update_peer_locators_testing_only(
365        &self,
366        peer_ip: SocketAddr,
367        locators: BlockLocators<N>,
368    ) -> Result<()> {
369        self.update_peer_locators(peer_ip, locators)
370    }
371}
372
373// Methods to manage storage.
374impl<N: Network> Sync<N> {
375    /// Syncs the storage with the ledger at bootup.
376    ///
377    /// This is called when starting the validator and after finishing a sync without BFT.
378    async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
379        // Retrieve the latest block in the ledger.
380        let latest_block = self.ledger.latest_block();
381
382        // Retrieve the block height.
383        let block_height = latest_block.height();
384        // Determine the maximum number of blocks corresponding to rounds
385        // that would not have been garbage collected, i.e. that would be kept in storage.
386        // Since at most one block is created every two rounds,
387        // this is half of the maximum number of rounds kept in storage.
388        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
389        // Determine the earliest height of blocks corresponding to rounds kept in storage,
390        // conservatively set to the block height minus the maximum number of blocks calculated above.
391        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
392        let gc_height = block_height.saturating_sub(max_gc_blocks);
393        // Retrieve the blocks.
394        let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
395
396        // Acquire the sync lock.
397        let _lock = self.sync_lock.lock().await;
398
399        debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
400
401        /* Sync storage */
402
403        // Sync the height with the block.
404        self.storage.sync_height_with_block(latest_block.height());
405        // Sync the round with the block.
406        self.storage.sync_round_with_block(latest_block.round());
407        // Perform GC on the latest block round.
408        self.storage.garbage_collect_certificates(latest_block.round());
409        // Iterate over the blocks.
410        for block in &blocks {
411            // If the block authority is a sub-DAG, then sync the batch certificates with the block.
412            // Note that the block authority is always a sub-DAG in production;
413            // beacon signatures are only used for testing,
414            // and as placeholder (irrelevant) block authority in the genesis block.
415            if let Authority::Quorum(subdag) = block.authority() {
416                // Reconstruct the unconfirmed transactions.
417                let unconfirmed_transactions = cfg_iter!(block.transactions())
418                    .filter_map(|tx| {
419                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
420                    })
421                    .collect::<HashMap<_, _>>();
422
423                // Iterate over the certificates.
424                for certificates in subdag.values().cloned() {
425                    cfg_into_iter!(certificates).for_each(|certificate| {
426                        self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
427                    });
428                }
429
430                // Update the validator telemetry.
431                #[cfg(feature = "telemetry")]
432                self.gateway.validator_telemetry().insert_subdag(subdag);
433            }
434        }
435
436        /* Sync the BFT DAG */
437
438        // Construct a list of the certificates.
439        let certificates = blocks
440            .iter()
441            .flat_map(|block| {
442                match block.authority() {
443                    // If the block authority is a beacon, then skip the block.
444                    Authority::Beacon(_) => None,
445                    // If the block authority is a subdag, then retrieve the certificates.
446                    Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
447                }
448            })
449            .flatten()
450            .collect::<Vec<_>>();
451
452        // If a BFT sender was provided, send the certificates to the BFT.
453        if let Some(bft_sender) = self.bft_sender.get() {
454            // Await the callback to continue.
455            if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
456                bail!("Failed to update the BFT DAG from sync: {e}");
457            }
458        }
459
460        self.block_sync.set_sync_height(block_height);
461
462        Ok(())
463    }
464
465    /// Returns which height we are synchronized to.
466    /// If there are queued block responses, this might be higher than the latest block in the ledger.
467    async fn compute_sync_height(&self) -> u32 {
468        let ledger_height = self.ledger.latest_block_height();
469        let mut responses = self.latest_block_responses.lock().await;
470
471        // Remove any old responses.
472        responses.retain(|height, _| *height > ledger_height);
473
474        // Ensure the returned value is always greater or equal than ledger height.
475        responses.last_key_value().map(|(height, _)| *height).unwrap_or(0).max(ledger_height)
476    }
477
478    /// BFT-version of [`snarkos_node_client::Client::try_advancing_block_synchronization`].
479    async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
480        // Process block responses and advance the ledger.
481        let new_blocks = match self.try_advancing_block_synchronization_inner().await {
482            Ok(new_blocks) => new_blocks,
483            Err(err) => {
484                error!("Block synchronization failed - {err}");
485                false
486            }
487        };
488
489        if let Some(ping) = &ping
490            && new_blocks
491        {
492            match self.get_block_locators() {
493                Ok(locators) => ping.update_block_locators(locators),
494                Err(err) => error!("Failed to update block locators: {err}"),
495            }
496        }
497    }
498
499    /// Aims to advance synchronization using any recent block responses received from peers.
500    ///
501    /// This is the validator's version of `BlockSync::try_advancing_block_synchronization`
502    /// and is called periodically at runtime.
503    ///
504    /// This returns Ok(true) if we successfully advanced the ledger by at least one new block.
505    ///
506    /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network.
507    /// If blocks are not confirmed yet, they will be kept in [`Self::latest_block_responses`].
508    /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected
509    /// (see [`Self::sync_storage_with_block`] for more details).
510    ///
511    /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead,
512    /// which syncs without updating the BFT state.
513    async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
514        // Acquire the response lock.
515        let _lock = self.response_lock.lock().await;
516
517        // For sanity, set the sync height again.
518        // (if the sync height is already larger or equal, this is a noop)
519        let ledger_height = self.ledger.latest_block_height();
520        self.block_sync.set_sync_height(ledger_height);
521
522        // Retrieve the maximum block height of the peers.
523        let tip = self
524            .block_sync
525            .find_sync_peers()
526            .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
527            .unwrap_or(0);
528
529        // Determine the maximum number of blocks corresponding to rounds
530        // that would not have been garbage collected, i.e. that would be kept in storage.
531        // Since at most one block is created every two rounds,
532        // this is half of the maximum number of rounds kept in storage.
533        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
534
535        // Updates sync state and returns the error (if any).
536        let cleanup = |start_height, current_height, error| {
537            let new_blocks = current_height > start_height;
538
539            // Make the underlying `BlockSync` instance aware of the new sync height.
540            if new_blocks {
541                self.block_sync.set_sync_height(current_height);
542            }
543
544            if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
545        };
546
547        // Determine the earliest height of blocks corresponding to rounds kept in storage,
548        // conservatively set to the block height minus the maximum number of blocks calculated above.
549        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
550        let max_gc_height = tip.saturating_sub(max_gc_blocks);
551        let within_gc = (ledger_height + 1) > max_gc_height;
552
553        if within_gc {
554            // Retrieve the current height, based on the ledger height and the
555            // (unconfirmed) blocks that are already queued up.
556            let start_height = self.compute_sync_height().await;
557
558            // For sanity, update the sync height before starting.
559            // (if this is lower or equal to the current sync height, this is a noop)
560            self.block_sync.set_sync_height(start_height);
561
562            // The height is incremented as blocks are added.
563            let mut current_height = start_height;
564            trace!("Try advancing with block responses (at block {current_height})");
565
566            // If we already were within GC or successfully caught up with GC, try to advance BFT normally again.
567            loop {
568                let next_height = current_height + 1;
569                let Some(block) = self.block_sync.peek_next_block(next_height) else {
570                    break;
571                };
572                info!("Syncing the BFT to block {}...", block.height());
573                // Sync the storage with the block.
574                match self.sync_storage_with_block(block).await {
575                    Ok(_) => {
576                        // Update the current height if sync succeeds.
577                        current_height = next_height;
578                    }
579                    Err(err) => {
580                        // Mark the current height as processed in block_sync.
581                        self.block_sync.remove_block_response(next_height);
582                        return cleanup(start_height, current_height, Some(err));
583                    }
584                }
585            }
586
587            cleanup(start_height, current_height, None)
588        } else {
589            info!("Block sync is too far behind other validators. Syncing without BFT.");
590
591            // For non-BFT sync we need to start at the current height of the ledger,as blocks are immediately
592            // added to it and not queue up in `latest_block_responses`.
593            let start_height = ledger_height;
594            let mut current_height = start_height;
595
596            // For sanity, update the sync height before starting.
597            // (if this is lower or equal to the current sync height, this is a noop)
598            self.block_sync.set_sync_height(start_height);
599
600            // Try to advance the ledger *to tip* without updating the BFT.
601            // TODO(kaimast): why to tip and not to tip-GC?
602            loop {
603                let next_height = current_height + 1;
604
605                let Some(block) = self.block_sync.peek_next_block(next_height) else {
606                    break;
607                };
608                info!("Syncing the ledger to block {}...", block.height());
609
610                // Sync the ledger with the block without BFT.
611                match self.sync_ledger_with_block_without_bft(block).await {
612                    Ok(_) => {
613                        // Update the current height if sync succeeds.
614                        current_height = next_height;
615                        self.block_sync.count_request_completed();
616                    }
617                    Err(err) => {
618                        // Mark the current height as processed in block_sync.
619                        self.block_sync.remove_block_response(next_height);
620                        return cleanup(start_height, current_height, Some(err));
621                    }
622                }
623            }
624
625            // Sync the storage with the ledger if we should transition to the BFT sync.
626            let within_gc = (current_height + 1) > max_gc_height;
627            if within_gc {
628                info!("Finished catching up with the network. Switching back to BFT sync.");
629                if let Err(err) = self.sync_storage_with_ledger_at_bootup().await {
630                    error!("BFT sync (with bootup routine) failed - {err}");
631                }
632            }
633
634            cleanup(start_height, current_height, None)
635        }
636    }
637
638    /// Syncs the ledger with the given block without updating the BFT.
639    ///
640    /// This is only used by `[Self::try_advancing_block_synchronization`].
641    async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
642        // Acquire the sync lock.
643        let _lock = self.sync_lock.lock().await;
644
645        let self_ = self.clone();
646        tokio::task::spawn_blocking(move || {
647            // Check the next block.
648            self_.ledger.check_next_block(&block)?;
649            // Attempt to advance to the next block.
650            self_.ledger.advance_to_next_block(&block)?;
651
652            // Sync the height with the block.
653            self_.storage.sync_height_with_block(block.height());
654            // Sync the round with the block.
655            self_.storage.sync_round_with_block(block.round());
656            // Mark the block height as processed in block_sync.
657            self_.block_sync.remove_block_response(block.height());
658
659            Ok(())
660        })
661        .await?
662    }
663
664    /// Advances the ledger by the given block and updates the storage accordingly.
665    ///
666    /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
667    /// meets the voter availability threshold (i.e. > f voting stake)
668    /// or is reachable via a DAG path from a later leader certificate that does.
669    /// Since performing this check requires DAG certificates from later blocks,
670    /// the block is stored in `Sync::latest_block_responses`,
671    /// and its addition to the ledger is deferred until the check passes.
672    /// Several blocks may be stored in `Sync::latest_block_responses`
673    /// before they can be all checked and added to the ledger.
674    async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
675        // Acquire the sync lock.
676        let _lock = self.sync_lock.lock().await;
677
678        // If this block has already been processed, return early.
679        // TODO(kaimast): Should we remove the response here?
680        if self.ledger.contains_block_height(block.height()) {
681            debug!("Ledger is already synced with block at height {}. Will not sync.", block.height());
682            return Ok(());
683        }
684
685        // Acquire the latest block responses lock.
686        let mut latest_block_responses = self.latest_block_responses.lock().await;
687
688        if latest_block_responses.contains_key(&block.height()) {
689            debug!("An unconfirmed block is queued already for height {}. Will not sync.", block.height());
690            return Ok(());
691        }
692
693        // If the block authority is a sub-DAG, then sync the batch certificates with the block.
694        // Note that the block authority is always a sub-DAG in production;
695        // beacon signatures are only used for testing,
696        // and as placeholder (irrelevant) block authority in the genesis block.
697        if let Authority::Quorum(subdag) = block.authority() {
698            // Reconstruct the unconfirmed transactions.
699            let unconfirmed_transactions = cfg_iter!(block.transactions())
700                .filter_map(|tx| {
701                    tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
702                })
703                .collect::<HashMap<_, _>>();
704
705            // Iterate over the certificates.
706            for certificates in subdag.values().cloned() {
707                cfg_into_iter!(certificates.clone()).for_each(|certificate| {
708                    // Sync the batch certificate with the block.
709                    self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
710                });
711
712                // Sync the BFT DAG with the certificates.
713                for certificate in certificates {
714                    // If a BFT sender was provided, send the certificate to the BFT.
715                    // For validators, BFT spawns a receiver task in `BFT::start_handlers`.
716                    if let Some(bft_sender) = self.bft_sender.get() {
717                        // Await the callback to continue.
718                        if let Err(err) = bft_sender.send_sync_bft(certificate).await {
719                            bail!("Failed to sync certificate - {err}");
720                        };
721                    }
722                }
723            }
724        }
725
726        // Fetch the latest block height.
727        let ledger_block_height = self.ledger.latest_block_height();
728
729        // Insert the latest block response.
730        latest_block_responses.insert(block.height(), block);
731        // Clear the latest block responses of older blocks.
732        latest_block_responses.retain(|height, _| *height > ledger_block_height);
733
734        // Get a list of contiguous blocks from the latest block responses.
735        let contiguous_blocks: Vec<Block<N>> = (ledger_block_height.saturating_add(1)..)
736            .take_while(|&k| latest_block_responses.contains_key(&k))
737            .filter_map(|k| latest_block_responses.get(&k).cloned())
738            .collect();
739
740        // Check if each block response, from the contiguous sequence just constructed,
741        // is ready to be added to the ledger.
742        // Ensure that the block's leader certificate meets the availability threshold
743        // based on the certificates in the DAG just after the block's round.
744        // If the availability threshold is not met,
745        // process the next block and check if it is linked to the current block,
746        // in the sense that there is a path in the DAG
747        // from the next block's leader certificate
748        // to the current block's leader certificate.
749        // Note: We do not advance to the most recent block response because we would be unable to
750        // validate if the leader certificate in the block has been certified properly.
751        for next_block in contiguous_blocks.into_iter() {
752            // Retrieve the height of the next block.
753            let next_block_height = next_block.height();
754
755            // Fetch the leader certificate and the relevant rounds.
756            let leader_certificate = match next_block.authority() {
757                Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
758                _ => bail!("Received a block with an unexpected authority type."),
759            };
760            let commit_round = leader_certificate.round();
761            let certificate_round =
762                commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
763
764            // Get the committee lookback for the round just after the leader.
765            let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
766            // Retrieve all of the certificates for the round just after the leader.
767            let certificates = self.storage.get_certificates_for_round(certificate_round);
768            // Construct a set over the authors, at the round just after the leader,
769            // who included the leader's certificate in their previous certificate IDs.
770            let authors = certificates
771                .iter()
772                .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
773                    true => Some(c.author()),
774                    false => None,
775                })
776                .collect();
777
778            debug!("Validating sync block {next_block_height} at round {commit_round}...");
779            // Check if the leader is ready to be committed.
780            if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
781                // Initialize the current certificate.
782                let mut current_certificate = leader_certificate;
783                // Check if there are any linked blocks that need to be added.
784                let mut blocks_to_add = vec![next_block];
785
786                // Check if there are other blocks to process based on `is_linked`.
787                for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
788                    // Retrieve the previous block.
789                    let Some(previous_block) = latest_block_responses.get(&height) else {
790                        bail!("Block {height} is missing from the latest block responses.");
791                    };
792                    // Retrieve the previous block's leader certificate.
793                    let previous_certificate = match previous_block.authority() {
794                        Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
795                        _ => bail!("Received a block with an unexpected authority type."),
796                    };
797                    // Determine if there is a path between the previous certificate and the current certificate.
798                    if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
799                        debug!("Previous sync block {height} is linked to the current block {next_block_height}");
800                        // Add the previous leader certificate to the list of certificates to commit.
801                        blocks_to_add.insert(0, previous_block.clone());
802                        // Update the current certificate to the previous leader certificate.
803                        current_certificate = previous_certificate;
804                    }
805                }
806
807                // Add the blocks to the ledger.
808                for block in blocks_to_add {
809                    // Check that the blocks are sequential and can be added to the ledger.
810                    let block_height = block.height();
811                    if block_height != self.ledger.latest_block_height().saturating_add(1) {
812                        warn!("Skipping block {block_height} from the latest block responses - not sequential.");
813                        continue;
814                    }
815                    #[cfg(feature = "telemetry")]
816                    let block_authority = block.authority().clone();
817
818                    let self_ = self.clone();
819                    tokio::task::spawn_blocking(move || {
820                        // Check the next block.
821                        self_.ledger.check_next_block(&block)?;
822                        // Attempt to advance to the next block.
823                        self_.ledger.advance_to_next_block(&block)?;
824
825                        // Sync the height with the block.
826                        self_.storage.sync_height_with_block(block.height());
827                        // Sync the round with the block.
828                        self_.storage.sync_round_with_block(block.round());
829
830                        Ok::<(), anyhow::Error>(())
831                    })
832                    .await??;
833                    // Remove the block height from the latest block responses.
834                    latest_block_responses.remove(&block_height);
835
836                    // Update the validator telemetry.
837                    #[cfg(feature = "telemetry")]
838                    if let Authority::Quorum(subdag) = block_authority {
839                        self_.gateway.validator_telemetry().insert_subdag(&subdag);
840                    }
841                }
842            } else {
843                debug!(
844                    "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
845                );
846            }
847
848            // Don't remove the response from BlockSync yet as we might fall back to non-BFT sync.
849        }
850
851        Ok(())
852    }
853
854    /// Returns `true` if there is a path from the previous certificate to the current certificate.
855    fn is_linked(
856        &self,
857        previous_certificate: BatchCertificate<N>,
858        current_certificate: BatchCertificate<N>,
859    ) -> Result<bool> {
860        // Initialize the list containing the traversal.
861        let mut traversal = vec![current_certificate.clone()];
862        // Iterate over the rounds from the current certificate to the previous certificate.
863        for round in (previous_certificate.round()..current_certificate.round()).rev() {
864            // Retrieve all of the certificates for this past round.
865            let certificates = self.storage.get_certificates_for_round(round);
866            // Filter the certificates to only include those that are in the traversal.
867            traversal = certificates
868                .into_iter()
869                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
870                .collect();
871        }
872        Ok(traversal.contains(&previous_certificate))
873    }
874}
875
876// Methods to assist with the block sync module.
877impl<N: Network> Sync<N> {
878    /// Returns `true` if the node is synced and has connected peers.
879    pub fn is_synced(&self) -> bool {
880        // Ensure the validator is connected to other validators,
881        // not just clients.
882        if self.gateway.number_of_connected_peers() == 0 {
883            return false;
884        }
885
886        self.block_sync.is_block_synced()
887    }
888
889    /// Returns the number of blocks the node is behind the greatest peer height.
890    pub fn num_blocks_behind(&self) -> Option<u32> {
891        self.block_sync.num_blocks_behind()
892    }
893
894    /// Returns the current block locators of the node.
895    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
896        self.block_sync.get_block_locators()
897    }
898}
899
900// Methods to assist with fetching batch certificates from peers.
901impl<N: Network> Sync<N> {
902    /// Sends a certificate request to the specified peer.
903    pub async fn send_certificate_request(
904        &self,
905        peer_ip: SocketAddr,
906        certificate_id: Field<N>,
907    ) -> Result<BatchCertificate<N>> {
908        // Initialize a oneshot channel.
909        let (callback_sender, callback_receiver) = oneshot::channel();
910        // Determine how many sent requests are pending.
911        let num_sent_requests = self.pending.num_sent_requests(certificate_id);
912        // Determine if we've already sent a request to the peer.
913        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
914        // Determine the maximum number of redundant requests.
915        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
916        // Determine if we should send a certificate request to the peer.
917        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
918        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
919
920        // Insert the certificate ID into the pending queue.
921        self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
922
923        // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
924        if should_send_request {
925            // Send the certificate request to the peer.
926            if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
927                bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
928            }
929        } else {
930            debug!(
931                "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
932                fmt_id(certificate_id)
933            );
934        }
935        // Wait for the certificate to be fetched.
936        // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
937        match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
938            // If the certificate was fetched, return it.
939            Ok(result) => Ok(result?),
940            // If the certificate was not fetched, return an error.
941            Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
942        }
943    }
944
945    /// Handles the incoming certificate request.
946    fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
947        // Attempt to retrieve the certificate.
948        if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
949            // Send the certificate response to the peer.
950            let self_ = self.clone();
951            tokio::spawn(async move {
952                let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
953            });
954        }
955    }
956
957    /// Handles the incoming certificate response.
958    /// This method ensures the certificate response is well-formed and matches the certificate ID.
959    fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
960        let certificate = response.certificate;
961        // Check if the peer IP exists in the pending queue for the given certificate ID.
962        let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
963        // If the peer IP exists, finish the pending request.
964        if exists {
965            // TODO: Validate the certificate.
966            // Remove the certificate ID from the pending queue.
967            self.pending.remove(certificate.id(), Some(certificate));
968        }
969    }
970}
971
972impl<N: Network> Sync<N> {
973    /// Spawns a task with the given future; it should only be used for long-running tasks.
974    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
975        self.handles.lock().push(tokio::spawn(future));
976    }
977
978    /// Shuts down the primary.
979    pub async fn shut_down(&self) {
980        info!("Shutting down the sync module...");
981        // Acquire the response lock.
982        let _lock = self.response_lock.lock().await;
983        // Acquire the sync lock.
984        let _lock = self.sync_lock.lock().await;
985        // Abort the tasks.
986        self.handles.lock().iter().for_each(|handle| handle.abort());
987    }
988}
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993
994    use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
995
996    use snarkos_account::Account;
997    use snarkos_node_sync::BlockSync;
998    use snarkvm::{
999        console::{
1000            account::{Address, PrivateKey},
1001            network::MainnetV0,
1002        },
1003        ledger::{
1004            narwhal::{BatchCertificate, BatchHeader, Subdag},
1005            store::{ConsensusStore, helpers::memory::ConsensusMemory},
1006        },
1007        prelude::{Ledger, VM},
1008        utilities::TestRng,
1009    };
1010
1011    use aleo_std::StorageMode;
1012    use indexmap::IndexSet;
1013    use rand::Rng;
1014    use std::collections::BTreeMap;
1015
1016    type CurrentNetwork = MainnetV0;
1017    type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1018    type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1019
1020    #[tokio::test]
1021    #[tracing_test::traced_test]
1022    async fn test_commit_via_is_linked() -> anyhow::Result<()> {
1023        let rng = &mut TestRng::default();
1024        // Initialize the round parameters.
1025        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1026        let commit_round = 2;
1027
1028        // Initialize the store.
1029        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1030        let account: Account<CurrentNetwork> = Account::new(rng)?;
1031
1032        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1033        let seed: u64 = rng.r#gen();
1034        let genesis_rng = &mut TestRng::from_seed(seed);
1035        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1036
1037        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1038        let genesis_rng = &mut TestRng::from_seed(seed);
1039        let private_keys = [
1040            *account.private_key(),
1041            PrivateKey::new(genesis_rng)?,
1042            PrivateKey::new(genesis_rng)?,
1043            PrivateKey::new(genesis_rng)?,
1044        ];
1045
1046        // Initialize the ledger with the genesis block.
1047        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1048        // Initialize the ledger.
1049        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1050
1051        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1052        let (round_to_certificates_map, committee) = {
1053            let addresses = vec![
1054                Address::try_from(private_keys[0])?,
1055                Address::try_from(private_keys[1])?,
1056                Address::try_from(private_keys[2])?,
1057                Address::try_from(private_keys[3])?,
1058            ];
1059
1060            let committee = ledger.latest_committee().unwrap();
1061
1062            // Initialize a mapping from the round number to the set of batch certificates in the round.
1063            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1064                HashMap::new();
1065            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1066
1067            for round in 0..=commit_round + 8 {
1068                let mut current_certificates = IndexSet::new();
1069                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1070                    IndexSet::new()
1071                } else {
1072                    previous_certificates.iter().map(|c| c.id()).collect()
1073                };
1074                let committee_id = committee.id();
1075
1076                // Create a certificate for the leader.
1077                if round <= 5 {
1078                    let leader = committee.get_leader(round).unwrap();
1079                    let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
1080                    let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
1081                    for i in [leader_index, non_leader_index].into_iter() {
1082                        let batch_header = BatchHeader::new(
1083                            &private_keys[i],
1084                            round,
1085                            now(),
1086                            committee_id,
1087                            Default::default(),
1088                            previous_certificate_ids.clone(),
1089                            rng,
1090                        )
1091                        .unwrap();
1092                        // Sign the batch header.
1093                        let mut signatures = IndexSet::with_capacity(4);
1094                        for (j, private_key_2) in private_keys.iter().enumerate() {
1095                            if i != j {
1096                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1097                            }
1098                        }
1099                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1100                    }
1101                }
1102
1103                // Create a certificate for each validator.
1104                if round > 5 {
1105                    for (i, private_key_1) in private_keys.iter().enumerate() {
1106                        let batch_header = BatchHeader::new(
1107                            private_key_1,
1108                            round,
1109                            now(),
1110                            committee_id,
1111                            Default::default(),
1112                            previous_certificate_ids.clone(),
1113                            rng,
1114                        )
1115                        .unwrap();
1116                        // Sign the batch header.
1117                        let mut signatures = IndexSet::with_capacity(4);
1118                        for (j, private_key_2) in private_keys.iter().enumerate() {
1119                            if i != j {
1120                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1121                            }
1122                        }
1123                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1124                    }
1125                }
1126                // Update the map of certificates.
1127                round_to_certificates_map.insert(round, current_certificates.clone());
1128                previous_certificates = current_certificates.clone();
1129            }
1130            (round_to_certificates_map, committee)
1131        };
1132
1133        // Initialize the storage.
1134        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1135        // Insert certificates into storage.
1136        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1137        for i in 1..=commit_round + 8 {
1138            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1139            certificates.extend(c);
1140        }
1141        for certificate in certificates.clone().iter() {
1142            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1143        }
1144
1145        // Create block 1.
1146        let leader_round_1 = commit_round;
1147        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1148        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1149        let block_1 = {
1150            let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1151            let mut leader_cert_map = IndexSet::new();
1152            leader_cert_map.insert(leader_certificate.clone());
1153            let mut previous_cert_map = IndexSet::new();
1154            for cert in storage.get_certificates_for_round(commit_round - 1) {
1155                previous_cert_map.insert(cert);
1156            }
1157            subdag_map.insert(commit_round, leader_cert_map.clone());
1158            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1159            let subdag = Subdag::from(subdag_map.clone())?;
1160            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1161        };
1162        // Insert block 1.
1163        core_ledger.advance_to_next_block(&block_1)?;
1164
1165        // Create block 2.
1166        let leader_round_2 = commit_round + 2;
1167        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1168        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1169        let block_2 = {
1170            let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1171            let mut leader_cert_map_2 = IndexSet::new();
1172            leader_cert_map_2.insert(leader_certificate_2.clone());
1173            let mut previous_cert_map_2 = IndexSet::new();
1174            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1175                previous_cert_map_2.insert(cert);
1176            }
1177            let mut prev_commit_cert_map_2 = IndexSet::new();
1178            for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
1179                if cert != leader_certificate {
1180                    prev_commit_cert_map_2.insert(cert);
1181                }
1182            }
1183            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1184            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1185            subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
1186            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1187            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1188        };
1189        // Insert block 2.
1190        core_ledger.advance_to_next_block(&block_2)?;
1191
1192        // Create block 3
1193        let leader_round_3 = commit_round + 4;
1194        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1195        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1196        let block_3 = {
1197            let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1198            let mut leader_cert_map_3 = IndexSet::new();
1199            leader_cert_map_3.insert(leader_certificate_3.clone());
1200            let mut previous_cert_map_3 = IndexSet::new();
1201            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1202                previous_cert_map_3.insert(cert);
1203            }
1204            let mut prev_commit_cert_map_3 = IndexSet::new();
1205            for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
1206                if cert != leader_certificate_2 {
1207                    prev_commit_cert_map_3.insert(cert);
1208                }
1209            }
1210            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1211            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1212            subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
1213            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1214            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1215        };
1216        // Insert block 3.
1217        core_ledger.advance_to_next_block(&block_3)?;
1218
1219        // Initialize the syncing ledger.
1220        let storage_mode = StorageMode::new_test(None);
1221        let syncing_ledger = Arc::new(CoreLedgerService::new(
1222            CurrentLedger::load(genesis, storage_mode.clone()).unwrap(),
1223            Default::default(),
1224        ));
1225        // Initialize the gateway.
1226        let gateway =
1227            Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], storage_mode, None)?;
1228        // Initialize the block synchronization logic.
1229        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1230        // Initialize the sync module.
1231        let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1232        // Try to sync block 1.
1233        sync.sync_storage_with_block(block_1).await?;
1234        assert_eq!(syncing_ledger.latest_block_height(), 1);
1235        // Try to sync block 2.
1236        sync.sync_storage_with_block(block_2).await?;
1237        assert_eq!(syncing_ledger.latest_block_height(), 2);
1238        // Try to sync block 3.
1239        sync.sync_storage_with_block(block_3).await?;
1240        assert_eq!(syncing_ledger.latest_block_height(), 3);
1241        // Ensure blocks 1 and 2 were added to the ledger.
1242        assert!(syncing_ledger.contains_block_height(1));
1243        assert!(syncing_ledger.contains_block_height(2));
1244
1245        Ok(())
1246    }
1247
1248    #[tokio::test]
1249    #[tracing_test::traced_test]
1250    async fn test_pending_certificates() -> anyhow::Result<()> {
1251        let rng = &mut TestRng::default();
1252        // Initialize the round parameters.
1253        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1254        let commit_round = 2;
1255
1256        // Initialize the store.
1257        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1258        let account: Account<CurrentNetwork> = Account::new(rng)?;
1259
1260        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1261        let seed: u64 = rng.r#gen();
1262        let genesis_rng = &mut TestRng::from_seed(seed);
1263        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1264
1265        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1266        let genesis_rng = &mut TestRng::from_seed(seed);
1267        let private_keys = [
1268            *account.private_key(),
1269            PrivateKey::new(genesis_rng)?,
1270            PrivateKey::new(genesis_rng)?,
1271            PrivateKey::new(genesis_rng)?,
1272        ];
1273        // Initialize the ledger with the genesis block.
1274        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1275        // Initialize the ledger.
1276        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1277        // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1278        let (round_to_certificates_map, committee) = {
1279            // Initialize the committee.
1280            let committee = ledger.latest_committee().unwrap();
1281            // Initialize a mapping from the round number to the set of batch certificates in the round.
1282            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1283                HashMap::new();
1284            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1285
1286            for round in 0..=commit_round + 8 {
1287                let mut current_certificates = IndexSet::new();
1288                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1289                    IndexSet::new()
1290                } else {
1291                    previous_certificates.iter().map(|c| c.id()).collect()
1292                };
1293                let committee_id = committee.id();
1294                // Create a certificate for each validator.
1295                for (i, private_key_1) in private_keys.iter().enumerate() {
1296                    let batch_header = BatchHeader::new(
1297                        private_key_1,
1298                        round,
1299                        now(),
1300                        committee_id,
1301                        Default::default(),
1302                        previous_certificate_ids.clone(),
1303                        rng,
1304                    )
1305                    .unwrap();
1306                    // Sign the batch header.
1307                    let mut signatures = IndexSet::with_capacity(4);
1308                    for (j, private_key_2) in private_keys.iter().enumerate() {
1309                        if i != j {
1310                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1311                        }
1312                    }
1313                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1314                }
1315
1316                // Update the map of certificates.
1317                round_to_certificates_map.insert(round, current_certificates.clone());
1318                previous_certificates = current_certificates.clone();
1319            }
1320            (round_to_certificates_map, committee)
1321        };
1322
1323        // Initialize the storage.
1324        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1325        // Insert certificates into storage.
1326        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1327        for i in 1..=commit_round + 8 {
1328            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1329            certificates.extend(c);
1330        }
1331        for certificate in certificates.clone().iter() {
1332            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1333        }
1334        // Create block 1.
1335        let leader_round_1 = commit_round;
1336        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1337        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1338        let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1339        let block_1 = {
1340            let mut leader_cert_map = IndexSet::new();
1341            leader_cert_map.insert(leader_certificate.clone());
1342            let mut previous_cert_map = IndexSet::new();
1343            for cert in storage.get_certificates_for_round(commit_round - 1) {
1344                previous_cert_map.insert(cert);
1345            }
1346            subdag_map.insert(commit_round, leader_cert_map.clone());
1347            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1348            let subdag = Subdag::from(subdag_map.clone())?;
1349            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1350        };
1351        // Insert block 1.
1352        core_ledger.advance_to_next_block(&block_1)?;
1353
1354        // Create block 2.
1355        let leader_round_2 = commit_round + 2;
1356        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1357        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1358        let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1359        let block_2 = {
1360            let mut leader_cert_map_2 = IndexSet::new();
1361            leader_cert_map_2.insert(leader_certificate_2.clone());
1362            let mut previous_cert_map_2 = IndexSet::new();
1363            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1364                previous_cert_map_2.insert(cert);
1365            }
1366            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1367            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1368            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1369            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1370        };
1371        // Insert block 2.
1372        core_ledger.advance_to_next_block(&block_2)?;
1373
1374        // Create block 3
1375        let leader_round_3 = commit_round + 4;
1376        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1377        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1378        let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1379        let block_3 = {
1380            let mut leader_cert_map_3 = IndexSet::new();
1381            leader_cert_map_3.insert(leader_certificate_3.clone());
1382            let mut previous_cert_map_3 = IndexSet::new();
1383            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1384                previous_cert_map_3.insert(cert);
1385            }
1386            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1387            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1388            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1389            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1390        };
1391        // Insert block 3.
1392        core_ledger.advance_to_next_block(&block_3)?;
1393
1394        /*
1395            Check that the pending certificates are computed correctly.
1396        */
1397
1398        // Retrieve the pending certificates.
1399        let pending_certificates = storage.get_pending_certificates();
1400        // Check that all of the pending certificates are not contained in the ledger.
1401        for certificate in pending_certificates.clone() {
1402            assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1403        }
1404        // Initialize an empty set to be populated with the committed certificates in the block subdags.
1405        let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1406        {
1407            let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1408            for subdag in subdag_maps.iter() {
1409                for subdag_certificates in subdag.values() {
1410                    committed_certificates.extend(subdag_certificates.iter().cloned());
1411                }
1412            }
1413        };
1414        // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1415        let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1416        for certificate in certificates.clone() {
1417            if !committed_certificates.contains(&certificate) {
1418                candidate_pending_certificates.insert(certificate);
1419            }
1420        }
1421        // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1422        assert_eq!(pending_certificates, candidate_pending_certificates);
1423        Ok(())
1424    }
1425}