amareleo_node_bft/sync/mod.rs
1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::helpers::{BFTSender, Storage};
17use amareleo_chain_tracing::TracingHandlerGuard;
18use amareleo_node_bft_ledger_service::LedgerService;
19use amareleo_node_sync::{BlockLocators, CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
20
21use snarkvm::{
22 console::network::Network,
23 ledger::authority::Authority,
24 prelude::{cfg_into_iter, cfg_iter},
25};
26
27use anyhow::{Result, bail};
28use indexmap::IndexMap;
29use rayon::prelude::*;
30use std::{collections::HashMap, sync::Arc};
31use tokio::sync::OnceCell;
32
33use std::sync::atomic::{AtomicBool, Ordering};
34
35#[derive(Clone)]
36pub struct Sync<N: Network> {
37 /// The storage.
38 storage: Storage<N>,
39 /// The ledger service.
40 ledger: Arc<dyn LedgerService<N>>,
41 /// The BFT sender.
42 bft_sender: Arc<OnceCell<BFTSender<N>>>,
43 /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
44 is_block_synced: Arc<AtomicBool>,
45}
46
47impl<N: Network> Sync<N> {
48 /// Initializes a new sync instance.
49 pub fn new(storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
50 // Return the sync instance.
51 Self { storage, ledger, bft_sender: Default::default(), is_block_synced: Default::default() }
52 }
53
54 /// Initializes the sync module and sync the storage with the ledger at bootup.
55 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
56 // If a BFT sender was provided, set it.
57 if let Some(bft_sender) = bft_sender {
58 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
59 }
60
61 guard_info!(&self.storage, "Syncing storage with the ledger...");
62
63 // Sync the storage with the ledger.
64 self.sync_storage_with_ledger_at_bootup().await
65 }
66
67 /// Starts the sync module.
68 pub async fn run(&self) -> Result<()> {
69 guard_info!(&self.storage, "Starting the sync module...");
70
71 // Update the sync status.
72 self.is_block_synced.store(true, Ordering::SeqCst);
73
74 // Update the `IS_SYNCED` metric.
75 #[cfg(feature = "metrics")]
76 metrics::gauge(metrics::bft::IS_SYNCED, true);
77
78 Ok(())
79 }
80}
81
82// Methods to manage storage.
83impl<N: Network> Sync<N> {
84 /// Syncs the storage with the ledger at bootup.
85 async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
86 // Retrieve the latest block in the ledger.
87 let latest_block = self.ledger.latest_block();
88
89 // Retrieve the block height.
90 let block_height = latest_block.height();
91 // Determine the maximum number of blocks corresponding to rounds
92 // that would not have been garbage collected, i.e. that would be kept in storage.
93 // Since at most one block is created every two rounds,
94 // this is half of the maximum number of rounds kept in storage.
95 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
96 // Determine the earliest height of blocks corresponding to rounds kept in storage,
97 // conservatively set to the block height minus the maximum number of blocks calculated above.
98 // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
99 let gc_height = block_height.saturating_sub(max_gc_blocks);
100 // Retrieve the blocks.
101 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
102
103 guard_debug!(
104 &self.storage,
105 "Syncing storage with the ledger from block {} to {}...",
106 gc_height,
107 block_height.saturating_add(1)
108 );
109
110 /* Sync storage */
111
112 // Sync the height with the block.
113 self.storage.sync_height_with_block(latest_block.height());
114 // Sync the round with the block.
115 self.storage.sync_round_with_block(latest_block.round());
116 // Perform GC on the latest block round.
117 self.storage.garbage_collect_certificates(latest_block.round());
118 // Iterate over the blocks.
119 for block in &blocks {
120 // If the block authority is a sub-DAG, then sync the batch certificates with the block.
121 // Note that the block authority is always a sub-DAG in production;
122 // beacon signatures are only used for testing,
123 // and as placeholder (irrelevant) block authority in the genesis block.
124 if let Authority::Quorum(subdag) = block.authority() {
125 // Reconstruct the unconfirmed transactions.
126 let unconfirmed_transactions = cfg_iter!(block.transactions())
127 .filter_map(|tx| {
128 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
129 })
130 .collect::<HashMap<_, _>>();
131
132 // Iterate over the certificates.
133 for certificates in subdag.values().cloned() {
134 cfg_into_iter!(certificates).for_each(|certificate| {
135 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
136 });
137 }
138 }
139 }
140
141 /* Sync the BFT DAG */
142
143 // Construct a list of the certificates.
144 let certificates = blocks
145 .iter()
146 .flat_map(|block| {
147 match block.authority() {
148 // If the block authority is a beacon, then skip the block.
149 Authority::Beacon(_) => None,
150 // If the block authority is a subdag, then retrieve the certificates.
151 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
152 }
153 })
154 .flatten()
155 .collect::<Vec<_>>();
156
157 // If a BFT sender was provided, send the certificates to the BFT.
158 if let Some(bft_sender) = self.bft_sender.get() {
159 // Await the callback to continue.
160 if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
161 bail!("Failed to update the BFT DAG from sync: {e}");
162 }
163 }
164
165 Ok(())
166 }
167}
168
169// Methods to assist with the block sync module.
170impl<N: Network> Sync<N> {
171 /// Returns `true` if the node is synced and has connected peers.
172 pub fn is_synced(&self) -> bool {
173 self.is_block_synced.load(Ordering::SeqCst)
174 }
175
176 /// Returns the number of blocks the node is behind the greatest peer height.
177 pub fn num_blocks_behind(&self) -> u32 {
178 0u32
179 }
180
181 /// Returns `true` if the node is in gateway mode.
182 pub const fn is_gateway_mode(&self) -> bool {
183 true
184 }
185
186 /// Returns the current block locators of the node.
187 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
188 // Retrieve the latest block height.
189 let latest_height = self.ledger.latest_block_height();
190
191 // Initialize the recents map.
192 let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
193 // Retrieve the recent block hashes.
194 for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
195 recents.insert(height, self.ledger.get_block_hash(height)?);
196 }
197
198 // Initialize the checkpoints map.
199 let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
200 // Retrieve the checkpoint block hashes.
201 for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
202 checkpoints.insert(height, self.ledger.get_block_hash(height)?);
203 }
204
205 // Construct the block locators.
206 BlockLocators::new(recents, checkpoints)
207 }
208}