1use super::{
2 cdx::{self, IndexClient},
3 digest::compute_digest,
4 downloader::Downloader,
5 Item,
6};
7use bytes::Buf;
8use chrono::Utc;
9use csv::{ReaderBuilder, WriterBuilder};
10use flate2::{Compression, GzBuilder};
11use futures::{StreamExt, TryStreamExt};
12use std::collections::HashSet;
13use std::fs::{create_dir_all, File};
14use std::io::{BufRead, BufReader, Read, Write};
15use std::path::{Path, PathBuf};
16
17#[derive(thiserror::Error, Debug)]
18pub enum Error {
19 #[error("I/O error: {0:?}")]
20 Io(#[from] std::io::Error),
21 #[error("CDX error: {0:?}")]
22 IndexClient(#[from] cdx::Error),
23 #[error("HTTP client error: {0:?}")]
24 Client(#[from] reqwest::Error),
25 #[error("CSV writing error: {0:?}")]
26 Csv(#[from] csv::Error),
27 #[error("Item parsing error: {0:?}")]
28 Item(#[from] super::item::Error),
29}
30
31pub struct Session {
32 base: PathBuf,
33 known_digests: Option<PathBuf>,
34 parallelism: usize,
35 index_client: IndexClient,
36 client: Downloader,
37}
38
39impl Session {
40 const TIMESTAMP_FMT: &'static str = "%Y%m%d%H%M%S";
41
42 pub fn new<P1: AsRef<Path>, P2: AsRef<Path>>(
43 base: P1,
44 known_digests: Option<P2>,
45 parallelism: usize,
46 ) -> Result<Session, Error> {
47 Ok(Session {
48 base: base.as_ref().to_path_buf(),
49 known_digests: known_digests.map(|path| path.as_ref().to_path_buf()),
50 parallelism,
51 index_client: IndexClient::default(),
52 client: Downloader::default(),
53 })
54 }
55
56 pub fn new_timestamped<P: AsRef<Path>>(
57 known_digests: Option<P>,
58 parallelism: usize,
59 ) -> Result<Session, Error> {
60 Self::new(
61 Utc::now().format(Self::TIMESTAMP_FMT).to_string(),
62 known_digests,
63 parallelism,
64 )
65 }
66
67 pub async fn save_cdx_results(&self, queries: &[String]) -> Result<(), Error> {
68 create_dir_all(&self.base)?;
69 let mut query_log = File::create(self.base.join("queries.txt"))?;
70 query_log.write_all(format!("{}\n", queries.join("\n")).as_bytes())?;
71
72 let results: Vec<Result<Vec<Item>, String>> = futures::stream::iter(queries.iter())
73 .map(|query| Ok(self.index_client.search(query, None, None)))
74 .try_buffer_unordered(self.parallelism)
75 .map(|result| match result {
76 Err(cdx::Error::BlockedQuery(query)) => Ok(Err(query)),
77 Err(other) => Err(other),
78 Ok(items) => Ok(Ok(items)),
79 })
80 .err_into::<Error>()
81 .try_collect()
82 .await?;
83
84 let mut blocked: Vec<String> = vec![];
85 let mut items: Vec<Item> = Vec::with_capacity(results.len());
86
87 for result in results {
88 match result {
89 Ok(batch) => items.extend(batch),
90 Err(query) => blocked.push(query),
91 }
92 }
93
94 if !blocked.is_empty() {
95 let mut blocked_log = File::create(self.base.join("blocked.txt"))?;
96 blocked.sort();
97 blocked_log.write_all(format!("{}\n", blocked.join("\n")).as_bytes())?;
98 }
99
100 items.sort();
101 items.dedup();
102
103 let originals_item_log = File::create(self.base.join("originals.csv"))?;
104 let redirects_item_log = File::create(self.base.join("redirects.csv"))?;
105
106 let mut originals_csv = WriterBuilder::new().from_writer(originals_item_log);
107 let mut redirects_csv = WriterBuilder::new().from_writer(redirects_item_log);
108
109 for item in &items {
110 if item.status == Some(302) {
111 redirects_csv.write_record(item.to_record())?;
112 } else {
113 originals_csv.write_record(item.to_record())?;
114 }
115 }
116
117 Ok(())
118 }
119
120 pub async fn resolve_redirects(&self) -> Result<(), Error> {
121 let redirects_item_log = File::open(self.base.join("redirects.csv"))?;
122 let mut items = Self::read_csv(redirects_item_log)?;
123
124 items.sort();
125
126 create_dir_all(self.base.join("data"))?;
127 create_dir_all(self.base.join("invalid"))?;
128
129 let mut digests = HashSet::new();
130
131 items.retain(|item| digests.insert(item.digest.clone()));
132
133 if let Some(path) = &self.known_digests {
134 let file = File::open(path)?;
135 for line in BufReader::new(file).lines() {
136 digests.remove(line?.trim());
137 }
138 }
139
140 items.retain(|item| digests.remove(&item.digest));
141
142 log::info!("Resolving {} items", items.len());
143
144 let results = futures::stream::iter(items.iter())
145 .map(|item| async move {
146 log::info!("Resolving: {}", item.url);
147 (
148 item,
149 self.client
150 .resolve_redirect(&item.url, &item.timestamp(), &item.digest)
151 .await,
152 )
153 })
154 .buffer_unordered(self.parallelism)
155 .map(|(item, result)| async move {
156 let resolution = result.map_err(|_| item)?;
157
158 if resolution.valid_digest {
159 let mut items = self
160 .index_client
161 .search(&resolution.url, Some(&resolution.timestamp), None)
162 .await
163 .map_err(|_| item)?;
164
165 let actual_item = items.pop().ok_or(item)?;
166
167 let output =
168 File::create(self.base.join("data").join(format!("{}.gz", item.digest)))
169 .map_err(|_| item)?;
170 let mut gz = GzBuilder::new()
171 .filename(item.make_filename())
172 .write(output, Compression::default());
173 gz.write_all(&resolution.content).map_err(|_| item)?;
174 gz.finish().map_err(|_| item)?;
175
176 Ok(actual_item)
177 } else {
178 Err(item)
179 }
180 })
181 .buffer_unordered(self.parallelism)
182 .collect::<Vec<_>>()
183 .await;
184
185 create_dir_all(self.base.join("errors"))?;
186
187 let redirects_error_log = File::create(self.base.join("errors").join("redirects.csv"))?;
188 let mut redirects_error_csv = WriterBuilder::new().from_writer(redirects_error_log);
189
190 let extras_item_log = File::create(self.base.join("extras.csv"))?;
191 let mut extras_item_csv = WriterBuilder::new().from_writer(extras_item_log);
192
193 for result in results {
194 match result {
195 Ok(item) => {
196 extras_item_csv.write_record(item.to_record())?;
197 }
198 Err(item) => {
199 redirects_error_csv.write_record(item.to_record())?;
200 }
201 }
202 }
203
204 Ok(())
205 }
206
207 pub async fn download_items(&self) -> Result<(usize, usize, usize, usize), Error> {
208 let originals_file = File::open(self.base.join("originals.csv"))?;
209 let mut items = Self::read_csv(originals_file)?;
210
211 let extras_file = File::open(self.base.join("extras.csv"))?;
212 items.extend(Self::read_csv(extras_file)?);
213 items.sort();
214
215 let total_count = items.len();
216
217 let mut digests = HashSet::new();
218
219 items.retain(|item| digests.insert(item.digest.clone()));
220
221 if let Some(path) = &self.known_digests {
222 let file = File::open(path)?;
223 for line in BufReader::new(file).lines() {
224 digests.remove(line?.trim());
225 }
226 }
227
228 items.retain(|item| digests.remove(&item.digest));
229
230 log::info!("Downloading {} items", items.len());
231
232 let results = futures::stream::iter(items)
233 .map(|item| async {
234 let content = self
235 .client
236 .download_item(&item)
237 .await
238 .map_err(|_| item.clone())?;
239
240 let expected = item.digest.clone();
241 let computed = compute_digest(&mut content.clone().reader()).unwrap();
242
243 if computed == expected {
244 let output =
245 File::create(self.base.join("data").join(format!("{}.gz", expected)))
246 .map_err(|_| item.clone())?;
247 let mut gz = GzBuilder::new()
248 .filename(item.make_filename())
249 .write(output, Compression::default());
250 gz.write_all(&content).map_err(|_| item.clone())?;
251 gz.finish().map_err(|_| item)?;
252
253 Ok(None)
254 } else {
255 let output =
256 File::create(self.base.join("invalid").join(format!("{}.gz", computed)))
257 .map_err(|_| item.clone())?;
258 let mut gz = GzBuilder::new()
259 .filename(item.make_filename())
260 .write(output, Compression::default());
261 gz.write_all(&content).map_err(|_| item.clone())?;
262 gz.finish().map_err(|_| item)?;
263
264 Ok(Some((expected, computed)))
265 }
266 })
267 .buffer_unordered(self.parallelism)
268 .collect::<Vec<Result<Option<(String, String)>, Item>>>()
269 .await;
270
271 let error_log = File::create(self.base.join("errors").join("items.csv"))?;
272 let mut error_csv = WriterBuilder::new().from_writer(error_log);
273
274 let invalid_log = File::create(self.base.join("errors").join("invalid.csv"))?;
275 let mut invalid_csv = WriterBuilder::new().from_writer(invalid_log);
276
277 let mut success_count = 0;
278 let mut invalid_count = 0;
279 let mut error_count = 0;
280
281 for result in results {
282 match result {
283 Ok(None) => {
284 success_count += 1;
285 }
286 Ok(Some((expected, computed))) => {
287 invalid_count += 1;
288 invalid_csv.write_record(vec![expected, computed])?;
289 }
290 Err(item) => {
291 error_count += 1;
292 error_csv.write_record(item.to_record())?;
293 }
294 }
295 }
296
297 Ok((
298 success_count,
299 invalid_count,
300 total_count - success_count - error_count - invalid_count,
301 error_count,
302 ))
303 }
304
305 fn read_csv<R: Read>(reader: R) -> Result<Vec<Item>, Error> {
306 let mut csv_reader = ReaderBuilder::new().has_headers(false).from_reader(reader);
307
308 csv_reader
309 .records()
310 .map(|record| {
311 let row = record?;
312 Ok(Item::parse_optional_record(
313 row.get(0),
314 row.get(1),
315 row.get(2),
316 row.get(3),
317 row.get(4),
318 row.get(5),
319 )?)
320 })
321 .collect()
322 }
323}