snarkos_node_cdn/
blocks.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16// Avoid a false positive from clippy:
17// https://github.com/rust-lang/rust-clippy/issues/6446
18#![allow(clippy::await_holding_lock)]
19
20use snarkvm::prelude::{
21    Deserialize,
22    DeserializeOwned,
23    Ledger,
24    Network,
25    Serialize,
26    block::Block,
27    store::{ConsensusStorage, cow_to_copied},
28};
29
30use anyhow::{Result, anyhow, bail};
31use colored::Colorize;
32use parking_lot::Mutex;
33use reqwest::Client;
34use std::{
35    cmp,
36    sync::{
37        Arc,
38        atomic::{AtomicBool, AtomicU32, Ordering},
39    },
40    time::{Duration, Instant},
41};
42
43/// The number of blocks per file.
44const BLOCKS_PER_FILE: u32 = 50;
45/// The desired number of concurrent requests to the CDN.
46const CONCURRENT_REQUESTS: u32 = 16;
47/// Maximum number of pending sync blocks.
48const MAXIMUM_PENDING_BLOCKS: u32 = BLOCKS_PER_FILE * CONCURRENT_REQUESTS * 2;
49/// Maximum number of attempts for a request to the CDN.
50const MAXIMUM_REQUEST_ATTEMPTS: u8 = 10;
51
52/// Loads blocks from a CDN into the ledger.
53///
54/// On success, this function returns the completed block height.
55/// On failure, this function returns the last successful block height (if any), along with the error.
56pub async fn sync_ledger_with_cdn<N: Network, C: ConsensusStorage<N>>(
57    base_url: &str,
58    ledger: Ledger<N, C>,
59    shutdown: Arc<AtomicBool>,
60) -> Result<u32, (u32, anyhow::Error)> {
61    // Fetch the node height.
62    let start_height = ledger.latest_height() + 1;
63    // Load the blocks from the CDN into the ledger.
64    let ledger_clone = ledger.clone();
65    let result = load_blocks(base_url, start_height, None, shutdown, move |block: Block<N>| {
66        ledger_clone.advance_to_next_block(&block)
67    })
68    .await;
69
70    // TODO (howardwu): Find a way to resolve integrity failures.
71    // If the sync failed, check the integrity of the ledger.
72    if let Err((completed_height, error)) = &result {
73        warn!("{error}");
74
75        // If the sync made any progress, then check the integrity of the ledger.
76        if *completed_height != start_height {
77            debug!("Synced the ledger up to block {completed_height}");
78
79            // Retrieve the latest height, according to the ledger.
80            let node_height = cow_to_copied!(ledger.vm().block_store().heights().max().unwrap_or_default());
81            // Check the integrity of the latest height.
82            if &node_height != completed_height {
83                return Err((*completed_height, anyhow!("The ledger height does not match the last sync height")));
84            }
85
86            // Fetch the latest block from the ledger.
87            if let Err(err) = ledger.get_block(node_height) {
88                return Err((*completed_height, err));
89            }
90        }
91
92        Ok(*completed_height)
93    } else {
94        result
95    }
96}
97
98/// Loads blocks from a CDN and process them with the given function.
99///
100/// On success, this function returns the completed block height.
101/// On failure, this function returns the last successful block height (if any), along with the error.
102pub async fn load_blocks<N: Network>(
103    base_url: &str,
104    start_height: u32,
105    end_height: Option<u32>,
106    shutdown: Arc<AtomicBool>,
107    process: impl FnMut(Block<N>) -> Result<()> + Clone + Send + Sync + 'static,
108) -> Result<u32, (u32, anyhow::Error)> {
109    // Create a Client to maintain a connection pool throughout the sync.
110    let client = match Client::builder().use_rustls_tls().build() {
111        Ok(client) => client,
112        Err(error) => {
113            return Err((start_height.saturating_sub(1), anyhow!("Failed to create a CDN request client - {error}")));
114        }
115    };
116
117    // Fetch the CDN height.
118    let cdn_height = match cdn_height::<BLOCKS_PER_FILE>(&client, base_url).await {
119        Ok(cdn_height) => cdn_height,
120        Err(error) => return Err((start_height, error)),
121    };
122    // If the CDN height is less than the start height, return.
123    if cdn_height < start_height {
124        return Err((
125            start_height,
126            anyhow!("The given start height ({start_height}) must be less than the CDN height ({cdn_height})"),
127        ));
128    }
129
130    // If the end height is not specified, set it to the CDN height.
131    // If the end height is greater than the CDN height, set the end height to the CDN height.
132    let end_height = cmp::min(end_height.unwrap_or(cdn_height), cdn_height);
133    // If the end height is less than the start height, return.
134    if end_height < start_height {
135        return Err((
136            start_height,
137            anyhow!("The given end height ({end_height}) must not be less than the start height ({start_height})"),
138        ));
139    }
140
141    // Compute the CDN start height rounded down to the nearest multiple.
142    let cdn_start = start_height - (start_height % BLOCKS_PER_FILE);
143    // Set the CDN end height to the given end height.
144    let cdn_end = end_height;
145    // If the CDN range is empty, return.
146    if cdn_start >= cdn_end {
147        return Ok(cdn_end);
148    }
149
150    // A collection of downloaded blocks pending insertion into the ledger.
151    let pending_blocks: Arc<Mutex<Vec<Block<N>>>> = Default::default();
152
153    // Start a timer.
154    let timer = Instant::now();
155
156    // Spawn a background task responsible for concurrent downloads.
157    let pending_blocks_clone = pending_blocks.clone();
158    let base_url = base_url.to_owned();
159    let shutdown_clone = shutdown.clone();
160    tokio::spawn(async move {
161        download_block_bundles(client, base_url, cdn_start, cdn_end, pending_blocks_clone, shutdown_clone).await;
162    });
163
164    // A loop for inserting the pending blocks into the ledger.
165    let mut current_height = start_height.saturating_sub(1);
166    while current_height < end_height - 1 {
167        // If we are instructed to shut down, abort.
168        if shutdown.load(Ordering::Acquire) {
169            info!("Stopping block sync at {} - shutting down", current_height);
170            // We can shut down cleanly from here, as the node hasn't been started yet.
171            std::process::exit(0);
172        }
173
174        let mut candidate_blocks = pending_blocks.lock();
175
176        // Obtain the height of the nearest pending block.
177        let Some(next_height) = candidate_blocks.first().map(|b| b.height()) else {
178            debug!("No pending blocks yet");
179            drop(candidate_blocks);
180            tokio::time::sleep(Duration::from_secs(3)).await;
181            continue;
182        };
183
184        // Wait if the nearest pending block is not the next one that can be inserted.
185        if next_height > current_height + 1 {
186            // There is a gap in pending blocks, we need to wait.
187            debug!("Waiting for the first relevant blocks ({} pending)", candidate_blocks.len());
188            drop(candidate_blocks);
189            tokio::time::sleep(Duration::from_secs(1)).await;
190            continue;
191        }
192
193        // Obtain the first BLOCKS_PER_FILE applicable blocks.
194        let retained_blocks = candidate_blocks.split_off(BLOCKS_PER_FILE as usize);
195        let next_blocks = std::mem::replace(&mut *candidate_blocks, retained_blocks);
196        drop(candidate_blocks);
197
198        // Attempt to advance the ledger using the CDN block bundle.
199        let mut process_clone = process.clone();
200        let shutdown_clone = shutdown.clone();
201        current_height = tokio::task::spawn_blocking(move || {
202            for block in next_blocks.into_iter().filter(|b| (start_height..end_height).contains(&b.height())) {
203                // If we are instructed to shut down, abort.
204                if shutdown_clone.load(Ordering::Relaxed) {
205                    info!("Stopping block sync at {} - the node is shutting down", current_height);
206                    // We can shut down cleanly from here, as the node hasn't been started yet.
207                    std::process::exit(0);
208                }
209
210                // Register the next block's height, as the block gets consumed next.
211                let block_height = block.height();
212
213                // Insert the block into the ledger.
214                process_clone(block)?;
215
216                // Update the current height.
217                current_height = block_height;
218
219                // Log the progress.
220                log_progress::<BLOCKS_PER_FILE>(timer, current_height, cdn_start, cdn_end, "block");
221            }
222
223            Ok(current_height)
224        })
225        .await
226        .map_err(|e| (current_height, e.into()))?
227        .map_err(|e| (current_height, e))?;
228    }
229
230    Ok(current_height)
231}
232
233async fn download_block_bundles<N: Network>(
234    client: Client,
235    base_url: String,
236    cdn_start: u32,
237    cdn_end: u32,
238    pending_blocks: Arc<Mutex<Vec<Block<N>>>>,
239    shutdown: Arc<AtomicBool>,
240) {
241    // Keep track of the number of concurrent requests.
242    let active_requests: Arc<AtomicU32> = Default::default();
243
244    let mut start = cdn_start;
245    while start < cdn_end - 1 {
246        // If we are instructed to shut down, stop downloading.
247        if shutdown.load(Ordering::Acquire) {
248            break;
249        }
250
251        // Avoid collecting too many blocks in order to restrict memory use.
252        let num_pending_blocks = pending_blocks.lock().len();
253        if num_pending_blocks >= MAXIMUM_PENDING_BLOCKS as usize {
254            debug!("Maximum number of pending blocks reached ({num_pending_blocks}), waiting...");
255            tokio::time::sleep(Duration::from_secs(5)).await;
256            continue;
257        }
258
259        // The number of concurrent requests is maintained at CONCURRENT_REQUESTS, unless the maximum
260        // number of pending blocks may be breached.
261        let active_request_count = active_requests.load(Ordering::Relaxed);
262        let num_requests =
263            cmp::min(CONCURRENT_REQUESTS, (MAXIMUM_PENDING_BLOCKS - num_pending_blocks as u32) / BLOCKS_PER_FILE)
264                .saturating_sub(active_request_count);
265
266        // Spawn concurrent requests for bundles of blocks.
267        for i in 0..num_requests {
268            let start = start + i * BLOCKS_PER_FILE;
269            let end = start + BLOCKS_PER_FILE;
270
271            // If this request would breach the upper limit, stop downloading.
272            if end > cdn_end + BLOCKS_PER_FILE {
273                debug!("Finishing network requests to the CDN...");
274                break;
275            }
276
277            let client_clone = client.clone();
278            let base_url_clone = base_url.clone();
279            let pending_blocks_clone = pending_blocks.clone();
280            let active_requests_clone = active_requests.clone();
281            let shutdown_clone = shutdown.clone();
282            tokio::spawn(async move {
283                // Increment the number of active requests.
284                active_requests_clone.fetch_add(1, Ordering::Relaxed);
285
286                let ctx = format!("blocks {start} to {end}");
287                debug!("Requesting {ctx} (of {cdn_end})");
288
289                // Prepare the URL.
290                let blocks_url = format!("{base_url_clone}/{start}.{end}.blocks");
291                let ctx = format!("blocks {start} to {end}");
292                // Download blocks, retrying on failure.
293                let mut attempts = 0;
294                let request_time = Instant::now();
295
296                loop {
297                    // Fetch the blocks.
298                    match cdn_get(client_clone.clone(), &blocks_url, &ctx).await {
299                        Ok::<Vec<Block<N>>, _>(blocks) => {
300                            // Keep the collection of pending blocks sorted by the height.
301                            let mut pending_blocks = pending_blocks_clone.lock();
302                            for block in blocks {
303                                match pending_blocks.binary_search_by_key(&block.height(), |b| b.height()) {
304                                    Ok(_idx) => warn!("Found a duplicate pending block at height {}", block.height()),
305                                    Err(idx) => pending_blocks.insert(idx, block),
306                                }
307                            }
308                            debug!("Received {ctx} {}", format!("(in {:.2?})", request_time.elapsed()).dimmed());
309                            break;
310                        }
311                        Err(error) => {
312                            // Increment the attempt counter, and wait with a linear backoff, or abort in
313                            // case the maximum number of attempts has been breached.
314                            attempts += 1;
315                            if attempts > MAXIMUM_REQUEST_ATTEMPTS {
316                                warn!("Maximum number of requests to {blocks_url} reached - shutting down...");
317                                shutdown_clone.store(true, Ordering::Relaxed);
318                                break;
319                            }
320                            tokio::time::sleep(Duration::from_secs(attempts as u64 * 10)).await;
321                            warn!("{error} - retrying ({attempts} attempt(s) so far)");
322                        }
323                    }
324                }
325
326                // Decrement the number of active requests.
327                active_requests_clone.fetch_sub(1, Ordering::Relaxed);
328            });
329        }
330
331        // Increase the starting block height for the subsequent requests.
332        start += BLOCKS_PER_FILE * num_requests;
333
334        // A short sleep in order to allow some block processing to happen in the meantime.
335        tokio::time::sleep(Duration::from_secs(1)).await;
336    }
337
338    debug!("Finished network requests to the CDN");
339}
340
341/// Retrieves the CDN height with the given base URL.
342///
343/// Note: This function decrements the tip by a few blocks, to ensure the
344/// tip is not on a block that is not yet available on the CDN.
345async fn cdn_height<const BLOCKS_PER_FILE: u32>(client: &Client, base_url: &str) -> Result<u32> {
346    // A representation of the 'latest.json' file object.
347    #[derive(Deserialize, Serialize, Debug)]
348    struct LatestState {
349        exclusive_height: u32,
350        inclusive_height: u32,
351        hash: String,
352    }
353    // Prepare the URL.
354    let latest_json_url = format!("{base_url}/latest.json");
355    // Send the request.
356    let response = match client.get(latest_json_url).send().await {
357        Ok(response) => response,
358        Err(error) => bail!("Failed to fetch the CDN height - {error}"),
359    };
360    // Parse the response.
361    let bytes = match response.bytes().await {
362        Ok(bytes) => bytes,
363        Err(error) => bail!("Failed to parse the CDN height response - {error}"),
364    };
365    // Parse the bytes for the string.
366    let latest_state_string = match bincode::deserialize::<String>(&bytes) {
367        Ok(string) => string,
368        Err(error) => bail!("Failed to deserialize the CDN height response - {error}"),
369    };
370    // Parse the string for the tip.
371    let tip = match serde_json::from_str::<LatestState>(&latest_state_string) {
372        Ok(latest) => latest.exclusive_height,
373        Err(error) => bail!("Failed to extract the CDN height response - {error}"),
374    };
375    // Decrement the tip by a few blocks to ensure the CDN is caught up.
376    let tip = tip.saturating_sub(10);
377    // Adjust the tip to the closest subsequent multiple of BLOCKS_PER_FILE.
378    Ok(tip - (tip % BLOCKS_PER_FILE) + BLOCKS_PER_FILE)
379}
380
381/// Retrieves the objects from the CDN with the given URL.
382async fn cdn_get<T: 'static + DeserializeOwned + Send>(client: Client, url: &str, ctx: &str) -> Result<T> {
383    // Fetch the bytes from the given URL.
384    let response = match client.get(url).send().await {
385        Ok(response) => response,
386        Err(error) => bail!("Failed to fetch {ctx} - {error}"),
387    };
388    // Parse the response.
389    let bytes = match response.bytes().await {
390        Ok(bytes) => bytes,
391        Err(error) => bail!("Failed to parse {ctx} - {error}"),
392    };
393    // Parse the objects.
394    match tokio::task::spawn_blocking(move || bincode::deserialize::<T>(&bytes)).await {
395        Ok(Ok(objects)) => Ok(objects),
396        Ok(Err(error)) => bail!("Failed to deserialize {ctx} - {error}"),
397        Err(error) => bail!("Failed to join task for {ctx} - {error}"),
398    }
399}
400
401/// Logs the progress of the sync.
402fn log_progress<const OBJECTS_PER_FILE: u32>(
403    timer: Instant,
404    current_index: u32,
405    cdn_start: u32,
406    mut cdn_end: u32,
407    object_name: &str,
408) {
409    // Subtract 1, as the end of the range is exclusive.
410    cdn_end -= 1;
411    // Compute the percentage completed.
412    let percentage = current_index * 100 / cdn_end;
413    // Compute the number of files processed so far.
414    let num_files_done = 1 + (current_index - cdn_start) / OBJECTS_PER_FILE;
415    // Compute the number of files remaining.
416    let num_files_remaining = 1 + (cdn_end.saturating_sub(current_index)) / OBJECTS_PER_FILE;
417    // Compute the milliseconds per file.
418    let millis_per_file = timer.elapsed().as_millis() / num_files_done as u128;
419    // Compute the heuristic slowdown factor (in millis).
420    let slowdown = 100 * num_files_remaining as u128;
421    // Compute the time remaining (in millis).
422    let time_remaining = num_files_remaining as u128 * millis_per_file + slowdown;
423    // Prepare the estimate message (in secs).
424    let estimate = format!("(est. {} minutes remaining)", time_remaining / (60 * 1000));
425    // Log the progress.
426    info!("Synced up to {object_name} {current_index} of {cdn_end} - {percentage}% complete {}", estimate.dimmed());
427}
428
429#[cfg(test)]
430mod tests {
431    use crate::{
432        blocks::{BLOCKS_PER_FILE, cdn_get, cdn_height, log_progress},
433        load_blocks,
434    };
435    use snarkvm::prelude::{MainnetV0, block::Block};
436
437    use parking_lot::RwLock;
438    use std::{sync::Arc, time::Instant};
439
440    type CurrentNetwork = MainnetV0;
441
442    const TEST_BASE_URL: &str = "https://blocks.aleo.org/mainnet/v0";
443
444    fn check_load_blocks(start: u32, end: Option<u32>, expected: usize) {
445        let blocks = Arc::new(RwLock::new(Vec::new()));
446        let blocks_clone = blocks.clone();
447        let process = move |block: Block<CurrentNetwork>| {
448            blocks_clone.write().push(block);
449            Ok(())
450        };
451
452        let rt = tokio::runtime::Runtime::new().unwrap();
453        rt.block_on(async {
454            let completed_height = load_blocks(TEST_BASE_URL, start, end, Default::default(), process).await.unwrap();
455            assert_eq!(blocks.read().len(), expected);
456            if expected > 0 {
457                assert_eq!(blocks.read().last().unwrap().height(), completed_height);
458            }
459            // Check they are sequential.
460            for (i, block) in blocks.read().iter().enumerate() {
461                assert_eq!(block.height(), start + i as u32);
462            }
463        });
464    }
465
466    #[test]
467    fn test_load_blocks_0_to_50() {
468        let start_height = 0;
469        let end_height = Some(50);
470        check_load_blocks(start_height, end_height, 50);
471    }
472
473    #[test]
474    fn test_load_blocks_50_to_100() {
475        let start_height = 50;
476        let end_height = Some(100);
477        check_load_blocks(start_height, end_height, 50);
478    }
479
480    #[test]
481    fn test_load_blocks_0_to_123() {
482        let start_height = 0;
483        let end_height = Some(123);
484        check_load_blocks(start_height, end_height, 123);
485    }
486
487    #[test]
488    fn test_load_blocks_46_to_234() {
489        let start_height = 46;
490        let end_height = Some(234);
491        check_load_blocks(start_height, end_height, 188);
492    }
493
494    #[test]
495    fn test_cdn_height() {
496        let rt = tokio::runtime::Runtime::new().unwrap();
497        let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
498        rt.block_on(async {
499            let height = cdn_height::<BLOCKS_PER_FILE>(&client, TEST_BASE_URL).await.unwrap();
500            assert!(height > 0);
501        });
502    }
503
504    #[test]
505    fn test_cdn_get() {
506        let rt = tokio::runtime::Runtime::new().unwrap();
507        rt.block_on(async {
508            let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
509            let height =
510                cdn_get::<u32>(client, &format!("{TEST_BASE_URL}/mainnet/latest/height"), "height").await.unwrap();
511            assert!(height > 0);
512        });
513    }
514
515    #[test]
516    fn test_log_progress() {
517        // This test sanity checks that basic arithmetic is correct (i.e. no divide by zero, etc.).
518        let timer = Instant::now();
519        let cdn_start = 0;
520        let cdn_end = 100;
521        let object_name = "blocks";
522        log_progress::<10>(timer, 0, cdn_start, cdn_end, object_name);
523        log_progress::<10>(timer, 10, cdn_start, cdn_end, object_name);
524        log_progress::<10>(timer, 20, cdn_start, cdn_end, object_name);
525        log_progress::<10>(timer, 30, cdn_start, cdn_end, object_name);
526        log_progress::<10>(timer, 40, cdn_start, cdn_end, object_name);
527        log_progress::<10>(timer, 50, cdn_start, cdn_end, object_name);
528        log_progress::<10>(timer, 60, cdn_start, cdn_end, object_name);
529        log_progress::<10>(timer, 70, cdn_start, cdn_end, object_name);
530        log_progress::<10>(timer, 80, cdn_start, cdn_end, object_name);
531        log_progress::<10>(timer, 90, cdn_start, cdn_end, object_name);
532        log_progress::<10>(timer, 100, cdn_start, cdn_end, object_name);
533    }
534}