#![allow(clippy::await_holding_lock)]
use snarkvm::prelude::{
Deserialize,
DeserializeOwned,
Ledger,
Network,
Serialize,
block::Block,
store::{ConsensusStorage, cow_to_copied},
};
use anyhow::{Result, anyhow, bail};
use colored::Colorize;
use parking_lot::Mutex;
use reqwest::Client;
use std::{
cmp,
sync::{
Arc,
atomic::{AtomicBool, AtomicU32, Ordering},
},
time::{Duration, Instant},
};
const BLOCKS_PER_FILE: u32 = 50;
const CONCURRENT_REQUESTS: u32 = 16;
const MAXIMUM_PENDING_BLOCKS: u32 = BLOCKS_PER_FILE * CONCURRENT_REQUESTS * 2;
const MAXIMUM_REQUEST_ATTEMPTS: u8 = 10;
pub async fn sync_ledger_with_cdn<N: Network, C: ConsensusStorage<N>>(
base_url: &str,
ledger: Ledger<N, C>,
shutdown: Arc<AtomicBool>,
) -> Result<u32, (u32, anyhow::Error)> {
let start_height = ledger.latest_height() + 1;
let ledger_clone = ledger.clone();
let result = load_blocks(base_url, start_height, None, shutdown, move |block: Block<N>| {
ledger_clone.advance_to_next_block(&block)
})
.await;
if let Err((completed_height, error)) = &result {
warn!("{error}");
if *completed_height != start_height {
debug!("Synced the ledger up to block {completed_height}");
let node_height = cow_to_copied!(ledger.vm().block_store().heights().max().unwrap_or_default());
if &node_height != completed_height {
return Err((*completed_height, anyhow!("The ledger height does not match the last sync height")));
}
if let Err(err) = ledger.get_block(node_height) {
return Err((*completed_height, err));
}
}
Ok(*completed_height)
} else {
result
}
}
pub async fn load_blocks<N: Network>(
base_url: &str,
start_height: u32,
end_height: Option<u32>,
shutdown: Arc<AtomicBool>,
process: impl FnMut(Block<N>) -> Result<()> + Clone + Send + Sync + 'static,
) -> Result<u32, (u32, anyhow::Error)> {
let client = match Client::builder().use_rustls_tls().build() {
Ok(client) => client,
Err(error) => {
return Err((start_height.saturating_sub(1), anyhow!("Failed to create a CDN request client - {error}")));
}
};
let cdn_height = match cdn_height::<BLOCKS_PER_FILE>(&client, base_url).await {
Ok(cdn_height) => cdn_height,
Err(error) => return Err((start_height, error)),
};
if cdn_height < start_height {
return Err((
start_height,
anyhow!("The given start height ({start_height}) must be less than the CDN height ({cdn_height})"),
));
}
let end_height = cmp::min(end_height.unwrap_or(cdn_height), cdn_height);
if end_height < start_height {
return Err((
start_height,
anyhow!("The given end height ({end_height}) must not be less than the start height ({start_height})"),
));
}
let cdn_start = start_height - (start_height % BLOCKS_PER_FILE);
let cdn_end = end_height;
if cdn_start >= cdn_end {
return Ok(cdn_end);
}
let pending_blocks: Arc<Mutex<Vec<Block<N>>>> = Default::default();
let timer = Instant::now();
let pending_blocks_clone = pending_blocks.clone();
let base_url = base_url.to_owned();
let shutdown_clone = shutdown.clone();
tokio::spawn(async move {
download_block_bundles(client, base_url, cdn_start, cdn_end, pending_blocks_clone, shutdown_clone).await;
});
let mut current_height = start_height.saturating_sub(1);
while current_height < end_height - 1 {
if shutdown.load(Ordering::Acquire) {
info!("Stopping block sync at {} - shutting down", current_height);
std::process::exit(0);
}
let mut candidate_blocks = pending_blocks.lock();
let Some(next_height) = candidate_blocks.first().map(|b| b.height()) else {
debug!("No pending blocks yet");
drop(candidate_blocks);
tokio::time::sleep(Duration::from_secs(3)).await;
continue;
};
if next_height > current_height + 1 {
debug!("Waiting for the first relevant blocks ({} pending)", candidate_blocks.len());
drop(candidate_blocks);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
let retained_blocks = candidate_blocks.split_off(BLOCKS_PER_FILE as usize);
let next_blocks = std::mem::replace(&mut *candidate_blocks, retained_blocks);
drop(candidate_blocks);
let mut process_clone = process.clone();
let shutdown_clone = shutdown.clone();
current_height = tokio::task::spawn_blocking(move || {
for block in next_blocks.into_iter().filter(|b| (start_height..end_height).contains(&b.height())) {
if shutdown_clone.load(Ordering::Relaxed) {
info!("Stopping block sync at {} - the node is shutting down", current_height);
std::process::exit(0);
}
let block_height = block.height();
process_clone(block)?;
current_height = block_height;
log_progress::<BLOCKS_PER_FILE>(timer, current_height, cdn_start, cdn_end, "block");
}
Ok(current_height)
})
.await
.map_err(|e| (current_height, e.into()))?
.map_err(|e| (current_height, e))?;
}
Ok(current_height)
}
async fn download_block_bundles<N: Network>(
client: Client,
base_url: String,
cdn_start: u32,
cdn_end: u32,
pending_blocks: Arc<Mutex<Vec<Block<N>>>>,
shutdown: Arc<AtomicBool>,
) {
let active_requests: Arc<AtomicU32> = Default::default();
let mut start = cdn_start;
while start < cdn_end - 1 {
if shutdown.load(Ordering::Acquire) {
break;
}
let num_pending_blocks = pending_blocks.lock().len();
if num_pending_blocks >= MAXIMUM_PENDING_BLOCKS as usize {
debug!("Maximum number of pending blocks reached ({num_pending_blocks}), waiting...");
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
let active_request_count = active_requests.load(Ordering::Relaxed);
let num_requests =
cmp::min(CONCURRENT_REQUESTS, (MAXIMUM_PENDING_BLOCKS - num_pending_blocks as u32) / BLOCKS_PER_FILE)
.saturating_sub(active_request_count);
for i in 0..num_requests {
let start = start + i * BLOCKS_PER_FILE;
let end = start + BLOCKS_PER_FILE;
if end > cdn_end + BLOCKS_PER_FILE {
debug!("Finishing network requests to the CDN...");
break;
}
let client_clone = client.clone();
let base_url_clone = base_url.clone();
let pending_blocks_clone = pending_blocks.clone();
let active_requests_clone = active_requests.clone();
let shutdown_clone = shutdown.clone();
tokio::spawn(async move {
active_requests_clone.fetch_add(1, Ordering::Relaxed);
let ctx = format!("blocks {start} to {end}");
debug!("Requesting {ctx} (of {cdn_end})");
let blocks_url = format!("{base_url_clone}/{start}.{end}.blocks");
let ctx = format!("blocks {start} to {end}");
let mut attempts = 0;
let request_time = Instant::now();
loop {
match cdn_get(client_clone.clone(), &blocks_url, &ctx).await {
Ok::<Vec<Block<N>>, _>(blocks) => {
let mut pending_blocks = pending_blocks_clone.lock();
for block in blocks {
match pending_blocks.binary_search_by_key(&block.height(), |b| b.height()) {
Ok(_idx) => warn!("Found a duplicate pending block at height {}", block.height()),
Err(idx) => pending_blocks.insert(idx, block),
}
}
debug!("Received {ctx} {}", format!("(in {:.2?})", request_time.elapsed()).dimmed());
break;
}
Err(error) => {
attempts += 1;
if attempts > MAXIMUM_REQUEST_ATTEMPTS {
warn!("Maximum number of requests to {blocks_url} reached - shutting down...");
shutdown_clone.store(true, Ordering::Relaxed);
break;
}
tokio::time::sleep(Duration::from_secs(attempts as u64 * 10)).await;
warn!("{error} - retrying ({attempts} attempt(s) so far)");
}
}
}
active_requests_clone.fetch_sub(1, Ordering::Relaxed);
});
}
start += BLOCKS_PER_FILE * num_requests;
tokio::time::sleep(Duration::from_secs(1)).await;
}
debug!("Finished network requests to the CDN");
}
async fn cdn_height<const BLOCKS_PER_FILE: u32>(client: &Client, base_url: &str) -> Result<u32> {
#[derive(Deserialize, Serialize, Debug)]
struct LatestState {
exclusive_height: u32,
inclusive_height: u32,
hash: String,
}
let latest_json_url = format!("{base_url}/latest.json");
let response = match client.get(latest_json_url).send().await {
Ok(response) => response,
Err(error) => bail!("Failed to fetch the CDN height - {error}"),
};
let bytes = match response.bytes().await {
Ok(bytes) => bytes,
Err(error) => bail!("Failed to parse the CDN height response - {error}"),
};
let latest_state_string = match bincode::deserialize::<String>(&bytes) {
Ok(string) => string,
Err(error) => bail!("Failed to deserialize the CDN height response - {error}"),
};
let tip = match serde_json::from_str::<LatestState>(&latest_state_string) {
Ok(latest) => latest.exclusive_height,
Err(error) => bail!("Failed to extract the CDN height response - {error}"),
};
let tip = tip.saturating_sub(10);
Ok(tip - (tip % BLOCKS_PER_FILE) + BLOCKS_PER_FILE)
}
async fn cdn_get<T: 'static + DeserializeOwned + Send>(client: Client, url: &str, ctx: &str) -> Result<T> {
let response = match client.get(url).send().await {
Ok(response) => response,
Err(error) => bail!("Failed to fetch {ctx} - {error}"),
};
let bytes = match response.bytes().await {
Ok(bytes) => bytes,
Err(error) => bail!("Failed to parse {ctx} - {error}"),
};
match tokio::task::spawn_blocking(move || bincode::deserialize::<T>(&bytes)).await {
Ok(Ok(objects)) => Ok(objects),
Ok(Err(error)) => bail!("Failed to deserialize {ctx} - {error}"),
Err(error) => bail!("Failed to join task for {ctx} - {error}"),
}
}
fn log_progress<const OBJECTS_PER_FILE: u32>(
timer: Instant,
current_index: u32,
cdn_start: u32,
mut cdn_end: u32,
object_name: &str,
) {
cdn_end -= 1;
let percentage = current_index * 100 / cdn_end;
let num_files_done = 1 + (current_index - cdn_start) / OBJECTS_PER_FILE;
let num_files_remaining = 1 + (cdn_end.saturating_sub(current_index)) / OBJECTS_PER_FILE;
let millis_per_file = timer.elapsed().as_millis() / num_files_done as u128;
let slowdown = 100 * num_files_remaining as u128;
let time_remaining = num_files_remaining as u128 * millis_per_file + slowdown;
let estimate = format!("(est. {} minutes remaining)", time_remaining / (60 * 1000));
info!("Synced up to {object_name} {current_index} of {cdn_end} - {percentage}% complete {}", estimate.dimmed());
}
#[cfg(test)]
mod tests {
use crate::{
blocks::{BLOCKS_PER_FILE, cdn_get, cdn_height, log_progress},
load_blocks,
};
use snarkvm::prelude::{MainnetV0, block::Block};
use parking_lot::RwLock;
use std::{sync::Arc, time::Instant};
type CurrentNetwork = MainnetV0;
const TEST_BASE_URL: &str = "https://blocks.aleo.org/mainnet/v0";
fn check_load_blocks(start: u32, end: Option<u32>, expected: usize) {
let blocks = Arc::new(RwLock::new(Vec::new()));
let blocks_clone = blocks.clone();
let process = move |block: Block<CurrentNetwork>| {
blocks_clone.write().push(block);
Ok(())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let completed_height = load_blocks(TEST_BASE_URL, start, end, Default::default(), process).await.unwrap();
assert_eq!(blocks.read().len(), expected);
if expected > 0 {
assert_eq!(blocks.read().last().unwrap().height(), completed_height);
}
for (i, block) in blocks.read().iter().enumerate() {
assert_eq!(block.height(), start + i as u32);
}
});
}
#[test]
fn test_load_blocks_0_to_50() {
let start_height = 0;
let end_height = Some(50);
check_load_blocks(start_height, end_height, 50);
}
#[test]
fn test_load_blocks_50_to_100() {
let start_height = 50;
let end_height = Some(100);
check_load_blocks(start_height, end_height, 50);
}
#[test]
fn test_load_blocks_0_to_123() {
let start_height = 0;
let end_height = Some(123);
check_load_blocks(start_height, end_height, 123);
}
#[test]
fn test_load_blocks_46_to_234() {
let start_height = 46;
let end_height = Some(234);
check_load_blocks(start_height, end_height, 188);
}
#[test]
fn test_cdn_height() {
let rt = tokio::runtime::Runtime::new().unwrap();
let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
rt.block_on(async {
let height = cdn_height::<BLOCKS_PER_FILE>(&client, TEST_BASE_URL).await.unwrap();
assert!(height > 0);
});
}
#[test]
fn test_cdn_get() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
let height =
cdn_get::<u32>(client, &format!("{TEST_BASE_URL}/mainnet/latest/height"), "height").await.unwrap();
assert!(height > 0);
});
}
#[test]
fn test_log_progress() {
let timer = Instant::now();
let cdn_start = 0;
let cdn_end = 100;
let object_name = "blocks";
log_progress::<10>(timer, 0, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 10, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 20, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 30, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 40, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 50, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 60, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 70, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 80, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 90, cdn_start, cdn_end, object_name);
log_progress::<10>(timer, 100, cdn_start, cdn_end, object_name);
}
}