1#![allow(clippy::await_holding_lock)]
19
20use snarkos_utilities::{SignalHandler, Stoppable};
21
22use snarkvm::{
23 prelude::{Deserialize, DeserializeOwned, Ledger, Network, Serialize, block::Block, store::ConsensusStorage},
24 utilities::{flatten_error, unchecked_deserialize},
25};
26
27use anyhow::{Context, Result, anyhow, bail};
28use colored::Colorize;
29#[cfg(feature = "locktick")]
30use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
31#[cfg(not(feature = "locktick"))]
32use parking_lot::Mutex;
33use reqwest::Client;
34use std::{
35 cmp,
36 sync::{
37 Arc,
38 atomic::{AtomicBool, AtomicU32, Ordering},
39 },
40 time::{Duration, Instant},
41};
42#[cfg(not(feature = "locktick"))]
43use tokio::sync::Mutex as TMutex;
44use tokio::task::JoinHandle;
45
46const BLOCKS_PER_FILE: u32 = 50;
48const CONCURRENT_REQUESTS: u32 = 16;
50const MAXIMUM_PENDING_BLOCKS: u32 = BLOCKS_PER_FILE * CONCURRENT_REQUESTS * 2;
52const MAXIMUM_REQUEST_ATTEMPTS: u8 = 10;
54
55pub const CDN_BASE_URL: &str = "https://cdn.provable.com/v0/blocks";
57
58const fn cdn_user_agent() -> &'static str {
60 concat!("snarkos/", env!("CARGO_PKG_VERSION"))
61}
62
63#[cfg(feature = "metrics")]
65fn update_block_metrics(height: u32) {
66 crate::metrics::gauge(crate::metrics::bft::HEIGHT, height as f64);
68}
69
70pub type SyncResult = Result<u32, (u32, anyhow::Error)>;
71
72pub struct CdnBlockSync {
77 base_url: http::Uri,
78 task: Mutex<Option<JoinHandle<SyncResult>>>,
80 done: AtomicBool,
82}
83
84impl CdnBlockSync {
85 pub fn new<N: Network, C: ConsensusStorage<N>>(
87 base_url: http::Uri,
88 ledger: Ledger<N, C>,
89 stoppable: Arc<SignalHandler>,
90 ) -> Self {
91 let task = {
92 let base_url = base_url.clone();
93 tokio::spawn(async move { Self::worker(base_url, ledger, stoppable).await })
94 };
95
96 debug!("Started sync from CDN at {base_url}");
97 Self { done: AtomicBool::new(false), base_url, task: Mutex::new(Some(task)) }
98 }
99
100 pub fn is_done(&self) -> bool {
104 self.done.load(Ordering::SeqCst)
105 }
106
107 pub async fn wait(&self) -> Result<SyncResult> {
112 let Some(hdl) = self.task.lock().take() else {
113 bail!("CDN task was already awaited");
114 };
115
116 let result = hdl.await.map_err(|err| anyhow!("Failed to wait for CDN task: {err}"));
117 self.done.store(true, Ordering::SeqCst);
118 result
119 }
120
121 async fn worker<N: Network, C: ConsensusStorage<N>>(
122 base_url: http::Uri,
123 ledger: Ledger<N, C>,
124 stoppable: Arc<dyn Stoppable>,
125 ) -> SyncResult {
126 let start_height = ledger.latest_height() + 1;
128 let ledger_clone = ledger.clone();
130 let result = load_blocks(&base_url, start_height, None, stoppable, move |block: Block<N>| {
131 ledger_clone
132 .advance_to_next_block(&block)
133 .with_context(|| format!("Failed to advance to block {} at height {}", block.hash(), block.height()))
134 })
135 .await;
136
137 match result {
140 Ok(completed_height) => Ok(completed_height),
141 Err((completed_height, error)) => {
142 warn!("{}", flatten_error(error.context("Failed to sync block(s) from the CDN")));
143
144 if completed_height != start_height {
146 debug!("Synced the ledger up to block {completed_height}");
147
148 let node_height = *ledger.vm().block_store().heights().max().unwrap_or_default();
150 if node_height != completed_height {
152 return Err((
153 completed_height,
154 anyhow!("The ledger height does not match the last sync height"),
155 ));
156 }
157
158 if let Err(err) = ledger.get_block(node_height) {
160 return Err((completed_height, err));
161 }
162 }
163
164 Ok(completed_height)
165 }
166 }
167 }
168
169 pub async fn get_cdn_height(&self) -> anyhow::Result<u32> {
170 let client = Client::builder().use_rustls_tls().user_agent(cdn_user_agent()).build()?;
171 cdn_height::<BLOCKS_PER_FILE>(&client, &self.base_url).await
172 }
173}
174
175pub async fn load_blocks<N: Network>(
180 base_url: &http::Uri,
181 start_height: u32,
182 end_height: Option<u32>,
183 stoppable: Arc<dyn Stoppable>,
184 process: impl FnMut(Block<N>) -> Result<()> + Clone + Send + Sync + 'static,
185) -> Result<u32, (u32, anyhow::Error)> {
186 let client = match Client::builder().use_rustls_tls().user_agent(cdn_user_agent()).build() {
188 Ok(client) => client,
189 Err(error) => {
190 return Err((start_height.saturating_sub(1), anyhow!("Failed to create a CDN request client - {error}")));
191 }
192 };
193
194 let cdn_height = match cdn_height::<BLOCKS_PER_FILE>(&client, base_url).await {
196 Ok(cdn_height) => cdn_height,
197 Err(error) => return Err((start_height, error)),
198 };
199 if cdn_height < start_height {
201 return Err((
202 start_height,
203 anyhow!("The given start height ({start_height}) must be less than the CDN height ({cdn_height})"),
204 ));
205 }
206
207 let end_height = cmp::min(end_height.unwrap_or(cdn_height), cdn_height);
210 if end_height < start_height {
212 return Err((
213 start_height,
214 anyhow!("The given end height ({end_height}) must not be less than the start height ({start_height})"),
215 ));
216 }
217
218 let cdn_start = start_height - (start_height % BLOCKS_PER_FILE);
220 let cdn_end = end_height;
222 if cdn_start >= cdn_end {
224 return Ok(cdn_end);
225 }
226
227 let pending_blocks: Arc<TMutex<Vec<Block<N>>>> = Default::default();
229
230 let timer = Instant::now();
232
233 let pending_blocks_clone = pending_blocks.clone();
235 let base_url = base_url.to_owned();
236
237 {
238 let stoppable = stoppable.clone();
239 tokio::spawn(async move {
240 download_block_bundles(client, &base_url, cdn_start, cdn_end, pending_blocks_clone, stoppable).await;
241 });
242 }
243
244 let threadpool = Arc::new(rayon::ThreadPoolBuilder::new().build().unwrap());
246
247 let mut current_height = start_height.saturating_sub(1);
249 while current_height < end_height - 1 {
250 if stoppable.is_stopped() {
252 info!("Stopping block sync at {} - shutting down", current_height);
253 return Ok(current_height);
255 }
256
257 let mut candidate_blocks = pending_blocks.lock().await;
258
259 let Some(next_height) = candidate_blocks.first().map(|b| b.height()) else {
261 debug!("No pending blocks yet");
262 drop(candidate_blocks);
263 tokio::time::sleep(Duration::from_secs(3)).await;
264 continue;
265 };
266
267 if next_height > current_height + 1 {
269 debug!("Waiting for the first relevant blocks ({} pending)", candidate_blocks.len());
271 drop(candidate_blocks);
272 tokio::time::sleep(Duration::from_secs(1)).await;
273 continue;
274 }
275
276 let retained_blocks = candidate_blocks.split_off(BLOCKS_PER_FILE as usize);
278 let next_blocks = std::mem::replace(&mut *candidate_blocks, retained_blocks);
279 drop(candidate_blocks);
280
281 let mut process_clone = process.clone();
283 let stoppable_clone = stoppable.clone();
284 let threadpool_clone = threadpool.clone();
285 current_height = tokio::task::spawn_blocking(move || {
286 threadpool_clone.install(|| {
287 for block in next_blocks.into_iter().filter(|b| (start_height..end_height).contains(&b.height())) {
288 if stoppable_clone.is_stopped() {
290 info!("Stopping block sync at {} - the node is shutting down", current_height);
291 break;
293 }
294
295 let block_height = block.height();
297
298 process_clone(block)?;
300
301 current_height = block_height;
303
304 #[cfg(feature = "metrics")]
306 update_block_metrics(current_height);
307
308 log_progress::<BLOCKS_PER_FILE>(timer, current_height, cdn_start, cdn_end, "block");
310 }
311
312 Ok(current_height)
313 })
314 })
315 .await
316 .map_err(|e| (current_height, e.into()))?
317 .map_err(|e| (current_height, e))?;
318 }
319
320 Ok(current_height)
321}
322
323async fn download_block_bundles<N: Network>(
324 client: Client,
325 base_url: &http::Uri,
326 cdn_start: u32,
327 cdn_end: u32,
328 pending_blocks: Arc<TMutex<Vec<Block<N>>>>,
329 stoppable: Arc<dyn Stoppable>,
330) {
331 let active_requests: Arc<AtomicU32> = Default::default();
333
334 let mut start = cdn_start;
335 while start < cdn_end - 1 {
336 if stoppable.is_stopped() {
338 break;
339 }
340
341 let num_pending_blocks = pending_blocks.lock().await.len();
343 if num_pending_blocks >= MAXIMUM_PENDING_BLOCKS as usize {
344 debug!("Maximum number of pending blocks reached ({num_pending_blocks}), waiting...");
345 tokio::time::sleep(Duration::from_secs(5)).await;
346 continue;
347 }
348
349 let active_request_count = active_requests.load(Ordering::Relaxed);
352 let num_requests =
353 cmp::min(CONCURRENT_REQUESTS, (MAXIMUM_PENDING_BLOCKS - num_pending_blocks as u32) / BLOCKS_PER_FILE)
354 .saturating_sub(active_request_count);
355
356 for i in 0..num_requests {
358 let start = start + i * BLOCKS_PER_FILE;
359 let end = start + BLOCKS_PER_FILE;
360
361 if end > cdn_end + BLOCKS_PER_FILE {
363 debug!("Finishing network requests to the CDN...");
364 break;
365 }
366
367 let client_clone = client.clone();
368 let base_url_clone = base_url.clone();
369 let pending_blocks_clone = pending_blocks.clone();
370 let active_requests_clone = active_requests.clone();
371 let stoppable_clone = stoppable.clone();
372 tokio::spawn(async move {
373 active_requests_clone.fetch_add(1, Ordering::Relaxed);
375
376 let ctx = format!("blocks {start} to {end}");
377 debug!("Requesting {ctx} (of {cdn_end})");
378
379 let blocks_url = format!("{base_url_clone}/{start}.{end}.blocks");
381 let ctx = format!("blocks {start} to {end}");
382 let mut attempts = 0;
384 let request_time = Instant::now();
385
386 loop {
387 match cdn_get(client_clone.clone(), &blocks_url, &ctx).await {
389 Ok::<Vec<Block<N>>, _>(blocks) => {
390 let mut pending_blocks = pending_blocks_clone.lock().await;
392 for block in blocks {
393 match pending_blocks.binary_search_by_key(&block.height(), |b| b.height()) {
394 Ok(_idx) => warn!("Found a duplicate pending block at height {}", block.height()),
395 Err(idx) => pending_blocks.insert(idx, block),
396 }
397 }
398 debug!("Received {ctx} {}", format!("(in {:.2?})", request_time.elapsed()).dimmed());
399 break;
400 }
401 Err(error) => {
402 attempts += 1;
405 if attempts > MAXIMUM_REQUEST_ATTEMPTS {
406 warn!("Maximum number of requests to {blocks_url} reached - shutting down...");
407 stoppable_clone.stop();
408 break;
409 }
410 tokio::time::sleep(Duration::from_secs(attempts as u64 * 10)).await;
411 warn!("{error} - retrying ({attempts} attempt(s) so far)");
412 }
413 }
414 }
415
416 active_requests_clone.fetch_sub(1, Ordering::Relaxed);
418 });
419 }
420
421 start += BLOCKS_PER_FILE * num_requests;
423
424 tokio::time::sleep(Duration::from_secs(1)).await;
426 }
427
428 debug!("Finished network requests to the CDN");
429}
430
431async fn cdn_height<const BLOCKS_PER_FILE: u32>(client: &Client, base_url: &http::Uri) -> Result<u32> {
436 #[derive(Deserialize, Serialize, Debug)]
438 struct LatestState {
439 exclusive_height: u32,
440 inclusive_height: u32,
441 hash: String,
442 }
443 let latest_json_url = format!("{base_url}/latest.json");
445 let response = match client.get(latest_json_url).send().await {
447 Ok(response) => response,
448 Err(error) => bail!("Failed to fetch the CDN height - {error}"),
449 };
450 let bytes = match response.bytes().await {
452 Ok(bytes) => bytes,
453 Err(error) => bail!("Failed to parse the CDN height response - {error}"),
454 };
455 let latest_state_string = match bincode::deserialize::<String>(&bytes) {
457 Ok(string) => string,
458 Err(error) => {
459 let bytes_as_string = String::from_utf8_lossy(&bytes);
460 bail!("Failed to deserialize the CDN height response - {error} - {bytes_as_string}")
461 }
462 };
463 let tip = match serde_json::from_str::<LatestState>(&latest_state_string) {
465 Ok(latest) => latest.exclusive_height,
466 Err(error) => bail!("Failed to extract the CDN height response - {error}"),
467 };
468 let tip = tip.saturating_sub(10);
470 Ok(tip - (tip % BLOCKS_PER_FILE) + BLOCKS_PER_FILE)
472}
473
474async fn cdn_get<T: 'static + DeserializeOwned + Send>(client: Client, url: &str, ctx: &str) -> Result<T> {
476 let response = match client.get(url).send().await {
478 Ok(response) => response,
479 Err(error) => bail!("Failed to fetch {ctx} - {error}"),
480 };
481 let bytes = match response.bytes().await {
483 Ok(bytes) => bytes,
484 Err(error) => bail!("Failed to parse {ctx} - {error}"),
485 };
486
487 match tokio::task::spawn_blocking(move || (unchecked_deserialize::<T>(&bytes), bytes)).await {
489 Ok((Ok(objects), _)) => Ok(objects),
490 Ok((Err(error), response_bytes)) => {
491 let bytes_as_string = String::from_utf8_lossy(&response_bytes);
492 bail!("Failed to deserialize {ctx} - {error} - {bytes_as_string}")
493 }
494 Err(error) => {
495 bail!("Failed to join task for {ctx} - {error}")
496 }
497 }
498}
499
500fn to_human_readable_duration(duration: Duration) -> String {
507 const SECS_PER_MIN: u64 = 60;
510 const MINS_PER_HOUR: u64 = 60;
511 const SECS_PER_HOUR: u64 = SECS_PER_MIN * MINS_PER_HOUR;
512 const HOURS_PER_DAY: u64 = 24;
513 const SECS_PER_DAY: u64 = SECS_PER_HOUR * HOURS_PER_DAY;
514
515 let duration = duration.as_secs();
516
517 if duration < 1 {
518 "less than one second".to_string()
519 } else if duration < SECS_PER_MIN {
520 format!("{duration} seconds")
521 } else if duration < SECS_PER_HOUR {
522 format!("{} minutes", duration / SECS_PER_MIN)
523 } else if duration < SECS_PER_DAY {
524 let mins = duration / SECS_PER_MIN;
525 format!("{hours} hours and {remainder} minutes", hours = mins / 60, remainder = mins % 60)
526 } else {
527 let days = duration / SECS_PER_DAY;
528 let hours = (duration % SECS_PER_DAY) / SECS_PER_HOUR;
529 format!("{days} days and {hours} hours")
530 }
531}
532
533fn log_progress<const OBJECTS_PER_FILE: u32>(
535 timer: Instant,
536 current_index: u32,
537 cdn_start: u32,
538 mut cdn_end: u32,
539 object_name: &str,
540) {
541 debug_assert!(cdn_start <= cdn_end);
542 debug_assert!(current_index <= cdn_end);
543 debug_assert!(cdn_end >= 1);
544
545 cdn_end -= 1;
547
548 let sync_percentage =
550 (current_index.saturating_sub(cdn_start) * 100).checked_div(cdn_end.saturating_sub(cdn_start)).unwrap_or(100);
551
552 let num_files_done = 1 + (current_index - cdn_start) / OBJECTS_PER_FILE;
554 let num_files_remaining = 1 + (cdn_end.saturating_sub(current_index)) / OBJECTS_PER_FILE;
556 let millis_per_file = timer.elapsed().as_millis() / num_files_done as u128;
558 let slowdown = 100 * num_files_remaining as u128;
560 let time_remaining = {
562 let remaining = num_files_remaining as u128 * millis_per_file + slowdown;
563 to_human_readable_duration(Duration::from_secs((remaining / 1000) as u64))
564 };
565 let estimate = format!("(started at height {cdn_start}, est. {time_remaining} remaining)");
567 info!(
569 "Reached {object_name} {current_index} of {cdn_end} - Sync is {sync_percentage}% complete {}",
570 estimate.dimmed()
571 );
572}
573
574#[cfg(test)]
575mod tests {
576 use super::{BLOCKS_PER_FILE, CDN_BASE_URL, cdn_height, load_blocks, log_progress};
577
578 use snarkos_utilities::SimpleStoppable;
579
580 use snarkvm::prelude::{MainnetV0, block::Block};
581
582 use http::Uri;
583 use parking_lot::RwLock;
584 use std::{sync::Arc, time::Instant};
585
586 type CurrentNetwork = MainnetV0;
587
588 fn check_load_blocks(start: u32, end: Option<u32>, expected: usize) {
589 let blocks = Arc::new(RwLock::new(Vec::new()));
590 let blocks_clone = blocks.clone();
591 let process = move |block: Block<CurrentNetwork>| {
592 blocks_clone.write().push(block);
593 Ok(())
594 };
595
596 let testnet_cdn_url = Uri::try_from(format!("{CDN_BASE_URL}/mainnet")).unwrap();
597
598 let rt = tokio::runtime::Runtime::new().unwrap();
599 rt.block_on(async {
600 let completed_height =
601 load_blocks(&testnet_cdn_url, start, end, SimpleStoppable::new(), process).await.unwrap();
602 assert_eq!(blocks.read().len(), expected);
603 if expected > 0 {
604 assert_eq!(blocks.read().last().unwrap().height(), completed_height);
605 }
606 for (i, block) in blocks.read().iter().enumerate() {
608 assert_eq!(block.height(), start + i as u32);
609 }
610 });
611 }
612
613 #[test]
614 fn test_load_blocks_0_to_50() {
615 let start_height = 0;
616 let end_height = Some(50);
617 check_load_blocks(start_height, end_height, 50);
618 }
619
620 #[test]
621 fn test_load_blocks_50_to_100() {
622 let start_height = 50;
623 let end_height = Some(100);
624 check_load_blocks(start_height, end_height, 50);
625 }
626
627 #[test]
628 fn test_load_blocks_0_to_123() {
629 let start_height = 0;
630 let end_height = Some(123);
631 check_load_blocks(start_height, end_height, 123);
632 }
633
634 #[test]
635 fn test_load_blocks_46_to_234() {
636 let start_height = 46;
637 let end_height = Some(234);
638 check_load_blocks(start_height, end_height, 188);
639 }
640
641 #[test]
642 fn test_cdn_height() {
643 let rt = tokio::runtime::Runtime::new().unwrap();
644 let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
645 let testnet_cdn_url = Uri::try_from(format!("{CDN_BASE_URL}/mainnet")).unwrap();
646 rt.block_on(async {
647 let height = cdn_height::<BLOCKS_PER_FILE>(&client, &testnet_cdn_url).await.unwrap();
648 assert!(height > 0);
649 });
650 }
651
652 #[test]
653 fn test_log_progress() {
654 let timer = Instant::now();
656 let cdn_start = 0;
657 let cdn_end = 100;
658 let object_name = "blocks";
659 log_progress::<10>(timer, 0, cdn_start, cdn_end, object_name);
660 log_progress::<10>(timer, 10, cdn_start, cdn_end, object_name);
661 log_progress::<10>(timer, 20, cdn_start, cdn_end, object_name);
662 log_progress::<10>(timer, 30, cdn_start, cdn_end, object_name);
663 log_progress::<10>(timer, 40, cdn_start, cdn_end, object_name);
664 log_progress::<10>(timer, 50, cdn_start, cdn_end, object_name);
665 log_progress::<10>(timer, 60, cdn_start, cdn_end, object_name);
666 log_progress::<10>(timer, 70, cdn_start, cdn_end, object_name);
667 log_progress::<10>(timer, 80, cdn_start, cdn_end, object_name);
668 log_progress::<10>(timer, 90, cdn_start, cdn_end, object_name);
669 log_progress::<10>(timer, 100, cdn_start, cdn_end, object_name);
670 }
671
672 #[test]
673 fn test_cdn_user_agent() {
674 use super::cdn_user_agent;
675 let ua = cdn_user_agent();
676 assert!(ua.starts_with("snarkos/"), "user-agent must start with 'snarkos/': got {ua}");
678 let version = ua.strip_prefix("snarkos/").unwrap();
680 assert!(!version.is_empty(), "user-agent version must not be empty");
681 }
682}