1#![allow(clippy::arithmetic_side_effects)]
2use {
3 console::Emoji,
4 indicatif::{ProgressBar, ProgressStyle},
5 log::*,
6 miraland_runtime::{
7 snapshot_hash::SnapshotHash,
8 snapshot_package::SnapshotKind,
9 snapshot_utils::{self, ArchiveFormat},
10 },
11 miraland_sdk::{clock::Slot, genesis_config::DEFAULT_GENESIS_ARCHIVE},
12 std::{
13 fs::{self, File},
14 io::{self, Read},
15 net::SocketAddr,
16 num::NonZeroUsize,
17 path::{Path, PathBuf},
18 time::{Duration, Instant},
19 },
20};
21
22static TRUCK: Emoji = Emoji("🚚 ", "");
23static SPARKLE: Emoji = Emoji("✨ ", "");
24
25fn new_spinner_progress_bar() -> ProgressBar {
27 let progress_bar = ProgressBar::new(42);
28 progress_bar.set_style(
29 ProgressStyle::default_spinner()
30 .template("{spinner:.green} {wide_msg}")
31 .expect("ProgresStyle::template direct input to be correct"),
32 );
33 progress_bar.enable_steady_tick(Duration::from_millis(100));
34 progress_bar
35}
36
37#[derive(Debug)]
39pub struct DownloadProgressRecord {
40 pub elapsed_time: Duration,
42 pub last_elapsed_time: Duration,
44 pub last_throughput: f32,
46 pub total_throughput: f32,
48 pub total_bytes: usize,
50 pub current_bytes: usize,
52 pub percentage_done: f32,
54 pub estimated_remaining_time: f32,
56 pub notification_count: u64,
58}
59
60type DownloadProgressCallback<'a> = Box<dyn FnMut(&DownloadProgressRecord) -> bool + 'a>;
61type DownloadProgressCallbackOption<'a> = Option<DownloadProgressCallback<'a>>;
62
63pub fn download_file<'a, 'b>(
67 url: &str,
68 destination_file: &Path,
69 use_progress_bar: bool,
70 progress_notify_callback: &'a mut DownloadProgressCallbackOption<'b>,
71) -> Result<(), String> {
72 if destination_file.is_file() {
73 return Err(format!("{destination_file:?} already exists"));
74 }
75 let download_start = Instant::now();
76
77 fs::create_dir_all(destination_file.parent().expect("parent"))
78 .map_err(|err| err.to_string())?;
79
80 let mut temp_destination_file = destination_file.to_path_buf();
81 temp_destination_file.set_file_name(format!(
82 "tmp-{}",
83 destination_file
84 .file_name()
85 .expect("file_name")
86 .to_str()
87 .expect("to_str")
88 ));
89
90 let progress_bar = new_spinner_progress_bar();
91 if use_progress_bar {
92 progress_bar.set_message(format!("{TRUCK}Downloading {url}..."));
93 }
94
95 let response = reqwest::blocking::Client::new()
96 .get(url)
97 .send()
98 .and_then(|response| response.error_for_status())
99 .map_err(|err| {
100 progress_bar.finish_and_clear();
101 err.to_string()
102 })?;
103
104 let download_size = {
105 response
106 .headers()
107 .get(reqwest::header::CONTENT_LENGTH)
108 .and_then(|content_length| content_length.to_str().ok())
109 .and_then(|content_length| content_length.parse().ok())
110 .unwrap_or(0)
111 };
112
113 if use_progress_bar {
114 progress_bar.set_length(download_size);
115 progress_bar.set_style(
116 ProgressStyle::default_bar()
117 .template(
118 "{spinner:.green}{msg_wide}[{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})",
119 )
120 .expect("ProgresStyle::template direct input to be correct")
121 .progress_chars("=> "),
122 );
123 progress_bar.set_message(format!("{TRUCK}Downloading~ {url}"));
124 } else {
125 info!("Downloading {} bytes from {}", download_size, url);
126 }
127
128 struct DownloadProgress<'e, 'f, R> {
129 progress_bar: ProgressBar,
130 response: R,
131 last_print: Instant,
132 current_bytes: usize,
133 last_print_bytes: usize,
134 download_size: f32,
135 use_progress_bar: bool,
136 start_time: Instant,
137 callback: &'f mut DownloadProgressCallbackOption<'e>,
138 notification_count: u64,
139 }
140
141 impl<'e, 'f, R: Read> Read for DownloadProgress<'e, 'f, R> {
142 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
143 let n = self.response.read(buf)?;
144
145 self.current_bytes += n;
146 let total_bytes_f32 = self.current_bytes as f32;
147 let diff_bytes_f32 = (self.current_bytes - self.last_print_bytes) as f32;
148 let last_throughput = diff_bytes_f32 / self.last_print.elapsed().as_secs_f32();
149 let estimated_remaining_time = if last_throughput > 0_f32 {
150 (self.download_size - self.current_bytes as f32) / last_throughput
151 } else {
152 f32::MAX
153 };
154
155 let mut progress_record = DownloadProgressRecord {
156 elapsed_time: self.start_time.elapsed(),
157 last_elapsed_time: self.last_print.elapsed(),
158 last_throughput,
159 total_throughput: self.current_bytes as f32
160 / self.start_time.elapsed().as_secs_f32(),
161 total_bytes: self.download_size as usize,
162 current_bytes: self.current_bytes,
163 percentage_done: 100f32 * (total_bytes_f32 / self.download_size),
164 estimated_remaining_time,
165 notification_count: self.notification_count,
166 };
167 let mut to_update_progress = false;
168 if progress_record.last_elapsed_time.as_secs() > 5 {
169 self.last_print = Instant::now();
170 self.last_print_bytes = self.current_bytes;
171 to_update_progress = true;
172 self.notification_count += 1;
173 progress_record.notification_count = self.notification_count
174 }
175
176 if self.use_progress_bar {
177 self.progress_bar.inc(n as u64);
178 } else if to_update_progress {
179 info!(
180 "downloaded {} bytes {:.1}% {:.1} bytes/s",
181 self.current_bytes,
182 progress_record.percentage_done,
183 progress_record.last_throughput,
184 );
185 }
186
187 if let Some(callback) = self.callback {
188 if to_update_progress && !callback(&progress_record) {
189 info!("Download is aborted by the caller");
190 return Err(io::Error::new(
191 io::ErrorKind::Other,
192 "Download is aborted by the caller",
193 ));
194 }
195 }
196
197 Ok(n)
198 }
199 }
200
201 let mut source = DownloadProgress::<'b, 'a> {
202 progress_bar,
203 response,
204 last_print: Instant::now(),
205 current_bytes: 0,
206 last_print_bytes: 0,
207 download_size: (download_size as f32).max(1f32),
208 use_progress_bar,
209 start_time: Instant::now(),
210 callback: progress_notify_callback,
211 notification_count: 0,
212 };
213
214 File::create(&temp_destination_file)
215 .and_then(|mut file| std::io::copy(&mut source, &mut file))
216 .map_err(|err| format!("Unable to write {temp_destination_file:?}: {err:?}"))?;
217
218 source.progress_bar.finish_and_clear();
219 info!(
220 " {}{}",
221 SPARKLE,
222 format!(
223 "Downloaded {} ({} bytes) in {:?}",
224 url,
225 download_size,
226 Instant::now().duration_since(download_start),
227 )
228 );
229
230 std::fs::rename(temp_destination_file, destination_file)
231 .map_err(|err| format!("Unable to rename: {err:?}"))?;
232
233 Ok(())
234}
235
236pub fn download_genesis_if_missing(
237 rpc_addr: &SocketAddr,
238 genesis_package: &Path,
239 use_progress_bar: bool,
240) -> Result<PathBuf, String> {
241 if !genesis_package.exists() {
242 let tmp_genesis_path = genesis_package.parent().unwrap().join("tmp-genesis");
243 let tmp_genesis_package = tmp_genesis_path.join(DEFAULT_GENESIS_ARCHIVE);
244
245 let _ignored = fs::remove_dir_all(&tmp_genesis_path);
246 download_file(
247 &format!("http://{rpc_addr}/{DEFAULT_GENESIS_ARCHIVE}"),
248 &tmp_genesis_package,
249 use_progress_bar,
250 &mut None,
251 )?;
252
253 Ok(tmp_genesis_package)
254 } else {
255 Err("genesis already exists".to_string())
256 }
257}
258
259pub fn download_snapshot_archive(
262 rpc_addr: &SocketAddr,
263 full_snapshot_archives_dir: &Path,
264 incremental_snapshot_archives_dir: &Path,
265 desired_snapshot_hash: (Slot, SnapshotHash),
266 snapshot_kind: SnapshotKind,
267 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
268 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
269 use_progress_bar: bool,
270 progress_notify_callback: &mut DownloadProgressCallbackOption<'_>,
271) -> Result<(), String> {
272 snapshot_utils::purge_old_snapshot_archives(
273 full_snapshot_archives_dir,
274 incremental_snapshot_archives_dir,
275 maximum_full_snapshot_archives_to_retain,
276 maximum_incremental_snapshot_archives_to_retain,
277 );
278
279 let snapshot_archives_remote_dir =
280 snapshot_utils::build_snapshot_archives_remote_dir(match snapshot_kind {
281 SnapshotKind::FullSnapshot => full_snapshot_archives_dir,
282 SnapshotKind::IncrementalSnapshot(_) => incremental_snapshot_archives_dir,
283 });
284 fs::create_dir_all(&snapshot_archives_remote_dir).unwrap();
285
286 for archive_format in [
287 ArchiveFormat::TarZstd,
288 ArchiveFormat::TarGzip,
289 ArchiveFormat::TarBzip2,
290 ArchiveFormat::TarLz4,
291 ArchiveFormat::Tar,
292 ] {
293 let destination_path = match snapshot_kind {
294 SnapshotKind::FullSnapshot => snapshot_utils::build_full_snapshot_archive_path(
295 &snapshot_archives_remote_dir,
296 desired_snapshot_hash.0,
297 &desired_snapshot_hash.1,
298 archive_format,
299 ),
300 SnapshotKind::IncrementalSnapshot(base_slot) => {
301 snapshot_utils::build_incremental_snapshot_archive_path(
302 &snapshot_archives_remote_dir,
303 base_slot,
304 desired_snapshot_hash.0,
305 &desired_snapshot_hash.1,
306 archive_format,
307 )
308 }
309 };
310
311 if destination_path.is_file() {
312 return Ok(());
313 }
314
315 match download_file(
316 &format!(
317 "http://{}/{}",
318 rpc_addr,
319 destination_path.file_name().unwrap().to_str().unwrap()
320 ),
321 &destination_path,
322 use_progress_bar,
323 progress_notify_callback,
324 ) {
325 Ok(()) => return Ok(()),
326 Err(err) => info!("{}", err),
327 }
328 }
329 Err(format!(
330 "Failed to download a snapshot archive for slot {} from {}",
331 desired_snapshot_hash.0, rpc_addr
332 ))
333}