libnetrunner/
archive.rs

1use std::collections::HashSet;
2use std::fmt::Write as FmtWrite;
3use std::io::{BufWriter, Read, Write};
4use std::{
5    fs::File,
6    path::{Path, PathBuf},
7};
8
9use crate::parser::ParseResult;
10use chrono::prelude::*;
11use flate2::{write::GzEncoder, Compression};
12use reqwest::Response;
13use serde::{Deserialize, Serialize};
14use warc::{BufferedBody, RawRecordHeader, Record, RecordType, WarcHeader, WarcReader, WarcWriter};
15
16pub const ARCHIVE_FILE: &str = "archive.warc";
17pub const PARSED_ARCHIVE_FILE: &str = "parsed.gz";
18
19pub struct Archiver {
20    path: PathBuf,
21    writer: WarcWriter<BufWriter<File>>,
22}
23
24#[derive(Serialize, Deserialize, Clone, Debug)]
25pub struct ArchiveRecord {
26    pub status: u16,
27    pub url: String,
28    pub headers: Vec<(String, String)>,
29    pub content: String,
30}
31
32impl ArchiveRecord {
33    pub async fn from_file(file: &Path, base_url: &str) -> anyhow::Result<Self> {
34        let content = std::fs::read_to_string(file)?;
35        let name = file
36            .file_name()
37            .unwrap_or_default()
38            .to_string_lossy()
39            .replace(".html", "");
40
41        let url = format!("{base_url}/{name}");
42
43        Ok(ArchiveRecord {
44            status: 200,
45            url,
46            headers: Vec::new(),
47            content,
48        })
49    }
50
51    pub async fn from_response(
52        resp: Response,
53        url_override: Option<String>,
54    ) -> anyhow::Result<Self> {
55        let headers: Vec<(String, String)> = resp
56            .headers()
57            .into_iter()
58            .filter_map(|(name, value)| {
59                if let Ok(value) = value.to_str() {
60                    Some((name.to_string(), value.to_string()))
61                } else {
62                    None
63                }
64            })
65            .collect();
66
67        let status = resp.status().as_u16();
68        let url = if let Some(url_override) = url_override {
69            url_override
70        } else {
71            resp.url().as_str().to_string()
72        };
73
74        let content = resp.text().await?;
75        Ok(ArchiveRecord {
76            status,
77            url,
78            headers,
79            content,
80        })
81    }
82}
83
84impl Archiver {
85    fn parse_body(body: &str) -> (Vec<(String, String)>, String) {
86        let mut headers = Vec::new();
87        let mut content = String::new();
88
89        let mut headers_finished = false;
90        for line in body.lines() {
91            let trimmed = line.trim();
92            if trimmed.is_empty() {
93                headers_finished = true;
94            } else {
95                match headers_finished {
96                    true => content.push_str(trimmed),
97                    false => {
98                        if let Some((key, value)) = trimmed.split_once(':') {
99                            headers.push((key.trim().to_string(), value.trim().to_string()));
100                        }
101                    }
102                }
103            }
104        }
105
106        (headers, content)
107    }
108
109    pub fn new(storage: &Path) -> anyhow::Result<Self> {
110        let path = storage.join(ARCHIVE_FILE);
111        Ok(Self {
112            path: path.clone(),
113            writer: WarcWriter::from_path(path)?,
114        })
115    }
116
117    pub fn read(path: &Path) -> anyhow::Result<Vec<ArchiveRecord>> {
118        let warc_path = if !path.ends_with(format!("{ARCHIVE_FILE}.gz")) {
119            let mut warc_path = path.join(ARCHIVE_FILE);
120            warc_path.set_extension("warc.gz");
121            warc_path
122        } else {
123            path.to_path_buf()
124        };
125
126        log::info!("Reading archive: {}", warc_path.display());
127
128        // Unzip
129        let warc = WarcReader::from_path_gzip(&warc_path)?;
130        let mut records = Vec::new();
131        for record in warc.iter_records().flatten() {
132            let url = record
133                .header(WarcHeader::TargetURI)
134                .expect("TargetURI not set")
135                .to_string();
136
137            if let Ok(body) = String::from_utf8(record.body().into()) {
138                let (headers, content) = Archiver::parse_body(&body);
139                records.push(ArchiveRecord {
140                    status: 200u16,
141                    url,
142                    headers,
143                    content,
144                });
145            }
146        }
147
148        log::info!("Found {} records", records.len());
149        Ok(records)
150    }
151
152    pub fn finish(self) -> anyhow::Result<PathBuf> {
153        // Make sure our buffer has been flushed to the filesystem.
154        if let Ok(mut inner_writer) = self.writer.into_inner() {
155            let _ = inner_writer.flush();
156        }
157
158        // Read file from filesystem & compress.
159        let file = std::fs::read(&self.path)?;
160        let before = file.len();
161        log::debug!(
162            "compressing data from {} | {} bytes",
163            self.path.display(),
164            before
165        );
166        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
167        encoder.write_all(&file)?;
168
169        // Check to see if we have an existing archive & remove it.
170        let mut compressed = self.path;
171        compressed.set_extension("warc.gz");
172        if compressed.exists() {
173            log::warn!("{} exists, removing!", compressed.display());
174            std::fs::remove_file(compressed.clone())?;
175        }
176
177        let contents = encoder.finish()?;
178        let after = contents.len();
179        let compresion_percentage = (before as f64 - after as f64) / before as f64 * 100.0;
180        std::fs::write(compressed.clone(), contents)?;
181        log::info!(
182            "saved to: {} | {} -> {} bytes ({:0.2}%)",
183            compressed.display(),
184            file.len(),
185            after,
186            compresion_percentage
187        );
188
189        Ok(compressed.clone())
190    }
191
192    pub fn generate_header(url: &str, content_length: usize) -> RawRecordHeader {
193        let date = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
194        RawRecordHeader {
195            version: "1.1".to_owned(),
196            headers: vec![
197                (
198                    // mandatory
199                    // Globally unique identifier for the current record
200                    WarcHeader::RecordID,
201                    Record::<BufferedBody>::generate_record_id().into_bytes(),
202                ),
203                (
204                    // mandatory
205                    // Number of octets in the block.
206                    WarcHeader::ContentLength,
207                    content_length.to_string().into_bytes(),
208                ),
209                (
210                    // mandatory
211                    // UTC timestamp that represents the instant the data
212                    // was captured.
213                    WarcHeader::Date,
214                    date.into_bytes(),
215                ),
216                (
217                    // mandatory
218                    // Type of record
219                    WarcHeader::WarcType,
220                    RecordType::Response.to_string().into_bytes(),
221                ),
222                // Optional attributes
223                (
224                    WarcHeader::ContentType,
225                    "text/html".to_string().into_bytes(),
226                ),
227                (WarcHeader::TargetURI, url.to_owned().into_bytes()),
228            ]
229            .into_iter()
230            .collect(),
231        }
232    }
233
234    pub async fn archive_record(&mut self, record: &ArchiveRecord) -> anyhow::Result<usize> {
235        let url = record.url.clone();
236        log::debug!("archiving {}", url);
237
238        // Output headers into HTTP format
239        let mut headers = "HTTP/1.1 200 OK\n".to_string();
240        for (name, value) in record.headers.iter() {
241            let _ = writeln!(headers, "{name}: {value}");
242        }
243
244        let body = record.content.clone();
245        let content = format!("{headers}\n{body}");
246        let warc_header = Self::generate_header(&url, content.len());
247
248        let bytes_written = self.writer.write_raw(warc_header, &content)?;
249        log::debug!("wrote {} bytes", bytes_written);
250        Ok(bytes_written)
251    }
252}
253
254pub struct ArchiveFiles {
255    pub warc: PathBuf,
256    pub parsed: PathBuf,
257}
258
259/// Create a preproprocessed archive from an existing WARC file
260pub fn preprocess_warc_archive(warc: &Path) -> anyhow::Result<PathBuf> {
261    let parent_dir = warc.parent().expect("Unable to get parent folder");
262    log::info!("Saving preprocessed archive to: {}", parent_dir.display());
263
264    let warc = WarcReader::from_path_gzip(warc)?;
265    let path = parent_dir.join(PARSED_ARCHIVE_FILE);
266    if path.exists() {
267        let _ = std::fs::remove_file(&path);
268    }
269
270    let archive_path = std::fs::File::create(path.clone()).expect("Unable to create file");
271    let mut gz = GzEncoder::new(&archive_path, Compression::default());
272
273    let mut buffer = String::new();
274    let mut archived_urls = HashSet::new();
275    let mut duplicate_count = 0;
276
277    for record in warc.iter_records().flatten() {
278        buffer.clear();
279        if record.body().read_to_string(&mut buffer).is_ok() {
280            if let Some(url) = record.header(WarcHeader::TargetURI) {
281                let (_, content) = Archiver::parse_body(&buffer);
282                let parsed = crate::parser::html::html_to_text(&url, &content);
283                let ser = ron::ser::to_string(&parsed).unwrap();
284
285                if let Some(canonical) = parsed.canonical_url {
286                    if let Ok(canonical) = url::Url::parse(&canonical) {
287                        let url_str = canonical.to_string();
288                        if !archived_urls.contains(&url_str) {
289                            gz.write_fmt(format_args!("{ser}\n"))?;
290                            archived_urls.insert(url_str);
291                        } else {
292                            duplicate_count += 1;
293                        }
294                    }
295                }
296            }
297        }
298    }
299
300    gz.finish()?;
301    log::info!("Found {duplicate_count} duplicates");
302    log::info!("Preprocess {} docs", archived_urls.len());
303    log::info!("Saved parsed results to: {}", path.display());
304
305    Ok(path)
306}
307
308pub fn validate_preprocessed_archive(path: &Path) -> anyhow::Result<()> {
309    let mut urls = HashSet::new();
310    for res in ParseResult::iter_from_gz(path)? {
311        let url = res.canonical_url.expect("Must have canonical URL");
312        if urls.contains(&url) {
313            return Err(anyhow::anyhow!(
314                "Duplicate URL found in preprocessed archive: {:?}",
315                url
316            ));
317        } else {
318            urls.insert(url);
319        }
320    }
321
322    Ok(())
323}
324
325pub fn warc_to_iterator(
326    warc: &Path,
327) -> anyhow::Result<impl Iterator<Item = Option<ArchiveRecord>>> {
328    let parent_dir = warc.parent().expect("Unable to get parent folder");
329    log::info!("Saving preprocessed archive to: {}", parent_dir.display());
330
331    let warc = WarcReader::from_path_gzip(warc)?;
332
333    let record_itr = warc.iter_records().map(move |record_rslt| {
334        if let Ok(record) = record_rslt {
335            let url = record
336                .header(WarcHeader::TargetURI)
337                .expect("TargetURI not set")
338                .to_string();
339
340            if let Ok(body) = String::from_utf8(record.body().into()) {
341                let (headers, content) = Archiver::parse_body(&body);
342                return Option::Some(ArchiveRecord {
343                    status: 200u16,
344                    url,
345                    headers,
346                    content,
347                });
348            }
349        }
350        Option::None
351    });
352
353    Ok(record_itr)
354}
355
356/// Creates gzipped archives for all the crawls & preprocessed crawl content.
357pub async fn create_archives(
358    storage: &Path,
359    records: &[(String, PathBuf)],
360) -> anyhow::Result<ArchiveFiles> {
361    log::info!("Archiving responses & pre-processed");
362    if !storage.exists() {
363        let _ = std::fs::create_dir_all(storage);
364    }
365
366    let mut archiver = Archiver::new(storage).expect("Unable to create archive");
367    let parsed_archive_path = storage.join(PARSED_ARCHIVE_FILE);
368    let parsed_archive =
369        std::fs::File::create(parsed_archive_path.clone()).expect("Unable to create file");
370    let mut gz = GzEncoder::new(&parsed_archive, Compression::default());
371    let mut archived_urls = HashSet::new();
372    for content in records
373        .iter()
374        .filter_map(|(_, path)| std::fs::read_to_string(path).ok())
375    {
376        if let Ok(rec) = ron::from_str::<ArchiveRecord>(&content) {
377            // Only save successes to the archive
378            if rec.status >= 200 && rec.status <= 299 && !archived_urls.contains(&rec.url) {
379                let parsed = crate::parser::html::html_to_text(&rec.url, &rec.content);
380                let canonical_url = parsed.canonical_url.clone();
381                archiver.archive_record(&rec).await?;
382
383                // Only add to preprocessed file if canonical url is unique.
384                if let Some(Ok(canonical)) = canonical_url.map(|x| url::Url::parse(&x)) {
385                    if !archived_urls.contains(&canonical.to_string()) {
386                        let ser = ron::ser::to_string(&parsed).unwrap();
387                        gz.write_fmt(format_args!("{ser}\n"))?;
388                        archived_urls.insert(canonical.to_string());
389                    }
390                }
391            }
392        }
393    }
394    gz.finish()?;
395    let archive_file = archiver.finish()?;
396    log::info!("Saved parsed results to: {}", parsed_archive_path.display());
397    log::info!("Finished crawl");
398
399    Ok(ArchiveFiles {
400        warc: archive_file,
401        parsed: parsed_archive_path,
402    })
403}