#![doc = include_str!("../README.md")]
use std::fmt;
use std::fmt::Write as _;
use std::future::Future;
use std::path::PathBuf;
use std::pin::pin;
use std::time::{Duration, Instant};
use futures_core::Stream;
use futures_util::TryStreamExt;
use md5::Md5;
use sha1::{Digest, Sha1};
use url::Url;
mod error;
mod http;
pub mod local;
mod range;
#[cfg(feature = "s3")]
pub mod s3;
pub use error::Error;
use http::is_retryable;
pub use http::{Downloader, DownloaderBuilder};
const PROGRESS_INTERVAL: Duration = Duration::from_millis(500);
#[derive(Debug, Clone)]
pub enum Checksum {
Sha1(String),
Md5(String),
}
impl Checksum {
pub fn hex(&self) -> &str {
match self {
Checksum::Sha1(h) | Checksum::Md5(h) => h,
}
}
pub fn algorithm(&self) -> &'static str {
match self {
Checksum::Sha1(_) => "sha1",
Checksum::Md5(_) => "md5",
}
}
pub fn with_value(&self, value: String) -> Checksum {
match self {
Checksum::Sha1(_) => Checksum::Sha1(value),
Checksum::Md5(_) => Checksum::Md5(value),
}
}
}
pub enum Hasher {
None,
Sha1(Sha1),
Md5(Md5),
}
impl Hasher {
pub fn for_checksum(checksum: Option<&Checksum>) -> Self {
match checksum {
Some(Checksum::Sha1(_)) => Hasher::Sha1(Sha1::new()),
Some(Checksum::Md5(_)) => Hasher::Md5(Md5::new()),
None => Hasher::None,
}
}
pub fn update(&mut self, bytes: &[u8]) {
match self {
Hasher::Sha1(h) => h.update(bytes),
Hasher::Md5(h) => h.update(bytes),
Hasher::None => {}
}
}
pub fn finalize_hex(self) -> Option<String> {
match self {
Hasher::Sha1(h) => Some(to_hex(&h.finalize())),
Hasher::Md5(h) => Some(to_hex(&h.finalize())),
Hasher::None => None,
}
}
}
fn to_hex(bytes: &[u8]) -> String {
let mut hex = String::with_capacity(bytes.len() * 2);
for b in bytes {
write!(&mut hex, "{b:02x}").expect("writing to String cannot fail");
}
hex
}
pub trait Source: Clone + Send + 'static {
fn name(&self) -> &str;
fn size(&self) -> u64;
fn checksum(&self) -> Option<Checksum> {
None
}
}
#[derive(Debug, Clone)]
pub struct Download {
pub url: Url,
pub size: u64,
pub checksum: Option<Checksum>,
pub name: String,
}
impl Source for Download {
fn name(&self) -> &str {
&self.name
}
fn size(&self) -> u64 {
self.size
}
fn checksum(&self) -> Option<Checksum> {
self.checksum.clone()
}
}
#[derive(Clone, Copy)]
pub struct Target<'a> {
pub name: &'a str,
pub size: u64,
pub checksum: Option<&'a Checksum>,
}
pub trait Sink: Send {
type Location: Send + 'static;
fn prepare(
&mut self,
target: Target<'_>,
) -> impl Future<Output = Result<Prepared<Self::Location>, Error>> + Send;
fn write_chunk(&mut self, chunk: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
fn restart(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
fn finalize(self) -> impl Future<Output = Result<Self::Location, Error>> + Send;
}
pub trait SinkFactory: Send {
type Sink: Sink<Location = Self::Location> + 'static;
type Location: DownloadLocation;
fn make(
&mut self,
target: Target<'_>,
) -> impl Future<Output = Result<Self::Sink, Error>> + Send;
}
pub trait DownloadLocation: Send + 'static {
fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result;
}
impl DownloadLocation for PathBuf {
fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.display())
}
}
impl DownloadLocation for String {
fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self)
}
}
#[derive(Debug)]
pub enum Outcome<M, L = PathBuf> {
Downloaded {
file: M,
location: L,
verified: bool,
},
Failed {
file: M,
error: Error,
},
Progress {
file: M,
received: u64,
total: u64,
},
Skipped {
file: M,
location: L,
},
StreamFailed {
error: Error,
},
}
impl<M: Source, L: DownloadLocation> fmt::Display for Outcome<M, L> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Progress {
file,
received,
total,
} => {
let pct = if *total == 0 {
100.0
} else {
(*received as f64 / *total as f64) * 100.0
};
write!(f, "{}: {pct:.1}% ({received} / {total} bytes)", file.name())
}
Self::Downloaded {
location, verified, ..
} => {
write!(f, "downloaded ")?;
location.fmt_location(f)?;
write!(
f,
" ({})",
if *verified { "verified" } else { "unverified" }
)
}
Self::Failed { file, error } => write!(f, "failed {}: {error}", file.name()),
Self::Skipped { location, .. } => {
write!(f, "skipped ")?;
location.fmt_location(f)?;
write!(f, " (already present)")
}
Self::StreamFailed { error } => write!(f, "stream failed: {error}"),
}
}
}
pub enum Prepared<L> {
Skip { location: L },
Resume { received: u64, partial: Hasher },
}
pub fn drive<'a, M, F, R>(
http: &'a Downloader,
items: impl Stream<Item = Result<M, Error>> + Send + 'a,
mut resolve: R,
factory: F,
) -> impl Stream<Item = Outcome<M, F::Location>> + Send + 'a
where
M: Source,
F: SinkFactory + 'a,
R: FnMut(&M) -> Result<Url, Error> + Send + 'a,
{
async_stream::stream! {
let mut factory = factory;
let mut items = pin!(items);
loop {
let item = match items.try_next().await {
Ok(Some(item)) => item,
Ok(None) => break,
Err(error) => {
yield Outcome::StreamFailed { error };
return;
}
};
let url = match resolve(&item) {
Ok(u) => u,
Err(error) => {
yield Outcome::Failed { file: item, error };
continue;
}
};
let checksum = item.checksum();
let target = Target {
name: item.name(),
size: item.size(),
checksum: checksum.as_ref(),
};
let sink = match factory.make(target).await {
Ok(s) => s,
Err(error) => {
yield Outcome::Failed { file: item, error };
continue;
}
};
let item_for_err = item.clone();
let mut events = pin!(run_download(http, url, item, sink));
loop {
match events.try_next().await {
Ok(Some(outcome)) => yield outcome,
Ok(None) => break,
Err(error) => {
yield Outcome::Failed {
file: item_for_err,
error,
};
break;
}
}
}
}
}
}
pub fn drive_downloads<'a, F>(
http: &'a Downloader,
items: impl Stream<Item = Result<Download, Error>> + Send + 'a,
factory: F,
) -> impl Stream<Item = Outcome<Download, F::Location>> + Send + 'a
where
F: SinkFactory + 'a,
{
drive(http, items, |download| Ok(download.url.clone()), factory)
}
pub fn run_download<M, S>(
http: &Downloader,
url: Url,
item: M,
sink: S,
) -> impl Stream<Item = Result<Outcome<M, S::Location>, Error>> + Send + '_
where
M: Source,
S: Sink + Send + 'static,
{
async_stream::try_stream! {
let mut sink = sink;
let size = item.size();
let checksum = item.checksum();
let (mut received, mut hasher) = match sink
.prepare(Target {
name: item.name(),
size,
checksum: checksum.as_ref(),
})
.await?
{
Prepared::Skip { location } => {
yield Outcome::Skipped { file: item, location };
return;
}
Prepared::Resume { received, partial } => (received, partial),
};
let mut last_progress: Option<Instant> = None;
let mut attempts_left = http.max_attempts();
let mut delay = http.backoff();
if received > 0 && received < size {
yield Outcome::Progress {
file: item.clone(),
received,
total: size,
};
last_progress = Some(Instant::now());
}
'download: while received < size {
let mut response = http.get_response_range(url.clone(), received).await?;
if received > 0 {
match response.status() {
reqwest::StatusCode::OK => {
sink.restart().await?;
received = 0;
hasher = Hasher::for_checksum(checksum.as_ref());
attempts_left = http.max_attempts();
delay = http.backoff();
}
reqwest::StatusCode::PARTIAL_CONTENT => {
range::validate_content_range(&response, received, size, &url)?;
}
status => {
Err(Error::InvalidRangeResponse {
url: url.to_string(),
details: format!("expected 200 or 206 for resume, got {status}"),
})?;
}
}
}
loop {
let chunk = match response.chunk().await {
Ok(Some(chunk)) => chunk,
Ok(None) => break 'download,
Err(e) => {
let err = Error::from(e);
if attempts_left > 1 && is_retryable(&err) {
attempts_left -= 1;
tokio::time::sleep(delay).await;
delay = delay.saturating_mul(2);
continue 'download;
}
Err(err)?;
unreachable!();
}
};
sink.write_chunk(&chunk).await?;
hasher.update(&chunk);
received += chunk.len() as u64;
attempts_left = http.max_attempts();
delay = http.backoff();
let emit = match last_progress {
None => true,
Some(t) => t.elapsed() >= PROGRESS_INTERVAL,
};
if emit {
yield Outcome::Progress {
file: item.clone(),
received,
total: size,
};
last_progress = Some(Instant::now());
}
}
}
if received != size {
Err(Error::SizeMismatch {
url: url.to_string(),
expected: size,
actual: received,
})?;
}
let verified = match (checksum.as_ref(), hasher.finalize_hex()) {
(Some(expected), Some(actual)) => {
if actual != expected.hex() {
Err(Error::ChecksumMismatch {
algorithm: expected.algorithm(),
url: url.to_string(),
expected: expected.hex().to_owned(),
actual,
})?;
}
true
}
_ => false,
};
let location = sink.finalize().await?;
yield Outcome::Downloaded { file: item, location, verified };
}
}