1#![allow(clippy::await_holding_lock)]
19
20use snarkvm::prelude::{
21 Deserialize,
22 DeserializeOwned,
23 Ledger,
24 Network,
25 Serialize,
26 block::Block,
27 store::ConsensusStorage,
28};
29
30use anyhow::{Result, anyhow, bail};
31use colored::Colorize;
32#[cfg(feature = "locktick")]
33use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
34#[cfg(not(feature = "locktick"))]
35use parking_lot::Mutex;
36use reqwest::Client;
37use std::{
38 cmp,
39 sync::{
40 Arc,
41 atomic::{AtomicBool, AtomicU32, Ordering},
42 },
43 time::{Duration, Instant},
44};
45#[cfg(not(feature = "locktick"))]
46use tokio::sync::Mutex as TMutex;
47use tokio::task::JoinHandle;
48
49const BLOCKS_PER_FILE: u32 = 50;
51const CONCURRENT_REQUESTS: u32 = 16;
53const MAXIMUM_PENDING_BLOCKS: u32 = BLOCKS_PER_FILE * CONCURRENT_REQUESTS * 2;
55const MAXIMUM_REQUEST_ATTEMPTS: u8 = 10;
57
58pub const CDN_BASE_URL: &str = "https://cdn.provable.com/v0/blocks";
60
61#[cfg(feature = "metrics")]
63fn update_block_metrics(height: u32) {
64 crate::metrics::gauge(crate::metrics::bft::HEIGHT, height as f64);
66}
67
68pub type SyncResult = Result<u32, (u32, anyhow::Error)>;
69
70pub struct CdnBlockSync {
75 base_url: http::Uri,
76 task: Mutex<Option<JoinHandle<SyncResult>>>,
78 done: AtomicBool,
80}
81
82impl CdnBlockSync {
83 pub fn new<N: Network, C: ConsensusStorage<N>>(
88 base_url: http::Uri,
89 ledger: Ledger<N, C>,
90 shutdown: Arc<AtomicBool>,
91 ) -> Self {
92 let task = {
93 let base_url = base_url.clone();
94 tokio::spawn(async move { Self::worker(base_url, ledger, shutdown).await })
95 };
96
97 debug!("Started sync from CDN at {base_url}");
98 Self { done: AtomicBool::new(false), base_url, task: Mutex::new(Some(task)) }
99 }
100
101 pub fn is_done(&self) -> bool {
105 self.done.load(Ordering::SeqCst)
106 }
107
108 pub async fn wait(&self) -> Result<SyncResult> {
110 let Some(hdl) = self.task.lock().take() else {
111 bail!("CDN task was already awaited");
112 };
113
114 let result = hdl.await.map_err(|err| anyhow!("Failed to wait for CDN task: {err}"));
115 self.done.store(true, Ordering::SeqCst);
116 result
117 }
118
119 async fn worker<N: Network, C: ConsensusStorage<N>>(
120 base_url: http::Uri,
121 ledger: Ledger<N, C>,
122 shutdown: Arc<AtomicBool>,
123 ) -> SyncResult {
124 let start_height = ledger.latest_height() + 1;
126 let ledger_clone = ledger.clone();
128 let result = load_blocks(&base_url, start_height, None, shutdown, move |block: Block<N>| {
129 ledger_clone.advance_to_next_block(&block)
130 })
131 .await;
132
133 if let Err((completed_height, error)) = &result {
136 warn!("{error}");
137
138 if *completed_height != start_height {
140 debug!("Synced the ledger up to block {completed_height}");
141
142 let node_height = *ledger.vm().block_store().heights().max().unwrap_or_default();
144 if &node_height != completed_height {
146 return Err((*completed_height, anyhow!("The ledger height does not match the last sync height")));
147 }
148
149 if let Err(err) = ledger.get_block(node_height) {
151 return Err((*completed_height, err));
152 }
153 }
154
155 Ok(*completed_height)
156 } else {
157 result
158 }
159 }
160
161 pub async fn get_cdn_height(&self) -> anyhow::Result<u32> {
162 let client = Client::builder().use_rustls_tls().build()?;
163 cdn_height::<BLOCKS_PER_FILE>(&client, &self.base_url).await
164 }
165}
166
167pub async fn load_blocks<N: Network>(
172 base_url: &http::Uri,
173 start_height: u32,
174 end_height: Option<u32>,
175 shutdown: Arc<AtomicBool>,
176 process: impl FnMut(Block<N>) -> Result<()> + Clone + Send + Sync + 'static,
177) -> Result<u32, (u32, anyhow::Error)> {
178 let client = match Client::builder().use_rustls_tls().build() {
180 Ok(client) => client,
181 Err(error) => {
182 return Err((start_height.saturating_sub(1), anyhow!("Failed to create a CDN request client - {error}")));
183 }
184 };
185
186 let cdn_height = match cdn_height::<BLOCKS_PER_FILE>(&client, base_url).await {
188 Ok(cdn_height) => cdn_height,
189 Err(error) => return Err((start_height, error)),
190 };
191 if cdn_height < start_height {
193 return Err((
194 start_height,
195 anyhow!("The given start height ({start_height}) must be less than the CDN height ({cdn_height})"),
196 ));
197 }
198
199 let end_height = cmp::min(end_height.unwrap_or(cdn_height), cdn_height);
202 if end_height < start_height {
204 return Err((
205 start_height,
206 anyhow!("The given end height ({end_height}) must not be less than the start height ({start_height})"),
207 ));
208 }
209
210 let cdn_start = start_height - (start_height % BLOCKS_PER_FILE);
212 let cdn_end = end_height;
214 if cdn_start >= cdn_end {
216 return Ok(cdn_end);
217 }
218
219 let pending_blocks: Arc<TMutex<Vec<Block<N>>>> = Default::default();
221
222 let timer = Instant::now();
224
225 let pending_blocks_clone = pending_blocks.clone();
227 let base_url = base_url.to_owned();
228 let shutdown_clone = shutdown.clone();
229 tokio::spawn(async move {
230 download_block_bundles(client, &base_url, cdn_start, cdn_end, pending_blocks_clone, shutdown_clone).await;
231 });
232
233 let mut current_height = start_height.saturating_sub(1);
235 while current_height < end_height - 1 {
236 if shutdown.load(Ordering::Acquire) {
238 info!("Stopping block sync at {} - shutting down", current_height);
239 std::process::exit(0);
241 }
242
243 let mut candidate_blocks = pending_blocks.lock().await;
244
245 let Some(next_height) = candidate_blocks.first().map(|b| b.height()) else {
247 debug!("No pending blocks yet");
248 drop(candidate_blocks);
249 tokio::time::sleep(Duration::from_secs(3)).await;
250 continue;
251 };
252
253 if next_height > current_height + 1 {
255 debug!("Waiting for the first relevant blocks ({} pending)", candidate_blocks.len());
257 drop(candidate_blocks);
258 tokio::time::sleep(Duration::from_secs(1)).await;
259 continue;
260 }
261
262 let retained_blocks = candidate_blocks.split_off(BLOCKS_PER_FILE as usize);
264 let next_blocks = std::mem::replace(&mut *candidate_blocks, retained_blocks);
265 drop(candidate_blocks);
266
267 let threadpool = rayon::ThreadPoolBuilder::new().build().unwrap();
269
270 let mut process_clone = process.clone();
272 let shutdown_clone = shutdown.clone();
273 current_height = tokio::task::spawn_blocking(move || {
274 threadpool.install(|| {
275 for block in next_blocks.into_iter().filter(|b| (start_height..end_height).contains(&b.height())) {
276 if shutdown_clone.load(Ordering::Relaxed) {
278 info!("Stopping block sync at {} - the node is shutting down", current_height);
279 std::process::exit(0);
281 }
282
283 let block_height = block.height();
285
286 process_clone(block)?;
288
289 current_height = block_height;
291
292 #[cfg(feature = "metrics")]
294 update_block_metrics(current_height);
295
296 log_progress::<BLOCKS_PER_FILE>(timer, current_height, cdn_start, cdn_end, "block");
298 }
299
300 Ok(current_height)
301 })
302 })
303 .await
304 .map_err(|e| (current_height, e.into()))?
305 .map_err(|e| (current_height, e))?;
306 }
307
308 Ok(current_height)
309}
310
311async fn download_block_bundles<N: Network>(
312 client: Client,
313 base_url: &http::Uri,
314 cdn_start: u32,
315 cdn_end: u32,
316 pending_blocks: Arc<TMutex<Vec<Block<N>>>>,
317 shutdown: Arc<AtomicBool>,
318) {
319 let active_requests: Arc<AtomicU32> = Default::default();
321
322 let mut start = cdn_start;
323 while start < cdn_end - 1 {
324 if shutdown.load(Ordering::Acquire) {
326 break;
327 }
328
329 let num_pending_blocks = pending_blocks.lock().await.len();
331 if num_pending_blocks >= MAXIMUM_PENDING_BLOCKS as usize {
332 debug!("Maximum number of pending blocks reached ({num_pending_blocks}), waiting...");
333 tokio::time::sleep(Duration::from_secs(5)).await;
334 continue;
335 }
336
337 let active_request_count = active_requests.load(Ordering::Relaxed);
340 let num_requests =
341 cmp::min(CONCURRENT_REQUESTS, (MAXIMUM_PENDING_BLOCKS - num_pending_blocks as u32) / BLOCKS_PER_FILE)
342 .saturating_sub(active_request_count);
343
344 for i in 0..num_requests {
346 let start = start + i * BLOCKS_PER_FILE;
347 let end = start + BLOCKS_PER_FILE;
348
349 if end > cdn_end + BLOCKS_PER_FILE {
351 debug!("Finishing network requests to the CDN...");
352 break;
353 }
354
355 let client_clone = client.clone();
356 let base_url_clone = base_url.clone();
357 let pending_blocks_clone = pending_blocks.clone();
358 let active_requests_clone = active_requests.clone();
359 let shutdown_clone = shutdown.clone();
360 tokio::spawn(async move {
361 active_requests_clone.fetch_add(1, Ordering::Relaxed);
363
364 let ctx = format!("blocks {start} to {end}");
365 debug!("Requesting {ctx} (of {cdn_end})");
366
367 let blocks_url = format!("{base_url_clone}/{start}.{end}.blocks");
369 let ctx = format!("blocks {start} to {end}");
370 let mut attempts = 0;
372 let request_time = Instant::now();
373
374 loop {
375 match cdn_get(client_clone.clone(), &blocks_url, &ctx).await {
377 Ok::<Vec<Block<N>>, _>(blocks) => {
378 let mut pending_blocks = pending_blocks_clone.lock().await;
380 for block in blocks {
381 match pending_blocks.binary_search_by_key(&block.height(), |b| b.height()) {
382 Ok(_idx) => warn!("Found a duplicate pending block at height {}", block.height()),
383 Err(idx) => pending_blocks.insert(idx, block),
384 }
385 }
386 debug!("Received {ctx} {}", format!("(in {:.2?})", request_time.elapsed()).dimmed());
387 break;
388 }
389 Err(error) => {
390 attempts += 1;
393 if attempts > MAXIMUM_REQUEST_ATTEMPTS {
394 warn!("Maximum number of requests to {blocks_url} reached - shutting down...");
395 shutdown_clone.store(true, Ordering::Relaxed);
396 break;
397 }
398 tokio::time::sleep(Duration::from_secs(attempts as u64 * 10)).await;
399 warn!("{error} - retrying ({attempts} attempt(s) so far)");
400 }
401 }
402 }
403
404 active_requests_clone.fetch_sub(1, Ordering::Relaxed);
406 });
407 }
408
409 start += BLOCKS_PER_FILE * num_requests;
411
412 tokio::time::sleep(Duration::from_secs(1)).await;
414 }
415
416 debug!("Finished network requests to the CDN");
417}
418
419async fn cdn_height<const BLOCKS_PER_FILE: u32>(client: &Client, base_url: &http::Uri) -> Result<u32> {
424 #[derive(Deserialize, Serialize, Debug)]
426 struct LatestState {
427 exclusive_height: u32,
428 inclusive_height: u32,
429 hash: String,
430 }
431 let latest_json_url = format!("{base_url}/latest.json");
433 let response = match client.get(latest_json_url).send().await {
435 Ok(response) => response,
436 Err(error) => bail!("Failed to fetch the CDN height - {error}"),
437 };
438 let bytes = match response.bytes().await {
440 Ok(bytes) => bytes,
441 Err(error) => bail!("Failed to parse the CDN height response - {error}"),
442 };
443 let latest_state_string = match bincode::deserialize::<String>(&bytes) {
445 Ok(string) => string,
446 Err(error) => bail!("Failed to deserialize the CDN height response - {error}"),
447 };
448 let tip = match serde_json::from_str::<LatestState>(&latest_state_string) {
450 Ok(latest) => latest.exclusive_height,
451 Err(error) => bail!("Failed to extract the CDN height response - {error}"),
452 };
453 let tip = tip.saturating_sub(10);
455 Ok(tip - (tip % BLOCKS_PER_FILE) + BLOCKS_PER_FILE)
457}
458
459async fn cdn_get<T: 'static + DeserializeOwned + Send>(client: Client, url: &str, ctx: &str) -> Result<T> {
461 let response = match client.get(url).send().await {
463 Ok(response) => response,
464 Err(error) => bail!("Failed to fetch {ctx} - {error}"),
465 };
466 let bytes = match response.bytes().await {
468 Ok(bytes) => bytes,
469 Err(error) => bail!("Failed to parse {ctx} - {error}"),
470 };
471
472 match tokio::task::spawn_blocking(move || bincode::deserialize::<T>(&bytes)).await {
474 Ok(Ok(objects)) => Ok(objects),
475 Ok(Err(error)) => bail!("Failed to deserialize {ctx} - {error}"),
476 Err(error) => bail!("Failed to join task for {ctx} - {error}"),
477 }
478}
479
480fn to_human_readable_duration(duration: Duration) -> String {
487 const SECS_PER_MIN: u64 = 60;
490 const MINS_PER_HOUR: u64 = 60;
491 const SECS_PER_HOUR: u64 = SECS_PER_MIN * MINS_PER_HOUR;
492 const HOURS_PER_DAY: u64 = 24;
493 const SECS_PER_DAY: u64 = SECS_PER_HOUR * HOURS_PER_DAY;
494
495 let duration = duration.as_secs();
496
497 if duration < 1 {
498 "less than one second".to_string()
499 } else if duration < SECS_PER_MIN {
500 format!("{duration} seconds")
501 } else if duration < SECS_PER_HOUR {
502 format!("{} minutes", duration / SECS_PER_MIN)
503 } else if duration < SECS_PER_DAY {
504 let mins = duration / SECS_PER_MIN;
505 format!("{hours} hours and {remainder} minutes", hours = mins / 60, remainder = mins % 60)
506 } else {
507 let days = duration / SECS_PER_DAY;
508 let hours = (duration % SECS_PER_DAY) / SECS_PER_HOUR;
509 format!("{days} days and {hours} hours")
510 }
511}
512
513fn log_progress<const OBJECTS_PER_FILE: u32>(
515 timer: Instant,
516 current_index: u32,
517 cdn_start: u32,
518 mut cdn_end: u32,
519 object_name: &str,
520) {
521 debug_assert!(cdn_start <= cdn_end);
522 debug_assert!(current_index <= cdn_end);
523 debug_assert!(cdn_end >= 1);
524
525 cdn_end -= 1;
527
528 let sync_percentage =
530 (current_index.saturating_sub(cdn_start) * 100).checked_div(cdn_end.saturating_sub(cdn_start)).unwrap_or(100);
531
532 let num_files_done = 1 + (current_index - cdn_start) / OBJECTS_PER_FILE;
534 let num_files_remaining = 1 + (cdn_end.saturating_sub(current_index)) / OBJECTS_PER_FILE;
536 let millis_per_file = timer.elapsed().as_millis() / num_files_done as u128;
538 let slowdown = 100 * num_files_remaining as u128;
540 let time_remaining = {
542 let remaining = num_files_remaining as u128 * millis_per_file + slowdown;
543 to_human_readable_duration(Duration::from_secs((remaining / 1000) as u64))
544 };
545 let estimate = format!("(started at height {cdn_start}, est. {time_remaining} remaining)");
547 info!(
549 "Reached {object_name} {current_index} of {cdn_end} - Sync is {sync_percentage}% complete {}",
550 estimate.dimmed()
551 );
552}
553
554#[cfg(test)]
555mod tests {
556 use super::{BLOCKS_PER_FILE, CDN_BASE_URL, cdn_height, log_progress};
557 use crate::load_blocks;
558 use snarkvm::prelude::{MainnetV0, block::Block};
559
560 use http::Uri;
561 use parking_lot::RwLock;
562 use std::{sync::Arc, time::Instant};
563
564 type CurrentNetwork = MainnetV0;
565
566 fn check_load_blocks(start: u32, end: Option<u32>, expected: usize) {
567 let blocks = Arc::new(RwLock::new(Vec::new()));
568 let blocks_clone = blocks.clone();
569 let process = move |block: Block<CurrentNetwork>| {
570 blocks_clone.write().push(block);
571 Ok(())
572 };
573
574 let testnet_cdn_url = Uri::try_from(format!("{CDN_BASE_URL}/mainnet")).unwrap();
575
576 let rt = tokio::runtime::Runtime::new().unwrap();
577 rt.block_on(async {
578 let completed_height =
579 load_blocks(&testnet_cdn_url, start, end, Default::default(), process).await.unwrap();
580 assert_eq!(blocks.read().len(), expected);
581 if expected > 0 {
582 assert_eq!(blocks.read().last().unwrap().height(), completed_height);
583 }
584 for (i, block) in blocks.read().iter().enumerate() {
586 assert_eq!(block.height(), start + i as u32);
587 }
588 });
589 }
590
591 #[test]
592 fn test_load_blocks_0_to_50() {
593 let start_height = 0;
594 let end_height = Some(50);
595 check_load_blocks(start_height, end_height, 50);
596 }
597
598 #[test]
599 fn test_load_blocks_50_to_100() {
600 let start_height = 50;
601 let end_height = Some(100);
602 check_load_blocks(start_height, end_height, 50);
603 }
604
605 #[test]
606 fn test_load_blocks_0_to_123() {
607 let start_height = 0;
608 let end_height = Some(123);
609 check_load_blocks(start_height, end_height, 123);
610 }
611
612 #[test]
613 fn test_load_blocks_46_to_234() {
614 let start_height = 46;
615 let end_height = Some(234);
616 check_load_blocks(start_height, end_height, 188);
617 }
618
619 #[test]
620 fn test_cdn_height() {
621 let rt = tokio::runtime::Runtime::new().unwrap();
622 let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
623 let testnet_cdn_url = Uri::try_from(format!("{CDN_BASE_URL}/mainnet")).unwrap();
624 rt.block_on(async {
625 let height = cdn_height::<BLOCKS_PER_FILE>(&client, &testnet_cdn_url).await.unwrap();
626 assert!(height > 0);
627 });
628 }
629
630 #[test]
631 fn test_log_progress() {
632 let timer = Instant::now();
634 let cdn_start = 0;
635 let cdn_end = 100;
636 let object_name = "blocks";
637 log_progress::<10>(timer, 0, cdn_start, cdn_end, object_name);
638 log_progress::<10>(timer, 10, cdn_start, cdn_end, object_name);
639 log_progress::<10>(timer, 20, cdn_start, cdn_end, object_name);
640 log_progress::<10>(timer, 30, cdn_start, cdn_end, object_name);
641 log_progress::<10>(timer, 40, cdn_start, cdn_end, object_name);
642 log_progress::<10>(timer, 50, cdn_start, cdn_end, object_name);
643 log_progress::<10>(timer, 60, cdn_start, cdn_end, object_name);
644 log_progress::<10>(timer, 70, cdn_start, cdn_end, object_name);
645 log_progress::<10>(timer, 80, cdn_start, cdn_end, object_name);
646 log_progress::<10>(timer, 90, cdn_start, cdn_end, object_name);
647 log_progress::<10>(timer, 100, cdn_start, cdn_end, object_name);
648 }
649}