1use std::path::{Path, PathBuf};
2use std::pin::pin;
3
4use aws_sdk_s3::Client as AwsS3Client;
5use futures_core::Stream;
6use futures_util::{StreamExt, TryStreamExt};
7use http_ferry::Downloader;
8use http_ferry::local::{LocalDir, LocalPath};
9use http_ferry::s3::{S3Dest, S3Location};
10use serde::Serialize;
11use url::Url;
12
13use crate::http::Transport;
14use crate::models::wasapi::{Page, WasapiFile};
15use crate::{Config, DownloadOutcome, Error, USER_AGENT};
16
17pub const PRIMARY_LOCATION_SRC: &str = "https://warcs.archive-it.org";
18pub const DEFAULT_WEBDATA_PAGE_SIZE: u32 = 50;
19
20#[derive(Debug, Clone, Serialize)]
21pub struct WebdataQuery {
22 #[serde(skip_serializing_if = "Option::is_none")]
23 pub filename: Option<String>,
24 #[serde(skip_serializing_if = "Option::is_none")]
25 pub filetype: Option<String>,
26 #[serde(skip_serializing_if = "Option::is_none")]
27 pub collection: Option<u64>,
28 #[serde(skip_serializing_if = "Option::is_none")]
29 pub crawl: Option<u64>,
30 #[serde(rename = "crawl-time-after", skip_serializing_if = "Option::is_none")]
31 pub crawl_time_after: Option<String>,
32 #[serde(rename = "crawl-time-before", skip_serializing_if = "Option::is_none")]
33 pub crawl_time_before: Option<String>,
34 #[serde(rename = "crawl-start-after", skip_serializing_if = "Option::is_none")]
35 pub crawl_start_after: Option<String>,
36 #[serde(rename = "crawl-start-before", skip_serializing_if = "Option::is_none")]
37 pub crawl_start_before: Option<String>,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub page: Option<u32>,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub page_size: Option<u32>,
42}
43
44impl Default for WebdataQuery {
45 fn default() -> Self {
46 Self {
47 filename: None,
48 filetype: None,
49 collection: None,
50 crawl: None,
51 crawl_time_after: None,
52 crawl_time_before: None,
53 crawl_start_after: None,
54 crawl_start_before: None,
55 page: None,
56 page_size: Some(DEFAULT_WEBDATA_PAGE_SIZE),
57 }
58 }
59}
60
61pub struct WasapiClient {
62 transport: Transport,
63 downloader: Downloader,
64 primary_location_src: String,
65}
66
67impl WasapiClient {
68 pub fn new(username: impl Into<String>, password: impl Into<String>) -> Result<Self, Error> {
69 Self::with_config(username, password, Config::wasapi())
70 }
71
72 pub fn with_config(
73 username: impl Into<String>,
74 password: impl Into<String>,
75 cfg: Config,
76 ) -> Result<Self, Error> {
77 let creds = (username.into(), password.into());
78 let transport = Transport::new(cfg.clone(), Some(creds.clone()))?;
79 let download_client = reqwest::Client::builder()
80 .user_agent(USER_AGENT)
81 .read_timeout(cfg.download_timeout)
82 .build()?;
83 let downloader = Downloader::builder(download_client)
84 .max_attempts(cfg.max_attempts)
85 .backoff(cfg.backoff)
86 .customize_request(move |req| req.basic_auth(&creds.0, Some(&creds.1)))
87 .build();
88 Ok(Self {
89 transport,
90 downloader,
91 primary_location_src: PRIMARY_LOCATION_SRC.into(),
92 })
93 }
94
95 pub fn with_primary_location_src(mut self, src: impl Into<String>) -> Self {
96 self.primary_location_src = src.into();
97 self
98 }
99
100 pub fn download(
101 &self,
102 file: WasapiFile,
103 path: impl AsRef<Path>,
104 ) -> impl Stream<Item = DownloadOutcome> + Send + '_ {
105 let path = path.as_ref().to_path_buf();
106 http_ferry::drive(
107 &self.downloader,
108 single_file(file).map_err(to_ferry),
109 self.resolver(),
110 LocalPath { path },
111 )
112 }
113
114 pub fn download_collection(
115 &self,
116 query: WebdataQuery,
117 dir: impl Into<PathBuf>,
118 ) -> impl Stream<Item = DownloadOutcome> + Send + '_ {
119 let dir = dir.into();
120 async_stream::stream! {
121 if let Err(error) = tokio::fs::create_dir_all(&dir).await {
125 yield DownloadOutcome::StreamFailed {
126 error: http_ferry::Error::Io(error),
127 };
128 return;
129 }
130 let inner = http_ferry::drive(
131 &self.downloader,
132 self.webdata(query).map_err(to_ferry),
133 self.resolver(),
134 LocalDir { dir },
135 );
136 let mut inner = pin!(inner);
137 while let Some(outcome) = inner.next().await {
138 yield outcome;
139 }
140 }
141 }
142
143 pub fn download_to_s3(
144 &self,
145 file: WasapiFile,
146 s3: AwsS3Client,
147 bucket: impl Into<String>,
148 prefix: Option<String>,
149 ) -> impl Stream<Item = DownloadOutcome<S3Location>> + Send + '_ {
150 http_ferry::drive(
151 &self.downloader,
152 single_file(file).map_err(to_ferry),
153 self.resolver(),
154 S3Dest {
155 client: s3,
156 bucket: bucket.into(),
157 prefix,
158 },
159 )
160 }
161
162 pub fn download_collection_to_s3(
163 &self,
164 query: WebdataQuery,
165 s3: AwsS3Client,
166 bucket: impl Into<String>,
167 prefix: Option<String>,
168 ) -> impl Stream<Item = DownloadOutcome<S3Location>> + Send + '_ {
169 http_ferry::drive(
170 &self.downloader,
171 self.webdata(query).map_err(to_ferry),
172 self.resolver(),
173 S3Dest {
174 client: s3,
175 bucket: bucket.into(),
176 prefix,
177 },
178 )
179 }
180
181 pub async fn list_webdata(&self, query: &WebdataQuery) -> Result<Page<WasapiFile>, Error> {
182 self.transport.get_json("webdata", query).await
183 }
184
185 pub async fn list_webdata_next(
186 &self,
187 prev: &Page<WasapiFile>,
188 ) -> Result<Option<Page<WasapiFile>>, Error> {
189 match prev.next.as_deref() {
190 None => Ok(None),
191 Some(url) => self.transport.get_json(url, &()).await.map(Some),
192 }
193 }
194
195 pub fn primary_location<'a>(&self, file: &'a WasapiFile) -> Option<&'a str> {
196 file.locations
197 .iter()
198 .find(|location| location.starts_with(&self.primary_location_src))
199 .map(String::as_str)
200 }
201
202 pub fn webdata(
203 &self,
204 query: WebdataQuery,
205 ) -> impl Stream<Item = Result<WasapiFile, Error>> + Send + '_ {
206 async_stream::try_stream! {
207 let mut page = self.list_webdata(&query).await?;
208 loop {
209 let files = std::mem::take(&mut page.files);
210 for f in files { yield f; }
211 match self.list_webdata_next(&page).await? {
212 Some(next) => page = next,
213 None => break,
214 }
215 }
216 }
217 }
218
219 fn resolver(&self) -> impl FnMut(&WasapiFile) -> Result<Url, http_ferry::Error> + Send + '_ {
224 let primary = self.primary_location_src.as_str();
225 move |file| primary_location_url(primary, file).map_err(to_ferry)
226 }
227}
228
229fn primary_location_url(primary_src: &str, file: &WasapiFile) -> Result<Url, Error> {
232 let location = file
233 .locations
234 .iter()
235 .find(|loc| loc.starts_with(primary_src))
236 .ok_or_else(|| Error::PrimaryLocationMissing {
237 filename: file.filename.clone(),
238 })?;
239 Ok(Url::parse(location)?)
240}
241
242fn single_file(file: WasapiFile) -> impl Stream<Item = Result<WasapiFile, Error>> + Send {
243 async_stream::try_stream! { yield file; }
244}
245
246fn to_ferry(err: Error) -> http_ferry::Error {
252 use http_ferry::Error as F;
253 match err {
254 Error::Io(e) => F::Io(e),
255 Error::Request(e) => F::Request(e),
256 Error::Url(e) => F::Url(e),
257 Error::Status(s) => F::Status(s),
258 Error::NotFound(s) => F::NotFound(s),
259 other => F::from_source(other),
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::models::wasapi::Checksums;
267
268 #[test]
269 fn primary_location_uses_configured_source_prefix() {
270 let client = WasapiClient::new("u", "p")
271 .unwrap()
272 .with_primary_location_src("https://example.invalid");
273 let file = WasapiFile {
274 filename: "ARCHIVEIT-1.warc.gz".into(),
275 filetype: "warc".into(),
276 checksums: Checksums {
277 sha1: Some("sha1".into()),
278 md5: Some("md5".into()),
279 },
280 account: 1,
281 size: 1,
282 collection: 1,
283 crawl: Some(1),
284 crawl_time: Some("2020-01-01T00:00:00Z".into()),
285 crawl_start: Some("2020-01-01T00:00:00Z".into()),
286 store_time: "2020-01-01T00:00:00Z".into(),
287 locations: vec![
288 "https://other.example.com/warcs/foo.warc.gz".into(),
289 "https://example.invalid/warcs/foo.warc.gz".into(),
290 ],
291 };
292
293 assert_eq!(
294 client.primary_location(&file),
295 Some("https://example.invalid/warcs/foo.warc.gz")
296 );
297 }
298}