miraland_download_utils/
lib.rs

1#![allow(clippy::arithmetic_side_effects)]
2use {
3    console::Emoji,
4    indicatif::{ProgressBar, ProgressStyle},
5    log::*,
6    miraland_runtime::{
7        snapshot_hash::SnapshotHash,
8        snapshot_package::SnapshotKind,
9        snapshot_utils::{self, ArchiveFormat},
10    },
11    miraland_sdk::{clock::Slot, genesis_config::DEFAULT_GENESIS_ARCHIVE},
12    std::{
13        fs::{self, File},
14        io::{self, Read},
15        net::SocketAddr,
16        num::NonZeroUsize,
17        path::{Path, PathBuf},
18        time::{Duration, Instant},
19    },
20};
21
22static TRUCK: Emoji = Emoji("🚚 ", "");
23static SPARKLE: Emoji = Emoji("✨ ", "");
24
25/// Creates a new process bar for processing that will take an unknown amount of time
26fn new_spinner_progress_bar() -> ProgressBar {
27    let progress_bar = ProgressBar::new(42);
28    progress_bar.set_style(
29        ProgressStyle::default_spinner()
30            .template("{spinner:.green} {wide_msg}")
31            .expect("ProgresStyle::template direct input to be correct"),
32    );
33    progress_bar.enable_steady_tick(Duration::from_millis(100));
34    progress_bar
35}
36
37/// Structure modeling information about download progress
38#[derive(Debug)]
39pub struct DownloadProgressRecord {
40    // Duration since the beginning of the download
41    pub elapsed_time: Duration,
42    // Duration since the the last notification
43    pub last_elapsed_time: Duration,
44    // the bytes/sec speed measured for the last notification period
45    pub last_throughput: f32,
46    // the bytes/sec speed measured from the beginning
47    pub total_throughput: f32,
48    // total bytes of the download
49    pub total_bytes: usize,
50    // bytes downloaded so far
51    pub current_bytes: usize,
52    // percentage downloaded
53    pub percentage_done: f32,
54    // Estimated remaining time (in seconds) to finish the download if it keeps at the the last download speed
55    pub estimated_remaining_time: f32,
56    // The times of the progress is being notified, it starts from 1 and increments by 1 each time
57    pub notification_count: u64,
58}
59
60type DownloadProgressCallback<'a> = Box<dyn FnMut(&DownloadProgressRecord) -> bool + 'a>;
61type DownloadProgressCallbackOption<'a> = Option<DownloadProgressCallback<'a>>;
62
63/// This callback allows the caller to get notified of the download progress modelled by DownloadProgressRecord
64/// Return "true" to continue the download
65/// Return "false" to abort the download
66pub fn download_file<'a, 'b>(
67    url: &str,
68    destination_file: &Path,
69    use_progress_bar: bool,
70    progress_notify_callback: &'a mut DownloadProgressCallbackOption<'b>,
71) -> Result<(), String> {
72    if destination_file.is_file() {
73        return Err(format!("{destination_file:?} already exists"));
74    }
75    let download_start = Instant::now();
76
77    fs::create_dir_all(destination_file.parent().expect("parent"))
78        .map_err(|err| err.to_string())?;
79
80    let mut temp_destination_file = destination_file.to_path_buf();
81    temp_destination_file.set_file_name(format!(
82        "tmp-{}",
83        destination_file
84            .file_name()
85            .expect("file_name")
86            .to_str()
87            .expect("to_str")
88    ));
89
90    let progress_bar = new_spinner_progress_bar();
91    if use_progress_bar {
92        progress_bar.set_message(format!("{TRUCK}Downloading {url}..."));
93    }
94
95    let response = reqwest::blocking::Client::new()
96        .get(url)
97        .send()
98        .and_then(|response| response.error_for_status())
99        .map_err(|err| {
100            progress_bar.finish_and_clear();
101            err.to_string()
102        })?;
103
104    let download_size = {
105        response
106            .headers()
107            .get(reqwest::header::CONTENT_LENGTH)
108            .and_then(|content_length| content_length.to_str().ok())
109            .and_then(|content_length| content_length.parse().ok())
110            .unwrap_or(0)
111    };
112
113    if use_progress_bar {
114        progress_bar.set_length(download_size);
115        progress_bar.set_style(
116            ProgressStyle::default_bar()
117                .template(
118                    "{spinner:.green}{msg_wide}[{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})",
119                )
120                .expect("ProgresStyle::template direct input to be correct")
121                .progress_chars("=> "),
122        );
123        progress_bar.set_message(format!("{TRUCK}Downloading~ {url}"));
124    } else {
125        info!("Downloading {} bytes from {}", download_size, url);
126    }
127
128    struct DownloadProgress<'e, 'f, R> {
129        progress_bar: ProgressBar,
130        response: R,
131        last_print: Instant,
132        current_bytes: usize,
133        last_print_bytes: usize,
134        download_size: f32,
135        use_progress_bar: bool,
136        start_time: Instant,
137        callback: &'f mut DownloadProgressCallbackOption<'e>,
138        notification_count: u64,
139    }
140
141    impl<'e, 'f, R: Read> Read for DownloadProgress<'e, 'f, R> {
142        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
143            let n = self.response.read(buf)?;
144
145            self.current_bytes += n;
146            let total_bytes_f32 = self.current_bytes as f32;
147            let diff_bytes_f32 = (self.current_bytes - self.last_print_bytes) as f32;
148            let last_throughput = diff_bytes_f32 / self.last_print.elapsed().as_secs_f32();
149            let estimated_remaining_time = if last_throughput > 0_f32 {
150                (self.download_size - self.current_bytes as f32) / last_throughput
151            } else {
152                f32::MAX
153            };
154
155            let mut progress_record = DownloadProgressRecord {
156                elapsed_time: self.start_time.elapsed(),
157                last_elapsed_time: self.last_print.elapsed(),
158                last_throughput,
159                total_throughput: self.current_bytes as f32
160                    / self.start_time.elapsed().as_secs_f32(),
161                total_bytes: self.download_size as usize,
162                current_bytes: self.current_bytes,
163                percentage_done: 100f32 * (total_bytes_f32 / self.download_size),
164                estimated_remaining_time,
165                notification_count: self.notification_count,
166            };
167            let mut to_update_progress = false;
168            if progress_record.last_elapsed_time.as_secs() > 5 {
169                self.last_print = Instant::now();
170                self.last_print_bytes = self.current_bytes;
171                to_update_progress = true;
172                self.notification_count += 1;
173                progress_record.notification_count = self.notification_count
174            }
175
176            if self.use_progress_bar {
177                self.progress_bar.inc(n as u64);
178            } else if to_update_progress {
179                info!(
180                    "downloaded {} bytes {:.1}% {:.1} bytes/s",
181                    self.current_bytes,
182                    progress_record.percentage_done,
183                    progress_record.last_throughput,
184                );
185            }
186
187            if let Some(callback) = self.callback {
188                if to_update_progress && !callback(&progress_record) {
189                    info!("Download is aborted by the caller");
190                    return Err(io::Error::new(
191                        io::ErrorKind::Other,
192                        "Download is aborted by the caller",
193                    ));
194                }
195            }
196
197            Ok(n)
198        }
199    }
200
201    let mut source = DownloadProgress::<'b, 'a> {
202        progress_bar,
203        response,
204        last_print: Instant::now(),
205        current_bytes: 0,
206        last_print_bytes: 0,
207        download_size: (download_size as f32).max(1f32),
208        use_progress_bar,
209        start_time: Instant::now(),
210        callback: progress_notify_callback,
211        notification_count: 0,
212    };
213
214    File::create(&temp_destination_file)
215        .and_then(|mut file| std::io::copy(&mut source, &mut file))
216        .map_err(|err| format!("Unable to write {temp_destination_file:?}: {err:?}"))?;
217
218    source.progress_bar.finish_and_clear();
219    info!(
220        "  {}{}",
221        SPARKLE,
222        format!(
223            "Downloaded {} ({} bytes) in {:?}",
224            url,
225            download_size,
226            Instant::now().duration_since(download_start),
227        )
228    );
229
230    std::fs::rename(temp_destination_file, destination_file)
231        .map_err(|err| format!("Unable to rename: {err:?}"))?;
232
233    Ok(())
234}
235
236pub fn download_genesis_if_missing(
237    rpc_addr: &SocketAddr,
238    genesis_package: &Path,
239    use_progress_bar: bool,
240) -> Result<PathBuf, String> {
241    if !genesis_package.exists() {
242        let tmp_genesis_path = genesis_package.parent().unwrap().join("tmp-genesis");
243        let tmp_genesis_package = tmp_genesis_path.join(DEFAULT_GENESIS_ARCHIVE);
244
245        let _ignored = fs::remove_dir_all(&tmp_genesis_path);
246        download_file(
247            &format!("http://{rpc_addr}/{DEFAULT_GENESIS_ARCHIVE}"),
248            &tmp_genesis_package,
249            use_progress_bar,
250            &mut None,
251        )?;
252
253        Ok(tmp_genesis_package)
254    } else {
255        Err("genesis already exists".to_string())
256    }
257}
258
259/// Download a snapshot archive from `rpc_addr`.  Use `snapshot_kind` to specify downloading either
260/// a full snapshot or an incremental snapshot.
261pub fn download_snapshot_archive(
262    rpc_addr: &SocketAddr,
263    full_snapshot_archives_dir: &Path,
264    incremental_snapshot_archives_dir: &Path,
265    desired_snapshot_hash: (Slot, SnapshotHash),
266    snapshot_kind: SnapshotKind,
267    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
268    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
269    use_progress_bar: bool,
270    progress_notify_callback: &mut DownloadProgressCallbackOption<'_>,
271) -> Result<(), String> {
272    snapshot_utils::purge_old_snapshot_archives(
273        full_snapshot_archives_dir,
274        incremental_snapshot_archives_dir,
275        maximum_full_snapshot_archives_to_retain,
276        maximum_incremental_snapshot_archives_to_retain,
277    );
278
279    let snapshot_archives_remote_dir =
280        snapshot_utils::build_snapshot_archives_remote_dir(match snapshot_kind {
281            SnapshotKind::FullSnapshot => full_snapshot_archives_dir,
282            SnapshotKind::IncrementalSnapshot(_) => incremental_snapshot_archives_dir,
283        });
284    fs::create_dir_all(&snapshot_archives_remote_dir).unwrap();
285
286    for archive_format in [
287        ArchiveFormat::TarZstd,
288        ArchiveFormat::TarGzip,
289        ArchiveFormat::TarBzip2,
290        ArchiveFormat::TarLz4,
291        ArchiveFormat::Tar,
292    ] {
293        let destination_path = match snapshot_kind {
294            SnapshotKind::FullSnapshot => snapshot_utils::build_full_snapshot_archive_path(
295                &snapshot_archives_remote_dir,
296                desired_snapshot_hash.0,
297                &desired_snapshot_hash.1,
298                archive_format,
299            ),
300            SnapshotKind::IncrementalSnapshot(base_slot) => {
301                snapshot_utils::build_incremental_snapshot_archive_path(
302                    &snapshot_archives_remote_dir,
303                    base_slot,
304                    desired_snapshot_hash.0,
305                    &desired_snapshot_hash.1,
306                    archive_format,
307                )
308            }
309        };
310
311        if destination_path.is_file() {
312            return Ok(());
313        }
314
315        match download_file(
316            &format!(
317                "http://{}/{}",
318                rpc_addr,
319                destination_path.file_name().unwrap().to_str().unwrap()
320            ),
321            &destination_path,
322            use_progress_bar,
323            progress_notify_callback,
324        ) {
325            Ok(()) => return Ok(()),
326            Err(err) => info!("{}", err),
327        }
328    }
329    Err(format!(
330        "Failed to download a snapshot archive for slot {} from {}",
331        desired_snapshot_hash.0, rpc_addr
332    ))
333}