1#![allow(clippy::await_holding_lock)]
19
20use snarkvm::prelude::{
21 Deserialize,
22 DeserializeOwned,
23 Ledger,
24 Network,
25 Serialize,
26 block::Block,
27 store::{ConsensusStorage, cow_to_copied},
28};
29
30use anyhow::{Result, anyhow, bail};
31use colored::Colorize;
32use parking_lot::Mutex;
33use reqwest::Client;
34use std::{
35 cmp,
36 sync::{
37 Arc,
38 atomic::{AtomicBool, AtomicU32, Ordering},
39 },
40 time::{Duration, Instant},
41};
42
43const BLOCKS_PER_FILE: u32 = 50;
45const CONCURRENT_REQUESTS: u32 = 16;
47const MAXIMUM_PENDING_BLOCKS: u32 = BLOCKS_PER_FILE * CONCURRENT_REQUESTS * 2;
49const MAXIMUM_REQUEST_ATTEMPTS: u8 = 10;
51
52pub async fn sync_ledger_with_cdn<N: Network, C: ConsensusStorage<N>>(
57 base_url: &str,
58 ledger: Ledger<N, C>,
59 shutdown: Arc<AtomicBool>,
60) -> Result<u32, (u32, anyhow::Error)> {
61 let start_height = ledger.latest_height() + 1;
63 let ledger_clone = ledger.clone();
65 let result = load_blocks(base_url, start_height, None, shutdown, move |block: Block<N>| {
66 ledger_clone.advance_to_next_block(&block)
67 })
68 .await;
69
70 if let Err((completed_height, error)) = &result {
73 warn!("{error}");
74
75 if *completed_height != start_height {
77 debug!("Synced the ledger up to block {completed_height}");
78
79 let node_height = cow_to_copied!(ledger.vm().block_store().heights().max().unwrap_or_default());
81 if &node_height != completed_height {
83 return Err((*completed_height, anyhow!("The ledger height does not match the last sync height")));
84 }
85
86 if let Err(err) = ledger.get_block(node_height) {
88 return Err((*completed_height, err));
89 }
90 }
91
92 Ok(*completed_height)
93 } else {
94 result
95 }
96}
97
98pub async fn load_blocks<N: Network>(
103 base_url: &str,
104 start_height: u32,
105 end_height: Option<u32>,
106 shutdown: Arc<AtomicBool>,
107 process: impl FnMut(Block<N>) -> Result<()> + Clone + Send + Sync + 'static,
108) -> Result<u32, (u32, anyhow::Error)> {
109 let client = match Client::builder().use_rustls_tls().build() {
111 Ok(client) => client,
112 Err(error) => {
113 return Err((start_height.saturating_sub(1), anyhow!("Failed to create a CDN request client - {error}")));
114 }
115 };
116
117 let cdn_height = match cdn_height::<BLOCKS_PER_FILE>(&client, base_url).await {
119 Ok(cdn_height) => cdn_height,
120 Err(error) => return Err((start_height, error)),
121 };
122 if cdn_height < start_height {
124 return Err((
125 start_height,
126 anyhow!("The given start height ({start_height}) must be less than the CDN height ({cdn_height})"),
127 ));
128 }
129
130 let end_height = cmp::min(end_height.unwrap_or(cdn_height), cdn_height);
133 if end_height < start_height {
135 return Err((
136 start_height,
137 anyhow!("The given end height ({end_height}) must not be less than the start height ({start_height})"),
138 ));
139 }
140
141 let cdn_start = start_height - (start_height % BLOCKS_PER_FILE);
143 let cdn_end = end_height;
145 if cdn_start >= cdn_end {
147 return Ok(cdn_end);
148 }
149
150 let pending_blocks: Arc<Mutex<Vec<Block<N>>>> = Default::default();
152
153 let timer = Instant::now();
155
156 let pending_blocks_clone = pending_blocks.clone();
158 let base_url = base_url.to_owned();
159 let shutdown_clone = shutdown.clone();
160 tokio::spawn(async move {
161 download_block_bundles(client, base_url, cdn_start, cdn_end, pending_blocks_clone, shutdown_clone).await;
162 });
163
164 let mut current_height = start_height.saturating_sub(1);
166 while current_height < end_height - 1 {
167 if shutdown.load(Ordering::Acquire) {
169 info!("Stopping block sync at {} - shutting down", current_height);
170 std::process::exit(0);
172 }
173
174 let mut candidate_blocks = pending_blocks.lock();
175
176 let Some(next_height) = candidate_blocks.first().map(|b| b.height()) else {
178 debug!("No pending blocks yet");
179 drop(candidate_blocks);
180 tokio::time::sleep(Duration::from_secs(3)).await;
181 continue;
182 };
183
184 if next_height > current_height + 1 {
186 debug!("Waiting for the first relevant blocks ({} pending)", candidate_blocks.len());
188 drop(candidate_blocks);
189 tokio::time::sleep(Duration::from_secs(1)).await;
190 continue;
191 }
192
193 let retained_blocks = candidate_blocks.split_off(BLOCKS_PER_FILE as usize);
195 let next_blocks = std::mem::replace(&mut *candidate_blocks, retained_blocks);
196 drop(candidate_blocks);
197
198 let mut process_clone = process.clone();
200 let shutdown_clone = shutdown.clone();
201 current_height = tokio::task::spawn_blocking(move || {
202 for block in next_blocks.into_iter().filter(|b| (start_height..end_height).contains(&b.height())) {
203 if shutdown_clone.load(Ordering::Relaxed) {
205 info!("Stopping block sync at {} - the node is shutting down", current_height);
206 std::process::exit(0);
208 }
209
210 let block_height = block.height();
212
213 process_clone(block)?;
215
216 current_height = block_height;
218
219 log_progress::<BLOCKS_PER_FILE>(timer, current_height, cdn_start, cdn_end, "block");
221 }
222
223 Ok(current_height)
224 })
225 .await
226 .map_err(|e| (current_height, e.into()))?
227 .map_err(|e| (current_height, e))?;
228 }
229
230 Ok(current_height)
231}
232
233async fn download_block_bundles<N: Network>(
234 client: Client,
235 base_url: String,
236 cdn_start: u32,
237 cdn_end: u32,
238 pending_blocks: Arc<Mutex<Vec<Block<N>>>>,
239 shutdown: Arc<AtomicBool>,
240) {
241 let active_requests: Arc<AtomicU32> = Default::default();
243
244 let mut start = cdn_start;
245 while start < cdn_end - 1 {
246 if shutdown.load(Ordering::Acquire) {
248 break;
249 }
250
251 let num_pending_blocks = pending_blocks.lock().len();
253 if num_pending_blocks >= MAXIMUM_PENDING_BLOCKS as usize {
254 debug!("Maximum number of pending blocks reached ({num_pending_blocks}), waiting...");
255 tokio::time::sleep(Duration::from_secs(5)).await;
256 continue;
257 }
258
259 let active_request_count = active_requests.load(Ordering::Relaxed);
262 let num_requests =
263 cmp::min(CONCURRENT_REQUESTS, (MAXIMUM_PENDING_BLOCKS - num_pending_blocks as u32) / BLOCKS_PER_FILE)
264 .saturating_sub(active_request_count);
265
266 for i in 0..num_requests {
268 let start = start + i * BLOCKS_PER_FILE;
269 let end = start + BLOCKS_PER_FILE;
270
271 if end > cdn_end + BLOCKS_PER_FILE {
273 debug!("Finishing network requests to the CDN...");
274 break;
275 }
276
277 let client_clone = client.clone();
278 let base_url_clone = base_url.clone();
279 let pending_blocks_clone = pending_blocks.clone();
280 let active_requests_clone = active_requests.clone();
281 let shutdown_clone = shutdown.clone();
282 tokio::spawn(async move {
283 active_requests_clone.fetch_add(1, Ordering::Relaxed);
285
286 let ctx = format!("blocks {start} to {end}");
287 debug!("Requesting {ctx} (of {cdn_end})");
288
289 let blocks_url = format!("{base_url_clone}/{start}.{end}.blocks");
291 let ctx = format!("blocks {start} to {end}");
292 let mut attempts = 0;
294 let request_time = Instant::now();
295
296 loop {
297 match cdn_get(client_clone.clone(), &blocks_url, &ctx).await {
299 Ok::<Vec<Block<N>>, _>(blocks) => {
300 let mut pending_blocks = pending_blocks_clone.lock();
302 for block in blocks {
303 match pending_blocks.binary_search_by_key(&block.height(), |b| b.height()) {
304 Ok(_idx) => warn!("Found a duplicate pending block at height {}", block.height()),
305 Err(idx) => pending_blocks.insert(idx, block),
306 }
307 }
308 debug!("Received {ctx} {}", format!("(in {:.2?})", request_time.elapsed()).dimmed());
309 break;
310 }
311 Err(error) => {
312 attempts += 1;
315 if attempts > MAXIMUM_REQUEST_ATTEMPTS {
316 warn!("Maximum number of requests to {blocks_url} reached - shutting down...");
317 shutdown_clone.store(true, Ordering::Relaxed);
318 break;
319 }
320 tokio::time::sleep(Duration::from_secs(attempts as u64 * 10)).await;
321 warn!("{error} - retrying ({attempts} attempt(s) so far)");
322 }
323 }
324 }
325
326 active_requests_clone.fetch_sub(1, Ordering::Relaxed);
328 });
329 }
330
331 start += BLOCKS_PER_FILE * num_requests;
333
334 tokio::time::sleep(Duration::from_secs(1)).await;
336 }
337
338 debug!("Finished network requests to the CDN");
339}
340
341async fn cdn_height<const BLOCKS_PER_FILE: u32>(client: &Client, base_url: &str) -> Result<u32> {
346 #[derive(Deserialize, Serialize, Debug)]
348 struct LatestState {
349 exclusive_height: u32,
350 inclusive_height: u32,
351 hash: String,
352 }
353 let latest_json_url = format!("{base_url}/latest.json");
355 let response = match client.get(latest_json_url).send().await {
357 Ok(response) => response,
358 Err(error) => bail!("Failed to fetch the CDN height - {error}"),
359 };
360 let bytes = match response.bytes().await {
362 Ok(bytes) => bytes,
363 Err(error) => bail!("Failed to parse the CDN height response - {error}"),
364 };
365 let latest_state_string = match bincode::deserialize::<String>(&bytes) {
367 Ok(string) => string,
368 Err(error) => bail!("Failed to deserialize the CDN height response - {error}"),
369 };
370 let tip = match serde_json::from_str::<LatestState>(&latest_state_string) {
372 Ok(latest) => latest.exclusive_height,
373 Err(error) => bail!("Failed to extract the CDN height response - {error}"),
374 };
375 let tip = tip.saturating_sub(10);
377 Ok(tip - (tip % BLOCKS_PER_FILE) + BLOCKS_PER_FILE)
379}
380
381async fn cdn_get<T: 'static + DeserializeOwned + Send>(client: Client, url: &str, ctx: &str) -> Result<T> {
383 let response = match client.get(url).send().await {
385 Ok(response) => response,
386 Err(error) => bail!("Failed to fetch {ctx} - {error}"),
387 };
388 let bytes = match response.bytes().await {
390 Ok(bytes) => bytes,
391 Err(error) => bail!("Failed to parse {ctx} - {error}"),
392 };
393 match tokio::task::spawn_blocking(move || bincode::deserialize::<T>(&bytes)).await {
395 Ok(Ok(objects)) => Ok(objects),
396 Ok(Err(error)) => bail!("Failed to deserialize {ctx} - {error}"),
397 Err(error) => bail!("Failed to join task for {ctx} - {error}"),
398 }
399}
400
401fn log_progress<const OBJECTS_PER_FILE: u32>(
403 timer: Instant,
404 current_index: u32,
405 cdn_start: u32,
406 mut cdn_end: u32,
407 object_name: &str,
408) {
409 cdn_end -= 1;
411 let percentage = current_index * 100 / cdn_end;
413 let num_files_done = 1 + (current_index - cdn_start) / OBJECTS_PER_FILE;
415 let num_files_remaining = 1 + (cdn_end.saturating_sub(current_index)) / OBJECTS_PER_FILE;
417 let millis_per_file = timer.elapsed().as_millis() / num_files_done as u128;
419 let slowdown = 100 * num_files_remaining as u128;
421 let time_remaining = num_files_remaining as u128 * millis_per_file + slowdown;
423 let estimate = format!("(est. {} minutes remaining)", time_remaining / (60 * 1000));
425 info!("Synced up to {object_name} {current_index} of {cdn_end} - {percentage}% complete {}", estimate.dimmed());
427}
428
429#[cfg(test)]
430mod tests {
431 use crate::{
432 blocks::{BLOCKS_PER_FILE, cdn_get, cdn_height, log_progress},
433 load_blocks,
434 };
435 use snarkvm::prelude::{MainnetV0, block::Block};
436
437 use parking_lot::RwLock;
438 use std::{sync::Arc, time::Instant};
439
440 type CurrentNetwork = MainnetV0;
441
442 const TEST_BASE_URL: &str = "https://blocks.aleo.org/mainnet/v0";
443
444 fn check_load_blocks(start: u32, end: Option<u32>, expected: usize) {
445 let blocks = Arc::new(RwLock::new(Vec::new()));
446 let blocks_clone = blocks.clone();
447 let process = move |block: Block<CurrentNetwork>| {
448 blocks_clone.write().push(block);
449 Ok(())
450 };
451
452 let rt = tokio::runtime::Runtime::new().unwrap();
453 rt.block_on(async {
454 let completed_height = load_blocks(TEST_BASE_URL, start, end, Default::default(), process).await.unwrap();
455 assert_eq!(blocks.read().len(), expected);
456 if expected > 0 {
457 assert_eq!(blocks.read().last().unwrap().height(), completed_height);
458 }
459 for (i, block) in blocks.read().iter().enumerate() {
461 assert_eq!(block.height(), start + i as u32);
462 }
463 });
464 }
465
466 #[test]
467 fn test_load_blocks_0_to_50() {
468 let start_height = 0;
469 let end_height = Some(50);
470 check_load_blocks(start_height, end_height, 50);
471 }
472
473 #[test]
474 fn test_load_blocks_50_to_100() {
475 let start_height = 50;
476 let end_height = Some(100);
477 check_load_blocks(start_height, end_height, 50);
478 }
479
480 #[test]
481 fn test_load_blocks_0_to_123() {
482 let start_height = 0;
483 let end_height = Some(123);
484 check_load_blocks(start_height, end_height, 123);
485 }
486
487 #[test]
488 fn test_load_blocks_46_to_234() {
489 let start_height = 46;
490 let end_height = Some(234);
491 check_load_blocks(start_height, end_height, 188);
492 }
493
494 #[test]
495 fn test_cdn_height() {
496 let rt = tokio::runtime::Runtime::new().unwrap();
497 let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
498 rt.block_on(async {
499 let height = cdn_height::<BLOCKS_PER_FILE>(&client, TEST_BASE_URL).await.unwrap();
500 assert!(height > 0);
501 });
502 }
503
504 #[test]
505 fn test_cdn_get() {
506 let rt = tokio::runtime::Runtime::new().unwrap();
507 rt.block_on(async {
508 let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
509 let height =
510 cdn_get::<u32>(client, &format!("{TEST_BASE_URL}/mainnet/latest/height"), "height").await.unwrap();
511 assert!(height > 0);
512 });
513 }
514
515 #[test]
516 fn test_log_progress() {
517 let timer = Instant::now();
519 let cdn_start = 0;
520 let cdn_end = 100;
521 let object_name = "blocks";
522 log_progress::<10>(timer, 0, cdn_start, cdn_end, object_name);
523 log_progress::<10>(timer, 10, cdn_start, cdn_end, object_name);
524 log_progress::<10>(timer, 20, cdn_start, cdn_end, object_name);
525 log_progress::<10>(timer, 30, cdn_start, cdn_end, object_name);
526 log_progress::<10>(timer, 40, cdn_start, cdn_end, object_name);
527 log_progress::<10>(timer, 50, cdn_start, cdn_end, object_name);
528 log_progress::<10>(timer, 60, cdn_start, cdn_end, object_name);
529 log_progress::<10>(timer, 70, cdn_start, cdn_end, object_name);
530 log_progress::<10>(timer, 80, cdn_start, cdn_end, object_name);
531 log_progress::<10>(timer, 90, cdn_start, cdn_end, object_name);
532 log_progress::<10>(timer, 100, cdn_start, cdn_end, object_name);
533 }
534}