Skip to main content

archive_it_client/downloads/
mod.rs

1use std::fmt;
2use std::future::Future;
3use std::path::PathBuf;
4use std::pin::pin;
5use std::time::{Duration, Instant};
6
7use futures_core::Stream;
8use futures_util::TryStreamExt;
9use sha1::{Digest, Sha1};
10use url::Url;
11
12use crate::Error;
13use crate::http::{Transport, is_retryable};
14use crate::models::wasapi::WasapiFile;
15
16pub(crate) mod local;
17mod range;
18pub(crate) mod s3;
19
20const PROGRESS_INTERVAL: Duration = Duration::from_millis(500);
21
22pub(crate) trait Sink: Send {
23    type Location: Send + 'static;
24
25    fn prepare(
26        &mut self,
27        file: &WasapiFile,
28    ) -> impl Future<Output = Result<Prepared<Self::Location>, Error>> + Send;
29
30    fn write_chunk(&mut self, chunk: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
31
32    fn restart(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
33
34    fn finalize(self) -> impl Future<Output = Result<Self::Location, Error>> + Send;
35}
36
37/// Builds a per-file `Sink`. One factory drives a whole stream of files
38/// (singular call sites pass a one-element file stream).
39pub(crate) trait SinkFactory: Send {
40    type Sink: Sink<Location = Self::Location> + 'static;
41    type Location: DownloadLocation;
42
43    fn make(&mut self, file: &WasapiFile)
44    -> impl Future<Output = Result<Self::Sink, Error>> + Send;
45}
46
47/// Renders a download destination for log lines.
48///
49/// Implemented by location types used with `DownloadOutcome<L>`, such as
50/// local paths and S3 object locations.
51pub trait DownloadLocation: Send + 'static {
52    fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result;
53}
54
55impl DownloadLocation for PathBuf {
56    fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        write!(f, "{}", self.display())
58    }
59}
60
61/// Public per-file outcome surfaced to callers of every `WasapiClient::download*`
62/// method. `Failed` carries per-file errors so a single bad file in a
63/// collection doesn't tear down the whole stream. `StreamFailed` carries
64/// errors that happen before a file is available, such as a failed collection
65/// listing request or destination preflight.
66#[derive(Debug)]
67pub enum DownloadOutcome<L = PathBuf> {
68    Downloaded {
69        file: WasapiFile,
70        location: L,
71        verified: bool,
72    },
73    Failed {
74        file: WasapiFile,
75        error: Error,
76    },
77    Progress {
78        file: WasapiFile,
79        received: u64,
80        total: u64,
81    },
82    Skipped {
83        file: WasapiFile,
84        location: L,
85    },
86    StreamFailed {
87        error: Error,
88    },
89}
90
91impl<L: DownloadLocation> fmt::Display for DownloadOutcome<L> {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        match self {
94            Self::Progress {
95                file,
96                received,
97                total,
98            } => {
99                let pct = if *total == 0 {
100                    100.0
101                } else {
102                    (*received as f64 / *total as f64) * 100.0
103                };
104                write!(
105                    f,
106                    "{}: {pct:.1}% ({received} / {total} bytes)",
107                    file.filename
108                )
109            }
110            Self::Downloaded {
111                file,
112                location,
113                verified,
114            } => {
115                write!(f, "downloaded ")?;
116                location.fmt_location(f)?;
117                if *verified {
118                    write!(f, " ({} bytes)", file.size)
119                } else {
120                    write!(f, " ({} bytes, unverified)", file.size)
121                }
122            }
123            Self::Failed { file, error } => {
124                write!(f, "failed {}: {error}", file.filename)
125            }
126            Self::Skipped { location, .. } => {
127                write!(f, "skipped ")?;
128                location.fmt_location(f)?;
129                write!(f, " (already present)")
130            }
131            Self::StreamFailed { error } => write!(f, "stream failed: {error}"),
132        }
133    }
134}
135
136pub(crate) enum Prepared<L> {
137    /// Destination already holds this file. Engine yields `Skipped` and stops.
138    Skip { location: L },
139    /// Begin or resume; engine should fetch starting at `received` and continue
140    /// hashing from `partial_sha1`. `received == file.size` is valid and means
141    /// the engine skips the byte fetch and goes straight to finalize.
142    Resume { received: u64, partial_sha1: Sha1 },
143}
144
145/// Resolve the WARC URL for `file`. Free fn so the driver doesn't need to
146/// borrow `WasapiClient`; it just gets the configured primary location prefix.
147pub(crate) fn primary_location_url(primary_src: &str, file: &WasapiFile) -> Result<Url, Error> {
148    let location = file
149        .locations
150        .iter()
151        .find(|loc| loc.starts_with(primary_src))
152        .ok_or_else(|| Error::PrimaryLocationMissing {
153            filename: file.filename.clone(),
154        })?;
155    Ok(Url::parse(location)?)
156}
157
158/// One driver for every download path. Pulls files from the input stream,
159/// asks the factory for a per-file sink, and runs `run_download`. Per-file
160/// errors (sink build, url resolution, transport failure) yield `Failed` and
161/// the loop continues to the next file — a one-element file stream therefore
162/// yields exactly one terminal outcome.
163pub(crate) fn drive<'a, F>(
164    transport: &'a Transport,
165    primary_src: &'a str,
166    files: impl Stream<Item = Result<WasapiFile, Error>> + Send + 'a,
167    factory: F,
168) -> impl Stream<Item = DownloadOutcome<F::Location>> + Send + 'a
169where
170    F: SinkFactory + 'a,
171{
172    async_stream::stream! {
173        let mut factory = factory;
174        let mut files = pin!(files);
175        loop {
176            let file = match files.try_next().await {
177                Ok(Some(file)) => file,
178                Ok(None) => break,
179                Err(error) => {
180                    yield DownloadOutcome::StreamFailed { error };
181                    return;
182                }
183            };
184            let sink = match factory.make(&file).await {
185                Ok(s) => s,
186                Err(error) => {
187                    yield DownloadOutcome::Failed { file, error };
188                    continue;
189                }
190            };
191            let url = match primary_location_url(primary_src, &file) {
192                Ok(u) => u,
193                Err(error) => {
194                    yield DownloadOutcome::Failed { file, error };
195                    continue;
196                }
197            };
198            let file_for_err = file.clone();
199            let mut events = pin!(run_download(transport, url, file, sink));
200            loop {
201                match events.try_next().await {
202                    Ok(Some(outcome)) => yield outcome,
203                    Ok(None) => break,
204                    Err(error) => {
205                        yield DownloadOutcome::Failed {
206                            file: file_for_err,
207                            error,
208                        };
209                        break;
210                    }
211                }
212            }
213        }
214    }
215}
216
217/// Streams one file's download. Only emits the happy-path `DownloadOutcome`
218/// variants (`Progress`, `Skipped`, `Downloaded`); per-file faults bubble out
219/// as `Err` and `drive` turns them into `Failed`. `StreamFailed` is never
220/// produced here — it's reserved for pre-file errors at the `drive` layer.
221pub(crate) fn run_download<S>(
222    transport: &Transport,
223    url: Url,
224    file: WasapiFile,
225    sink: S,
226) -> impl Stream<Item = Result<DownloadOutcome<S::Location>, Error>> + Send + '_
227where
228    S: Sink + Send + 'static,
229{
230    async_stream::try_stream! {
231        let expected_sha1 = file.checksums.sha1.clone();
232        let mut sink = sink;
233
234        let (mut received, mut hasher) = match sink.prepare(&file).await? {
235            Prepared::Skip { location } => {
236                yield DownloadOutcome::Skipped { file, location };
237                return;
238            }
239            Prepared::Resume { received, partial_sha1 } => (received, partial_sha1),
240        };
241
242        let mut last_progress: Option<Instant> = None;
243        let mut attempts_left = transport.max_attempts();
244        let mut delay = transport.backoff();
245
246        if received > 0 && received < file.size {
247            yield DownloadOutcome::Progress {
248                file: file.clone(),
249                received,
250                total: file.size,
251            };
252            last_progress = Some(Instant::now());
253        }
254
255        'download: while received < file.size {
256            let mut response = transport.get_response_range(url.clone(), received).await?;
257
258            if received > 0 {
259                match response.status() {
260                    reqwest::StatusCode::OK => {
261                        sink.restart().await?;
262                        received = 0;
263                        hasher = Sha1::new();
264                        attempts_left = transport.max_attempts();
265                        delay = transport.backoff();
266                    }
267                    reqwest::StatusCode::PARTIAL_CONTENT => {
268                        range::validate_content_range(&response, received, file.size, &url)?;
269                    }
270                    status => {
271                        Err(Error::InvalidRangeResponse {
272                            url: url.to_string(),
273                            details: format!("expected 200 or 206 for resume, got {status}"),
274                        })?;
275                    }
276                }
277            }
278
279            loop {
280                let chunk = match response.chunk().await {
281                    Ok(Some(chunk)) => chunk,
282                    Ok(None) => break 'download,
283                    Err(e) => {
284                        let err = Error::from(e);
285                        if attempts_left > 1 && is_retryable(&err) {
286                            attempts_left -= 1;
287                            tokio::time::sleep(delay).await;
288                            delay = delay.saturating_mul(2);
289                            continue 'download;
290                        }
291                        Err(err)?;
292                        unreachable!();
293                    }
294                };
295
296                sink.write_chunk(&chunk).await?;
297                hasher.update(&chunk);
298                received += chunk.len() as u64;
299                attempts_left = transport.max_attempts();
300                delay = transport.backoff();
301
302                let emit = match last_progress {
303                    None => true,
304                    Some(t) => t.elapsed() >= PROGRESS_INTERVAL,
305                };
306                if emit {
307                    yield DownloadOutcome::Progress {
308                        file: file.clone(),
309                        received,
310                        total: file.size,
311                    };
312                    last_progress = Some(Instant::now());
313                }
314            }
315        }
316
317        if received != file.size {
318            Err(Error::SizeMismatch {
319                url: url.to_string(),
320                expected: file.size,
321                actual: received,
322            })?;
323        }
324
325        let verified = if let Some(expected) = expected_sha1.as_deref() {
326            let actual = crate::sha1_hex(hasher.finalize());
327            if actual != expected {
328                Err(Error::ChecksumMismatch {
329                    url: url.to_string(),
330                    expected: expected.to_owned(),
331                    actual,
332                })?;
333            }
334            true
335        } else {
336            false
337        };
338
339        let location = sink.finalize().await?;
340        yield DownloadOutcome::Downloaded { file, location, verified };
341    }
342}