Skip to main content

http_ferry/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::fmt;
4use std::fmt::Write as _;
5use std::future::Future;
6use std::path::PathBuf;
7use std::pin::pin;
8use std::time::{Duration, Instant};
9
10use futures_core::Stream;
11use futures_util::TryStreamExt;
12use md5::Md5;
13use sha1::{Digest, Sha1};
14use url::Url;
15
16mod error;
17mod http;
18pub mod local;
19mod range;
20#[cfg(feature = "s3")]
21pub mod s3;
22
23pub use error::Error;
24pub use http::Downloader;
25use http::is_retryable;
26
27const PROGRESS_INTERVAL: Duration = Duration::from_millis(500);
28
29/// Expected integrity hash supplied by the caller for a transfer. The engine
30/// hashes the byte stream with the matching algorithm and verifies the result;
31/// sinks use it for skip-on-match decisions.
32#[derive(Debug, Clone)]
33pub enum Checksum {
34    Sha1(String),
35    Md5(String),
36}
37
38impl Checksum {
39    /// Lowercase hex digest this checksum carries.
40    pub fn hex(&self) -> &str {
41        match self {
42            Checksum::Sha1(h) | Checksum::Md5(h) => h,
43        }
44    }
45
46    /// Stable short name for the algorithm, used as an object-metadata key.
47    pub fn algorithm(&self) -> &'static str {
48        match self {
49            Checksum::Sha1(_) => "sha1",
50            Checksum::Md5(_) => "md5",
51        }
52    }
53
54    /// A checksum of the same algorithm carrying `value`. Lets a sink rebuild
55    /// the expected variant from a stored metadata string.
56    pub fn with_value(&self, value: String) -> Checksum {
57        match self {
58            Checksum::Sha1(_) => Checksum::Sha1(value),
59            Checksum::Md5(_) => Checksum::Md5(value),
60        }
61    }
62}
63
64/// Streaming hasher selected from the caller's expected [`Checksum`]. `None`
65/// means no checksum was supplied: the engine still counts bytes but reports
66/// `verified: false`.
67pub enum Hasher {
68    None,
69    Sha1(Sha1),
70    Md5(Md5),
71}
72
73impl Hasher {
74    pub fn for_checksum(checksum: Option<&Checksum>) -> Self {
75        match checksum {
76            Some(Checksum::Sha1(_)) => Hasher::Sha1(Sha1::new()),
77            Some(Checksum::Md5(_)) => Hasher::Md5(Md5::new()),
78            None => Hasher::None,
79        }
80    }
81
82    pub fn update(&mut self, bytes: &[u8]) {
83        match self {
84            Hasher::Sha1(h) => h.update(bytes),
85            Hasher::Md5(h) => h.update(bytes),
86            Hasher::None => {}
87        }
88    }
89
90    pub fn finalize_hex(self) -> Option<String> {
91        match self {
92            Hasher::Sha1(h) => Some(to_hex(&h.finalize())),
93            Hasher::Md5(h) => Some(to_hex(&h.finalize())),
94            Hasher::None => None,
95        }
96    }
97}
98
99fn to_hex(bytes: &[u8]) -> String {
100    let mut hex = String::with_capacity(bytes.len() * 2);
101    for b in bytes {
102        write!(&mut hex, "{b:02x}").expect("writing to String cannot fail");
103    }
104    hex
105}
106
107/// One unit of work for the engine. `meta` is the caller's own item type,
108/// carried opaquely and handed straight back via [`Outcome`]; the engine reads
109/// only `size`, `checksum`, and `name`.
110pub struct Transfer<M> {
111    pub size: u64,
112    pub checksum: Option<Checksum>,
113    pub name: String,
114    pub meta: M,
115}
116
117/// Borrowed view of a [`Transfer`] handed to sinks at prepare time. The source
118/// URL is resolved by the engine and `meta` is the caller's concern, so neither
119/// appears here — sinks are domain-agnostic.
120#[derive(Clone, Copy)]
121pub struct Target<'a> {
122    pub name: &'a str,
123    pub size: u64,
124    pub checksum: Option<&'a Checksum>,
125}
126
127pub trait Sink: Send {
128    type Location: Send + 'static;
129
130    fn prepare(
131        &mut self,
132        target: Target<'_>,
133    ) -> impl Future<Output = Result<Prepared<Self::Location>, Error>> + Send;
134
135    fn write_chunk(&mut self, chunk: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
136
137    fn restart(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
138
139    fn finalize(self) -> impl Future<Output = Result<Self::Location, Error>> + Send;
140}
141
142/// Builds a per-item `Sink`. One factory drives a whole stream of items
143/// (singular call sites pass a one-element stream).
144pub trait SinkFactory: Send {
145    type Sink: Sink<Location = Self::Location> + 'static;
146    type Location: DownloadLocation;
147
148    fn make(
149        &mut self,
150        target: Target<'_>,
151    ) -> impl Future<Output = Result<Self::Sink, Error>> + Send;
152}
153
154/// Renders a transfer destination for log lines.
155///
156/// Implemented by location types used with [`Outcome`], such as local paths
157/// and S3 object locations.
158pub trait DownloadLocation: Send + 'static {
159    fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result;
160}
161
162impl DownloadLocation for PathBuf {
163    fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        write!(f, "{}", self.display())
165    }
166}
167
168impl DownloadLocation for String {
169    fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        f.write_str(self)
171    }
172}
173
174/// Short label for a transfer item (e.g. a filename), used by [`Outcome`]'s
175/// `Display` impl to render log lines. Implement it on your `meta` type `M` to
176/// get `Display` for the outcomes carrying it.
177pub trait Label {
178    fn label(&self) -> &str;
179}
180
181/// Per-item outcome of a transfer stream, generic over the caller's item type
182/// `M`. `Failed` carries per-item errors so a single bad item in a batch
183/// doesn't tear down the whole stream. `StreamFailed` carries errors that
184/// happen before an item is available, such as a failed listing request or
185/// destination preflight.
186#[derive(Debug)]
187pub enum Outcome<M, L = PathBuf> {
188    Downloaded {
189        file: M,
190        location: L,
191        verified: bool,
192    },
193    Failed {
194        file: M,
195        error: Error,
196    },
197    Progress {
198        file: M,
199        received: u64,
200        total: u64,
201    },
202    Skipped {
203        file: M,
204        location: L,
205    },
206    StreamFailed {
207        error: Error,
208    },
209}
210
211impl<M: Label, L: DownloadLocation> fmt::Display for Outcome<M, L> {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        match self {
214            Self::Progress {
215                file,
216                received,
217                total,
218            } => {
219                let pct = if *total == 0 {
220                    100.0
221                } else {
222                    (*received as f64 / *total as f64) * 100.0
223                };
224                write!(
225                    f,
226                    "{}: {pct:.1}% ({received} / {total} bytes)",
227                    file.label()
228                )
229            }
230            Self::Downloaded {
231                location, verified, ..
232            } => {
233                write!(f, "downloaded ")?;
234                location.fmt_location(f)?;
235                write!(
236                    f,
237                    " ({})",
238                    if *verified { "verified" } else { "unverified" }
239                )
240            }
241            Self::Failed { file, error } => write!(f, "failed {}: {error}", file.label()),
242            Self::Skipped { location, .. } => {
243                write!(f, "skipped ")?;
244                location.fmt_location(f)?;
245                write!(f, " (already present)")
246            }
247            Self::StreamFailed { error } => write!(f, "stream failed: {error}"),
248        }
249    }
250}
251
252pub enum Prepared<L> {
253    /// Destination already holds this file. Engine yields `Skipped` and stops.
254    Skip { location: L },
255    /// Begin or resume; engine should fetch starting at `received` and continue
256    /// hashing from `partial`. `received == size` is valid and means the engine
257    /// skips the byte fetch and goes straight to finalize.
258    Resume { received: u64, partial: Hasher },
259}
260
261/// One driver for every download path. Pulls items from the input stream,
262/// resolves each item's source URL, asks the factory for a per-item sink, and
263/// runs `run_download`. Per-item errors (url resolution, sink build, transport
264/// failure) yield `Failed` and the loop continues to the next item — a
265/// one-element input stream therefore yields exactly one terminal outcome.
266pub fn drive<'a, M, F, R>(
267    http: &'a Downloader,
268    items: impl Stream<Item = Result<Transfer<M>, Error>> + Send + 'a,
269    mut resolve: R,
270    factory: F,
271) -> impl Stream<Item = Outcome<M, F::Location>> + Send + 'a
272where
273    M: Clone + Send + 'static,
274    F: SinkFactory + 'a,
275    R: FnMut(&Transfer<M>) -> Result<Url, Error> + Send + 'a,
276{
277    async_stream::stream! {
278        let mut factory = factory;
279        let mut items = pin!(items);
280        loop {
281            let transfer = match items.try_next().await {
282                Ok(Some(transfer)) => transfer,
283                Ok(None) => break,
284                Err(error) => {
285                    yield Outcome::StreamFailed { error };
286                    return;
287                }
288            };
289            let url = match resolve(&transfer) {
290                Ok(u) => u,
291                Err(error) => {
292                    yield Outcome::Failed { file: transfer.meta, error };
293                    continue;
294                }
295            };
296            let target = Target {
297                name: &transfer.name,
298                size: transfer.size,
299                checksum: transfer.checksum.as_ref(),
300            };
301            let sink = match factory.make(target).await {
302                Ok(s) => s,
303                Err(error) => {
304                    yield Outcome::Failed { file: transfer.meta, error };
305                    continue;
306                }
307            };
308            let meta_for_err = transfer.meta.clone();
309            let mut events = pin!(run_download(http, url, transfer, sink));
310            loop {
311                match events.try_next().await {
312                    Ok(Some(outcome)) => yield outcome,
313                    Ok(None) => break,
314                    Err(error) => {
315                        yield Outcome::Failed {
316                            file: meta_for_err,
317                            error,
318                        };
319                        break;
320                    }
321                }
322            }
323        }
324    }
325}
326
327/// Streams one item's download. Only emits the happy-path `Outcome` variants
328/// (`Progress`, `Skipped`, `Downloaded`); per-item faults bubble out as `Err`
329/// and `drive` turns them into `Failed`. `StreamFailed` is never produced
330/// here — it's reserved for pre-item errors at the `drive` layer.
331pub fn run_download<M, S>(
332    http: &Downloader,
333    url: Url,
334    transfer: Transfer<M>,
335    sink: S,
336) -> impl Stream<Item = Result<Outcome<M, S::Location>, Error>> + Send + '_
337where
338    M: Clone + Send + 'static,
339    S: Sink + Send + 'static,
340{
341    async_stream::try_stream! {
342        let mut sink = sink;
343
344        let (mut received, mut hasher) = match sink
345            .prepare(Target {
346                name: &transfer.name,
347                size: transfer.size,
348                checksum: transfer.checksum.as_ref(),
349            })
350            .await?
351        {
352            Prepared::Skip { location } => {
353                yield Outcome::Skipped { file: transfer.meta, location };
354                return;
355            }
356            Prepared::Resume { received, partial } => (received, partial),
357        };
358
359        let mut last_progress: Option<Instant> = None;
360        let mut attempts_left = http.max_attempts();
361        let mut delay = http.backoff();
362
363        if received > 0 && received < transfer.size {
364            yield Outcome::Progress {
365                file: transfer.meta.clone(),
366                received,
367                total: transfer.size,
368            };
369            last_progress = Some(Instant::now());
370        }
371
372        'download: while received < transfer.size {
373            let mut response = http.get_response_range(url.clone(), received).await?;
374
375            if received > 0 {
376                match response.status() {
377                    reqwest::StatusCode::OK => {
378                        sink.restart().await?;
379                        received = 0;
380                        hasher = Hasher::for_checksum(transfer.checksum.as_ref());
381                        attempts_left = http.max_attempts();
382                        delay = http.backoff();
383                    }
384                    reqwest::StatusCode::PARTIAL_CONTENT => {
385                        range::validate_content_range(&response, received, transfer.size, &url)?;
386                    }
387                    status => {
388                        Err(Error::InvalidRangeResponse {
389                            url: url.to_string(),
390                            details: format!("expected 200 or 206 for resume, got {status}"),
391                        })?;
392                    }
393                }
394            }
395
396            loop {
397                let chunk = match response.chunk().await {
398                    Ok(Some(chunk)) => chunk,
399                    Ok(None) => break 'download,
400                    Err(e) => {
401                        let err = Error::from(e);
402                        if attempts_left > 1 && is_retryable(&err) {
403                            attempts_left -= 1;
404                            tokio::time::sleep(delay).await;
405                            delay = delay.saturating_mul(2);
406                            continue 'download;
407                        }
408                        Err(err)?;
409                        unreachable!();
410                    }
411                };
412
413                sink.write_chunk(&chunk).await?;
414                hasher.update(&chunk);
415                received += chunk.len() as u64;
416                attempts_left = http.max_attempts();
417                delay = http.backoff();
418
419                let emit = match last_progress {
420                    None => true,
421                    Some(t) => t.elapsed() >= PROGRESS_INTERVAL,
422                };
423                if emit {
424                    yield Outcome::Progress {
425                        file: transfer.meta.clone(),
426                        received,
427                        total: transfer.size,
428                    };
429                    last_progress = Some(Instant::now());
430                }
431            }
432        }
433
434        if received != transfer.size {
435            Err(Error::SizeMismatch {
436                url: url.to_string(),
437                expected: transfer.size,
438                actual: received,
439            })?;
440        }
441
442        let verified = match (transfer.checksum.as_ref(), hasher.finalize_hex()) {
443            (Some(expected), Some(actual)) => {
444                if actual != expected.hex() {
445                    Err(Error::ChecksumMismatch {
446                        algorithm: expected.algorithm(),
447                        url: url.to_string(),
448                        expected: expected.hex().to_owned(),
449                        actual,
450                    })?;
451                }
452                true
453            }
454            _ => false,
455        };
456
457        let location = sink.finalize().await?;
458        yield Outcome::Downloaded { file: transfer.meta, location, verified };
459    }
460}