1use std::path::{Path, PathBuf};
2use std::pin::pin;
3
4use aws_sdk_s3::Client as AwsS3Client;
5use futures_core::Stream;
6use futures_util::StreamExt;
7use serde::Serialize;
8
9use crate::downloads::local::{LocalDir, LocalPath};
10use crate::downloads::s3::{S3Dest, S3Location};
11use crate::downloads::{self, DownloadOutcome};
12use crate::http::Transport;
13use crate::models::wasapi::{Page, WasapiFile};
14use crate::{Config, Error};
15
16pub const PRIMARY_LOCATION_SRC: &str = "https://warcs.archive-it.org";
17pub const DEFAULT_WEBDATA_PAGE_SIZE: u32 = 50;
18
19#[derive(Debug, Clone, Serialize)]
20pub struct WebdataQuery {
21 #[serde(skip_serializing_if = "Option::is_none")]
22 pub filename: Option<String>,
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub filetype: Option<String>,
25 #[serde(skip_serializing_if = "Option::is_none")]
26 pub collection: Option<u64>,
27 #[serde(skip_serializing_if = "Option::is_none")]
28 pub crawl: Option<u64>,
29 #[serde(rename = "crawl-time-after", skip_serializing_if = "Option::is_none")]
30 pub crawl_time_after: Option<String>,
31 #[serde(rename = "crawl-time-before", skip_serializing_if = "Option::is_none")]
32 pub crawl_time_before: Option<String>,
33 #[serde(rename = "crawl-start-after", skip_serializing_if = "Option::is_none")]
34 pub crawl_start_after: Option<String>,
35 #[serde(rename = "crawl-start-before", skip_serializing_if = "Option::is_none")]
36 pub crawl_start_before: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub page: Option<u32>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub page_size: Option<u32>,
41}
42
43impl Default for WebdataQuery {
44 fn default() -> Self {
45 Self {
46 filename: None,
47 filetype: None,
48 collection: None,
49 crawl: None,
50 crawl_time_after: None,
51 crawl_time_before: None,
52 crawl_start_after: None,
53 crawl_start_before: None,
54 page: None,
55 page_size: Some(DEFAULT_WEBDATA_PAGE_SIZE),
56 }
57 }
58}
59
60pub struct WasapiClient {
61 transport: Transport,
62 primary_location_src: String,
63}
64
65impl WasapiClient {
66 pub fn new(username: impl Into<String>, password: impl Into<String>) -> Result<Self, Error> {
67 Self::with_config(username, password, Config::wasapi())
68 }
69
70 pub fn with_config(
71 username: impl Into<String>,
72 password: impl Into<String>,
73 cfg: Config,
74 ) -> Result<Self, Error> {
75 Ok(Self {
76 transport: Transport::new(cfg, Some((username.into(), password.into())))?,
77 primary_location_src: PRIMARY_LOCATION_SRC.into(),
78 })
79 }
80
81 pub fn with_primary_location_src(mut self, src: impl Into<String>) -> Self {
82 self.primary_location_src = src.into();
83 self
84 }
85
86 pub fn download(
87 &self,
88 file: WasapiFile,
89 path: impl AsRef<Path>,
90 ) -> impl Stream<Item = DownloadOutcome> + Send + '_ {
91 let path = path.as_ref().to_path_buf();
92 downloads::drive(
93 &self.transport,
94 &self.primary_location_src,
95 single_file(file),
96 LocalPath { path },
97 )
98 }
99
100 pub fn download_collection(
101 &self,
102 query: WebdataQuery,
103 dir: impl Into<PathBuf>,
104 ) -> impl Stream<Item = DownloadOutcome> + Send + '_ {
105 let dir = dir.into();
106 async_stream::stream! {
107 if let Err(error) = tokio::fs::create_dir_all(&dir).await {
111 yield DownloadOutcome::StreamFailed {
112 error: Error::from(error),
113 };
114 return;
115 }
116 let inner = downloads::drive(
117 &self.transport,
118 &self.primary_location_src,
119 self.webdata(query),
120 LocalDir { dir },
121 );
122 let mut inner = pin!(inner);
123 while let Some(outcome) = inner.next().await {
124 yield outcome;
125 }
126 }
127 }
128
129 pub fn download_to_s3(
130 &self,
131 file: WasapiFile,
132 s3: AwsS3Client,
133 bucket: impl Into<String>,
134 prefix: Option<String>,
135 ) -> impl Stream<Item = DownloadOutcome<S3Location>> + Send + '_ {
136 downloads::drive(
137 &self.transport,
138 &self.primary_location_src,
139 single_file(file),
140 S3Dest {
141 client: s3,
142 bucket: bucket.into(),
143 prefix,
144 },
145 )
146 }
147
148 pub fn download_collection_to_s3(
149 &self,
150 query: WebdataQuery,
151 s3: AwsS3Client,
152 bucket: impl Into<String>,
153 prefix: Option<String>,
154 ) -> impl Stream<Item = DownloadOutcome<S3Location>> + Send + '_ {
155 downloads::drive(
156 &self.transport,
157 &self.primary_location_src,
158 self.webdata(query),
159 S3Dest {
160 client: s3,
161 bucket: bucket.into(),
162 prefix,
163 },
164 )
165 }
166
167 pub async fn list_webdata(&self, query: &WebdataQuery) -> Result<Page<WasapiFile>, Error> {
168 self.transport.get_json("webdata", query).await
169 }
170
171 pub async fn list_webdata_next(
172 &self,
173 prev: &Page<WasapiFile>,
174 ) -> Result<Option<Page<WasapiFile>>, Error> {
175 match prev.next.as_deref() {
176 None => Ok(None),
177 Some(url) => self.transport.get_json(url, &()).await.map(Some),
178 }
179 }
180
181 pub fn webdata(
182 &self,
183 query: WebdataQuery,
184 ) -> impl Stream<Item = Result<WasapiFile, Error>> + Send + '_ {
185 async_stream::try_stream! {
186 let mut page = self.list_webdata(&query).await?;
187 loop {
188 let files = std::mem::take(&mut page.files);
189 for f in files { yield f; }
190 match self.list_webdata_next(&page).await? {
191 Some(next) => page = next,
192 None => break,
193 }
194 }
195 }
196 }
197
198 pub fn primary_location<'a>(&self, file: &'a WasapiFile) -> Option<&'a str> {
199 file.locations
200 .iter()
201 .find(|location| location.starts_with(&self.primary_location_src))
202 .map(String::as_str)
203 }
204}
205
206fn single_file(file: WasapiFile) -> impl Stream<Item = Result<WasapiFile, Error>> + Send {
207 async_stream::try_stream! { yield file; }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use crate::models::wasapi::Checksums;
214
215 #[test]
216 fn primary_location_uses_configured_source_prefix() {
217 let client = WasapiClient::new("u", "p")
218 .unwrap()
219 .with_primary_location_src("https://example.invalid");
220 let file = WasapiFile {
221 filename: "ARCHIVEIT-1.warc.gz".into(),
222 filetype: "warc".into(),
223 checksums: Checksums {
224 sha1: Some("sha1".into()),
225 md5: Some("md5".into()),
226 },
227 account: 1,
228 size: 1,
229 collection: 1,
230 crawl: Some(1),
231 crawl_time: Some("2020-01-01T00:00:00Z".into()),
232 crawl_start: Some("2020-01-01T00:00:00Z".into()),
233 store_time: "2020-01-01T00:00:00Z".into(),
234 locations: vec![
235 "https://other.example.com/warcs/foo.warc.gz".into(),
236 "https://example.invalid/warcs/foo.warc.gz".into(),
237 ],
238 };
239
240 assert_eq!(
241 client.primary_location(&file),
242 Some("https://example.invalid/warcs/foo.warc.gz")
243 );
244 }
245}