Skip to main content

archive_it_client/
wasapi.rs

1use std::path::{Path, PathBuf};
2use std::pin::pin;
3
4use aws_sdk_s3::Client as AwsS3Client;
5use futures_core::Stream;
6use futures_util::{StreamExt, TryStreamExt};
7use http_ferry::Downloader;
8use http_ferry::local::{LocalDir, LocalPath};
9use http_ferry::s3::{S3Dest, S3Location};
10use serde::Serialize;
11use url::Url;
12
13use crate::http::Transport;
14use crate::models::wasapi::{Page, WasapiFile};
15use crate::{Config, DownloadOutcome, Error, USER_AGENT};
16
17pub const PRIMARY_LOCATION_SRC: &str = "https://warcs.archive-it.org";
18pub const DEFAULT_WEBDATA_PAGE_SIZE: u32 = 50;
19
20#[derive(Debug, Clone, Serialize)]
21pub struct WebdataQuery {
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub filename: Option<String>,
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub filetype: Option<String>,
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub collection: Option<u64>,
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub crawl: Option<u64>,
30    #[serde(rename = "crawl-time-after", skip_serializing_if = "Option::is_none")]
31    pub crawl_time_after: Option<String>,
32    #[serde(rename = "crawl-time-before", skip_serializing_if = "Option::is_none")]
33    pub crawl_time_before: Option<String>,
34    #[serde(rename = "crawl-start-after", skip_serializing_if = "Option::is_none")]
35    pub crawl_start_after: Option<String>,
36    #[serde(rename = "crawl-start-before", skip_serializing_if = "Option::is_none")]
37    pub crawl_start_before: Option<String>,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub page: Option<u32>,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub page_size: Option<u32>,
42}
43
44impl Default for WebdataQuery {
45    fn default() -> Self {
46        Self {
47            filename: None,
48            filetype: None,
49            collection: None,
50            crawl: None,
51            crawl_time_after: None,
52            crawl_time_before: None,
53            crawl_start_after: None,
54            crawl_start_before: None,
55            page: None,
56            page_size: Some(DEFAULT_WEBDATA_PAGE_SIZE),
57        }
58    }
59}
60
61pub struct WasapiClient {
62    transport: Transport,
63    downloader: Downloader,
64    primary_location_src: String,
65}
66
67impl WasapiClient {
68    pub fn new(username: impl Into<String>, password: impl Into<String>) -> Result<Self, Error> {
69        Self::with_config(username, password, Config::wasapi())
70    }
71
72    pub fn with_config(
73        username: impl Into<String>,
74        password: impl Into<String>,
75        cfg: Config,
76    ) -> Result<Self, Error> {
77        let creds = (username.into(), password.into());
78        let transport = Transport::new(cfg.clone(), Some(creds.clone()))?;
79        let download_client = reqwest::Client::builder()
80            .user_agent(USER_AGENT)
81            .read_timeout(cfg.download_timeout)
82            .build()?;
83        let downloader = Downloader::builder(download_client)
84            .max_attempts(cfg.max_attempts)
85            .backoff(cfg.backoff)
86            .customize_request(move |req| req.basic_auth(&creds.0, Some(&creds.1)))
87            .build();
88        Ok(Self {
89            transport,
90            downloader,
91            primary_location_src: PRIMARY_LOCATION_SRC.into(),
92        })
93    }
94
95    pub fn with_primary_location_src(mut self, src: impl Into<String>) -> Self {
96        self.primary_location_src = src.into();
97        self
98    }
99
100    pub fn download(
101        &self,
102        file: WasapiFile,
103        path: impl AsRef<Path>,
104    ) -> impl Stream<Item = DownloadOutcome> + Send + '_ {
105        let path = path.as_ref().to_path_buf();
106        http_ferry::drive(
107            &self.downloader,
108            single_file(file).map_err(to_ferry),
109            self.resolver(),
110            LocalPath { path },
111        )
112    }
113
114    pub fn download_collection(
115        &self,
116        query: WebdataQuery,
117        dir: impl Into<PathBuf>,
118    ) -> impl Stream<Item = DownloadOutcome> + Send + '_ {
119        let dir = dir.into();
120        async_stream::stream! {
121            // Preflight the destination once so a bad output path fails the
122            // stream upfront instead of yielding one Failed per file. Also
123            // ensures the directory exists when the collection is empty.
124            if let Err(error) = tokio::fs::create_dir_all(&dir).await {
125                yield DownloadOutcome::StreamFailed {
126                    error: http_ferry::Error::Io(error),
127                };
128                return;
129            }
130            let inner = http_ferry::drive(
131                &self.downloader,
132                self.webdata(query).map_err(to_ferry),
133                self.resolver(),
134                LocalDir { dir },
135            );
136            let mut inner = pin!(inner);
137            while let Some(outcome) = inner.next().await {
138                yield outcome;
139            }
140        }
141    }
142
143    pub fn download_to_s3(
144        &self,
145        file: WasapiFile,
146        s3: AwsS3Client,
147        bucket: impl Into<String>,
148        prefix: Option<String>,
149    ) -> impl Stream<Item = DownloadOutcome<S3Location>> + Send + '_ {
150        http_ferry::drive(
151            &self.downloader,
152            single_file(file).map_err(to_ferry),
153            self.resolver(),
154            S3Dest {
155                client: s3,
156                bucket: bucket.into(),
157                prefix,
158            },
159        )
160    }
161
162    pub fn download_collection_to_s3(
163        &self,
164        query: WebdataQuery,
165        s3: AwsS3Client,
166        bucket: impl Into<String>,
167        prefix: Option<String>,
168    ) -> impl Stream<Item = DownloadOutcome<S3Location>> + Send + '_ {
169        http_ferry::drive(
170            &self.downloader,
171            self.webdata(query).map_err(to_ferry),
172            self.resolver(),
173            S3Dest {
174                client: s3,
175                bucket: bucket.into(),
176                prefix,
177            },
178        )
179    }
180
181    pub async fn list_webdata(&self, query: &WebdataQuery) -> Result<Page<WasapiFile>, Error> {
182        self.transport.get_json("webdata", query).await
183    }
184
185    pub async fn list_webdata_next(
186        &self,
187        prev: &Page<WasapiFile>,
188    ) -> Result<Option<Page<WasapiFile>>, Error> {
189        match prev.next.as_deref() {
190            None => Ok(None),
191            Some(url) => self.transport.get_json(url, &()).await.map(Some),
192        }
193    }
194
195    pub fn primary_location<'a>(&self, file: &'a WasapiFile) -> Option<&'a str> {
196        file.locations
197            .iter()
198            .find(|location| location.starts_with(&self.primary_location_src))
199            .map(String::as_str)
200    }
201
202    pub fn webdata(
203        &self,
204        query: WebdataQuery,
205    ) -> impl Stream<Item = Result<WasapiFile, Error>> + Send + '_ {
206        async_stream::try_stream! {
207            let mut page = self.list_webdata(&query).await?;
208            loop {
209                let files = std::mem::take(&mut page.files);
210                for f in files { yield f; }
211                match self.list_webdata_next(&page).await? {
212                    Some(next) => page = next,
213                    None => break,
214                }
215            }
216        }
217    }
218
219    /// Per-file resolver from a WASAPI item to its primary WARC URL. Runs
220    /// in-loop inside the engine so a file with no primary location yields a
221    /// non-fatal `Failed` rather than tearing down the stream. Host errors are
222    /// mapped into the engine's error type via [`to_ferry`].
223    fn resolver(&self) -> impl FnMut(&WasapiFile) -> Result<Url, http_ferry::Error> + Send + '_ {
224        let primary = self.primary_location_src.as_str();
225        move |file| primary_location_url(primary, file).map_err(to_ferry)
226    }
227}
228
229/// Resolve the WARC URL for `file` from its `locations`, picking the one under
230/// the configured primary source prefix.
231fn primary_location_url(primary_src: &str, file: &WasapiFile) -> Result<Url, Error> {
232    let location = file
233        .locations
234        .iter()
235        .find(|loc| loc.starts_with(primary_src))
236        .ok_or_else(|| Error::PrimaryLocationMissing {
237            filename: file.filename.clone(),
238        })?;
239    Ok(Url::parse(location)?)
240}
241
242fn single_file(file: WasapiFile) -> impl Stream<Item = Result<WasapiFile, Error>> + Send {
243    async_stream::try_stream! { yield file; }
244}
245
246/// Map a host error into the engine's error type at the resolver / item-stream
247/// boundary. Shared infrastructure variants pass through 1:1 so callers can
248/// still match on them (e.g. `http_ferry::Error::Status`); WASAPI-specific
249/// errors like `PrimaryLocationMissing` are boxed into
250/// [`http_ferry::Error::Source`].
251fn to_ferry(err: Error) -> http_ferry::Error {
252    use http_ferry::Error as F;
253    match err {
254        Error::Io(e) => F::Io(e),
255        Error::Request(e) => F::Request(e),
256        Error::Url(e) => F::Url(e),
257        Error::Status(s) => F::Status(s),
258        Error::NotFound(s) => F::NotFound(s),
259        other => F::from_source(other),
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::models::wasapi::Checksums;
267
268    #[test]
269    fn primary_location_uses_configured_source_prefix() {
270        let client = WasapiClient::new("u", "p")
271            .unwrap()
272            .with_primary_location_src("https://example.invalid");
273        let file = WasapiFile {
274            filename: "ARCHIVEIT-1.warc.gz".into(),
275            filetype: "warc".into(),
276            checksums: Checksums {
277                sha1: Some("sha1".into()),
278                md5: Some("md5".into()),
279            },
280            account: 1,
281            size: 1,
282            collection: 1,
283            crawl: Some(1),
284            crawl_time: Some("2020-01-01T00:00:00Z".into()),
285            crawl_start: Some("2020-01-01T00:00:00Z".into()),
286            store_time: "2020-01-01T00:00:00Z".into(),
287            locations: vec![
288                "https://other.example.com/warcs/foo.warc.gz".into(),
289                "https://example.invalid/warcs/foo.warc.gz".into(),
290            ],
291        };
292
293        assert_eq!(
294            client.primary_location(&file),
295            Some("https://example.invalid/warcs/foo.warc.gz")
296        );
297    }
298}