amareleo_node_bft/sync/
mod.rs1use crate::helpers::{BFTSender, Storage};
17use amareleo_node_bft_ledger_service::LedgerService;
18use amareleo_node_sync::locators::{BlockLocators, CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
19use snarkvm::{
20 console::network::Network,
21 ledger::authority::Authority,
22 prelude::{cfg_into_iter, cfg_iter},
23};
24
25use anyhow::{Result, bail};
26use indexmap::IndexMap;
27use rayon::prelude::*;
28use std::{collections::HashMap, sync::Arc};
29use tokio::sync::OnceCell;
30
31use std::sync::atomic::{AtomicBool, Ordering};
32
33#[derive(Clone)]
34pub struct Sync<N: Network> {
35 storage: Storage<N>,
37 ledger: Arc<dyn LedgerService<N>>,
39 bft_sender: Arc<OnceCell<BFTSender<N>>>,
41 is_block_synced: Arc<AtomicBool>,
43}
44
45impl<N: Network> Sync<N> {
46 pub fn new(storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
48 Self { storage, ledger, bft_sender: Default::default(), is_block_synced: Default::default() }
50 }
51
52 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
54 if let Some(bft_sender) = bft_sender {
56 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
57 }
58
59 info!("Syncing storage with the ledger...");
60
61 self.sync_storage_with_ledger_at_bootup().await
63 }
64
65 pub async fn run(&self) -> Result<()> {
67 info!("Starting the sync module...");
68
69 self.is_block_synced.store(true, Ordering::SeqCst);
71
72 #[cfg(feature = "metrics")]
74 metrics::gauge(metrics::bft::IS_SYNCED, true);
75
76 Ok(())
77 }
78}
79
80impl<N: Network> Sync<N> {
82 async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
84 let latest_block = self.ledger.latest_block();
86
87 let block_height = latest_block.height();
89 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
91 let gc_height = block_height.saturating_sub(max_gc_blocks);
94 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
96
97 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
98
99 self.storage.sync_height_with_block(latest_block.height());
103 self.storage.sync_round_with_block(latest_block.round());
105 self.storage.garbage_collect_certificates(latest_block.round());
107 for block in &blocks {
109 if let Authority::Quorum(subdag) = block.authority() {
111 let unconfirmed_transactions = cfg_iter!(block.transactions())
113 .filter_map(|tx| {
114 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
115 })
116 .collect::<HashMap<_, _>>();
117
118 for certificates in subdag.values().cloned() {
120 cfg_into_iter!(certificates).for_each(|certificate| {
121 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
122 });
123 }
124 }
125 }
126
127 let certificates = blocks
131 .iter()
132 .flat_map(|block| {
133 match block.authority() {
134 Authority::Beacon(_) => None,
136 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
138 }
139 })
140 .flatten()
141 .collect::<Vec<_>>();
142
143 if let Some(bft_sender) = self.bft_sender.get() {
145 if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
147 bail!("Failed to update the BFT DAG from sync: {e}");
148 }
149 }
150
151 Ok(())
152 }
153}
154
155impl<N: Network> Sync<N> {
157 pub fn is_synced(&self) -> bool {
159 self.is_block_synced.load(Ordering::SeqCst)
160 }
161
162 pub fn num_blocks_behind(&self) -> u32 {
164 0u32
165 }
166
167 pub const fn is_gateway_mode(&self) -> bool {
169 true
170 }
171
172 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
174 let latest_height = self.ledger.latest_block_height();
176
177 let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
179 for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
181 recents.insert(height, self.ledger.get_block_hash(height)?);
182 }
183
184 let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
186 for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
188 checkpoints.insert(height, self.ledger.get_block_hash(height)?);
189 }
190
191 BlockLocators::new(recents, checkpoints)
193 }
194}