1#![allow(clippy::integer_arithmetic)]
2use {
3 console::Emoji,
4 indicatif::{ProgressBar, ProgressStyle},
5 log::*,
6 solana_runtime::{
7 snapshot_hash::SnapshotHash,
8 snapshot_package::SnapshotType,
9 snapshot_utils::{self, ArchiveFormat},
10 },
11 solana_sdk::{clock::Slot, genesis_config::DEFAULT_GENESIS_ARCHIVE},
12 std::{
13 fs::{self, File},
14 io::{self, Read},
15 net::SocketAddr,
16 path::{Path, PathBuf},
17 time::{Duration, Instant},
18 },
19};
20
21static TRUCK: Emoji = Emoji("🚚 ", "");
22static SPARKLE: Emoji = Emoji("✨ ", "");
23
24fn new_spinner_progress_bar() -> ProgressBar {
26 let progress_bar = ProgressBar::new(42);
27 progress_bar.set_style(
28 ProgressStyle::default_spinner()
29 .template("{spinner:.green} {wide_msg}")
30 .expect("ProgresStyle::template direct input to be correct"),
31 );
32 progress_bar.enable_steady_tick(Duration::from_millis(100));
33 progress_bar
34}
35
36#[derive(Debug)]
38pub struct DownloadProgressRecord {
39 pub elapsed_time: Duration,
41 pub last_elapsed_time: Duration,
43 pub last_throughput: f32,
45 pub total_throughput: f32,
47 pub total_bytes: usize,
49 pub current_bytes: usize,
51 pub percentage_done: f32,
53 pub estimated_remaining_time: f32,
55 pub notification_count: u64,
57}
58
59type DownloadProgressCallback<'a> = Box<dyn FnMut(&DownloadProgressRecord) -> bool + 'a>;
60type DownloadProgressCallbackOption<'a> = Option<DownloadProgressCallback<'a>>;
61
62pub fn download_file<'a, 'b>(
66 url: &str,
67 destination_file: &Path,
68 use_progress_bar: bool,
69 progress_notify_callback: &'a mut DownloadProgressCallbackOption<'b>,
70) -> Result<(), String> {
71 if destination_file.is_file() {
72 return Err(format!("{destination_file:?} already exists"));
73 }
74 let download_start = Instant::now();
75
76 fs::create_dir_all(destination_file.parent().expect("parent"))
77 .map_err(|err| err.to_string())?;
78
79 let mut temp_destination_file = destination_file.to_path_buf();
80 temp_destination_file.set_file_name(format!(
81 "tmp-{}",
82 destination_file
83 .file_name()
84 .expect("file_name")
85 .to_str()
86 .expect("to_str")
87 ));
88
89 let progress_bar = new_spinner_progress_bar();
90 if use_progress_bar {
91 progress_bar.set_message(format!("{TRUCK}Downloading {url}..."));
92 }
93
94 let response = reqwest::blocking::Client::new()
95 .get(url)
96 .send()
97 .and_then(|response| response.error_for_status())
98 .map_err(|err| {
99 progress_bar.finish_and_clear();
100 err.to_string()
101 })?;
102
103 let download_size = {
104 response
105 .headers()
106 .get(reqwest::header::CONTENT_LENGTH)
107 .and_then(|content_length| content_length.to_str().ok())
108 .and_then(|content_length| content_length.parse().ok())
109 .unwrap_or(0)
110 };
111
112 if use_progress_bar {
113 progress_bar.set_length(download_size);
114 progress_bar.set_style(
115 ProgressStyle::default_bar()
116 .template(
117 "{spinner:.green}{msg_wide}[{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})",
118 )
119 .expect("ProgresStyle::template direct input to be correct")
120 .progress_chars("=> "),
121 );
122 progress_bar.set_message(format!("{TRUCK}Downloading~ {url}"));
123 } else {
124 info!("Downloading {} bytes from {}", download_size, url);
125 }
126
127 struct DownloadProgress<'e, 'f, R> {
128 progress_bar: ProgressBar,
129 response: R,
130 last_print: Instant,
131 current_bytes: usize,
132 last_print_bytes: usize,
133 download_size: f32,
134 use_progress_bar: bool,
135 start_time: Instant,
136 callback: &'f mut DownloadProgressCallbackOption<'e>,
137 notification_count: u64,
138 }
139
140 impl<'e, 'f, R: Read> Read for DownloadProgress<'e, 'f, R> {
141 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
142 let n = self.response.read(buf)?;
143
144 self.current_bytes += n;
145 let total_bytes_f32 = self.current_bytes as f32;
146 let diff_bytes_f32 = (self.current_bytes - self.last_print_bytes) as f32;
147 let last_throughput = diff_bytes_f32 / self.last_print.elapsed().as_secs_f32();
148 let estimated_remaining_time = if last_throughput > 0_f32 {
149 (self.download_size - self.current_bytes as f32) / last_throughput
150 } else {
151 f32::MAX
152 };
153
154 let mut progress_record = DownloadProgressRecord {
155 elapsed_time: self.start_time.elapsed(),
156 last_elapsed_time: self.last_print.elapsed(),
157 last_throughput,
158 total_throughput: self.current_bytes as f32
159 / self.start_time.elapsed().as_secs_f32(),
160 total_bytes: self.download_size as usize,
161 current_bytes: self.current_bytes,
162 percentage_done: 100f32 * (total_bytes_f32 / self.download_size),
163 estimated_remaining_time,
164 notification_count: self.notification_count,
165 };
166 let mut to_update_progress = false;
167 if progress_record.last_elapsed_time.as_secs() > 5 {
168 self.last_print = Instant::now();
169 self.last_print_bytes = self.current_bytes;
170 to_update_progress = true;
171 self.notification_count += 1;
172 progress_record.notification_count = self.notification_count
173 }
174
175 if self.use_progress_bar {
176 self.progress_bar.inc(n as u64);
177 } else if to_update_progress {
178 info!(
179 "downloaded {} bytes {:.1}% {:.1} bytes/s",
180 self.current_bytes,
181 progress_record.percentage_done,
182 progress_record.last_throughput,
183 );
184 }
185
186 if let Some(callback) = self.callback {
187 if to_update_progress && !callback(&progress_record) {
188 info!("Download is aborted by the caller");
189 return Err(io::Error::new(
190 io::ErrorKind::Other,
191 "Download is aborted by the caller",
192 ));
193 }
194 }
195
196 Ok(n)
197 }
198 }
199
200 let mut source = DownloadProgress::<'b, 'a> {
201 progress_bar,
202 response,
203 last_print: Instant::now(),
204 current_bytes: 0,
205 last_print_bytes: 0,
206 download_size: (download_size as f32).max(1f32),
207 use_progress_bar,
208 start_time: Instant::now(),
209 callback: progress_notify_callback,
210 notification_count: 0,
211 };
212
213 File::create(&temp_destination_file)
214 .and_then(|mut file| std::io::copy(&mut source, &mut file))
215 .map_err(|err| format!("Unable to write {temp_destination_file:?}: {err:?}"))?;
216
217 source.progress_bar.finish_and_clear();
218 info!(
219 " {}{}",
220 SPARKLE,
221 format!(
222 "Downloaded {} ({} bytes) in {:?}",
223 url,
224 download_size,
225 Instant::now().duration_since(download_start),
226 )
227 );
228
229 std::fs::rename(temp_destination_file, destination_file)
230 .map_err(|err| format!("Unable to rename: {err:?}"))?;
231
232 Ok(())
233}
234
235pub fn download_genesis_if_missing(
236 rpc_addr: &SocketAddr,
237 genesis_package: &Path,
238 use_progress_bar: bool,
239) -> Result<PathBuf, String> {
240 if !genesis_package.exists() {
241 let tmp_genesis_path = genesis_package.parent().unwrap().join("tmp-genesis");
242 let tmp_genesis_package = tmp_genesis_path.join(DEFAULT_GENESIS_ARCHIVE);
243
244 let _ignored = fs::remove_dir_all(&tmp_genesis_path);
245 download_file(
246 &format!("http://{rpc_addr}/{DEFAULT_GENESIS_ARCHIVE}"),
247 &tmp_genesis_package,
248 use_progress_bar,
249 &mut None,
250 )?;
251
252 Ok(tmp_genesis_package)
253 } else {
254 Err("genesis already exists".to_string())
255 }
256}
257
258pub fn download_snapshot_archive(
261 rpc_addr: &SocketAddr,
262 full_snapshot_archives_dir: &Path,
263 incremental_snapshot_archives_dir: &Path,
264 desired_snapshot_hash: (Slot, SnapshotHash),
265 snapshot_type: SnapshotType,
266 maximum_full_snapshot_archives_to_retain: usize,
267 maximum_incremental_snapshot_archives_to_retain: usize,
268 use_progress_bar: bool,
269 progress_notify_callback: &mut DownloadProgressCallbackOption<'_>,
270) -> Result<(), String> {
271 snapshot_utils::purge_old_snapshot_archives(
272 full_snapshot_archives_dir,
273 incremental_snapshot_archives_dir,
274 maximum_full_snapshot_archives_to_retain,
275 maximum_incremental_snapshot_archives_to_retain,
276 );
277
278 let snapshot_archives_remote_dir =
279 snapshot_utils::build_snapshot_archives_remote_dir(match snapshot_type {
280 SnapshotType::FullSnapshot => full_snapshot_archives_dir,
281 SnapshotType::IncrementalSnapshot(_) => incremental_snapshot_archives_dir,
282 });
283 fs::create_dir_all(&snapshot_archives_remote_dir).unwrap();
284
285 for archive_format in [
286 ArchiveFormat::TarZstd,
287 ArchiveFormat::TarGzip,
288 ArchiveFormat::TarBzip2,
289 ArchiveFormat::TarLz4,
290 ArchiveFormat::Tar, ] {
292 let destination_path = match snapshot_type {
293 SnapshotType::FullSnapshot => snapshot_utils::build_full_snapshot_archive_path(
294 &snapshot_archives_remote_dir,
295 desired_snapshot_hash.0,
296 &desired_snapshot_hash.1,
297 archive_format,
298 ),
299 SnapshotType::IncrementalSnapshot(base_slot) => {
300 snapshot_utils::build_incremental_snapshot_archive_path(
301 &snapshot_archives_remote_dir,
302 base_slot,
303 desired_snapshot_hash.0,
304 &desired_snapshot_hash.1,
305 archive_format,
306 )
307 }
308 };
309
310 if destination_path.is_file() {
311 return Ok(());
312 }
313
314 match download_file(
315 &format!(
316 "http://{}/{}",
317 rpc_addr,
318 destination_path.file_name().unwrap().to_str().unwrap()
319 ),
320 &destination_path,
321 use_progress_bar,
322 progress_notify_callback,
323 ) {
324 Ok(()) => return Ok(()),
325 Err(err) => info!("{}", err),
326 }
327 }
328 Err(format!(
329 "Failed to download a snapshot archive for slot {} from {}",
330 desired_snapshot_hash.0, rpc_addr
331 ))
332}