1#![allow(clippy::await_holding_lock)]
19
20use snarkos_utilities::{SignalHandler, Stoppable};
21
22use snarkvm::{
23 prelude::{Deserialize, DeserializeOwned, Ledger, Network, Serialize, block::Block, store::ConsensusStorage},
24 utilities::flatten_error,
25};
26
27use anyhow::{Context, Result, anyhow, bail};
28use colored::Colorize;
29#[cfg(feature = "locktick")]
30use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
31#[cfg(not(feature = "locktick"))]
32use parking_lot::Mutex;
33use reqwest::Client;
34use std::{
35 cmp,
36 sync::{
37 Arc,
38 atomic::{AtomicBool, AtomicU32, Ordering},
39 },
40 time::{Duration, Instant},
41};
42#[cfg(not(feature = "locktick"))]
43use tokio::sync::Mutex as TMutex;
44use tokio::task::JoinHandle;
45
46const BLOCKS_PER_FILE: u32 = 50;
48const CONCURRENT_REQUESTS: u32 = 16;
50const MAXIMUM_PENDING_BLOCKS: u32 = BLOCKS_PER_FILE * CONCURRENT_REQUESTS * 2;
52const MAXIMUM_REQUEST_ATTEMPTS: u8 = 10;
54
55pub const CDN_BASE_URL: &str = "https://cdn.provable.com/v0/blocks";
57
58#[cfg(feature = "metrics")]
60fn update_block_metrics(height: u32) {
61 crate::metrics::gauge(crate::metrics::bft::HEIGHT, height as f64);
63}
64
65pub type SyncResult = Result<u32, (u32, anyhow::Error)>;
66
67pub struct CdnBlockSync {
72 base_url: http::Uri,
73 task: Mutex<Option<JoinHandle<SyncResult>>>,
75 done: AtomicBool,
77}
78
79impl CdnBlockSync {
80 pub fn new<N: Network, C: ConsensusStorage<N>>(
82 base_url: http::Uri,
83 ledger: Ledger<N, C>,
84 stoppable: Arc<SignalHandler>,
85 ) -> Self {
86 let task = {
87 let base_url = base_url.clone();
88 tokio::spawn(async move { Self::worker(base_url, ledger, stoppable).await })
89 };
90
91 debug!("Started sync from CDN at {base_url}");
92 Self { done: AtomicBool::new(false), base_url, task: Mutex::new(Some(task)) }
93 }
94
95 pub fn is_done(&self) -> bool {
99 self.done.load(Ordering::SeqCst)
100 }
101
102 pub async fn wait(&self) -> Result<SyncResult> {
107 let Some(hdl) = self.task.lock().take() else {
108 bail!("CDN task was already awaited");
109 };
110
111 let result = hdl.await.map_err(|err| anyhow!("Failed to wait for CDN task: {err}"));
112 self.done.store(true, Ordering::SeqCst);
113 result
114 }
115
116 async fn worker<N: Network, C: ConsensusStorage<N>>(
117 base_url: http::Uri,
118 ledger: Ledger<N, C>,
119 stoppable: Arc<dyn Stoppable>,
120 ) -> SyncResult {
121 let start_height = ledger.latest_height() + 1;
123 let ledger_clone = ledger.clone();
125 let result = load_blocks(&base_url, start_height, None, stoppable, move |block: Block<N>| {
126 ledger_clone
127 .advance_to_next_block(&block)
128 .with_context(|| format!("Failed to advance to block {} at height {}", block.hash(), block.height()))
129 })
130 .await;
131
132 match result {
135 Ok(completed_height) => Ok(completed_height),
136 Err((completed_height, error)) => {
137 warn!("{}", flatten_error(error.context("Failed to sync block(s) from the CDN")));
138
139 if completed_height != start_height {
141 debug!("Synced the ledger up to block {completed_height}");
142
143 let node_height = *ledger.vm().block_store().heights().max().unwrap_or_default();
145 if node_height != completed_height {
147 return Err((
148 completed_height,
149 anyhow!("The ledger height does not match the last sync height"),
150 ));
151 }
152
153 if let Err(err) = ledger.get_block(node_height) {
155 return Err((completed_height, err));
156 }
157 }
158
159 Ok(completed_height)
160 }
161 }
162 }
163
164 pub async fn get_cdn_height(&self) -> anyhow::Result<u32> {
165 let client = Client::builder().use_rustls_tls().build()?;
166 cdn_height::<BLOCKS_PER_FILE>(&client, &self.base_url).await
167 }
168}
169
170pub async fn load_blocks<N: Network>(
175 base_url: &http::Uri,
176 start_height: u32,
177 end_height: Option<u32>,
178 stoppable: Arc<dyn Stoppable>,
179 process: impl FnMut(Block<N>) -> Result<()> + Clone + Send + Sync + 'static,
180) -> Result<u32, (u32, anyhow::Error)> {
181 let client = match Client::builder().use_rustls_tls().build() {
183 Ok(client) => client,
184 Err(error) => {
185 return Err((start_height.saturating_sub(1), anyhow!("Failed to create a CDN request client - {error}")));
186 }
187 };
188
189 let cdn_height = match cdn_height::<BLOCKS_PER_FILE>(&client, base_url).await {
191 Ok(cdn_height) => cdn_height,
192 Err(error) => return Err((start_height, error)),
193 };
194 if cdn_height < start_height {
196 return Err((
197 start_height,
198 anyhow!("The given start height ({start_height}) must be less than the CDN height ({cdn_height})"),
199 ));
200 }
201
202 let end_height = cmp::min(end_height.unwrap_or(cdn_height), cdn_height);
205 if end_height < start_height {
207 return Err((
208 start_height,
209 anyhow!("The given end height ({end_height}) must not be less than the start height ({start_height})"),
210 ));
211 }
212
213 let cdn_start = start_height - (start_height % BLOCKS_PER_FILE);
215 let cdn_end = end_height;
217 if cdn_start >= cdn_end {
219 return Ok(cdn_end);
220 }
221
222 let pending_blocks: Arc<TMutex<Vec<Block<N>>>> = Default::default();
224
225 let timer = Instant::now();
227
228 let pending_blocks_clone = pending_blocks.clone();
230 let base_url = base_url.to_owned();
231
232 {
233 let stoppable = stoppable.clone();
234 tokio::spawn(async move {
235 download_block_bundles(client, &base_url, cdn_start, cdn_end, pending_blocks_clone, stoppable).await;
236 });
237 }
238
239 let mut current_height = start_height.saturating_sub(1);
241 while current_height < end_height - 1 {
242 if stoppable.is_stopped() {
244 info!("Stopping block sync at {} - shutting down", current_height);
245 std::process::exit(0);
247 }
248
249 let mut candidate_blocks = pending_blocks.lock().await;
250
251 let Some(next_height) = candidate_blocks.first().map(|b| b.height()) else {
253 debug!("No pending blocks yet");
254 drop(candidate_blocks);
255 tokio::time::sleep(Duration::from_secs(3)).await;
256 continue;
257 };
258
259 if next_height > current_height + 1 {
261 debug!("Waiting for the first relevant blocks ({} pending)", candidate_blocks.len());
263 drop(candidate_blocks);
264 tokio::time::sleep(Duration::from_secs(1)).await;
265 continue;
266 }
267
268 let retained_blocks = candidate_blocks.split_off(BLOCKS_PER_FILE as usize);
270 let next_blocks = std::mem::replace(&mut *candidate_blocks, retained_blocks);
271 drop(candidate_blocks);
272
273 let threadpool = rayon::ThreadPoolBuilder::new().build().unwrap();
275
276 let mut process_clone = process.clone();
278 let stoppable_clone = stoppable.clone();
279 current_height = tokio::task::spawn_blocking(move || {
280 threadpool.install(|| {
281 for block in next_blocks.into_iter().filter(|b| (start_height..end_height).contains(&b.height())) {
282 if stoppable_clone.is_stopped() {
284 info!("Stopping block sync at {} - the node is shutting down", current_height);
285 std::process::exit(0);
287 }
288
289 let block_height = block.height();
291
292 process_clone(block)?;
294
295 current_height = block_height;
297
298 #[cfg(feature = "metrics")]
300 update_block_metrics(current_height);
301
302 log_progress::<BLOCKS_PER_FILE>(timer, current_height, cdn_start, cdn_end, "block");
304 }
305
306 Ok(current_height)
307 })
308 })
309 .await
310 .map_err(|e| (current_height, e.into()))?
311 .map_err(|e| (current_height, e))?;
312 }
313
314 Ok(current_height)
315}
316
317async fn download_block_bundles<N: Network>(
318 client: Client,
319 base_url: &http::Uri,
320 cdn_start: u32,
321 cdn_end: u32,
322 pending_blocks: Arc<TMutex<Vec<Block<N>>>>,
323 stoppable: Arc<dyn Stoppable>,
324) {
325 let active_requests: Arc<AtomicU32> = Default::default();
327
328 let mut start = cdn_start;
329 while start < cdn_end - 1 {
330 if stoppable.is_stopped() {
332 break;
333 }
334
335 let num_pending_blocks = pending_blocks.lock().await.len();
337 if num_pending_blocks >= MAXIMUM_PENDING_BLOCKS as usize {
338 debug!("Maximum number of pending blocks reached ({num_pending_blocks}), waiting...");
339 tokio::time::sleep(Duration::from_secs(5)).await;
340 continue;
341 }
342
343 let active_request_count = active_requests.load(Ordering::Relaxed);
346 let num_requests =
347 cmp::min(CONCURRENT_REQUESTS, (MAXIMUM_PENDING_BLOCKS - num_pending_blocks as u32) / BLOCKS_PER_FILE)
348 .saturating_sub(active_request_count);
349
350 for i in 0..num_requests {
352 let start = start + i * BLOCKS_PER_FILE;
353 let end = start + BLOCKS_PER_FILE;
354
355 if end > cdn_end + BLOCKS_PER_FILE {
357 debug!("Finishing network requests to the CDN...");
358 break;
359 }
360
361 let client_clone = client.clone();
362 let base_url_clone = base_url.clone();
363 let pending_blocks_clone = pending_blocks.clone();
364 let active_requests_clone = active_requests.clone();
365 let stoppable_clone = stoppable.clone();
366 tokio::spawn(async move {
367 active_requests_clone.fetch_add(1, Ordering::Relaxed);
369
370 let ctx = format!("blocks {start} to {end}");
371 debug!("Requesting {ctx} (of {cdn_end})");
372
373 let blocks_url = format!("{base_url_clone}/{start}.{end}.blocks");
375 let ctx = format!("blocks {start} to {end}");
376 let mut attempts = 0;
378 let request_time = Instant::now();
379
380 loop {
381 match cdn_get(client_clone.clone(), &blocks_url, &ctx).await {
383 Ok::<Vec<Block<N>>, _>(blocks) => {
384 let mut pending_blocks = pending_blocks_clone.lock().await;
386 for block in blocks {
387 match pending_blocks.binary_search_by_key(&block.height(), |b| b.height()) {
388 Ok(_idx) => warn!("Found a duplicate pending block at height {}", block.height()),
389 Err(idx) => pending_blocks.insert(idx, block),
390 }
391 }
392 debug!("Received {ctx} {}", format!("(in {:.2?})", request_time.elapsed()).dimmed());
393 break;
394 }
395 Err(error) => {
396 attempts += 1;
399 if attempts > MAXIMUM_REQUEST_ATTEMPTS {
400 warn!("Maximum number of requests to {blocks_url} reached - shutting down...");
401 stoppable_clone.stop();
402 break;
403 }
404 tokio::time::sleep(Duration::from_secs(attempts as u64 * 10)).await;
405 warn!("{error} - retrying ({attempts} attempt(s) so far)");
406 }
407 }
408 }
409
410 active_requests_clone.fetch_sub(1, Ordering::Relaxed);
412 });
413 }
414
415 start += BLOCKS_PER_FILE * num_requests;
417
418 tokio::time::sleep(Duration::from_secs(1)).await;
420 }
421
422 debug!("Finished network requests to the CDN");
423}
424
425async fn cdn_height<const BLOCKS_PER_FILE: u32>(client: &Client, base_url: &http::Uri) -> Result<u32> {
430 #[derive(Deserialize, Serialize, Debug)]
432 struct LatestState {
433 exclusive_height: u32,
434 inclusive_height: u32,
435 hash: String,
436 }
437 let latest_json_url = format!("{base_url}/latest.json");
439 let response = match client.get(latest_json_url).send().await {
441 Ok(response) => response,
442 Err(error) => bail!("Failed to fetch the CDN height - {error}"),
443 };
444 let bytes = match response.bytes().await {
446 Ok(bytes) => bytes,
447 Err(error) => bail!("Failed to parse the CDN height response - {error}"),
448 };
449 let latest_state_string = match bincode::deserialize::<String>(&bytes) {
451 Ok(string) => string,
452 Err(error) => {
453 let bytes_as_string = String::from_utf8_lossy(&bytes);
454 bail!("Failed to deserialize the CDN height response - {error} - {bytes_as_string}")
455 }
456 };
457 let tip = match serde_json::from_str::<LatestState>(&latest_state_string) {
459 Ok(latest) => latest.exclusive_height,
460 Err(error) => bail!("Failed to extract the CDN height response - {error}"),
461 };
462 let tip = tip.saturating_sub(10);
464 Ok(tip - (tip % BLOCKS_PER_FILE) + BLOCKS_PER_FILE)
466}
467
468async fn cdn_get<T: 'static + DeserializeOwned + Send>(client: Client, url: &str, ctx: &str) -> Result<T> {
470 let response = match client.get(url).send().await {
472 Ok(response) => response,
473 Err(error) => bail!("Failed to fetch {ctx} - {error}"),
474 };
475 let bytes = match response.bytes().await {
477 Ok(bytes) => bytes,
478 Err(error) => bail!("Failed to parse {ctx} - {error}"),
479 };
480
481 match tokio::task::spawn_blocking(move || (bincode::deserialize::<T>(&bytes), bytes)).await {
483 Ok((Ok(objects), _)) => Ok(objects),
484 Ok((Err(error), response_bytes)) => {
485 let bytes_as_string = String::from_utf8_lossy(&response_bytes);
486 bail!("Failed to deserialize {ctx} - {error} - {bytes_as_string}")
487 }
488 Err(error) => {
489 bail!("Failed to join task for {ctx} - {error}")
490 }
491 }
492}
493
494fn to_human_readable_duration(duration: Duration) -> String {
501 const SECS_PER_MIN: u64 = 60;
504 const MINS_PER_HOUR: u64 = 60;
505 const SECS_PER_HOUR: u64 = SECS_PER_MIN * MINS_PER_HOUR;
506 const HOURS_PER_DAY: u64 = 24;
507 const SECS_PER_DAY: u64 = SECS_PER_HOUR * HOURS_PER_DAY;
508
509 let duration = duration.as_secs();
510
511 if duration < 1 {
512 "less than one second".to_string()
513 } else if duration < SECS_PER_MIN {
514 format!("{duration} seconds")
515 } else if duration < SECS_PER_HOUR {
516 format!("{} minutes", duration / SECS_PER_MIN)
517 } else if duration < SECS_PER_DAY {
518 let mins = duration / SECS_PER_MIN;
519 format!("{hours} hours and {remainder} minutes", hours = mins / 60, remainder = mins % 60)
520 } else {
521 let days = duration / SECS_PER_DAY;
522 let hours = (duration % SECS_PER_DAY) / SECS_PER_HOUR;
523 format!("{days} days and {hours} hours")
524 }
525}
526
527fn log_progress<const OBJECTS_PER_FILE: u32>(
529 timer: Instant,
530 current_index: u32,
531 cdn_start: u32,
532 mut cdn_end: u32,
533 object_name: &str,
534) {
535 debug_assert!(cdn_start <= cdn_end);
536 debug_assert!(current_index <= cdn_end);
537 debug_assert!(cdn_end >= 1);
538
539 cdn_end -= 1;
541
542 let sync_percentage =
544 (current_index.saturating_sub(cdn_start) * 100).checked_div(cdn_end.saturating_sub(cdn_start)).unwrap_or(100);
545
546 let num_files_done = 1 + (current_index - cdn_start) / OBJECTS_PER_FILE;
548 let num_files_remaining = 1 + (cdn_end.saturating_sub(current_index)) / OBJECTS_PER_FILE;
550 let millis_per_file = timer.elapsed().as_millis() / num_files_done as u128;
552 let slowdown = 100 * num_files_remaining as u128;
554 let time_remaining = {
556 let remaining = num_files_remaining as u128 * millis_per_file + slowdown;
557 to_human_readable_duration(Duration::from_secs((remaining / 1000) as u64))
558 };
559 let estimate = format!("(started at height {cdn_start}, est. {time_remaining} remaining)");
561 info!(
563 "Reached {object_name} {current_index} of {cdn_end} - Sync is {sync_percentage}% complete {}",
564 estimate.dimmed()
565 );
566}
567
568#[cfg(test)]
569mod tests {
570 use super::{BLOCKS_PER_FILE, CDN_BASE_URL, cdn_height, load_blocks, log_progress};
571
572 use snarkos_utilities::SimpleStoppable;
573
574 use snarkvm::prelude::{MainnetV0, block::Block};
575
576 use http::Uri;
577 use parking_lot::RwLock;
578 use std::{sync::Arc, time::Instant};
579
580 type CurrentNetwork = MainnetV0;
581
582 fn check_load_blocks(start: u32, end: Option<u32>, expected: usize) {
583 let blocks = Arc::new(RwLock::new(Vec::new()));
584 let blocks_clone = blocks.clone();
585 let process = move |block: Block<CurrentNetwork>| {
586 blocks_clone.write().push(block);
587 Ok(())
588 };
589
590 let testnet_cdn_url = Uri::try_from(format!("{CDN_BASE_URL}/mainnet")).unwrap();
591
592 let rt = tokio::runtime::Runtime::new().unwrap();
593 rt.block_on(async {
594 let completed_height =
595 load_blocks(&testnet_cdn_url, start, end, SimpleStoppable::new(), process).await.unwrap();
596 assert_eq!(blocks.read().len(), expected);
597 if expected > 0 {
598 assert_eq!(blocks.read().last().unwrap().height(), completed_height);
599 }
600 for (i, block) in blocks.read().iter().enumerate() {
602 assert_eq!(block.height(), start + i as u32);
603 }
604 });
605 }
606
607 #[test]
608 fn test_load_blocks_0_to_50() {
609 let start_height = 0;
610 let end_height = Some(50);
611 check_load_blocks(start_height, end_height, 50);
612 }
613
614 #[test]
615 fn test_load_blocks_50_to_100() {
616 let start_height = 50;
617 let end_height = Some(100);
618 check_load_blocks(start_height, end_height, 50);
619 }
620
621 #[test]
622 fn test_load_blocks_0_to_123() {
623 let start_height = 0;
624 let end_height = Some(123);
625 check_load_blocks(start_height, end_height, 123);
626 }
627
628 #[test]
629 fn test_load_blocks_46_to_234() {
630 let start_height = 46;
631 let end_height = Some(234);
632 check_load_blocks(start_height, end_height, 188);
633 }
634
635 #[test]
636 fn test_cdn_height() {
637 let rt = tokio::runtime::Runtime::new().unwrap();
638 let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
639 let testnet_cdn_url = Uri::try_from(format!("{CDN_BASE_URL}/mainnet")).unwrap();
640 rt.block_on(async {
641 let height = cdn_height::<BLOCKS_PER_FILE>(&client, &testnet_cdn_url).await.unwrap();
642 assert!(height > 0);
643 });
644 }
645
646 #[test]
647 fn test_log_progress() {
648 let timer = Instant::now();
650 let cdn_start = 0;
651 let cdn_end = 100;
652 let object_name = "blocks";
653 log_progress::<10>(timer, 0, cdn_start, cdn_end, object_name);
654 log_progress::<10>(timer, 10, cdn_start, cdn_end, object_name);
655 log_progress::<10>(timer, 20, cdn_start, cdn_end, object_name);
656 log_progress::<10>(timer, 30, cdn_start, cdn_end, object_name);
657 log_progress::<10>(timer, 40, cdn_start, cdn_end, object_name);
658 log_progress::<10>(timer, 50, cdn_start, cdn_end, object_name);
659 log_progress::<10>(timer, 60, cdn_start, cdn_end, object_name);
660 log_progress::<10>(timer, 70, cdn_start, cdn_end, object_name);
661 log_progress::<10>(timer, 80, cdn_start, cdn_end, object_name);
662 log_progress::<10>(timer, 90, cdn_start, cdn_end, object_name);
663 log_progress::<10>(timer, 100, cdn_start, cdn_end, object_name);
664 }
665}