archive_it_client/downloads/
mod.rs1use std::fmt;
2use std::future::Future;
3use std::path::PathBuf;
4use std::pin::pin;
5use std::time::{Duration, Instant};
6
7use futures_core::Stream;
8use futures_util::TryStreamExt;
9use sha1::{Digest, Sha1};
10use url::Url;
11
12use crate::Error;
13use crate::http::{Transport, is_retryable};
14use crate::models::wasapi::WasapiFile;
15
16pub(crate) mod local;
17mod range;
18pub(crate) mod s3;
19
20const PROGRESS_INTERVAL: Duration = Duration::from_millis(500);
21
22pub(crate) trait Sink: Send {
23 type Location: Send + 'static;
24
25 fn prepare(
26 &mut self,
27 file: &WasapiFile,
28 ) -> impl Future<Output = Result<Prepared<Self::Location>, Error>> + Send;
29
30 fn write_chunk(&mut self, chunk: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
31
32 fn restart(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
33
34 fn finalize(self) -> impl Future<Output = Result<Self::Location, Error>> + Send;
35}
36
37pub(crate) trait SinkFactory: Send {
40 type Sink: Sink<Location = Self::Location> + 'static;
41 type Location: DownloadLocation;
42
43 fn make(&mut self, file: &WasapiFile)
44 -> impl Future<Output = Result<Self::Sink, Error>> + Send;
45}
46
47pub trait DownloadLocation: Send + 'static {
52 fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result;
53}
54
55impl DownloadLocation for PathBuf {
56 fn fmt_location(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 write!(f, "{}", self.display())
58 }
59}
60
61#[derive(Debug)]
67pub enum DownloadOutcome<L = PathBuf> {
68 Downloaded {
69 file: WasapiFile,
70 location: L,
71 verified: bool,
72 },
73 Failed {
74 file: WasapiFile,
75 error: Error,
76 },
77 Progress {
78 file: WasapiFile,
79 received: u64,
80 total: u64,
81 },
82 Skipped {
83 file: WasapiFile,
84 location: L,
85 },
86 StreamFailed {
87 error: Error,
88 },
89}
90
91impl<L: DownloadLocation> fmt::Display for DownloadOutcome<L> {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 match self {
94 Self::Progress {
95 file,
96 received,
97 total,
98 } => {
99 let pct = if *total == 0 {
100 100.0
101 } else {
102 (*received as f64 / *total as f64) * 100.0
103 };
104 write!(
105 f,
106 "{}: {pct:.1}% ({received} / {total} bytes)",
107 file.filename
108 )
109 }
110 Self::Downloaded {
111 file,
112 location,
113 verified,
114 } => {
115 write!(f, "downloaded ")?;
116 location.fmt_location(f)?;
117 if *verified {
118 write!(f, " ({} bytes)", file.size)
119 } else {
120 write!(f, " ({} bytes, unverified)", file.size)
121 }
122 }
123 Self::Failed { file, error } => {
124 write!(f, "failed {}: {error}", file.filename)
125 }
126 Self::Skipped { location, .. } => {
127 write!(f, "skipped ")?;
128 location.fmt_location(f)?;
129 write!(f, " (already present)")
130 }
131 Self::StreamFailed { error } => write!(f, "stream failed: {error}"),
132 }
133 }
134}
135
136pub(crate) enum Prepared<L> {
137 Skip { location: L },
139 Resume { received: u64, partial_sha1: Sha1 },
143}
144
145pub(crate) fn primary_location_url(primary_src: &str, file: &WasapiFile) -> Result<Url, Error> {
148 let location = file
149 .locations
150 .iter()
151 .find(|loc| loc.starts_with(primary_src))
152 .ok_or_else(|| Error::PrimaryLocationMissing {
153 filename: file.filename.clone(),
154 })?;
155 Ok(Url::parse(location)?)
156}
157
158pub(crate) fn drive<'a, F>(
164 transport: &'a Transport,
165 primary_src: &'a str,
166 files: impl Stream<Item = Result<WasapiFile, Error>> + Send + 'a,
167 factory: F,
168) -> impl Stream<Item = DownloadOutcome<F::Location>> + Send + 'a
169where
170 F: SinkFactory + 'a,
171{
172 async_stream::stream! {
173 let mut factory = factory;
174 let mut files = pin!(files);
175 loop {
176 let file = match files.try_next().await {
177 Ok(Some(file)) => file,
178 Ok(None) => break,
179 Err(error) => {
180 yield DownloadOutcome::StreamFailed { error };
181 return;
182 }
183 };
184 let sink = match factory.make(&file).await {
185 Ok(s) => s,
186 Err(error) => {
187 yield DownloadOutcome::Failed { file, error };
188 continue;
189 }
190 };
191 let url = match primary_location_url(primary_src, &file) {
192 Ok(u) => u,
193 Err(error) => {
194 yield DownloadOutcome::Failed { file, error };
195 continue;
196 }
197 };
198 let file_for_err = file.clone();
199 let mut events = pin!(run_download(transport, url, file, sink));
200 loop {
201 match events.try_next().await {
202 Ok(Some(outcome)) => yield outcome,
203 Ok(None) => break,
204 Err(error) => {
205 yield DownloadOutcome::Failed {
206 file: file_for_err,
207 error,
208 };
209 break;
210 }
211 }
212 }
213 }
214 }
215}
216
217pub(crate) fn run_download<S>(
222 transport: &Transport,
223 url: Url,
224 file: WasapiFile,
225 sink: S,
226) -> impl Stream<Item = Result<DownloadOutcome<S::Location>, Error>> + Send + '_
227where
228 S: Sink + Send + 'static,
229{
230 async_stream::try_stream! {
231 let expected_sha1 = file.checksums.sha1.clone();
232 let mut sink = sink;
233
234 let (mut received, mut hasher) = match sink.prepare(&file).await? {
235 Prepared::Skip { location } => {
236 yield DownloadOutcome::Skipped { file, location };
237 return;
238 }
239 Prepared::Resume { received, partial_sha1 } => (received, partial_sha1),
240 };
241
242 let mut last_progress: Option<Instant> = None;
243 let mut attempts_left = transport.max_attempts();
244 let mut delay = transport.backoff();
245
246 if received > 0 && received < file.size {
247 yield DownloadOutcome::Progress {
248 file: file.clone(),
249 received,
250 total: file.size,
251 };
252 last_progress = Some(Instant::now());
253 }
254
255 'download: while received < file.size {
256 let mut response = transport.get_response_range(url.clone(), received).await?;
257
258 if received > 0 {
259 match response.status() {
260 reqwest::StatusCode::OK => {
261 sink.restart().await?;
262 received = 0;
263 hasher = Sha1::new();
264 attempts_left = transport.max_attempts();
265 delay = transport.backoff();
266 }
267 reqwest::StatusCode::PARTIAL_CONTENT => {
268 range::validate_content_range(&response, received, file.size, &url)?;
269 }
270 status => {
271 Err(Error::InvalidRangeResponse {
272 url: url.to_string(),
273 details: format!("expected 200 or 206 for resume, got {status}"),
274 })?;
275 }
276 }
277 }
278
279 loop {
280 let chunk = match response.chunk().await {
281 Ok(Some(chunk)) => chunk,
282 Ok(None) => break 'download,
283 Err(e) => {
284 let err = Error::from(e);
285 if attempts_left > 1 && is_retryable(&err) {
286 attempts_left -= 1;
287 tokio::time::sleep(delay).await;
288 delay = delay.saturating_mul(2);
289 continue 'download;
290 }
291 Err(err)?;
292 unreachable!();
293 }
294 };
295
296 sink.write_chunk(&chunk).await?;
297 hasher.update(&chunk);
298 received += chunk.len() as u64;
299 attempts_left = transport.max_attempts();
300 delay = transport.backoff();
301
302 let emit = match last_progress {
303 None => true,
304 Some(t) => t.elapsed() >= PROGRESS_INTERVAL,
305 };
306 if emit {
307 yield DownloadOutcome::Progress {
308 file: file.clone(),
309 received,
310 total: file.size,
311 };
312 last_progress = Some(Instant::now());
313 }
314 }
315 }
316
317 if received != file.size {
318 Err(Error::SizeMismatch {
319 url: url.to_string(),
320 expected: file.size,
321 actual: received,
322 })?;
323 }
324
325 let verified = if let Some(expected) = expected_sha1.as_deref() {
326 let actual = crate::sha1_hex(hasher.finalize());
327 if actual != expected {
328 Err(Error::ChecksumMismatch {
329 url: url.to_string(),
330 expected: expected.to_owned(),
331 actual,
332 })?;
333 }
334 true
335 } else {
336 false
337 };
338
339 let location = sink.finalize().await?;
340 yield DownloadOutcome::Downloaded { file, location, verified };
341 }
342}