wayback_rs/
session.rs

1use super::{
2    cdx::{self, IndexClient},
3    digest::compute_digest,
4    downloader::Downloader,
5    Item,
6};
7use bytes::Buf;
8use chrono::Utc;
9use csv::{ReaderBuilder, WriterBuilder};
10use flate2::{Compression, GzBuilder};
11use futures::{StreamExt, TryStreamExt};
12use std::collections::HashSet;
13use std::fs::{create_dir_all, File};
14use std::io::{BufRead, BufReader, Read, Write};
15use std::path::{Path, PathBuf};
16
17#[derive(thiserror::Error, Debug)]
18pub enum Error {
19    #[error("I/O error: {0:?}")]
20    Io(#[from] std::io::Error),
21    #[error("CDX error: {0:?}")]
22    IndexClient(#[from] cdx::Error),
23    #[error("HTTP client error: {0:?}")]
24    Client(#[from] reqwest::Error),
25    #[error("CSV writing error: {0:?}")]
26    Csv(#[from] csv::Error),
27    #[error("Item parsing error: {0:?}")]
28    Item(#[from] super::item::Error),
29}
30
31pub struct Session {
32    base: PathBuf,
33    known_digests: Option<PathBuf>,
34    parallelism: usize,
35    index_client: IndexClient,
36    client: Downloader,
37}
38
39impl Session {
40    const TIMESTAMP_FMT: &'static str = "%Y%m%d%H%M%S";
41
42    pub fn new<P1: AsRef<Path>, P2: AsRef<Path>>(
43        base: P1,
44        known_digests: Option<P2>,
45        parallelism: usize,
46    ) -> Result<Session, Error> {
47        Ok(Session {
48            base: base.as_ref().to_path_buf(),
49            known_digests: known_digests.map(|path| path.as_ref().to_path_buf()),
50            parallelism,
51            index_client: IndexClient::default(),
52            client: Downloader::default(),
53        })
54    }
55
56    pub fn new_timestamped<P: AsRef<Path>>(
57        known_digests: Option<P>,
58        parallelism: usize,
59    ) -> Result<Session, Error> {
60        Self::new(
61            Utc::now().format(Self::TIMESTAMP_FMT).to_string(),
62            known_digests,
63            parallelism,
64        )
65    }
66
67    pub async fn save_cdx_results(&self, queries: &[String]) -> Result<(), Error> {
68        create_dir_all(&self.base)?;
69        let mut query_log = File::create(self.base.join("queries.txt"))?;
70        query_log.write_all(format!("{}\n", queries.join("\n")).as_bytes())?;
71
72        let results: Vec<Result<Vec<Item>, String>> = futures::stream::iter(queries.iter())
73            .map(|query| Ok(self.index_client.search(query, None, None)))
74            .try_buffer_unordered(self.parallelism)
75            .map(|result| match result {
76                Err(cdx::Error::BlockedQuery(query)) => Ok(Err(query)),
77                Err(other) => Err(other),
78                Ok(items) => Ok(Ok(items)),
79            })
80            .err_into::<Error>()
81            .try_collect()
82            .await?;
83
84        let mut blocked: Vec<String> = vec![];
85        let mut items: Vec<Item> = Vec::with_capacity(results.len());
86
87        for result in results {
88            match result {
89                Ok(batch) => items.extend(batch),
90                Err(query) => blocked.push(query),
91            }
92        }
93
94        if !blocked.is_empty() {
95            let mut blocked_log = File::create(self.base.join("blocked.txt"))?;
96            blocked.sort();
97            blocked_log.write_all(format!("{}\n", blocked.join("\n")).as_bytes())?;
98        }
99
100        items.sort();
101        items.dedup();
102
103        let originals_item_log = File::create(self.base.join("originals.csv"))?;
104        let redirects_item_log = File::create(self.base.join("redirects.csv"))?;
105
106        let mut originals_csv = WriterBuilder::new().from_writer(originals_item_log);
107        let mut redirects_csv = WriterBuilder::new().from_writer(redirects_item_log);
108
109        for item in &items {
110            if item.status == Some(302) {
111                redirects_csv.write_record(item.to_record())?;
112            } else {
113                originals_csv.write_record(item.to_record())?;
114            }
115        }
116
117        Ok(())
118    }
119
120    pub async fn resolve_redirects(&self) -> Result<(), Error> {
121        let redirects_item_log = File::open(self.base.join("redirects.csv"))?;
122        let mut items = Self::read_csv(redirects_item_log)?;
123
124        items.sort();
125
126        create_dir_all(self.base.join("data"))?;
127        create_dir_all(self.base.join("invalid"))?;
128
129        let mut digests = HashSet::new();
130
131        items.retain(|item| digests.insert(item.digest.clone()));
132
133        if let Some(path) = &self.known_digests {
134            let file = File::open(path)?;
135            for line in BufReader::new(file).lines() {
136                digests.remove(line?.trim());
137            }
138        }
139
140        items.retain(|item| digests.remove(&item.digest));
141
142        log::info!("Resolving {} items", items.len());
143
144        let results = futures::stream::iter(items.iter())
145            .map(|item| async move {
146                log::info!("Resolving: {}", item.url);
147                (
148                    item,
149                    self.client
150                        .resolve_redirect(&item.url, &item.timestamp(), &item.digest)
151                        .await,
152                )
153            })
154            .buffer_unordered(self.parallelism)
155            .map(|(item, result)| async move {
156                let resolution = result.map_err(|_| item)?;
157
158                if resolution.valid_digest {
159                    let mut items = self
160                        .index_client
161                        .search(&resolution.url, Some(&resolution.timestamp), None)
162                        .await
163                        .map_err(|_| item)?;
164
165                    let actual_item = items.pop().ok_or(item)?;
166
167                    let output =
168                        File::create(self.base.join("data").join(format!("{}.gz", item.digest)))
169                            .map_err(|_| item)?;
170                    let mut gz = GzBuilder::new()
171                        .filename(item.make_filename())
172                        .write(output, Compression::default());
173                    gz.write_all(&resolution.content).map_err(|_| item)?;
174                    gz.finish().map_err(|_| item)?;
175
176                    Ok(actual_item)
177                } else {
178                    Err(item)
179                }
180            })
181            .buffer_unordered(self.parallelism)
182            .collect::<Vec<_>>()
183            .await;
184
185        create_dir_all(self.base.join("errors"))?;
186
187        let redirects_error_log = File::create(self.base.join("errors").join("redirects.csv"))?;
188        let mut redirects_error_csv = WriterBuilder::new().from_writer(redirects_error_log);
189
190        let extras_item_log = File::create(self.base.join("extras.csv"))?;
191        let mut extras_item_csv = WriterBuilder::new().from_writer(extras_item_log);
192
193        for result in results {
194            match result {
195                Ok(item) => {
196                    extras_item_csv.write_record(item.to_record())?;
197                }
198                Err(item) => {
199                    redirects_error_csv.write_record(item.to_record())?;
200                }
201            }
202        }
203
204        Ok(())
205    }
206
207    pub async fn download_items(&self) -> Result<(usize, usize, usize, usize), Error> {
208        let originals_file = File::open(self.base.join("originals.csv"))?;
209        let mut items = Self::read_csv(originals_file)?;
210
211        let extras_file = File::open(self.base.join("extras.csv"))?;
212        items.extend(Self::read_csv(extras_file)?);
213        items.sort();
214
215        let total_count = items.len();
216
217        let mut digests = HashSet::new();
218
219        items.retain(|item| digests.insert(item.digest.clone()));
220
221        if let Some(path) = &self.known_digests {
222            let file = File::open(path)?;
223            for line in BufReader::new(file).lines() {
224                digests.remove(line?.trim());
225            }
226        }
227
228        items.retain(|item| digests.remove(&item.digest));
229
230        log::info!("Downloading {} items", items.len());
231
232        let results = futures::stream::iter(items)
233            .map(|item| async {
234                let content = self
235                    .client
236                    .download_item(&item)
237                    .await
238                    .map_err(|_| item.clone())?;
239
240                let expected = item.digest.clone();
241                let computed = compute_digest(&mut content.clone().reader()).unwrap();
242
243                if computed == expected {
244                    let output =
245                        File::create(self.base.join("data").join(format!("{}.gz", expected)))
246                            .map_err(|_| item.clone())?;
247                    let mut gz = GzBuilder::new()
248                        .filename(item.make_filename())
249                        .write(output, Compression::default());
250                    gz.write_all(&content).map_err(|_| item.clone())?;
251                    gz.finish().map_err(|_| item)?;
252
253                    Ok(None)
254                } else {
255                    let output =
256                        File::create(self.base.join("invalid").join(format!("{}.gz", computed)))
257                            .map_err(|_| item.clone())?;
258                    let mut gz = GzBuilder::new()
259                        .filename(item.make_filename())
260                        .write(output, Compression::default());
261                    gz.write_all(&content).map_err(|_| item.clone())?;
262                    gz.finish().map_err(|_| item)?;
263
264                    Ok(Some((expected, computed)))
265                }
266            })
267            .buffer_unordered(self.parallelism)
268            .collect::<Vec<Result<Option<(String, String)>, Item>>>()
269            .await;
270
271        let error_log = File::create(self.base.join("errors").join("items.csv"))?;
272        let mut error_csv = WriterBuilder::new().from_writer(error_log);
273
274        let invalid_log = File::create(self.base.join("errors").join("invalid.csv"))?;
275        let mut invalid_csv = WriterBuilder::new().from_writer(invalid_log);
276
277        let mut success_count = 0;
278        let mut invalid_count = 0;
279        let mut error_count = 0;
280
281        for result in results {
282            match result {
283                Ok(None) => {
284                    success_count += 1;
285                }
286                Ok(Some((expected, computed))) => {
287                    invalid_count += 1;
288                    invalid_csv.write_record(vec![expected, computed])?;
289                }
290                Err(item) => {
291                    error_count += 1;
292                    error_csv.write_record(item.to_record())?;
293                }
294            }
295        }
296
297        Ok((
298            success_count,
299            invalid_count,
300            total_count - success_count - error_count - invalid_count,
301            error_count,
302        ))
303    }
304
305    fn read_csv<R: Read>(reader: R) -> Result<Vec<Item>, Error> {
306        let mut csv_reader = ReaderBuilder::new().has_headers(false).from_reader(reader);
307
308        csv_reader
309            .records()
310            .map(|record| {
311                let row = record?;
312                Ok(Item::parse_optional_record(
313                    row.get(0),
314                    row.get(1),
315                    row.get(2),
316                    row.get(3),
317                    row.get(4),
318                    row.get(5),
319                )?)
320            })
321            .collect()
322    }
323}