Skip to main content

archive_it_client/
wasapi.rs

1use std::path::{Path, PathBuf};
2use std::pin::pin;
3
4use aws_sdk_s3::Client as AwsS3Client;
5use futures_core::Stream;
6use futures_util::StreamExt;
7use serde::Serialize;
8
9use crate::downloads::local::{LocalDir, LocalPath};
10use crate::downloads::s3::{S3Dest, S3Location};
11use crate::downloads::{self, DownloadOutcome};
12use crate::http::Transport;
13use crate::models::wasapi::{Page, WasapiFile};
14use crate::{Config, Error};
15
16pub const PRIMARY_LOCATION_SRC: &str = "https://warcs.archive-it.org";
17pub const DEFAULT_WEBDATA_PAGE_SIZE: u32 = 50;
18
19#[derive(Debug, Clone, Serialize)]
20pub struct WebdataQuery {
21    #[serde(skip_serializing_if = "Option::is_none")]
22    pub filename: Option<String>,
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub filetype: Option<String>,
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub collection: Option<u64>,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub crawl: Option<u64>,
29    #[serde(rename = "crawl-time-after", skip_serializing_if = "Option::is_none")]
30    pub crawl_time_after: Option<String>,
31    #[serde(rename = "crawl-time-before", skip_serializing_if = "Option::is_none")]
32    pub crawl_time_before: Option<String>,
33    #[serde(rename = "crawl-start-after", skip_serializing_if = "Option::is_none")]
34    pub crawl_start_after: Option<String>,
35    #[serde(rename = "crawl-start-before", skip_serializing_if = "Option::is_none")]
36    pub crawl_start_before: Option<String>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub page: Option<u32>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub page_size: Option<u32>,
41}
42
43impl Default for WebdataQuery {
44    fn default() -> Self {
45        Self {
46            filename: None,
47            filetype: None,
48            collection: None,
49            crawl: None,
50            crawl_time_after: None,
51            crawl_time_before: None,
52            crawl_start_after: None,
53            crawl_start_before: None,
54            page: None,
55            page_size: Some(DEFAULT_WEBDATA_PAGE_SIZE),
56        }
57    }
58}
59
60pub struct WasapiClient {
61    transport: Transport,
62    primary_location_src: String,
63}
64
65impl WasapiClient {
66    pub fn new(username: impl Into<String>, password: impl Into<String>) -> Result<Self, Error> {
67        Self::with_config(username, password, Config::wasapi())
68    }
69
70    pub fn with_config(
71        username: impl Into<String>,
72        password: impl Into<String>,
73        cfg: Config,
74    ) -> Result<Self, Error> {
75        Ok(Self {
76            transport: Transport::new(cfg, Some((username.into(), password.into())))?,
77            primary_location_src: PRIMARY_LOCATION_SRC.into(),
78        })
79    }
80
81    pub fn with_primary_location_src(mut self, src: impl Into<String>) -> Self {
82        self.primary_location_src = src.into();
83        self
84    }
85
86    pub fn download(
87        &self,
88        file: WasapiFile,
89        path: impl AsRef<Path>,
90    ) -> impl Stream<Item = DownloadOutcome> + Send + '_ {
91        let path = path.as_ref().to_path_buf();
92        downloads::drive(
93            &self.transport,
94            &self.primary_location_src,
95            single_file(file),
96            LocalPath { path },
97        )
98    }
99
100    pub fn download_collection(
101        &self,
102        query: WebdataQuery,
103        dir: impl Into<PathBuf>,
104    ) -> impl Stream<Item = DownloadOutcome> + Send + '_ {
105        let dir = dir.into();
106        async_stream::stream! {
107            // Preflight the destination once so a bad output path fails the
108            // stream upfront instead of yielding one Failed per file. Also
109            // ensures the directory exists when the collection is empty.
110            if let Err(error) = tokio::fs::create_dir_all(&dir).await {
111                yield DownloadOutcome::StreamFailed {
112                    error: Error::from(error),
113                };
114                return;
115            }
116            let inner = downloads::drive(
117                &self.transport,
118                &self.primary_location_src,
119                self.webdata(query),
120                LocalDir { dir },
121            );
122            let mut inner = pin!(inner);
123            while let Some(outcome) = inner.next().await {
124                yield outcome;
125            }
126        }
127    }
128
129    pub fn download_to_s3(
130        &self,
131        file: WasapiFile,
132        s3: AwsS3Client,
133        bucket: impl Into<String>,
134        prefix: Option<String>,
135    ) -> impl Stream<Item = DownloadOutcome<S3Location>> + Send + '_ {
136        downloads::drive(
137            &self.transport,
138            &self.primary_location_src,
139            single_file(file),
140            S3Dest {
141                client: s3,
142                bucket: bucket.into(),
143                prefix,
144            },
145        )
146    }
147
148    pub fn download_collection_to_s3(
149        &self,
150        query: WebdataQuery,
151        s3: AwsS3Client,
152        bucket: impl Into<String>,
153        prefix: Option<String>,
154    ) -> impl Stream<Item = DownloadOutcome<S3Location>> + Send + '_ {
155        downloads::drive(
156            &self.transport,
157            &self.primary_location_src,
158            self.webdata(query),
159            S3Dest {
160                client: s3,
161                bucket: bucket.into(),
162                prefix,
163            },
164        )
165    }
166
167    pub async fn list_webdata(&self, query: &WebdataQuery) -> Result<Page<WasapiFile>, Error> {
168        self.transport.get_json("webdata", query).await
169    }
170
171    pub async fn list_webdata_next(
172        &self,
173        prev: &Page<WasapiFile>,
174    ) -> Result<Option<Page<WasapiFile>>, Error> {
175        match prev.next.as_deref() {
176            None => Ok(None),
177            Some(url) => self.transport.get_json(url, &()).await.map(Some),
178        }
179    }
180
181    pub fn webdata(
182        &self,
183        query: WebdataQuery,
184    ) -> impl Stream<Item = Result<WasapiFile, Error>> + Send + '_ {
185        async_stream::try_stream! {
186            let mut page = self.list_webdata(&query).await?;
187            loop {
188                let files = std::mem::take(&mut page.files);
189                for f in files { yield f; }
190                match self.list_webdata_next(&page).await? {
191                    Some(next) => page = next,
192                    None => break,
193                }
194            }
195        }
196    }
197
198    pub fn primary_location<'a>(&self, file: &'a WasapiFile) -> Option<&'a str> {
199        file.locations
200            .iter()
201            .find(|location| location.starts_with(&self.primary_location_src))
202            .map(String::as_str)
203    }
204}
205
206fn single_file(file: WasapiFile) -> impl Stream<Item = Result<WasapiFile, Error>> + Send {
207    async_stream::try_stream! { yield file; }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use crate::models::wasapi::Checksums;
214
215    #[test]
216    fn primary_location_uses_configured_source_prefix() {
217        let client = WasapiClient::new("u", "p")
218            .unwrap()
219            .with_primary_location_src("https://example.invalid");
220        let file = WasapiFile {
221            filename: "ARCHIVEIT-1.warc.gz".into(),
222            filetype: "warc".into(),
223            checksums: Checksums {
224                sha1: Some("sha1".into()),
225                md5: Some("md5".into()),
226            },
227            account: 1,
228            size: 1,
229            collection: 1,
230            crawl: Some(1),
231            crawl_time: Some("2020-01-01T00:00:00Z".into()),
232            crawl_start: Some("2020-01-01T00:00:00Z".into()),
233            store_time: "2020-01-01T00:00:00Z".into(),
234            locations: vec![
235                "https://other.example.com/warcs/foo.warc.gz".into(),
236                "https://example.invalid/warcs/foo.warc.gz".into(),
237            ],
238        };
239
240        assert_eq!(
241            client.primary_location(&file),
242            Some("https://example.invalid/warcs/foo.warc.gz")
243        );
244    }
245}