1#![doc = include_str!("../README.md")]
2
3use std::fmt;
4use std::fmt::Write as _;
5use std::future::Future;
6use std::path::PathBuf;
7use std::pin::pin;
8use std::time::{Duration, Instant};
9
10use futures_core::Stream;
11use futures_util::TryStreamExt;
12use md5::Md5;
13use sha1::{Digest, Sha1};
14use url::Url;
15
16mod error;
17mod http;
18pub mod local;
19mod range;
20#[cfg(feature = "s3")]
21pub mod s3;
22
23pub use error::Error;
24pub use http::Downloader;
25use http::is_retryable;
26
27const PROGRESS_INTERVAL: Duration = Duration::from_millis(500);
28
29#[derive(Debug, Clone)]
33pub enum Checksum {
34 Sha1(String),
35 Md5(String),
36}
37
38impl Checksum {
39 pub fn hex(&self) -> &str {
41 match self {
42 Checksum::Sha1(h) | Checksum::Md5(h) => h,
43 }
44 }
45
46 pub fn algorithm(&self) -> &'static str {
48 match self {
49 Checksum::Sha1(_) => "sha1",
50 Checksum::Md5(_) => "md5",
51 }
52 }
53
54 pub fn with_value(&self, value: String) -> Checksum {
57 match self {
58 Checksum::Sha1(_) => Checksum::Sha1(value),
59 Checksum::Md5(_) => Checksum::Md5(value),
60 }
61 }
62}
63
64pub enum Hasher {
68 None,
69 Sha1(Sha1),
70 Md5(Md5),
71}
72
73impl Hasher {
74 pub fn for_checksum(checksum: Option<&Checksum>) -> Self {
75 match checksum {
76 Some(Checksum::Sha1(_)) => Hasher::Sha1(Sha1::new()),
77 Some(Checksum::Md5(_)) => Hasher::Md5(Md5::new()),
78 None => Hasher::None,
79 }
80 }
81
82 pub fn update(&mut self, bytes: &[u8]) {
83 match self {
84 Hasher::Sha1(h) => h.update(bytes),
85 Hasher::Md5(h) => h.update(bytes),
86 Hasher::None => {}
87 }
88 }
89
90 pub fn finalize_hex(self) -> Option<String> {
91 match self {
92 Hasher::Sha1(h) => Some(to_hex(&h.finalize())),
93 Hasher::Md5(h) => Some(to_hex(&h.finalize())),
94 Hasher::None => None,
95 }
96 }
97}
98
99fn to_hex(bytes: &[u8]) -> String {
100 let mut hex = String::with_capacity(bytes.len() * 2);
101 for b in bytes {
102 write!(&mut hex, "{b:02x}").expect("writing to String cannot fail");
103 }
104 hex
105}
106
107pub struct Transfer<M> {
111 pub size: u64,
112 pub checksum: Option<Checksum>,
113 pub name: String,
114 pub meta: M,
115}
116
117#[derive(Clone, Copy)]
121pub struct Target<'a> {
122 pub name: &'a str,
123 pub size: u64,
124 pub checksum: Option<&'a Checksum>,
125}
126
127pub trait Sink: Send {
128 type Location: Send + 'static;
129
130 fn prepare(
131 &mut self,
132 target: Target<'_>,
133 ) -> impl Future<Output = Result<Prepared<Self::Location>, Error>> + Send;
134
135 fn write_chunk(&mut self, chunk: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
136
137 fn restart(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
138
139 fn finalize(self) -> impl Future<Output = Result<Self::Location, Error>> + Send;
140}
141
142pub trait SinkFactory: Send {
145 type Sink: Sink<Location = Self::Location> + 'static;
146 type Location: DownloadLocation;
147
148 fn make(
149 &mut self,
150 target: Target<'_>,
151 ) -> impl Future<Output = Result<Self::Sink, Error>> + Send;
152}
153
154pub trait DownloadLocation: Send + 'static {
159 fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result;
160}
161
162impl DownloadLocation for PathBuf {
163 fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 write!(f, "{}", self.display())
165 }
166}
167
168impl DownloadLocation for String {
169 fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 f.write_str(self)
171 }
172}
173
174pub trait Label {
178 fn label(&self) -> &str;
179}
180
181#[derive(Debug)]
187pub enum Outcome<M, L = PathBuf> {
188 Downloaded {
189 file: M,
190 location: L,
191 verified: bool,
192 },
193 Failed {
194 file: M,
195 error: Error,
196 },
197 Progress {
198 file: M,
199 received: u64,
200 total: u64,
201 },
202 Skipped {
203 file: M,
204 location: L,
205 },
206 StreamFailed {
207 error: Error,
208 },
209}
210
211impl<M: Label, L: DownloadLocation> fmt::Display for Outcome<M, L> {
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 match self {
214 Self::Progress {
215 file,
216 received,
217 total,
218 } => {
219 let pct = if *total == 0 {
220 100.0
221 } else {
222 (*received as f64 / *total as f64) * 100.0
223 };
224 write!(
225 f,
226 "{}: {pct:.1}% ({received} / {total} bytes)",
227 file.label()
228 )
229 }
230 Self::Downloaded {
231 location, verified, ..
232 } => {
233 write!(f, "downloaded ")?;
234 location.fmt_location(f)?;
235 write!(
236 f,
237 " ({})",
238 if *verified { "verified" } else { "unverified" }
239 )
240 }
241 Self::Failed { file, error } => write!(f, "failed {}: {error}", file.label()),
242 Self::Skipped { location, .. } => {
243 write!(f, "skipped ")?;
244 location.fmt_location(f)?;
245 write!(f, " (already present)")
246 }
247 Self::StreamFailed { error } => write!(f, "stream failed: {error}"),
248 }
249 }
250}
251
252pub enum Prepared<L> {
253 Skip { location: L },
255 Resume { received: u64, partial: Hasher },
259}
260
261pub fn drive<'a, M, F, R>(
267 http: &'a Downloader,
268 items: impl Stream<Item = Result<Transfer<M>, Error>> + Send + 'a,
269 mut resolve: R,
270 factory: F,
271) -> impl Stream<Item = Outcome<M, F::Location>> + Send + 'a
272where
273 M: Clone + Send + 'static,
274 F: SinkFactory + 'a,
275 R: FnMut(&Transfer<M>) -> Result<Url, Error> + Send + 'a,
276{
277 async_stream::stream! {
278 let mut factory = factory;
279 let mut items = pin!(items);
280 loop {
281 let transfer = match items.try_next().await {
282 Ok(Some(transfer)) => transfer,
283 Ok(None) => break,
284 Err(error) => {
285 yield Outcome::StreamFailed { error };
286 return;
287 }
288 };
289 let url = match resolve(&transfer) {
290 Ok(u) => u,
291 Err(error) => {
292 yield Outcome::Failed { file: transfer.meta, error };
293 continue;
294 }
295 };
296 let target = Target {
297 name: &transfer.name,
298 size: transfer.size,
299 checksum: transfer.checksum.as_ref(),
300 };
301 let sink = match factory.make(target).await {
302 Ok(s) => s,
303 Err(error) => {
304 yield Outcome::Failed { file: transfer.meta, error };
305 continue;
306 }
307 };
308 let meta_for_err = transfer.meta.clone();
309 let mut events = pin!(run_download(http, url, transfer, sink));
310 loop {
311 match events.try_next().await {
312 Ok(Some(outcome)) => yield outcome,
313 Ok(None) => break,
314 Err(error) => {
315 yield Outcome::Failed {
316 file: meta_for_err,
317 error,
318 };
319 break;
320 }
321 }
322 }
323 }
324 }
325}
326
327pub fn run_download<M, S>(
332 http: &Downloader,
333 url: Url,
334 transfer: Transfer<M>,
335 sink: S,
336) -> impl Stream<Item = Result<Outcome<M, S::Location>, Error>> + Send + '_
337where
338 M: Clone + Send + 'static,
339 S: Sink + Send + 'static,
340{
341 async_stream::try_stream! {
342 let mut sink = sink;
343
344 let (mut received, mut hasher) = match sink
345 .prepare(Target {
346 name: &transfer.name,
347 size: transfer.size,
348 checksum: transfer.checksum.as_ref(),
349 })
350 .await?
351 {
352 Prepared::Skip { location } => {
353 yield Outcome::Skipped { file: transfer.meta, location };
354 return;
355 }
356 Prepared::Resume { received, partial } => (received, partial),
357 };
358
359 let mut last_progress: Option<Instant> = None;
360 let mut attempts_left = http.max_attempts();
361 let mut delay = http.backoff();
362
363 if received > 0 && received < transfer.size {
364 yield Outcome::Progress {
365 file: transfer.meta.clone(),
366 received,
367 total: transfer.size,
368 };
369 last_progress = Some(Instant::now());
370 }
371
372 'download: while received < transfer.size {
373 let mut response = http.get_response_range(url.clone(), received).await?;
374
375 if received > 0 {
376 match response.status() {
377 reqwest::StatusCode::OK => {
378 sink.restart().await?;
379 received = 0;
380 hasher = Hasher::for_checksum(transfer.checksum.as_ref());
381 attempts_left = http.max_attempts();
382 delay = http.backoff();
383 }
384 reqwest::StatusCode::PARTIAL_CONTENT => {
385 range::validate_content_range(&response, received, transfer.size, &url)?;
386 }
387 status => {
388 Err(Error::InvalidRangeResponse {
389 url: url.to_string(),
390 details: format!("expected 200 or 206 for resume, got {status}"),
391 })?;
392 }
393 }
394 }
395
396 loop {
397 let chunk = match response.chunk().await {
398 Ok(Some(chunk)) => chunk,
399 Ok(None) => break 'download,
400 Err(e) => {
401 let err = Error::from(e);
402 if attempts_left > 1 && is_retryable(&err) {
403 attempts_left -= 1;
404 tokio::time::sleep(delay).await;
405 delay = delay.saturating_mul(2);
406 continue 'download;
407 }
408 Err(err)?;
409 unreachable!();
410 }
411 };
412
413 sink.write_chunk(&chunk).await?;
414 hasher.update(&chunk);
415 received += chunk.len() as u64;
416 attempts_left = http.max_attempts();
417 delay = http.backoff();
418
419 let emit = match last_progress {
420 None => true,
421 Some(t) => t.elapsed() >= PROGRESS_INTERVAL,
422 };
423 if emit {
424 yield Outcome::Progress {
425 file: transfer.meta.clone(),
426 received,
427 total: transfer.size,
428 };
429 last_progress = Some(Instant::now());
430 }
431 }
432 }
433
434 if received != transfer.size {
435 Err(Error::SizeMismatch {
436 url: url.to_string(),
437 expected: transfer.size,
438 actual: received,
439 })?;
440 }
441
442 let verified = match (transfer.checksum.as_ref(), hasher.finalize_hex()) {
443 (Some(expected), Some(actual)) => {
444 if actual != expected.hex() {
445 Err(Error::ChecksumMismatch {
446 algorithm: expected.algorithm(),
447 url: url.to_string(),
448 expected: expected.hex().to_owned(),
449 actual,
450 })?;
451 }
452 true
453 }
454 _ => false,
455 };
456
457 let location = sink.finalize().await?;
458 yield Outcome::Downloaded { file: transfer.meta, location, verified };
459 }
460}