solana_download_utils/
lib.rs

1#![allow(clippy::integer_arithmetic)]
2use {
3    console::Emoji,
4    indicatif::{ProgressBar, ProgressStyle},
5    log::*,
6    solana_runtime::{
7        snapshot_hash::SnapshotHash,
8        snapshot_package::SnapshotType,
9        snapshot_utils::{self, ArchiveFormat},
10    },
11    solana_sdk::{clock::Slot, genesis_config::DEFAULT_GENESIS_ARCHIVE},
12    std::{
13        fs::{self, File},
14        io::{self, Read},
15        net::SocketAddr,
16        path::{Path, PathBuf},
17        time::{Duration, Instant},
18    },
19};
20
21static TRUCK: Emoji = Emoji("🚚 ", "");
22static SPARKLE: Emoji = Emoji("✨ ", "");
23
24/// Creates a new process bar for processing that will take an unknown amount of time
25fn new_spinner_progress_bar() -> ProgressBar {
26    let progress_bar = ProgressBar::new(42);
27    progress_bar.set_style(
28        ProgressStyle::default_spinner()
29            .template("{spinner:.green} {wide_msg}")
30            .expect("ProgresStyle::template direct input to be correct"),
31    );
32    progress_bar.enable_steady_tick(Duration::from_millis(100));
33    progress_bar
34}
35
36/// Structure modeling information about download progress
37#[derive(Debug)]
38pub struct DownloadProgressRecord {
39    // Duration since the beginning of the download
40    pub elapsed_time: Duration,
41    // Duration since the the last notification
42    pub last_elapsed_time: Duration,
43    // the bytes/sec speed measured for the last notification period
44    pub last_throughput: f32,
45    // the bytes/sec speed measured from the beginning
46    pub total_throughput: f32,
47    // total bytes of the download
48    pub total_bytes: usize,
49    // bytes downloaded so far
50    pub current_bytes: usize,
51    // percentage downloaded
52    pub percentage_done: f32,
53    // Estimated remaining time (in seconds) to finish the download if it keeps at the the last download speed
54    pub estimated_remaining_time: f32,
55    // The times of the progress is being notified, it starts from 1 and increments by 1 each time
56    pub notification_count: u64,
57}
58
59type DownloadProgressCallback<'a> = Box<dyn FnMut(&DownloadProgressRecord) -> bool + 'a>;
60type DownloadProgressCallbackOption<'a> = Option<DownloadProgressCallback<'a>>;
61
62/// This callback allows the caller to get notified of the download progress modelled by DownloadProgressRecord
63/// Return "true" to continue the download
64/// Return "false" to abort the download
65pub fn download_file<'a, 'b>(
66    url: &str,
67    destination_file: &Path,
68    use_progress_bar: bool,
69    progress_notify_callback: &'a mut DownloadProgressCallbackOption<'b>,
70) -> Result<(), String> {
71    if destination_file.is_file() {
72        return Err(format!("{destination_file:?} already exists"));
73    }
74    let download_start = Instant::now();
75
76    fs::create_dir_all(destination_file.parent().expect("parent"))
77        .map_err(|err| err.to_string())?;
78
79    let mut temp_destination_file = destination_file.to_path_buf();
80    temp_destination_file.set_file_name(format!(
81        "tmp-{}",
82        destination_file
83            .file_name()
84            .expect("file_name")
85            .to_str()
86            .expect("to_str")
87    ));
88
89    let progress_bar = new_spinner_progress_bar();
90    if use_progress_bar {
91        progress_bar.set_message(format!("{TRUCK}Downloading {url}..."));
92    }
93
94    let response = reqwest::blocking::Client::new()
95        .get(url)
96        .send()
97        .and_then(|response| response.error_for_status())
98        .map_err(|err| {
99            progress_bar.finish_and_clear();
100            err.to_string()
101        })?;
102
103    let download_size = {
104        response
105            .headers()
106            .get(reqwest::header::CONTENT_LENGTH)
107            .and_then(|content_length| content_length.to_str().ok())
108            .and_then(|content_length| content_length.parse().ok())
109            .unwrap_or(0)
110    };
111
112    if use_progress_bar {
113        progress_bar.set_length(download_size);
114        progress_bar.set_style(
115            ProgressStyle::default_bar()
116                .template(
117                    "{spinner:.green}{msg_wide}[{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})",
118                )
119                .expect("ProgresStyle::template direct input to be correct")
120                .progress_chars("=> "),
121        );
122        progress_bar.set_message(format!("{TRUCK}Downloading~ {url}"));
123    } else {
124        info!("Downloading {} bytes from {}", download_size, url);
125    }
126
127    struct DownloadProgress<'e, 'f, R> {
128        progress_bar: ProgressBar,
129        response: R,
130        last_print: Instant,
131        current_bytes: usize,
132        last_print_bytes: usize,
133        download_size: f32,
134        use_progress_bar: bool,
135        start_time: Instant,
136        callback: &'f mut DownloadProgressCallbackOption<'e>,
137        notification_count: u64,
138    }
139
140    impl<'e, 'f, R: Read> Read for DownloadProgress<'e, 'f, R> {
141        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
142            let n = self.response.read(buf)?;
143
144            self.current_bytes += n;
145            let total_bytes_f32 = self.current_bytes as f32;
146            let diff_bytes_f32 = (self.current_bytes - self.last_print_bytes) as f32;
147            let last_throughput = diff_bytes_f32 / self.last_print.elapsed().as_secs_f32();
148            let estimated_remaining_time = if last_throughput > 0_f32 {
149                (self.download_size - self.current_bytes as f32) / last_throughput
150            } else {
151                f32::MAX
152            };
153
154            let mut progress_record = DownloadProgressRecord {
155                elapsed_time: self.start_time.elapsed(),
156                last_elapsed_time: self.last_print.elapsed(),
157                last_throughput,
158                total_throughput: self.current_bytes as f32
159                    / self.start_time.elapsed().as_secs_f32(),
160                total_bytes: self.download_size as usize,
161                current_bytes: self.current_bytes,
162                percentage_done: 100f32 * (total_bytes_f32 / self.download_size),
163                estimated_remaining_time,
164                notification_count: self.notification_count,
165            };
166            let mut to_update_progress = false;
167            if progress_record.last_elapsed_time.as_secs() > 5 {
168                self.last_print = Instant::now();
169                self.last_print_bytes = self.current_bytes;
170                to_update_progress = true;
171                self.notification_count += 1;
172                progress_record.notification_count = self.notification_count
173            }
174
175            if self.use_progress_bar {
176                self.progress_bar.inc(n as u64);
177            } else if to_update_progress {
178                info!(
179                    "downloaded {} bytes {:.1}% {:.1} bytes/s",
180                    self.current_bytes,
181                    progress_record.percentage_done,
182                    progress_record.last_throughput,
183                );
184            }
185
186            if let Some(callback) = self.callback {
187                if to_update_progress && !callback(&progress_record) {
188                    info!("Download is aborted by the caller");
189                    return Err(io::Error::new(
190                        io::ErrorKind::Other,
191                        "Download is aborted by the caller",
192                    ));
193                }
194            }
195
196            Ok(n)
197        }
198    }
199
200    let mut source = DownloadProgress::<'b, 'a> {
201        progress_bar,
202        response,
203        last_print: Instant::now(),
204        current_bytes: 0,
205        last_print_bytes: 0,
206        download_size: (download_size as f32).max(1f32),
207        use_progress_bar,
208        start_time: Instant::now(),
209        callback: progress_notify_callback,
210        notification_count: 0,
211    };
212
213    File::create(&temp_destination_file)
214        .and_then(|mut file| std::io::copy(&mut source, &mut file))
215        .map_err(|err| format!("Unable to write {temp_destination_file:?}: {err:?}"))?;
216
217    source.progress_bar.finish_and_clear();
218    info!(
219        "  {}{}",
220        SPARKLE,
221        format!(
222            "Downloaded {} ({} bytes) in {:?}",
223            url,
224            download_size,
225            Instant::now().duration_since(download_start),
226        )
227    );
228
229    std::fs::rename(temp_destination_file, destination_file)
230        .map_err(|err| format!("Unable to rename: {err:?}"))?;
231
232    Ok(())
233}
234
235pub fn download_genesis_if_missing(
236    rpc_addr: &SocketAddr,
237    genesis_package: &Path,
238    use_progress_bar: bool,
239) -> Result<PathBuf, String> {
240    if !genesis_package.exists() {
241        let tmp_genesis_path = genesis_package.parent().unwrap().join("tmp-genesis");
242        let tmp_genesis_package = tmp_genesis_path.join(DEFAULT_GENESIS_ARCHIVE);
243
244        let _ignored = fs::remove_dir_all(&tmp_genesis_path);
245        download_file(
246            &format!("http://{rpc_addr}/{DEFAULT_GENESIS_ARCHIVE}"),
247            &tmp_genesis_package,
248            use_progress_bar,
249            &mut None,
250        )?;
251
252        Ok(tmp_genesis_package)
253    } else {
254        Err("genesis already exists".to_string())
255    }
256}
257
258/// Download a snapshot archive from `rpc_addr`.  Use `snapshot_type` to specify downloading either
259/// a full snapshot or an incremental snapshot.
260pub fn download_snapshot_archive(
261    rpc_addr: &SocketAddr,
262    full_snapshot_archives_dir: &Path,
263    incremental_snapshot_archives_dir: &Path,
264    desired_snapshot_hash: (Slot, SnapshotHash),
265    snapshot_type: SnapshotType,
266    maximum_full_snapshot_archives_to_retain: usize,
267    maximum_incremental_snapshot_archives_to_retain: usize,
268    use_progress_bar: bool,
269    progress_notify_callback: &mut DownloadProgressCallbackOption<'_>,
270) -> Result<(), String> {
271    snapshot_utils::purge_old_snapshot_archives(
272        full_snapshot_archives_dir,
273        incremental_snapshot_archives_dir,
274        maximum_full_snapshot_archives_to_retain,
275        maximum_incremental_snapshot_archives_to_retain,
276    );
277
278    let snapshot_archives_remote_dir =
279        snapshot_utils::build_snapshot_archives_remote_dir(match snapshot_type {
280            SnapshotType::FullSnapshot => full_snapshot_archives_dir,
281            SnapshotType::IncrementalSnapshot(_) => incremental_snapshot_archives_dir,
282        });
283    fs::create_dir_all(&snapshot_archives_remote_dir).unwrap();
284
285    for archive_format in [
286        ArchiveFormat::TarZstd,
287        ArchiveFormat::TarGzip,
288        ArchiveFormat::TarBzip2,
289        ArchiveFormat::TarLz4,
290        ArchiveFormat::Tar, // `solana-test-validator` creates uncompressed snapshots
291    ] {
292        let destination_path = match snapshot_type {
293            SnapshotType::FullSnapshot => snapshot_utils::build_full_snapshot_archive_path(
294                &snapshot_archives_remote_dir,
295                desired_snapshot_hash.0,
296                &desired_snapshot_hash.1,
297                archive_format,
298            ),
299            SnapshotType::IncrementalSnapshot(base_slot) => {
300                snapshot_utils::build_incremental_snapshot_archive_path(
301                    &snapshot_archives_remote_dir,
302                    base_slot,
303                    desired_snapshot_hash.0,
304                    &desired_snapshot_hash.1,
305                    archive_format,
306                )
307            }
308        };
309
310        if destination_path.is_file() {
311            return Ok(());
312        }
313
314        match download_file(
315            &format!(
316                "http://{}/{}",
317                rpc_addr,
318                destination_path.file_name().unwrap().to_str().unwrap()
319            ),
320            &destination_path,
321            use_progress_bar,
322            progress_notify_callback,
323        ) {
324            Ok(()) => return Ok(()),
325            Err(err) => info!("{}", err),
326        }
327    }
328    Err(format!(
329        "Failed to download a snapshot archive for slot {} from {}",
330        desired_snapshot_hash.0, rpc_addr
331    ))
332}