1use std::collections::HashSet;
2use std::fmt::Write as FmtWrite;
3use std::io::{BufWriter, Read, Write};
4use std::{
5 fs::File,
6 path::{Path, PathBuf},
7};
8
9use crate::parser::ParseResult;
10use chrono::prelude::*;
11use flate2::{write::GzEncoder, Compression};
12use reqwest::Response;
13use serde::{Deserialize, Serialize};
14use warc::{BufferedBody, RawRecordHeader, Record, RecordType, WarcHeader, WarcReader, WarcWriter};
15
16pub const ARCHIVE_FILE: &str = "archive.warc";
17pub const PARSED_ARCHIVE_FILE: &str = "parsed.gz";
18
19pub struct Archiver {
20 path: PathBuf,
21 writer: WarcWriter<BufWriter<File>>,
22}
23
24#[derive(Serialize, Deserialize, Clone, Debug)]
25pub struct ArchiveRecord {
26 pub status: u16,
27 pub url: String,
28 pub headers: Vec<(String, String)>,
29 pub content: String,
30}
31
32impl ArchiveRecord {
33 pub async fn from_file(file: &Path, base_url: &str) -> anyhow::Result<Self> {
34 let content = std::fs::read_to_string(file)?;
35 let name = file
36 .file_name()
37 .unwrap_or_default()
38 .to_string_lossy()
39 .replace(".html", "");
40
41 let url = format!("{base_url}/{name}");
42
43 Ok(ArchiveRecord {
44 status: 200,
45 url,
46 headers: Vec::new(),
47 content,
48 })
49 }
50
51 pub async fn from_response(
52 resp: Response,
53 url_override: Option<String>,
54 ) -> anyhow::Result<Self> {
55 let headers: Vec<(String, String)> = resp
56 .headers()
57 .into_iter()
58 .filter_map(|(name, value)| {
59 if let Ok(value) = value.to_str() {
60 Some((name.to_string(), value.to_string()))
61 } else {
62 None
63 }
64 })
65 .collect();
66
67 let status = resp.status().as_u16();
68 let url = if let Some(url_override) = url_override {
69 url_override
70 } else {
71 resp.url().as_str().to_string()
72 };
73
74 let content = resp.text().await?;
75 Ok(ArchiveRecord {
76 status,
77 url,
78 headers,
79 content,
80 })
81 }
82}
83
84impl Archiver {
85 fn parse_body(body: &str) -> (Vec<(String, String)>, String) {
86 let mut headers = Vec::new();
87 let mut content = String::new();
88
89 let mut headers_finished = false;
90 for line in body.lines() {
91 let trimmed = line.trim();
92 if trimmed.is_empty() {
93 headers_finished = true;
94 } else {
95 match headers_finished {
96 true => content.push_str(trimmed),
97 false => {
98 if let Some((key, value)) = trimmed.split_once(':') {
99 headers.push((key.trim().to_string(), value.trim().to_string()));
100 }
101 }
102 }
103 }
104 }
105
106 (headers, content)
107 }
108
109 pub fn new(storage: &Path) -> anyhow::Result<Self> {
110 let path = storage.join(ARCHIVE_FILE);
111 Ok(Self {
112 path: path.clone(),
113 writer: WarcWriter::from_path(path)?,
114 })
115 }
116
117 pub fn read(path: &Path) -> anyhow::Result<Vec<ArchiveRecord>> {
118 let warc_path = if !path.ends_with(format!("{ARCHIVE_FILE}.gz")) {
119 let mut warc_path = path.join(ARCHIVE_FILE);
120 warc_path.set_extension("warc.gz");
121 warc_path
122 } else {
123 path.to_path_buf()
124 };
125
126 log::info!("Reading archive: {}", warc_path.display());
127
128 let warc = WarcReader::from_path_gzip(&warc_path)?;
130 let mut records = Vec::new();
131 for record in warc.iter_records().flatten() {
132 let url = record
133 .header(WarcHeader::TargetURI)
134 .expect("TargetURI not set")
135 .to_string();
136
137 if let Ok(body) = String::from_utf8(record.body().into()) {
138 let (headers, content) = Archiver::parse_body(&body);
139 records.push(ArchiveRecord {
140 status: 200u16,
141 url,
142 headers,
143 content,
144 });
145 }
146 }
147
148 log::info!("Found {} records", records.len());
149 Ok(records)
150 }
151
152 pub fn finish(self) -> anyhow::Result<PathBuf> {
153 if let Ok(mut inner_writer) = self.writer.into_inner() {
155 let _ = inner_writer.flush();
156 }
157
158 let file = std::fs::read(&self.path)?;
160 let before = file.len();
161 log::debug!(
162 "compressing data from {} | {} bytes",
163 self.path.display(),
164 before
165 );
166 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
167 encoder.write_all(&file)?;
168
169 let mut compressed = self.path;
171 compressed.set_extension("warc.gz");
172 if compressed.exists() {
173 log::warn!("{} exists, removing!", compressed.display());
174 std::fs::remove_file(compressed.clone())?;
175 }
176
177 let contents = encoder.finish()?;
178 let after = contents.len();
179 let compresion_percentage = (before as f64 - after as f64) / before as f64 * 100.0;
180 std::fs::write(compressed.clone(), contents)?;
181 log::info!(
182 "saved to: {} | {} -> {} bytes ({:0.2}%)",
183 compressed.display(),
184 file.len(),
185 after,
186 compresion_percentage
187 );
188
189 Ok(compressed.clone())
190 }
191
192 pub fn generate_header(url: &str, content_length: usize) -> RawRecordHeader {
193 let date = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
194 RawRecordHeader {
195 version: "1.1".to_owned(),
196 headers: vec![
197 (
198 WarcHeader::RecordID,
201 Record::<BufferedBody>::generate_record_id().into_bytes(),
202 ),
203 (
204 WarcHeader::ContentLength,
207 content_length.to_string().into_bytes(),
208 ),
209 (
210 WarcHeader::Date,
214 date.into_bytes(),
215 ),
216 (
217 WarcHeader::WarcType,
220 RecordType::Response.to_string().into_bytes(),
221 ),
222 (
224 WarcHeader::ContentType,
225 "text/html".to_string().into_bytes(),
226 ),
227 (WarcHeader::TargetURI, url.to_owned().into_bytes()),
228 ]
229 .into_iter()
230 .collect(),
231 }
232 }
233
234 pub async fn archive_record(&mut self, record: &ArchiveRecord) -> anyhow::Result<usize> {
235 let url = record.url.clone();
236 log::debug!("archiving {}", url);
237
238 let mut headers = "HTTP/1.1 200 OK\n".to_string();
240 for (name, value) in record.headers.iter() {
241 let _ = writeln!(headers, "{name}: {value}");
242 }
243
244 let body = record.content.clone();
245 let content = format!("{headers}\n{body}");
246 let warc_header = Self::generate_header(&url, content.len());
247
248 let bytes_written = self.writer.write_raw(warc_header, &content)?;
249 log::debug!("wrote {} bytes", bytes_written);
250 Ok(bytes_written)
251 }
252}
253
254pub struct ArchiveFiles {
255 pub warc: PathBuf,
256 pub parsed: PathBuf,
257}
258
259pub fn preprocess_warc_archive(warc: &Path) -> anyhow::Result<PathBuf> {
261 let parent_dir = warc.parent().expect("Unable to get parent folder");
262 log::info!("Saving preprocessed archive to: {}", parent_dir.display());
263
264 let warc = WarcReader::from_path_gzip(warc)?;
265 let path = parent_dir.join(PARSED_ARCHIVE_FILE);
266 if path.exists() {
267 let _ = std::fs::remove_file(&path);
268 }
269
270 let archive_path = std::fs::File::create(path.clone()).expect("Unable to create file");
271 let mut gz = GzEncoder::new(&archive_path, Compression::default());
272
273 let mut buffer = String::new();
274 let mut archived_urls = HashSet::new();
275 let mut duplicate_count = 0;
276
277 for record in warc.iter_records().flatten() {
278 buffer.clear();
279 if record.body().read_to_string(&mut buffer).is_ok() {
280 if let Some(url) = record.header(WarcHeader::TargetURI) {
281 let (_, content) = Archiver::parse_body(&buffer);
282 let parsed = crate::parser::html::html_to_text(&url, &content);
283 let ser = ron::ser::to_string(&parsed).unwrap();
284
285 if let Some(canonical) = parsed.canonical_url {
286 if let Ok(canonical) = url::Url::parse(&canonical) {
287 let url_str = canonical.to_string();
288 if !archived_urls.contains(&url_str) {
289 gz.write_fmt(format_args!("{ser}\n"))?;
290 archived_urls.insert(url_str);
291 } else {
292 duplicate_count += 1;
293 }
294 }
295 }
296 }
297 }
298 }
299
300 gz.finish()?;
301 log::info!("Found {duplicate_count} duplicates");
302 log::info!("Preprocess {} docs", archived_urls.len());
303 log::info!("Saved parsed results to: {}", path.display());
304
305 Ok(path)
306}
307
308pub fn validate_preprocessed_archive(path: &Path) -> anyhow::Result<()> {
309 let mut urls = HashSet::new();
310 for res in ParseResult::iter_from_gz(path)? {
311 let url = res.canonical_url.expect("Must have canonical URL");
312 if urls.contains(&url) {
313 return Err(anyhow::anyhow!(
314 "Duplicate URL found in preprocessed archive: {:?}",
315 url
316 ));
317 } else {
318 urls.insert(url);
319 }
320 }
321
322 Ok(())
323}
324
325pub fn warc_to_iterator(
326 warc: &Path,
327) -> anyhow::Result<impl Iterator<Item = Option<ArchiveRecord>>> {
328 let parent_dir = warc.parent().expect("Unable to get parent folder");
329 log::info!("Saving preprocessed archive to: {}", parent_dir.display());
330
331 let warc = WarcReader::from_path_gzip(warc)?;
332
333 let record_itr = warc.iter_records().map(move |record_rslt| {
334 if let Ok(record) = record_rslt {
335 let url = record
336 .header(WarcHeader::TargetURI)
337 .expect("TargetURI not set")
338 .to_string();
339
340 if let Ok(body) = String::from_utf8(record.body().into()) {
341 let (headers, content) = Archiver::parse_body(&body);
342 return Option::Some(ArchiveRecord {
343 status: 200u16,
344 url,
345 headers,
346 content,
347 });
348 }
349 }
350 Option::None
351 });
352
353 Ok(record_itr)
354}
355
356pub async fn create_archives(
358 storage: &Path,
359 records: &[(String, PathBuf)],
360) -> anyhow::Result<ArchiveFiles> {
361 log::info!("Archiving responses & pre-processed");
362 if !storage.exists() {
363 let _ = std::fs::create_dir_all(storage);
364 }
365
366 let mut archiver = Archiver::new(storage).expect("Unable to create archive");
367 let parsed_archive_path = storage.join(PARSED_ARCHIVE_FILE);
368 let parsed_archive =
369 std::fs::File::create(parsed_archive_path.clone()).expect("Unable to create file");
370 let mut gz = GzEncoder::new(&parsed_archive, Compression::default());
371 let mut archived_urls = HashSet::new();
372 for content in records
373 .iter()
374 .filter_map(|(_, path)| std::fs::read_to_string(path).ok())
375 {
376 if let Ok(rec) = ron::from_str::<ArchiveRecord>(&content) {
377 if rec.status >= 200 && rec.status <= 299 && !archived_urls.contains(&rec.url) {
379 let parsed = crate::parser::html::html_to_text(&rec.url, &rec.content);
380 let canonical_url = parsed.canonical_url.clone();
381 archiver.archive_record(&rec).await?;
382
383 if let Some(Ok(canonical)) = canonical_url.map(|x| url::Url::parse(&x)) {
385 if !archived_urls.contains(&canonical.to_string()) {
386 let ser = ron::ser::to_string(&parsed).unwrap();
387 gz.write_fmt(format_args!("{ser}\n"))?;
388 archived_urls.insert(canonical.to_string());
389 }
390 }
391 }
392 }
393 }
394 gz.finish()?;
395 let archive_file = archiver.finish()?;
396 log::info!("Saved parsed results to: {}", parsed_archive_path.display());
397 log::info!("Finished crawl");
398
399 Ok(ArchiveFiles {
400 warc: archive_file,
401 parsed: parsed_archive_path,
402 })
403}