Skip to main content

snarkos_node_cdn/
blocks.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16// Avoid a false positive from clippy:
17// https://github.com/rust-lang/rust-clippy/issues/6446
18#![allow(clippy::await_holding_lock)]
19
20use snarkos_utilities::{SignalHandler, Stoppable};
21
22use snarkvm::{
23    prelude::{Deserialize, DeserializeOwned, Ledger, Network, Serialize, block::Block, store::ConsensusStorage},
24    utilities::{flatten_error, unchecked_deserialize},
25};
26
27use anyhow::{Context, Result, anyhow, bail};
28use colored::Colorize;
29#[cfg(feature = "locktick")]
30use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
31#[cfg(not(feature = "locktick"))]
32use parking_lot::Mutex;
33use reqwest::Client;
34use std::{
35    cmp,
36    sync::{
37        Arc,
38        atomic::{AtomicBool, AtomicU32, Ordering},
39    },
40    time::{Duration, Instant},
41};
42#[cfg(not(feature = "locktick"))]
43use tokio::sync::Mutex as TMutex;
44use tokio::task::JoinHandle;
45
46/// The number of blocks per file.
47const BLOCKS_PER_FILE: u32 = 50;
48/// The desired number of concurrent requests to the CDN.
49const CONCURRENT_REQUESTS: u32 = 16;
50/// Maximum number of pending sync blocks.
51const MAXIMUM_PENDING_BLOCKS: u32 = BLOCKS_PER_FILE * CONCURRENT_REQUESTS * 2;
52/// Maximum number of attempts for a request to the CDN.
53const MAXIMUM_REQUEST_ATTEMPTS: u8 = 10;
54
55/// The CDN base url.
56pub const CDN_BASE_URL: &str = "https://cdn.provable.com/v0/blocks";
57
58/// Returns the user-agent string for CDN requests.
59const fn cdn_user_agent() -> &'static str {
60    concat!("snarkos/", env!("CARGO_PKG_VERSION"))
61}
62
63/// Updates the metrics during CDN sync.
64#[cfg(feature = "metrics")]
65fn update_block_metrics(height: u32) {
66    // Update the BFT height metric.
67    crate::metrics::gauge(crate::metrics::bft::HEIGHT, height as f64);
68}
69
70pub type SyncResult = Result<u32, (u32, anyhow::Error)>;
71
72/// Manages the CDN sync task.
73///
74/// This is used, for example, in snarkos_node_rest to query how
75/// far along the CDN sync is.
76pub struct CdnBlockSync {
77    base_url: http::Uri,
78    /// The background tasks that performs the sync operation.
79    task: Mutex<Option<JoinHandle<SyncResult>>>,
80    /// This flag will be set to true once the sync task has been successfully awaited.
81    done: AtomicBool,
82}
83
84impl CdnBlockSync {
85    /// Spawn a background task that loads blocks from a CDN into the ledger.
86    pub fn new<N: Network, C: ConsensusStorage<N>>(
87        base_url: http::Uri,
88        ledger: Ledger<N, C>,
89        stoppable: Arc<SignalHandler>,
90    ) -> Self {
91        let task = {
92            let base_url = base_url.clone();
93            tokio::spawn(async move { Self::worker(base_url, ledger, stoppable).await })
94        };
95
96        debug!("Started sync from CDN at {base_url}");
97        Self { done: AtomicBool::new(false), base_url, task: Mutex::new(Some(task)) }
98    }
99
100    /// Did the CDN sync finish?
101    ///
102    /// Note: This can only return true if you call wait()
103    pub fn is_done(&self) -> bool {
104        self.done.load(Ordering::SeqCst)
105    }
106
107    /// Wait for CDN sync to finish. Can only be called once.
108    ///
109    /// On success, this function returns the completed block height.
110    /// On failure, this function returns the last successful block height (if any), along with the error.
111    pub async fn wait(&self) -> Result<SyncResult> {
112        let Some(hdl) = self.task.lock().take() else {
113            bail!("CDN task was already awaited");
114        };
115
116        let result = hdl.await.map_err(|err| anyhow!("Failed to wait for CDN task: {err}"));
117        self.done.store(true, Ordering::SeqCst);
118        result
119    }
120
121    async fn worker<N: Network, C: ConsensusStorage<N>>(
122        base_url: http::Uri,
123        ledger: Ledger<N, C>,
124        stoppable: Arc<dyn Stoppable>,
125    ) -> SyncResult {
126        // Fetch the node height.
127        let start_height = ledger.latest_height() + 1;
128        // Load the blocks from the CDN into the ledger.
129        let ledger_clone = ledger.clone();
130        let result = load_blocks(&base_url, start_height, None, stoppable, move |block: Block<N>| {
131            ledger_clone
132                .advance_to_next_block(&block)
133                .with_context(|| format!("Failed to advance to block {} at height {}", block.hash(), block.height()))
134        })
135        .await;
136
137        // TODO (howardwu): Find a way to resolve integrity failures.
138        // If the sync failed, check the integrity of the ledger.
139        match result {
140            Ok(completed_height) => Ok(completed_height),
141            Err((completed_height, error)) => {
142                warn!("{}", flatten_error(error.context("Failed to sync block(s) from the CDN")));
143
144                // If the sync made any progress, then check the integrity of the ledger.
145                if completed_height != start_height {
146                    debug!("Synced the ledger up to block {completed_height}");
147
148                    // Retrieve the latest height, according to the ledger.
149                    let node_height = *ledger.vm().block_store().heights().max().unwrap_or_default();
150                    // Check the integrity of the latest height.
151                    if node_height != completed_height {
152                        return Err((
153                            completed_height,
154                            anyhow!("The ledger height does not match the last sync height"),
155                        ));
156                    }
157
158                    // Fetch the latest block from the ledger.
159                    if let Err(err) = ledger.get_block(node_height) {
160                        return Err((completed_height, err));
161                    }
162                }
163
164                Ok(completed_height)
165            }
166        }
167    }
168
169    pub async fn get_cdn_height(&self) -> anyhow::Result<u32> {
170        let client = Client::builder().use_rustls_tls().user_agent(cdn_user_agent()).build()?;
171        cdn_height::<BLOCKS_PER_FILE>(&client, &self.base_url).await
172    }
173}
174
175/// Loads blocks from a CDN and process them with the given function.
176///
177/// On success, this function returns the completed block height.
178/// On failure, this function returns the last successful block height (if any), along with the error.
179pub async fn load_blocks<N: Network>(
180    base_url: &http::Uri,
181    start_height: u32,
182    end_height: Option<u32>,
183    stoppable: Arc<dyn Stoppable>,
184    process: impl FnMut(Block<N>) -> Result<()> + Clone + Send + Sync + 'static,
185) -> Result<u32, (u32, anyhow::Error)> {
186    // Create a Client to maintain a connection pool throughout the sync.
187    let client = match Client::builder().use_rustls_tls().user_agent(cdn_user_agent()).build() {
188        Ok(client) => client,
189        Err(error) => {
190            return Err((start_height.saturating_sub(1), anyhow!("Failed to create a CDN request client - {error}")));
191        }
192    };
193
194    // Fetch the CDN height.
195    let cdn_height = match cdn_height::<BLOCKS_PER_FILE>(&client, base_url).await {
196        Ok(cdn_height) => cdn_height,
197        Err(error) => return Err((start_height, error)),
198    };
199    // If the CDN height is less than the start height, return.
200    if cdn_height < start_height {
201        return Err((
202            start_height,
203            anyhow!("The given start height ({start_height}) must be less than the CDN height ({cdn_height})"),
204        ));
205    }
206
207    // If the end height is not specified, set it to the CDN height.
208    // If the end height is greater than the CDN height, set the end height to the CDN height.
209    let end_height = cmp::min(end_height.unwrap_or(cdn_height), cdn_height);
210    // If the end height is less than the start height, return.
211    if end_height < start_height {
212        return Err((
213            start_height,
214            anyhow!("The given end height ({end_height}) must not be less than the start height ({start_height})"),
215        ));
216    }
217
218    // Compute the CDN start height rounded down to the nearest multiple.
219    let cdn_start = start_height - (start_height % BLOCKS_PER_FILE);
220    // Set the CDN end height to the given end height.
221    let cdn_end = end_height;
222    // If the CDN range is empty, return.
223    if cdn_start >= cdn_end {
224        return Ok(cdn_end);
225    }
226
227    // A collection of downloaded blocks pending insertion into the ledger.
228    let pending_blocks: Arc<TMutex<Vec<Block<N>>>> = Default::default();
229
230    // Start a timer.
231    let timer = Instant::now();
232
233    // Spawn a background task responsible for concurrent downloads.
234    let pending_blocks_clone = pending_blocks.clone();
235    let base_url = base_url.to_owned();
236
237    {
238        let stoppable = stoppable.clone();
239        tokio::spawn(async move {
240            download_block_bundles(client, &base_url, cdn_start, cdn_end, pending_blocks_clone, stoppable).await;
241        });
242    }
243
244    // Initialize a temporary threadpool that can use the full CPU.
245    let threadpool = Arc::new(rayon::ThreadPoolBuilder::new().build().unwrap());
246
247    // A loop for inserting the pending blocks into the ledger.
248    let mut current_height = start_height.saturating_sub(1);
249    while current_height < end_height - 1 {
250        // If we are instructed to shut down, abort.
251        if stoppable.is_stopped() {
252            info!("Stopping block sync at {} - shutting down", current_height);
253            // We can shut down cleanly from here, as the node hasn't been started yet.
254            return Ok(current_height);
255        }
256
257        let mut candidate_blocks = pending_blocks.lock().await;
258
259        // Obtain the height of the nearest pending block.
260        let Some(next_height) = candidate_blocks.first().map(|b| b.height()) else {
261            debug!("No pending blocks yet");
262            drop(candidate_blocks);
263            tokio::time::sleep(Duration::from_secs(3)).await;
264            continue;
265        };
266
267        // Wait if the nearest pending block is not the next one that can be inserted.
268        if next_height > current_height + 1 {
269            // There is a gap in pending blocks, we need to wait.
270            debug!("Waiting for the first relevant blocks ({} pending)", candidate_blocks.len());
271            drop(candidate_blocks);
272            tokio::time::sleep(Duration::from_secs(1)).await;
273            continue;
274        }
275
276        // Obtain the first BLOCKS_PER_FILE applicable blocks.
277        let retained_blocks = candidate_blocks.split_off(BLOCKS_PER_FILE as usize);
278        let next_blocks = std::mem::replace(&mut *candidate_blocks, retained_blocks);
279        drop(candidate_blocks);
280
281        // Attempt to advance the ledger using the CDN block bundle.
282        let mut process_clone = process.clone();
283        let stoppable_clone = stoppable.clone();
284        let threadpool_clone = threadpool.clone();
285        current_height = tokio::task::spawn_blocking(move || {
286            threadpool_clone.install(|| {
287                for block in next_blocks.into_iter().filter(|b| (start_height..end_height).contains(&b.height())) {
288                    // If we are instructed to shut down, abort.
289                    if stoppable_clone.is_stopped() {
290                        info!("Stopping block sync at {} - the node is shutting down", current_height);
291                        // We can shut down cleanly from here, as the node hasn't been started yet.
292                        break;
293                    }
294
295                    // Register the next block's height, as the block gets consumed next.
296                    let block_height = block.height();
297
298                    // Insert the block into the ledger.
299                    process_clone(block)?;
300
301                    // Update the current height.
302                    current_height = block_height;
303
304                    // Update metrics.
305                    #[cfg(feature = "metrics")]
306                    update_block_metrics(current_height);
307
308                    // Log the progress.
309                    log_progress::<BLOCKS_PER_FILE>(timer, current_height, cdn_start, cdn_end, "block");
310                }
311
312                Ok(current_height)
313            })
314        })
315        .await
316        .map_err(|e| (current_height, e.into()))?
317        .map_err(|e| (current_height, e))?;
318    }
319
320    Ok(current_height)
321}
322
323async fn download_block_bundles<N: Network>(
324    client: Client,
325    base_url: &http::Uri,
326    cdn_start: u32,
327    cdn_end: u32,
328    pending_blocks: Arc<TMutex<Vec<Block<N>>>>,
329    stoppable: Arc<dyn Stoppable>,
330) {
331    // Keep track of the number of concurrent requests.
332    let active_requests: Arc<AtomicU32> = Default::default();
333
334    let mut start = cdn_start;
335    while start < cdn_end - 1 {
336        // If we are instructed to shut down, stop downloading.
337        if stoppable.is_stopped() {
338            break;
339        }
340
341        // Avoid collecting too many blocks in order to restrict memory use.
342        let num_pending_blocks = pending_blocks.lock().await.len();
343        if num_pending_blocks >= MAXIMUM_PENDING_BLOCKS as usize {
344            debug!("Maximum number of pending blocks reached ({num_pending_blocks}), waiting...");
345            tokio::time::sleep(Duration::from_secs(5)).await;
346            continue;
347        }
348
349        // The number of concurrent requests is maintained at CONCURRENT_REQUESTS, unless the maximum
350        // number of pending blocks may be breached.
351        let active_request_count = active_requests.load(Ordering::Relaxed);
352        let num_requests =
353            cmp::min(CONCURRENT_REQUESTS, (MAXIMUM_PENDING_BLOCKS - num_pending_blocks as u32) / BLOCKS_PER_FILE)
354                .saturating_sub(active_request_count);
355
356        // Spawn concurrent requests for bundles of blocks.
357        for i in 0..num_requests {
358            let start = start + i * BLOCKS_PER_FILE;
359            let end = start + BLOCKS_PER_FILE;
360
361            // If this request would breach the upper limit, stop downloading.
362            if end > cdn_end + BLOCKS_PER_FILE {
363                debug!("Finishing network requests to the CDN...");
364                break;
365            }
366
367            let client_clone = client.clone();
368            let base_url_clone = base_url.clone();
369            let pending_blocks_clone = pending_blocks.clone();
370            let active_requests_clone = active_requests.clone();
371            let stoppable_clone = stoppable.clone();
372            tokio::spawn(async move {
373                // Increment the number of active requests.
374                active_requests_clone.fetch_add(1, Ordering::Relaxed);
375
376                let ctx = format!("blocks {start} to {end}");
377                debug!("Requesting {ctx} (of {cdn_end})");
378
379                // Prepare the URL.
380                let blocks_url = format!("{base_url_clone}/{start}.{end}.blocks");
381                let ctx = format!("blocks {start} to {end}");
382                // Download blocks, retrying on failure.
383                let mut attempts = 0;
384                let request_time = Instant::now();
385
386                loop {
387                    // Fetch the blocks.
388                    match cdn_get(client_clone.clone(), &blocks_url, &ctx).await {
389                        Ok::<Vec<Block<N>>, _>(blocks) => {
390                            // Keep the collection of pending blocks sorted by the height.
391                            let mut pending_blocks = pending_blocks_clone.lock().await;
392                            for block in blocks {
393                                match pending_blocks.binary_search_by_key(&block.height(), |b| b.height()) {
394                                    Ok(_idx) => warn!("Found a duplicate pending block at height {}", block.height()),
395                                    Err(idx) => pending_blocks.insert(idx, block),
396                                }
397                            }
398                            debug!("Received {ctx} {}", format!("(in {:.2?})", request_time.elapsed()).dimmed());
399                            break;
400                        }
401                        Err(error) => {
402                            // Increment the attempt counter, and wait with a linear backoff, or abort in
403                            // case the maximum number of attempts has been breached.
404                            attempts += 1;
405                            if attempts > MAXIMUM_REQUEST_ATTEMPTS {
406                                warn!("Maximum number of requests to {blocks_url} reached - shutting down...");
407                                stoppable_clone.stop();
408                                break;
409                            }
410                            tokio::time::sleep(Duration::from_secs(attempts as u64 * 10)).await;
411                            warn!("{error} - retrying ({attempts} attempt(s) so far)");
412                        }
413                    }
414                }
415
416                // Decrement the number of active requests.
417                active_requests_clone.fetch_sub(1, Ordering::Relaxed);
418            });
419        }
420
421        // Increase the starting block height for the subsequent requests.
422        start += BLOCKS_PER_FILE * num_requests;
423
424        // A short sleep in order to allow some block processing to happen in the meantime.
425        tokio::time::sleep(Duration::from_secs(1)).await;
426    }
427
428    debug!("Finished network requests to the CDN");
429}
430
431/// Retrieves the CDN height with the given base URL.
432///
433/// Note: This function decrements the tip by a few blocks, to ensure the
434/// tip is not on a block that is not yet available on the CDN.
435async fn cdn_height<const BLOCKS_PER_FILE: u32>(client: &Client, base_url: &http::Uri) -> Result<u32> {
436    // A representation of the 'latest.json' file object.
437    #[derive(Deserialize, Serialize, Debug)]
438    struct LatestState {
439        exclusive_height: u32,
440        inclusive_height: u32,
441        hash: String,
442    }
443    // Prepare the URL.
444    let latest_json_url = format!("{base_url}/latest.json");
445    // Send the request.
446    let response = match client.get(latest_json_url).send().await {
447        Ok(response) => response,
448        Err(error) => bail!("Failed to fetch the CDN height - {error}"),
449    };
450    // Parse the response.
451    let bytes = match response.bytes().await {
452        Ok(bytes) => bytes,
453        Err(error) => bail!("Failed to parse the CDN height response - {error}"),
454    };
455    // Parse the bytes for the string.
456    let latest_state_string = match bincode::deserialize::<String>(&bytes) {
457        Ok(string) => string,
458        Err(error) => {
459            let bytes_as_string = String::from_utf8_lossy(&bytes);
460            bail!("Failed to deserialize the CDN height response - {error} - {bytes_as_string}")
461        }
462    };
463    // Parse the string for the tip.
464    let tip = match serde_json::from_str::<LatestState>(&latest_state_string) {
465        Ok(latest) => latest.exclusive_height,
466        Err(error) => bail!("Failed to extract the CDN height response - {error}"),
467    };
468    // Decrement the tip by a few blocks to ensure the CDN is caught up.
469    let tip = tip.saturating_sub(10);
470    // Adjust the tip to the closest subsequent multiple of BLOCKS_PER_FILE.
471    Ok(tip - (tip % BLOCKS_PER_FILE) + BLOCKS_PER_FILE)
472}
473
474/// Retrieves the objects from the CDN with the given URL.
475async fn cdn_get<T: 'static + DeserializeOwned + Send>(client: Client, url: &str, ctx: &str) -> Result<T> {
476    // Fetch the bytes from the given URL.
477    let response = match client.get(url).send().await {
478        Ok(response) => response,
479        Err(error) => bail!("Failed to fetch {ctx} - {error}"),
480    };
481    // Parse the response.
482    let bytes = match response.bytes().await {
483        Ok(bytes) => bytes,
484        Err(error) => bail!("Failed to parse {ctx} - {error}"),
485    };
486
487    // Parse the objects.
488    match tokio::task::spawn_blocking(move || (unchecked_deserialize::<T>(&bytes), bytes)).await {
489        Ok((Ok(objects), _)) => Ok(objects),
490        Ok((Err(error), response_bytes)) => {
491            let bytes_as_string = String::from_utf8_lossy(&response_bytes);
492            bail!("Failed to deserialize {ctx} - {error} - {bytes_as_string}")
493        }
494        Err(error) => {
495            bail!("Failed to join task for {ctx} - {error}")
496        }
497    }
498}
499
500/// Converts a duration into a string that humans can read easily.
501///
502/// # Output
503/// The output remains to be accurate but not too detailed (to reduce noise in the log).
504/// It will give at most two levels of granularity, e.g., days and hours,
505/// and only shows seconds if less than a minute remains.
506fn to_human_readable_duration(duration: Duration) -> String {
507    // TODO: simplify this once the duration_constructors feature is stable
508    // See: https://github.com/rust-lang/rust/issues/140881
509    const SECS_PER_MIN: u64 = 60;
510    const MINS_PER_HOUR: u64 = 60;
511    const SECS_PER_HOUR: u64 = SECS_PER_MIN * MINS_PER_HOUR;
512    const HOURS_PER_DAY: u64 = 24;
513    const SECS_PER_DAY: u64 = SECS_PER_HOUR * HOURS_PER_DAY;
514
515    let duration = duration.as_secs();
516
517    if duration < 1 {
518        "less than one second".to_string()
519    } else if duration < SECS_PER_MIN {
520        format!("{duration} seconds")
521    } else if duration < SECS_PER_HOUR {
522        format!("{} minutes", duration / SECS_PER_MIN)
523    } else if duration < SECS_PER_DAY {
524        let mins = duration / SECS_PER_MIN;
525        format!("{hours} hours and {remainder} minutes", hours = mins / 60, remainder = mins % 60)
526    } else {
527        let days = duration / SECS_PER_DAY;
528        let hours = (duration % SECS_PER_DAY) / SECS_PER_HOUR;
529        format!("{days} days and {hours} hours")
530    }
531}
532
533/// Logs the progress of the sync.
534fn log_progress<const OBJECTS_PER_FILE: u32>(
535    timer: Instant,
536    current_index: u32,
537    cdn_start: u32,
538    mut cdn_end: u32,
539    object_name: &str,
540) {
541    debug_assert!(cdn_start <= cdn_end);
542    debug_assert!(current_index <= cdn_end);
543    debug_assert!(cdn_end >= 1);
544
545    // Subtract 1, as the end of the range is exclusive.
546    cdn_end -= 1;
547
548    // Compute the percentage completed of this particular sync.
549    let sync_percentage =
550        (current_index.saturating_sub(cdn_start) * 100).checked_div(cdn_end.saturating_sub(cdn_start)).unwrap_or(100);
551
552    // Compute the number of files processed so far.
553    let num_files_done = 1 + (current_index - cdn_start) / OBJECTS_PER_FILE;
554    // Compute the number of files remaining.
555    let num_files_remaining = 1 + (cdn_end.saturating_sub(current_index)) / OBJECTS_PER_FILE;
556    // Compute the milliseconds per file.
557    let millis_per_file = timer.elapsed().as_millis() / num_files_done as u128;
558    // Compute the heuristic slowdown factor (in millis).
559    let slowdown = 100 * num_files_remaining as u128;
560    // Compute the time remaining (in millis).
561    let time_remaining = {
562        let remaining = num_files_remaining as u128 * millis_per_file + slowdown;
563        to_human_readable_duration(Duration::from_secs((remaining / 1000) as u64))
564    };
565    // Prepare the estimate message (in secs).
566    let estimate = format!("(started at height {cdn_start}, est. {time_remaining} remaining)");
567    // Log the progress.
568    info!(
569        "Reached {object_name} {current_index} of {cdn_end} - Sync is {sync_percentage}% complete {}",
570        estimate.dimmed()
571    );
572}
573
574#[cfg(test)]
575mod tests {
576    use super::{BLOCKS_PER_FILE, CDN_BASE_URL, cdn_height, load_blocks, log_progress};
577
578    use snarkos_utilities::SimpleStoppable;
579
580    use snarkvm::prelude::{MainnetV0, block::Block};
581
582    use http::Uri;
583    use parking_lot::RwLock;
584    use std::{sync::Arc, time::Instant};
585
586    type CurrentNetwork = MainnetV0;
587
588    fn check_load_blocks(start: u32, end: Option<u32>, expected: usize) {
589        let blocks = Arc::new(RwLock::new(Vec::new()));
590        let blocks_clone = blocks.clone();
591        let process = move |block: Block<CurrentNetwork>| {
592            blocks_clone.write().push(block);
593            Ok(())
594        };
595
596        let testnet_cdn_url = Uri::try_from(format!("{CDN_BASE_URL}/mainnet")).unwrap();
597
598        let rt = tokio::runtime::Runtime::new().unwrap();
599        rt.block_on(async {
600            let completed_height =
601                load_blocks(&testnet_cdn_url, start, end, SimpleStoppable::new(), process).await.unwrap();
602            assert_eq!(blocks.read().len(), expected);
603            if expected > 0 {
604                assert_eq!(blocks.read().last().unwrap().height(), completed_height);
605            }
606            // Check they are sequential.
607            for (i, block) in blocks.read().iter().enumerate() {
608                assert_eq!(block.height(), start + i as u32);
609            }
610        });
611    }
612
613    #[test]
614    fn test_load_blocks_0_to_50() {
615        let start_height = 0;
616        let end_height = Some(50);
617        check_load_blocks(start_height, end_height, 50);
618    }
619
620    #[test]
621    fn test_load_blocks_50_to_100() {
622        let start_height = 50;
623        let end_height = Some(100);
624        check_load_blocks(start_height, end_height, 50);
625    }
626
627    #[test]
628    fn test_load_blocks_0_to_123() {
629        let start_height = 0;
630        let end_height = Some(123);
631        check_load_blocks(start_height, end_height, 123);
632    }
633
634    #[test]
635    fn test_load_blocks_46_to_234() {
636        let start_height = 46;
637        let end_height = Some(234);
638        check_load_blocks(start_height, end_height, 188);
639    }
640
641    #[test]
642    fn test_cdn_height() {
643        let rt = tokio::runtime::Runtime::new().unwrap();
644        let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
645        let testnet_cdn_url = Uri::try_from(format!("{CDN_BASE_URL}/mainnet")).unwrap();
646        rt.block_on(async {
647            let height = cdn_height::<BLOCKS_PER_FILE>(&client, &testnet_cdn_url).await.unwrap();
648            assert!(height > 0);
649        });
650    }
651
652    #[test]
653    fn test_log_progress() {
654        // This test sanity checks that basic arithmetic is correct (i.e. no divide by zero, etc.).
655        let timer = Instant::now();
656        let cdn_start = 0;
657        let cdn_end = 100;
658        let object_name = "blocks";
659        log_progress::<10>(timer, 0, cdn_start, cdn_end, object_name);
660        log_progress::<10>(timer, 10, cdn_start, cdn_end, object_name);
661        log_progress::<10>(timer, 20, cdn_start, cdn_end, object_name);
662        log_progress::<10>(timer, 30, cdn_start, cdn_end, object_name);
663        log_progress::<10>(timer, 40, cdn_start, cdn_end, object_name);
664        log_progress::<10>(timer, 50, cdn_start, cdn_end, object_name);
665        log_progress::<10>(timer, 60, cdn_start, cdn_end, object_name);
666        log_progress::<10>(timer, 70, cdn_start, cdn_end, object_name);
667        log_progress::<10>(timer, 80, cdn_start, cdn_end, object_name);
668        log_progress::<10>(timer, 90, cdn_start, cdn_end, object_name);
669        log_progress::<10>(timer, 100, cdn_start, cdn_end, object_name);
670    }
671
672    #[test]
673    fn test_cdn_user_agent() {
674        use super::cdn_user_agent;
675        let ua = cdn_user_agent();
676        // Must start with the "snarkos/" prefix.
677        assert!(ua.starts_with("snarkos/"), "user-agent must start with 'snarkos/': got {ua}");
678        // The version part must not be empty.
679        let version = ua.strip_prefix("snarkos/").unwrap();
680        assert!(!version.is_empty(), "user-agent version must not be empty");
681    }
682}