amareleo_node_bft/sync/
mod.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::helpers::{BFTSender, Storage};
17use amareleo_node_bft_ledger_service::LedgerService;
18use amareleo_node_sync::{BlockLocators, CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
19use snarkvm::{
20    console::network::Network,
21    ledger::authority::Authority,
22    prelude::{cfg_into_iter, cfg_iter},
23};
24
25use anyhow::{Result, bail};
26use indexmap::IndexMap;
27use rayon::prelude::*;
28use std::{collections::HashMap, sync::Arc};
29use tokio::sync::OnceCell;
30
31use std::sync::atomic::{AtomicBool, Ordering};
32
33#[derive(Clone)]
34pub struct Sync<N: Network> {
35    /// The storage.
36    storage: Storage<N>,
37    /// The ledger service.
38    ledger: Arc<dyn LedgerService<N>>,
39    /// The BFT sender.
40    bft_sender: Arc<OnceCell<BFTSender<N>>>,
41    /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
42    is_block_synced: Arc<AtomicBool>,
43}
44
45impl<N: Network> Sync<N> {
46    /// Initializes a new sync instance.
47    pub fn new(storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
48        // Return the sync instance.
49        Self { storage, ledger, bft_sender: Default::default(), is_block_synced: Default::default() }
50    }
51
52    /// Initializes the sync module and sync the storage with the ledger at bootup.
53    pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
54        let _guard = self.storage.get_tracing_guard();
55
56        // If a BFT sender was provided, set it.
57        if let Some(bft_sender) = bft_sender {
58            self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
59        }
60
61        info!("Syncing storage with the ledger...");
62
63        // Sync the storage with the ledger.
64        self.sync_storage_with_ledger_at_bootup().await
65    }
66
67    /// Starts the sync module.
68    pub async fn run(&self) -> Result<()> {
69        let _guard = self.storage.get_tracing_guard();
70        info!("Starting the sync module...");
71
72        // Update the sync status.
73        self.is_block_synced.store(true, Ordering::SeqCst);
74
75        // Update the `IS_SYNCED` metric.
76        #[cfg(feature = "metrics")]
77        metrics::gauge(metrics::bft::IS_SYNCED, true);
78
79        Ok(())
80    }
81}
82
83// Methods to manage storage.
84impl<N: Network> Sync<N> {
85    /// Syncs the storage with the ledger at bootup.
86    async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
87        let _guard = self.storage.get_tracing_guard();
88        // Retrieve the latest block in the ledger.
89        let latest_block = self.ledger.latest_block();
90
91        // Retrieve the block height.
92        let block_height = latest_block.height();
93        // Determine the maximum number of blocks corresponding to rounds
94        // that would not have been garbage collected, i.e. that would be kept in storage.
95        // Since at most one block is created every two rounds,
96        // this is half of the maximum number of rounds kept in storage.
97        let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
98        // Determine the earliest height of blocks corresponding to rounds kept in storage,
99        // conservatively set to the block height minus the maximum number of blocks calculated above.
100        // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
101        let gc_height = block_height.saturating_sub(max_gc_blocks);
102        // Retrieve the blocks.
103        let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
104
105        debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
106
107        /* Sync storage */
108
109        // Sync the height with the block.
110        self.storage.sync_height_with_block(latest_block.height());
111        // Sync the round with the block.
112        self.storage.sync_round_with_block(latest_block.round());
113        // Perform GC on the latest block round.
114        self.storage.garbage_collect_certificates(latest_block.round());
115        // Iterate over the blocks.
116        for block in &blocks {
117            // If the block authority is a sub-DAG, then sync the batch certificates with the block.
118            // Note that the block authority is always a sub-DAG in production;
119            // beacon signatures are only used for testing,
120            // and as placeholder (irrelevant) block authority in the genesis block.
121            if let Authority::Quorum(subdag) = block.authority() {
122                // Reconstruct the unconfirmed transactions.
123                let unconfirmed_transactions = cfg_iter!(block.transactions())
124                    .filter_map(|tx| {
125                        tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
126                    })
127                    .collect::<HashMap<_, _>>();
128
129                // Iterate over the certificates.
130                for certificates in subdag.values().cloned() {
131                    cfg_into_iter!(certificates).for_each(|certificate| {
132                        self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
133                    });
134                }
135            }
136        }
137
138        /* Sync the BFT DAG */
139
140        // Construct a list of the certificates.
141        let certificates = blocks
142            .iter()
143            .flat_map(|block| {
144                match block.authority() {
145                    // If the block authority is a beacon, then skip the block.
146                    Authority::Beacon(_) => None,
147                    // If the block authority is a subdag, then retrieve the certificates.
148                    Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
149                }
150            })
151            .flatten()
152            .collect::<Vec<_>>();
153
154        // If a BFT sender was provided, send the certificates to the BFT.
155        if let Some(bft_sender) = self.bft_sender.get() {
156            // Await the callback to continue.
157            if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
158                bail!("Failed to update the BFT DAG from sync: {e}");
159            }
160        }
161
162        Ok(())
163    }
164}
165
166// Methods to assist with the block sync module.
167impl<N: Network> Sync<N> {
168    /// Returns `true` if the node is synced and has connected peers.
169    pub fn is_synced(&self) -> bool {
170        self.is_block_synced.load(Ordering::SeqCst)
171    }
172
173    /// Returns the number of blocks the node is behind the greatest peer height.
174    pub fn num_blocks_behind(&self) -> u32 {
175        0u32
176    }
177
178    /// Returns `true` if the node is in gateway mode.
179    pub const fn is_gateway_mode(&self) -> bool {
180        true
181    }
182
183    /// Returns the current block locators of the node.
184    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
185        // Retrieve the latest block height.
186        let latest_height = self.ledger.latest_block_height();
187
188        // Initialize the recents map.
189        let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
190        // Retrieve the recent block hashes.
191        for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
192            recents.insert(height, self.ledger.get_block_hash(height)?);
193        }
194
195        // Initialize the checkpoints map.
196        let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
197        // Retrieve the checkpoint block hashes.
198        for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
199            checkpoints.insert(height, self.ledger.get_block_hash(height)?);
200        }
201
202        // Construct the block locators.
203        BlockLocators::new(recents, checkpoints)
204    }
205}