snarkos_node_bft/sync/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_FETCH_TIMEOUT_IN_MS,
19    PRIMARY_PING_IN_MS,
20    Transport,
21    events::DataBlocks,
22    helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
23    spawn_blocking,
24};
25use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
26use snarkos_node_bft_ledger_service::LedgerService;
27use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
28use snarkvm::{
29    console::{network::Network, types::Field},
30    ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
31    prelude::{cfg_into_iter, cfg_iter},
32};
33
34use anyhow::{Result, anyhow, bail};
35use indexmap::IndexMap;
36#[cfg(feature = "locktick")]
37use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
38#[cfg(not(feature = "locktick"))]
39use parking_lot::Mutex;
40use rayon::prelude::*;
41use std::{
42    collections::{BTreeMap, HashMap},
43    future::Future,
44    net::SocketAddr,
45    sync::Arc,
46    time::Duration,
47};
48#[cfg(not(feature = "locktick"))]
49use tokio::sync::Mutex as TMutex;
50use tokio::{
51    sync::{OnceCell, oneshot},
52    task::JoinHandle,
53};
54
55/// Block synchronization logic for validators.
56///
57/// Synchronization works differently for nodes that act as validators in AleoBFT;
58/// In the common case, validators generate blocks after receiving an anchor block that has been accepted
59/// by a supermajority of the committee instead of fetching entire blocks from other nodes.
60/// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes.
61///
62/// This struct also manages fetching certificates from other validators during normal operation,
63/// and blocks when falling behind.
64///
65/// Finally, `Sync` handles synchronization of blocks with the validator's local storage:
66/// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them.
67#[derive(Clone)]
68pub struct Sync<N: Network> {
69    /// The gateway enables communication with other validators.
70    gateway: Gateway<N>,
71    /// The storage.
72    storage: Storage<N>,
73    /// The ledger service.
74    ledger: Arc<dyn LedgerService<N>>,
75    /// The block synchronization logic.
76    block_sync: Arc<BlockSync<N>>,
77    /// The pending certificates queue.
78    pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
79    /// The BFT sender.
80    bft_sender: Arc<OnceCell<BFTSender<N>>>,
81    /// Handles to the spawned background tasks.
82    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
83    /// The response lock.
84    response_lock: Arc<TMutex<()>>,
85    /// The sync lock. Ensures that only one task syncs the ledger at a time.
86    sync_lock: Arc<TMutex<()>>,
87    /// The latest block responses.
88    ///
89    /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks  whose addition to the ledger is
90    /// deferred until certain checks pass.
91    /// Blocks need to be processed in order, hence a BTree map.
92    ///
93    /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called.
94    latest_block_responses: Arc<TMutex<BTreeMap<u32, Block<N>>>>,
95}
96
97impl<N: Network> Sync<N> {
98    /// Initializes a new sync instance.
99    pub fn new(
100        gateway: Gateway<N>,
101        storage: Storage<N>,
102        ledger: Arc<dyn LedgerService<N>>,
103        block_sync: Arc<BlockSync<N>>,
104    ) -> Self {
105        // Return the sync instance.
106        Self {
107            gateway,
108            storage,
109            ledger,
110            block_sync,
111            pending: Default::default(),
112            bft_sender: Default::default(),
113            handles: Default::default(),
114            response_lock: Default::default(),
115            sync_lock: Default::default(),
116            latest_block_responses: Default::default(),
117        }
118    }
119
120    /// Initializes the sync module and sync the storage with the ledger at bootup.
121    pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
122        // If a BFT sender was provided, set it.
123        if let Some(bft_sender) = bft_sender {
124            self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
125        }
126
127        info!("Syncing storage with the ledger...");
128
129        // Sync the storage with the ledger.
130        self.sync_storage_with_ledger_at_bootup().await?;
131
132        debug!("Finished initial block synchronization at startup");
133        Ok(())
134    }
135
136    /// Sends the given batch of block requests to peers.
137    ///
138    /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`.
139    #[inline]
140    async fn send_block_requests(
141        &self,
142
143        block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
144        sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
145    ) {
146        trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
147
148        // Sends the block requests to the sync peers.
149        for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
150            if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
151                // Stop if we fail to process a batch of requests.
152                break;
153            }
154
155            // Sleep to avoid triggering spam detection.
156            tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
157        }
158    }
159
160    /// Starts the sync module.
161    ///
162    /// When this function returns successfully, the sync module will have spawned background tasks
163    /// that fetch blocks from other validators.
164    pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
165        info!("Starting the sync module...");
166
167        // Start the block sync loop.
168        let self_ = self.clone();
169        self.spawn(async move {
170            // Sleep briefly to allow an initial primary ping to come in prior to entering the loop.
171            // Ideally, a node does not consider itself synced when it has not received
172            // any block locators from peers. However, in the initial bootup of validators,
173            // this needs to happen, so we use this additional sleep as a grace period.
174            tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
175            loop {
176                // Sleep briefly to avoid triggering spam detection.
177                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
178
179                let new_blocks = self_.try_block_sync().await;
180                if new_blocks {
181                    if let Some(ping) = &ping {
182                        match self_.get_block_locators() {
183                            Ok(locators) => ping.update_block_locators(locators),
184                            Err(err) => error!("Failed to update block locators: {err}"),
185                        }
186                    }
187                }
188            }
189        });
190
191        // Start the pending queue expiration loop.
192        let self_ = self.clone();
193        self.spawn(async move {
194            loop {
195                // Sleep briefly.
196                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
197
198                // Remove the expired pending transmission requests.
199                let self__ = self_.clone();
200                let _ = spawn_blocking!({
201                    self__.pending.clear_expired_callbacks();
202                    Ok(())
203                });
204            }
205        });
206
207        /* Set up callbacks for events from the Gateway */
208
209        // Retrieve the sync receiver.
210        let SyncReceiver {
211            mut rx_block_sync_advance_with_sync_blocks,
212            mut rx_block_sync_remove_peer,
213            mut rx_block_sync_update_peer_locators,
214            mut rx_certificate_request,
215            mut rx_certificate_response,
216        } = sync_receiver;
217
218        // Process the block sync request to advance with sync blocks.
219        // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
220        // which is initially handled by [`Gateway::inbound()`],
221        // which calls [`SyncSender::advance_with_sync_blocks()`],
222        // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
223        // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
224        let self_ = self.clone();
225        self.spawn(async move {
226            while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
227                callback.send(self_.advance_with_sync_blocks(peer_ip, blocks).await).ok();
228            }
229        });
230
231        // Process the block sync request to remove the peer.
232        let self_ = self.clone();
233        self.spawn(async move {
234            while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
235                self_.remove_peer(peer_ip);
236            }
237        });
238
239        // Process each block sync request to update peer locators.
240        // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
241        // which is initially handled by [`Gateway::inbound()`],
242        // which calls [`SyncSender::update_peer_locators()`],
243        // which calls [`tx_block_sync_update_peer_locators.send()`],
244        // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
245        let self_ = self.clone();
246        self.spawn(async move {
247            while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
248                let self_clone = self_.clone();
249                tokio::spawn(async move {
250                    callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
251                });
252            }
253        });
254
255        // Process each certificate request.
256        // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
257        // which is initially handled by [`Gateway::inbound()`],
258        // which calls [`tx_certificate_request.send()`],
259        // which causes the `rx_certificate_request.recv()` call below to return.
260        let self_ = self.clone();
261        self.spawn(async move {
262            while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
263                self_.send_certificate_response(peer_ip, certificate_request);
264            }
265        });
266
267        // Process each certificate response.
268        // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
269        // which is initially handled by [`Gateway::inbound()`],
270        // which calls [`tx_certificate_response.send()`],
271        // which causes the `rx_certificate_response.recv()` call below to return.
272        let self_ = self.clone();
273        self.spawn(async move {
274            while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
275                self_.finish_certificate_request(peer_ip, certificate_response);
276            }
277        });
278
279        Ok(())
280    }
281
282    /// Execute one iteration of block synchronization.
283    ///
284    /// Returns true if we made progress. Returning true does *not* imply being fully synced.
285    ///
286    /// This is called periodically by a tokio background task spawned in `Self::run`.
287    /// Some unit tests also call this function directly to manually trigger block synchronization.
288    pub(crate) async fn try_block_sync(&self) -> bool {
289        // Check if any existing requests can be removed.
290        // We should do this even if we cannot block sync, to ensure
291        // there are no dangling block requests.
292        let new_requests = self.block_sync.handle_block_request_timeouts(&self.gateway);
293        if let Some((sync_peers, requests)) = new_requests {
294            self.send_block_requests(sync_peers, requests).await;
295        }
296
297        // Do not attempt to sync if there are no blocks to sync.
298        // This prevents redundant log messages and performing unnecessary computation.
299        if !self.block_sync.can_block_sync() {
300            trace!("No blocks to sync");
301            return false;
302        }
303
304        // Prepare the block requests, if any.
305        // In the process, we update the state of `is_block_synced` for the sync module.
306        let (sync_peers, requests) = self.block_sync.prepare_block_requests();
307        self.send_block_requests(sync_peers, requests).await;
308
309        // Sync the storage with the blocks
310        match self.try_advancing_block_synchronization().await {
311            Ok(new_blocks) => new_blocks,
312            Err(err) => {
313                error!("Block synchronization failed - {err}");
314                false
315            }
316        }
317    }
318}
319
320// Callbacks used when receiving messages from the Gateway
321impl<N: Network> Sync<N> {
322    /// We received a block response and can (possibly) advance synchronization.
323    async fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
324        // Verify that the response is valid and add it to block sync.
325        self.block_sync.insert_block_responses(peer_ip, blocks)?;
326
327        // Try to process responses stored in BlockSync.
328        // Note: Do not call `self.block_sync.try_advancing_block_synchronziation` here as it will process
329        // and remove any completed requests, which means the call to `sync_storage_with_blocks` will not process
330        // them as expected.
331        self.try_advancing_block_synchronization().await?;
332
333        Ok(())
334    }
335
336    /// We received new peer locators during a Ping.
337    fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
338        self.block_sync.update_peer_locators(peer_ip, locators)
339    }
340
341    /// A peer disconnected.
342    fn remove_peer(&self, peer_ip: SocketAddr) {
343        self.block_sync.remove_peer(&peer_ip);
344    }
345
346    #[cfg(test)]
347    pub fn test_update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
348        self.update_peer_locators(peer_ip, locators)
349    }
350}
351
352// Methods to manage storage.
353impl<N: Network> Sync<N> {
354    /// Syncs the storage with the ledger at bootup.
355    ///
356    /// This is called when starting the validator and after finishing a sync without BFT.
357    async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
358        // Retrieve the latest block in the ledger.
359        let latest_block = self.ledger.latest_block();
360
361        // Retrieve the block height.
362        let block_height = latest_block.height();
363        // Determine the maximum number of blocks corresponding to rounds
364        // that would not have been garbage collected, i.e. that would be kept in storage.
365        // Since at most one block is created every two rounds,
366        // this is half of the maximum number of rounds kept in storage.
367        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
368        // Determine the earliest height of blocks corresponding to rounds kept in storage,
369        // conservatively set to the block height minus the maximum number of blocks calculated above.
370        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
371        let gc_height = block_height.saturating_sub(max_gc_blocks);
372        // Retrieve the blocks.
373        let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
374
375        // Acquire the sync lock.
376        let _lock = self.sync_lock.lock().await;
377
378        debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
379
380        /* Sync storage */
381
382        // Sync the height with the block.
383        self.storage.sync_height_with_block(latest_block.height());
384        // Sync the round with the block.
385        self.storage.sync_round_with_block(latest_block.round());
386        // Perform GC on the latest block round.
387        self.storage.garbage_collect_certificates(latest_block.round());
388        // Iterate over the blocks.
389        for block in &blocks {
390            // If the block authority is a sub-DAG, then sync the batch certificates with the block.
391            // Note that the block authority is always a sub-DAG in production;
392            // beacon signatures are only used for testing,
393            // and as placeholder (irrelevant) block authority in the genesis block.
394            if let Authority::Quorum(subdag) = block.authority() {
395                // Reconstruct the unconfirmed transactions.
396                let unconfirmed_transactions = cfg_iter!(block.transactions())
397                    .filter_map(|tx| {
398                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
399                    })
400                    .collect::<HashMap<_, _>>();
401
402                // Iterate over the certificates.
403                for certificates in subdag.values().cloned() {
404                    cfg_into_iter!(certificates).for_each(|certificate| {
405                        self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
406                    });
407                }
408
409                // Update the validator telemetry.
410                #[cfg(feature = "telemetry")]
411                self.gateway.validator_telemetry().insert_subdag(subdag);
412            }
413        }
414
415        /* Sync the BFT DAG */
416
417        // Construct a list of the certificates.
418        let certificates = blocks
419            .iter()
420            .flat_map(|block| {
421                match block.authority() {
422                    // If the block authority is a beacon, then skip the block.
423                    Authority::Beacon(_) => None,
424                    // If the block authority is a subdag, then retrieve the certificates.
425                    Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
426                }
427            })
428            .flatten()
429            .collect::<Vec<_>>();
430
431        // If a BFT sender was provided, send the certificates to the BFT.
432        if let Some(bft_sender) = self.bft_sender.get() {
433            // Await the callback to continue.
434            if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
435                bail!("Failed to update the BFT DAG from sync: {e}");
436            }
437        }
438
439        self.block_sync.set_sync_height(block_height);
440
441        Ok(())
442    }
443
444    /// Returns which height we are synchronized to.
445    /// If there are queued block responses, this might be higher than the latest block in the ledger.
446    async fn compute_sync_height(&self) -> u32 {
447        let ledger_height = self.ledger.latest_block_height();
448        let mut responses = self.latest_block_responses.lock().await;
449
450        // Remove any old responses.
451        responses.retain(|height, _| *height > ledger_height);
452
453        // Ensure the returned value is always greater or equal than ledger height.
454        responses.last_key_value().map(|(height, _)| *height).unwrap_or(0).max(ledger_height)
455    }
456
457    /// Aims to advance synchronization using any recent block responses received from peers.
458    ///
459    /// This is the validator's version of `BlockSync::try_advancing_block_synchronization`
460    /// and is called periodically at runtime.
461    ///
462    /// This returns Ok(true) if we successfully advanced the ledger by at least one new block.
463    ///
464    /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network.
465    /// If blocks are not confirmed yet, they will be kept in [`Self::latest_block_responses`].
466    /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected
467    /// (see [`Self::sync_storage_with_block`] for more details).
468    ///
469    /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead,
470    /// which syncs without updating the BFT state.
471    async fn try_advancing_block_synchronization(&self) -> Result<bool> {
472        // Acquire the response lock.
473        let _lock = self.response_lock.lock().await;
474
475        // For sanity, set the sync height again.
476        // (if the sync height is already larger or equal, this is a noop)
477        let ledger_height = self.ledger.latest_block_height();
478        self.block_sync.set_sync_height(ledger_height);
479
480        // Retrieve the maximum block height of the peers.
481        let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
482
483        // Determine the maximum number of blocks corresponding to rounds
484        // that would not have been garbage collected, i.e. that would be kept in storage.
485        // Since at most one block is created every two rounds,
486        // this is half of the maximum number of rounds kept in storage.
487        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
488
489        // Updates sync state and returns the error (if any).
490        let cleanup = |start_height, current_height, error| {
491            let new_blocks = current_height > start_height;
492
493            // Make the underlying `BlockSync` instance aware of the new sync height.
494            if new_blocks {
495                self.block_sync.set_sync_height(current_height);
496            }
497
498            if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
499        };
500
501        // Determine the earliest height of blocks corresponding to rounds kept in storage,
502        // conservatively set to the block height minus the maximum number of blocks calculated above.
503        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
504        let max_gc_height = tip.saturating_sub(max_gc_blocks);
505        let within_gc = (ledger_height + 1) > max_gc_height;
506
507        if within_gc {
508            // Retrieve the current height, based on the ledger height and the
509            // (unconfirmed) blocks that are already queued up.
510            let start_height = self.compute_sync_height().await;
511
512            // For sanity, update the sync height before starting.
513            // (if this is lower or equal to the current sync height, this is a noop)
514            self.block_sync.set_sync_height(start_height);
515
516            // The height is incremented as blocks are added.
517            let mut current_height = start_height;
518            trace!("Try advancing with block responses (at block {current_height})");
519
520            // If we already were within GC or successfully caught up with GC, try to advance BFT normally again.
521            loop {
522                let next_height = current_height + 1;
523                let Some(block) = self.block_sync.peek_next_block(next_height) else {
524                    break;
525                };
526                info!("Syncing the BFT to block {}...", block.height());
527                // Sync the storage with the block.
528                match self.sync_storage_with_block(block).await {
529                    Ok(_) => {
530                        // Update the current height if sync succeeds.
531                        current_height = next_height;
532                    }
533                    Err(err) => {
534                        // Mark the current height as processed in block_sync.
535                        self.block_sync.remove_block_response(next_height);
536                        return cleanup(start_height, current_height, Some(err));
537                    }
538                }
539            }
540
541            cleanup(start_height, current_height, None)
542        } else {
543            info!("Block sync is too far behind other validators. Syncing without BFT.");
544
545            // For non-BFT sync we need to start at the current height of the ledger,as blocks are immediately
546            // added to it and not queue up in `latest_block_responses`.
547            let start_height = ledger_height;
548            let mut current_height = start_height;
549
550            // For sanity, update the sync height before starting.
551            // (if this is lower or equal to the current sync height, this is a noop)
552            self.block_sync.set_sync_height(start_height);
553
554            // Try to advance the ledger *to tip* without updating the BFT.
555            // TODO(kaimast): why to tip and not to tip-GC?
556            loop {
557                let next_height = current_height + 1;
558
559                let Some(block) = self.block_sync.peek_next_block(next_height) else {
560                    break;
561                };
562                info!("Syncing the ledger to block {}...", block.height());
563
564                // Sync the ledger with the block without BFT.
565                match self.sync_ledger_with_block_without_bft(block).await {
566                    Ok(_) => {
567                        // Update the current height if sync succeeds.
568                        current_height = next_height;
569                    }
570                    Err(err) => {
571                        // Mark the current height as processed in block_sync.
572                        self.block_sync.remove_block_response(next_height);
573                        return cleanup(start_height, current_height, Some(err));
574                    }
575                }
576            }
577
578            // Sync the storage with the ledger if we should transition to the BFT sync.
579            let within_gc = (current_height + 1) > max_gc_height;
580            if within_gc {
581                info!("Finished catching up with the network. Switching back to BFT sync.");
582                if let Err(err) = self.sync_storage_with_ledger_at_bootup().await {
583                    error!("BFT sync (with bootup routine) failed - {err}");
584                }
585            }
586
587            cleanup(start_height, current_height, None)
588        }
589    }
590
591    /// Syncs the ledger with the given block without updating the BFT.
592    ///
593    /// This is only used by `[Self::try_advancing_block_synchronization`].
594    async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
595        // Acquire the sync lock.
596        let _lock = self.sync_lock.lock().await;
597
598        let self_ = self.clone();
599        tokio::task::spawn_blocking(move || {
600            // Check the next block.
601            self_.ledger.check_next_block(&block)?;
602            // Attempt to advance to the next block.
603            self_.ledger.advance_to_next_block(&block)?;
604
605            // Sync the height with the block.
606            self_.storage.sync_height_with_block(block.height());
607            // Sync the round with the block.
608            self_.storage.sync_round_with_block(block.round());
609            // Mark the block height as processed in block_sync.
610            self_.block_sync.remove_block_response(block.height());
611
612            Ok(())
613        })
614        .await?
615    }
616
617    /// Advances the ledger by the given block and updates the storage accordingly.
618    ///
619    /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
620    /// meets the voter availability threshold (i.e. > f voting stake)
621    /// or is reachable via a DAG path from a later leader certificate that does.
622    /// Since performing this check requires DAG certificates from later blocks,
623    /// the block is stored in `Sync::latest_block_responses`,
624    /// and its addition to the ledger is deferred until the check passes.
625    /// Several blocks may be stored in `Sync::latest_block_responses`
626    /// before they can be all checked and added to the ledger.
627    async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
628        // Acquire the sync lock.
629        let _lock = self.sync_lock.lock().await;
630
631        // If this block has already been processed, return early.
632        // TODO(kaimast): Should we remove the response here?
633        if self.ledger.contains_block_height(block.height()) {
634            debug!("Ledger is already synced with block at height {}. Will not sync.", block.height());
635            return Ok(());
636        }
637
638        // Acquire the latest block responses lock.
639        let mut latest_block_responses = self.latest_block_responses.lock().await;
640
641        if latest_block_responses.contains_key(&block.height()) {
642            debug!("An unconfirmed block is queued already for height {}. Will not sync.", block.height());
643            return Ok(());
644        }
645
646        // If the block authority is a sub-DAG, then sync the batch certificates with the block.
647        // Note that the block authority is always a sub-DAG in production;
648        // beacon signatures are only used for testing,
649        // and as placeholder (irrelevant) block authority in the genesis block.
650        if let Authority::Quorum(subdag) = block.authority() {
651            // Reconstruct the unconfirmed transactions.
652            let unconfirmed_transactions = cfg_iter!(block.transactions())
653                .filter_map(|tx| {
654                    tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
655                })
656                .collect::<HashMap<_, _>>();
657
658            // Iterate over the certificates.
659            for certificates in subdag.values().cloned() {
660                cfg_into_iter!(certificates.clone()).for_each(|certificate| {
661                    // Sync the batch certificate with the block.
662                    self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
663                });
664
665                // Sync the BFT DAG with the certificates.
666                for certificate in certificates {
667                    // If a BFT sender was provided, send the certificate to the BFT.
668                    // For validators, BFT spawns a receiver task in `BFT::start_handlers`.
669                    if let Some(bft_sender) = self.bft_sender.get() {
670                        // Await the callback to continue.
671                        if let Err(err) = bft_sender.send_sync_bft(certificate).await {
672                            bail!("Failed to sync certificate - {err}");
673                        };
674                    }
675                }
676            }
677        }
678
679        // Fetch the latest block height.
680        let ledger_block_height = self.ledger.latest_block_height();
681
682        // Insert the latest block response.
683        latest_block_responses.insert(block.height(), block);
684        // Clear the latest block responses of older blocks.
685        latest_block_responses.retain(|height, _| *height > ledger_block_height);
686
687        // Get a list of contiguous blocks from the latest block responses.
688        let contiguous_blocks: Vec<Block<N>> = (ledger_block_height.saturating_add(1)..)
689            .take_while(|&k| latest_block_responses.contains_key(&k))
690            .filter_map(|k| latest_block_responses.get(&k).cloned())
691            .collect();
692
693        // Check if each block response, from the contiguous sequence just constructed,
694        // is ready to be added to the ledger.
695        // Ensure that the block's leader certificate meets the availability threshold
696        // based on the certificates in the DAG just after the block's round.
697        // If the availability threshold is not met,
698        // process the next block and check if it is linked to the current block,
699        // in the sense that there is a path in the DAG
700        // from the next block's leader certificate
701        // to the current block's leader certificate.
702        // Note: We do not advance to the most recent block response because we would be unable to
703        // validate if the leader certificate in the block has been certified properly.
704        for next_block in contiguous_blocks.into_iter() {
705            // Retrieve the height of the next block.
706            let next_block_height = next_block.height();
707
708            // Fetch the leader certificate and the relevant rounds.
709            let leader_certificate = match next_block.authority() {
710                Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
711                _ => bail!("Received a block with an unexpected authority type."),
712            };
713            let commit_round = leader_certificate.round();
714            let certificate_round =
715                commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
716
717            // Get the committee lookback for the round just after the leader.
718            let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
719            // Retrieve all of the certificates for the round just after the leader.
720            let certificates = self.storage.get_certificates_for_round(certificate_round);
721            // Construct a set over the authors, at the round just after the leader,
722            // who included the leader's certificate in their previous certificate IDs.
723            let authors = certificates
724                .iter()
725                .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
726                    true => Some(c.author()),
727                    false => None,
728                })
729                .collect();
730
731            debug!("Validating sync block {next_block_height} at round {commit_round}...");
732            // Check if the leader is ready to be committed.
733            if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
734                // Initialize the current certificate.
735                let mut current_certificate = leader_certificate;
736                // Check if there are any linked blocks that need to be added.
737                let mut blocks_to_add = vec![next_block];
738
739                // Check if there are other blocks to process based on `is_linked`.
740                for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
741                    // Retrieve the previous block.
742                    let Some(previous_block) = latest_block_responses.get(&height) else {
743                        bail!("Block {height} is missing from the latest block responses.");
744                    };
745                    // Retrieve the previous block's leader certificate.
746                    let previous_certificate = match previous_block.authority() {
747                        Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
748                        _ => bail!("Received a block with an unexpected authority type."),
749                    };
750                    // Determine if there is a path between the previous certificate and the current certificate.
751                    if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
752                        debug!("Previous sync block {height} is linked to the current block {next_block_height}");
753                        // Add the previous leader certificate to the list of certificates to commit.
754                        blocks_to_add.insert(0, previous_block.clone());
755                        // Update the current certificate to the previous leader certificate.
756                        current_certificate = previous_certificate;
757                    }
758                }
759
760                // Add the blocks to the ledger.
761                for block in blocks_to_add {
762                    // Check that the blocks are sequential and can be added to the ledger.
763                    let block_height = block.height();
764                    if block_height != self.ledger.latest_block_height().saturating_add(1) {
765                        warn!("Skipping block {block_height} from the latest block responses - not sequential.");
766                        continue;
767                    }
768                    #[cfg(feature = "telemetry")]
769                    let block_authority = block.authority().clone();
770
771                    let self_ = self.clone();
772                    tokio::task::spawn_blocking(move || {
773                        // Check the next block.
774                        self_.ledger.check_next_block(&block)?;
775                        // Attempt to advance to the next block.
776                        self_.ledger.advance_to_next_block(&block)?;
777
778                        // Sync the height with the block.
779                        self_.storage.sync_height_with_block(block.height());
780                        // Sync the round with the block.
781                        self_.storage.sync_round_with_block(block.round());
782
783                        Ok::<(), anyhow::Error>(())
784                    })
785                    .await??;
786                    // Remove the block height from the latest block responses.
787                    latest_block_responses.remove(&block_height);
788
789                    // Update the validator telemetry.
790                    #[cfg(feature = "telemetry")]
791                    if let Authority::Quorum(subdag) = block_authority {
792                        self_.gateway.validator_telemetry().insert_subdag(&subdag);
793                    }
794                }
795            } else {
796                debug!(
797                    "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
798                );
799            }
800
801            // Don't remove the response from BlockSync yet as we might fall back to non-BFT sync.
802        }
803
804        Ok(())
805    }
806
807    /// Returns `true` if there is a path from the previous certificate to the current certificate.
808    fn is_linked(
809        &self,
810        previous_certificate: BatchCertificate<N>,
811        current_certificate: BatchCertificate<N>,
812    ) -> Result<bool> {
813        // Initialize the list containing the traversal.
814        let mut traversal = vec![current_certificate.clone()];
815        // Iterate over the rounds from the current certificate to the previous certificate.
816        for round in (previous_certificate.round()..current_certificate.round()).rev() {
817            // Retrieve all of the certificates for this past round.
818            let certificates = self.storage.get_certificates_for_round(round);
819            // Filter the certificates to only include those that are in the traversal.
820            traversal = certificates
821                .into_iter()
822                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
823                .collect();
824        }
825        Ok(traversal.contains(&previous_certificate))
826    }
827}
828
829// Methods to assist with the block sync module.
830impl<N: Network> Sync<N> {
831    /// Returns `true` if the node is synced and has connected peers.
832    pub fn is_synced(&self) -> bool {
833        // Ensure the validator is connected to other validators,
834        // not just clients.
835        if self.gateway.number_of_connected_peers() == 0 {
836            return false;
837        }
838
839        self.block_sync.is_block_synced()
840    }
841
842    /// Returns the number of blocks the node is behind the greatest peer height.
843    pub fn num_blocks_behind(&self) -> Option<u32> {
844        self.block_sync.num_blocks_behind()
845    }
846
847    /// Returns the current block locators of the node.
848    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
849        self.block_sync.get_block_locators()
850    }
851}
852
853// Methods to assist with fetching batch certificates from peers.
854impl<N: Network> Sync<N> {
855    /// Sends a certificate request to the specified peer.
856    pub async fn send_certificate_request(
857        &self,
858        peer_ip: SocketAddr,
859        certificate_id: Field<N>,
860    ) -> Result<BatchCertificate<N>> {
861        // Initialize a oneshot channel.
862        let (callback_sender, callback_receiver) = oneshot::channel();
863        // Determine how many sent requests are pending.
864        let num_sent_requests = self.pending.num_sent_requests(certificate_id);
865        // Determine if we've already sent a request to the peer.
866        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
867        // Determine the maximum number of redundant requests.
868        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
869        // Determine if we should send a certificate request to the peer.
870        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
871        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
872
873        // Insert the certificate ID into the pending queue.
874        self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
875
876        // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
877        if should_send_request {
878            // Send the certificate request to the peer.
879            if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
880                bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
881            }
882        } else {
883            debug!(
884                "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
885                fmt_id(certificate_id)
886            );
887        }
888        // Wait for the certificate to be fetched.
889        // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
890        match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
891            // If the certificate was fetched, return it.
892            Ok(result) => Ok(result?),
893            // If the certificate was not fetched, return an error.
894            Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
895        }
896    }
897
898    /// Handles the incoming certificate request.
899    fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
900        // Attempt to retrieve the certificate.
901        if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
902            // Send the certificate response to the peer.
903            let self_ = self.clone();
904            tokio::spawn(async move {
905                let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
906            });
907        }
908    }
909
910    /// Handles the incoming certificate response.
911    /// This method ensures the certificate response is well-formed and matches the certificate ID.
912    fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
913        let certificate = response.certificate;
914        // Check if the peer IP exists in the pending queue for the given certificate ID.
915        let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
916        // If the peer IP exists, finish the pending request.
917        if exists {
918            // TODO: Validate the certificate.
919            // Remove the certificate ID from the pending queue.
920            self.pending.remove(certificate.id(), Some(certificate));
921        }
922    }
923}
924
925impl<N: Network> Sync<N> {
926    /// Spawns a task with the given future; it should only be used for long-running tasks.
927    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
928        self.handles.lock().push(tokio::spawn(future));
929    }
930
931    /// Shuts down the primary.
932    pub async fn shut_down(&self) {
933        info!("Shutting down the sync module...");
934        // Acquire the response lock.
935        let _lock = self.response_lock.lock().await;
936        // Acquire the sync lock.
937        let _lock = self.sync_lock.lock().await;
938        // Abort the tasks.
939        self.handles.lock().iter().for_each(|handle| handle.abort());
940    }
941}
942
943#[cfg(test)]
944mod tests {
945    use super::*;
946
947    use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
948
949    use snarkos_account::Account;
950    use snarkos_node_sync::BlockSync;
951    use snarkvm::{
952        console::{
953            account::{Address, PrivateKey},
954            network::MainnetV0,
955        },
956        ledger::{
957            narwhal::{BatchCertificate, BatchHeader, Subdag},
958            store::{ConsensusStore, helpers::memory::ConsensusMemory},
959        },
960        prelude::{Ledger, VM},
961        utilities::TestRng,
962    };
963
964    use aleo_std::StorageMode;
965    use indexmap::IndexSet;
966    use rand::Rng;
967    use std::collections::BTreeMap;
968
969    type CurrentNetwork = MainnetV0;
970    type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
971    type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
972
973    #[tokio::test]
974    #[tracing_test::traced_test]
975    async fn test_commit_via_is_linked() -> anyhow::Result<()> {
976        let rng = &mut TestRng::default();
977        // Initialize the round parameters.
978        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
979        let commit_round = 2;
980
981        // Initialize the store.
982        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
983        let account: Account<CurrentNetwork> = Account::new(rng)?;
984
985        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
986        let seed: u64 = rng.gen();
987        let genesis_rng = &mut TestRng::from_seed(seed);
988        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
989
990        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
991        let genesis_rng = &mut TestRng::from_seed(seed);
992        let private_keys = [
993            *account.private_key(),
994            PrivateKey::new(genesis_rng)?,
995            PrivateKey::new(genesis_rng)?,
996            PrivateKey::new(genesis_rng)?,
997        ];
998
999        // Initialize the ledger with the genesis block.
1000        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1001        // Initialize the ledger.
1002        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1003
1004        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1005        let (round_to_certificates_map, committee) = {
1006            let addresses = vec![
1007                Address::try_from(private_keys[0])?,
1008                Address::try_from(private_keys[1])?,
1009                Address::try_from(private_keys[2])?,
1010                Address::try_from(private_keys[3])?,
1011            ];
1012
1013            let committee = ledger.latest_committee().unwrap();
1014
1015            // Initialize a mapping from the round number to the set of batch certificates in the round.
1016            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1017                HashMap::new();
1018            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1019
1020            for round in 0..=commit_round + 8 {
1021                let mut current_certificates = IndexSet::new();
1022                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1023                    IndexSet::new()
1024                } else {
1025                    previous_certificates.iter().map(|c| c.id()).collect()
1026                };
1027                let committee_id = committee.id();
1028
1029                // Create a certificate for the leader.
1030                if round <= 5 {
1031                    let leader = committee.get_leader(round).unwrap();
1032                    let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
1033                    let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
1034                    for i in [leader_index, non_leader_index].into_iter() {
1035                        let batch_header = BatchHeader::new(
1036                            &private_keys[i],
1037                            round,
1038                            now(),
1039                            committee_id,
1040                            Default::default(),
1041                            previous_certificate_ids.clone(),
1042                            rng,
1043                        )
1044                        .unwrap();
1045                        // Sign the batch header.
1046                        let mut signatures = IndexSet::with_capacity(4);
1047                        for (j, private_key_2) in private_keys.iter().enumerate() {
1048                            if i != j {
1049                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1050                            }
1051                        }
1052                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1053                    }
1054                }
1055
1056                // Create a certificate for each validator.
1057                if round > 5 {
1058                    for (i, private_key_1) in private_keys.iter().enumerate() {
1059                        let batch_header = BatchHeader::new(
1060                            private_key_1,
1061                            round,
1062                            now(),
1063                            committee_id,
1064                            Default::default(),
1065                            previous_certificate_ids.clone(),
1066                            rng,
1067                        )
1068                        .unwrap();
1069                        // Sign the batch header.
1070                        let mut signatures = IndexSet::with_capacity(4);
1071                        for (j, private_key_2) in private_keys.iter().enumerate() {
1072                            if i != j {
1073                                signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1074                            }
1075                        }
1076                        current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1077                    }
1078                }
1079                // Update the map of certificates.
1080                round_to_certificates_map.insert(round, current_certificates.clone());
1081                previous_certificates = current_certificates.clone();
1082            }
1083            (round_to_certificates_map, committee)
1084        };
1085
1086        // Initialize the storage.
1087        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1088        // Insert certificates into storage.
1089        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1090        for i in 1..=commit_round + 8 {
1091            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1092            certificates.extend(c);
1093        }
1094        for certificate in certificates.clone().iter() {
1095            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1096        }
1097
1098        // Create block 1.
1099        let leader_round_1 = commit_round;
1100        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1101        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1102        let block_1 = {
1103            let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1104            let mut leader_cert_map = IndexSet::new();
1105            leader_cert_map.insert(leader_certificate.clone());
1106            let mut previous_cert_map = IndexSet::new();
1107            for cert in storage.get_certificates_for_round(commit_round - 1) {
1108                previous_cert_map.insert(cert);
1109            }
1110            subdag_map.insert(commit_round, leader_cert_map.clone());
1111            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1112            let subdag = Subdag::from(subdag_map.clone())?;
1113            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1114        };
1115        // Insert block 1.
1116        core_ledger.advance_to_next_block(&block_1)?;
1117
1118        // Create block 2.
1119        let leader_round_2 = commit_round + 2;
1120        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1121        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1122        let block_2 = {
1123            let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1124            let mut leader_cert_map_2 = IndexSet::new();
1125            leader_cert_map_2.insert(leader_certificate_2.clone());
1126            let mut previous_cert_map_2 = IndexSet::new();
1127            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1128                previous_cert_map_2.insert(cert);
1129            }
1130            let mut prev_commit_cert_map_2 = IndexSet::new();
1131            for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
1132                if cert != leader_certificate {
1133                    prev_commit_cert_map_2.insert(cert);
1134                }
1135            }
1136            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1137            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1138            subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
1139            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1140            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1141        };
1142        // Insert block 2.
1143        core_ledger.advance_to_next_block(&block_2)?;
1144
1145        // Create block 3
1146        let leader_round_3 = commit_round + 4;
1147        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1148        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1149        let block_3 = {
1150            let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1151            let mut leader_cert_map_3 = IndexSet::new();
1152            leader_cert_map_3.insert(leader_certificate_3.clone());
1153            let mut previous_cert_map_3 = IndexSet::new();
1154            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1155                previous_cert_map_3.insert(cert);
1156            }
1157            let mut prev_commit_cert_map_3 = IndexSet::new();
1158            for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
1159                if cert != leader_certificate_2 {
1160                    prev_commit_cert_map_3.insert(cert);
1161                }
1162            }
1163            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1164            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1165            subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
1166            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1167            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1168        };
1169        // Insert block 3.
1170        core_ledger.advance_to_next_block(&block_3)?;
1171
1172        // Initialize the syncing ledger.
1173        let syncing_ledger = Arc::new(CoreLedgerService::new(
1174            CurrentLedger::load(genesis, StorageMode::new_test(None)).unwrap(),
1175            Default::default(),
1176        ));
1177        // Initialize the gateway.
1178        let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
1179        // Initialize the block synchronization logic.
1180        let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1181        // Initialize the sync module.
1182        let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1183        // Try to sync block 1.
1184        sync.sync_storage_with_block(block_1).await?;
1185        assert_eq!(syncing_ledger.latest_block_height(), 1);
1186        // Try to sync block 2.
1187        sync.sync_storage_with_block(block_2).await?;
1188        assert_eq!(syncing_ledger.latest_block_height(), 2);
1189        // Try to sync block 3.
1190        sync.sync_storage_with_block(block_3).await?;
1191        assert_eq!(syncing_ledger.latest_block_height(), 3);
1192        // Ensure blocks 1 and 2 were added to the ledger.
1193        assert!(syncing_ledger.contains_block_height(1));
1194        assert!(syncing_ledger.contains_block_height(2));
1195
1196        Ok(())
1197    }
1198
1199    #[tokio::test]
1200    #[tracing_test::traced_test]
1201    async fn test_pending_certificates() -> anyhow::Result<()> {
1202        let rng = &mut TestRng::default();
1203        // Initialize the round parameters.
1204        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1205        let commit_round = 2;
1206
1207        // Initialize the store.
1208        let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1209        let account: Account<CurrentNetwork> = Account::new(rng)?;
1210
1211        // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1212        let seed: u64 = rng.gen();
1213        let genesis_rng = &mut TestRng::from_seed(seed);
1214        let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
1215
1216        // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1217        let genesis_rng = &mut TestRng::from_seed(seed);
1218        let private_keys = [
1219            *account.private_key(),
1220            PrivateKey::new(genesis_rng)?,
1221            PrivateKey::new(genesis_rng)?,
1222            PrivateKey::new(genesis_rng)?,
1223        ];
1224        // Initialize the ledger with the genesis block.
1225        let ledger = CurrentLedger::load(genesis.clone(), StorageMode::new_test(None)).unwrap();
1226        // Initialize the ledger.
1227        let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
1228        // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1229        let (round_to_certificates_map, committee) = {
1230            // Initialize the committee.
1231            let committee = ledger.latest_committee().unwrap();
1232            // Initialize a mapping from the round number to the set of batch certificates in the round.
1233            let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1234                HashMap::new();
1235            let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1236
1237            for round in 0..=commit_round + 8 {
1238                let mut current_certificates = IndexSet::new();
1239                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1240                    IndexSet::new()
1241                } else {
1242                    previous_certificates.iter().map(|c| c.id()).collect()
1243                };
1244                let committee_id = committee.id();
1245                // Create a certificate for each validator.
1246                for (i, private_key_1) in private_keys.iter().enumerate() {
1247                    let batch_header = BatchHeader::new(
1248                        private_key_1,
1249                        round,
1250                        now(),
1251                        committee_id,
1252                        Default::default(),
1253                        previous_certificate_ids.clone(),
1254                        rng,
1255                    )
1256                    .unwrap();
1257                    // Sign the batch header.
1258                    let mut signatures = IndexSet::with_capacity(4);
1259                    for (j, private_key_2) in private_keys.iter().enumerate() {
1260                        if i != j {
1261                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1262                        }
1263                    }
1264                    current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1265                }
1266
1267                // Update the map of certificates.
1268                round_to_certificates_map.insert(round, current_certificates.clone());
1269                previous_certificates = current_certificates.clone();
1270            }
1271            (round_to_certificates_map, committee)
1272        };
1273
1274        // Initialize the storage.
1275        let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1276        // Insert certificates into storage.
1277        let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1278        for i in 1..=commit_round + 8 {
1279            let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1280            certificates.extend(c);
1281        }
1282        for certificate in certificates.clone().iter() {
1283            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1284        }
1285        // Create block 1.
1286        let leader_round_1 = commit_round;
1287        let leader_1 = committee.get_leader(leader_round_1).unwrap();
1288        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1289        let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1290        let block_1 = {
1291            let mut leader_cert_map = IndexSet::new();
1292            leader_cert_map.insert(leader_certificate.clone());
1293            let mut previous_cert_map = IndexSet::new();
1294            for cert in storage.get_certificates_for_round(commit_round - 1) {
1295                previous_cert_map.insert(cert);
1296            }
1297            subdag_map.insert(commit_round, leader_cert_map.clone());
1298            subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1299            let subdag = Subdag::from(subdag_map.clone())?;
1300            core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
1301        };
1302        // Insert block 1.
1303        core_ledger.advance_to_next_block(&block_1)?;
1304
1305        // Create block 2.
1306        let leader_round_2 = commit_round + 2;
1307        let leader_2 = committee.get_leader(leader_round_2).unwrap();
1308        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1309        let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1310        let block_2 = {
1311            let mut leader_cert_map_2 = IndexSet::new();
1312            leader_cert_map_2.insert(leader_certificate_2.clone());
1313            let mut previous_cert_map_2 = IndexSet::new();
1314            for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1315                previous_cert_map_2.insert(cert);
1316            }
1317            subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1318            subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1319            let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1320            core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
1321        };
1322        // Insert block 2.
1323        core_ledger.advance_to_next_block(&block_2)?;
1324
1325        // Create block 3
1326        let leader_round_3 = commit_round + 4;
1327        let leader_3 = committee.get_leader(leader_round_3).unwrap();
1328        let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1329        let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1330        let block_3 = {
1331            let mut leader_cert_map_3 = IndexSet::new();
1332            leader_cert_map_3.insert(leader_certificate_3.clone());
1333            let mut previous_cert_map_3 = IndexSet::new();
1334            for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1335                previous_cert_map_3.insert(cert);
1336            }
1337            subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1338            subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1339            let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1340            core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
1341        };
1342        // Insert block 3.
1343        core_ledger.advance_to_next_block(&block_3)?;
1344
1345        /*
1346            Check that the pending certificates are computed correctly.
1347        */
1348
1349        // Retrieve the pending certificates.
1350        let pending_certificates = storage.get_pending_certificates();
1351        // Check that all of the pending certificates are not contained in the ledger.
1352        for certificate in pending_certificates.clone() {
1353            assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1354        }
1355        // Initialize an empty set to be populated with the committed certificates in the block subdags.
1356        let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1357        {
1358            let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1359            for subdag in subdag_maps.iter() {
1360                for subdag_certificates in subdag.values() {
1361                    committed_certificates.extend(subdag_certificates.iter().cloned());
1362                }
1363            }
1364        };
1365        // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1366        let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1367        for certificate in certificates.clone() {
1368            if !committed_certificates.contains(&certificate) {
1369                candidate_pending_certificates.insert(certificate);
1370            }
1371        }
1372        // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1373        assert_eq!(pending_certificates, candidate_pending_certificates);
1374        Ok(())
1375    }
1376}