1use chrono::Utc;
4use flate2::write::GzEncoder;
5use flate2::Compression;
6use std::fs::{File, OpenOptions};
7use std::io::{BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicU64, Ordering};
10use url::Url;
11use uuid::Uuid;
12
13use crate::types::error::Result;
14
15pub struct WarcWriter<W: Write> {
17 writer: BufWriter<W>,
18 warc_version: String,
19 bytes_written: AtomicU64,
20}
21
22impl<W: Write> WarcWriter<W> {
23 pub fn new(writer: W) -> Self {
25 Self {
26 writer: BufWriter::new(writer),
27 warc_version: "WARC/1.1".to_string(),
28 bytes_written: AtomicU64::new(0),
29 }
30 }
31
32 pub fn bytes_written(&self) -> u64 {
34 self.bytes_written.load(Ordering::Relaxed)
35 }
36
37 pub fn write_warcinfo(&mut self, info: &WarcInfo) -> Result<()> {
39 let record_id = Self::generate_record_id();
40 let date = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
41
42 let content = format!(
43 "software: {}\r\nformat: {}\r\nconformsTo: http://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/\r\n",
44 info.software, info.format
45 );
46 let content_bytes = content.as_bytes();
47
48 let header = format!(
49 "{}\r\nWARC-Type: warcinfo\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\nContent-Type: application/warc-fields\r\nContent-Length: {}\r\n\r\n",
50 self.warc_version, date, record_id, content_bytes.len()
51 );
52
53 let bytes = header.len() + content_bytes.len() + 4; self.writer.write_all(header.as_bytes())?;
55 self.writer.write_all(content_bytes)?;
56 self.writer.write_all(b"\r\n\r\n")?;
57
58 self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
59
60 Ok(())
61 }
62
63 pub fn write_request(&mut self, request: &WarcRequest) -> Result<String> {
65 let record_id = Self::generate_record_id();
66 let date = request.date.format("%Y-%m-%dT%H:%M:%SZ").to_string();
67
68 let mut http_request = format!(
69 "{} {} HTTP/1.1\r\n",
70 request.method.as_deref().unwrap_or("GET"),
71 request.url.path()
72 );
73 http_request.push_str(&format!("Host: {}\r\n", request.url.host_str().unwrap_or("")));
74 for (key, value) in &request.headers {
75 http_request.push_str(&format!("{}: {}\r\n", key, value));
76 }
77 http_request.push_str("\r\n");
78
79 let content_bytes = http_request.as_bytes();
80
81 let header = format!(
82 "{}\r\nWARC-Type: request\r\nWARC-Target-URI: {}\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\nContent-Type: application/http;msgtype=request\r\nContent-Length: {}\r\n\r\n",
83 self.warc_version, request.url, date, record_id, content_bytes.len()
84 );
85
86 let bytes = header.len() + content_bytes.len() + 4;
87 self.writer.write_all(header.as_bytes())?;
88 self.writer.write_all(content_bytes)?;
89 self.writer.write_all(b"\r\n\r\n")?;
90
91 self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
92
93 Ok(record_id)
94 }
95
96 pub fn write_response(&mut self, response: &WarcResponse, concurrent_to: Option<&str>) -> Result<String> {
98 let record_id = Self::generate_record_id();
99 let date = response.date.format("%Y-%m-%dT%H:%M:%SZ").to_string();
100
101 let status_text = http_status_text(response.status_code);
103 let mut http_response = format!(
104 "HTTP/1.1 {} {}\r\n",
105 response.status_code, status_text
106 );
107 for (key, value) in &response.headers {
108 http_response.push_str(&format!("{}: {}\r\n", key, value));
109 }
110 http_response.push_str("\r\n");
111
112 let mut content_bytes = http_response.into_bytes();
113 content_bytes.extend_from_slice(&response.body);
114
115 let mut header = format!(
116 "{}\r\nWARC-Type: response\r\nWARC-Target-URI: {}\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\n",
117 self.warc_version, response.url, date, record_id
118 );
119 if let Some(req_id) = concurrent_to {
120 header.push_str(&format!("WARC-Concurrent-To: <{}>\r\n", req_id));
121 }
122 if let Some(ip) = &response.ip_address {
123 header.push_str(&format!("WARC-IP-Address: {}\r\n", ip));
124 }
125 header.push_str(&format!(
126 "Content-Type: application/http;msgtype=response\r\nContent-Length: {}\r\n\r\n",
127 content_bytes.len()
128 ));
129
130 let bytes = header.len() + content_bytes.len() + 4;
131 self.writer.write_all(header.as_bytes())?;
132 self.writer.write_all(&content_bytes)?;
133 self.writer.write_all(b"\r\n\r\n")?;
134
135 self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
136
137 Ok(record_id)
138 }
139
140 pub fn write_metadata(&mut self, url: &Url, metadata: &WarcMetadata) -> Result<String> {
142 let record_id = Self::generate_record_id();
143 let date = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
144
145 let content = serde_json::to_string(&metadata.data).unwrap_or_default();
146 let content_bytes = content.as_bytes();
147
148 let mut header = format!(
149 "{}\r\nWARC-Type: metadata\r\nWARC-Target-URI: {}\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\n",
150 self.warc_version, url, date, record_id
151 );
152 if let Some(refers_to) = &metadata.refers_to {
153 header.push_str(&format!("WARC-Refers-To: <{}>\r\n", refers_to));
154 }
155 header.push_str(&format!(
156 "Content-Type: application/json\r\nContent-Length: {}\r\n\r\n",
157 content_bytes.len()
158 ));
159
160 let bytes = header.len() + content_bytes.len() + 4;
161 self.writer.write_all(header.as_bytes())?;
162 self.writer.write_all(content_bytes)?;
163 self.writer.write_all(b"\r\n\r\n")?;
164
165 self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
166
167 Ok(record_id)
168 }
169
170 fn generate_record_id() -> String {
172 format!("urn:uuid:{}", Uuid::new_v4())
173 }
174
175 pub fn flush(&mut self) -> Result<()> {
177 self.writer.flush()?;
178 Ok(())
179 }
180}
181
182pub struct RotatingWarcWriter {
184 base_dir: PathBuf,
186 prefix: String,
188 max_size: u64,
190 compress: bool,
192 current_index: AtomicU64,
194 warc_info: WarcInfo,
196}
197
198impl RotatingWarcWriter {
199 pub fn new(base_dir: impl AsRef<Path>, prefix: impl Into<String>) -> Self {
201 Self {
202 base_dir: base_dir.as_ref().to_path_buf(),
203 prefix: prefix.into(),
204 max_size: 1024 * 1024 * 1024, compress: true,
206 current_index: AtomicU64::new(0),
207 warc_info: WarcInfo::default(),
208 }
209 }
210
211 pub fn with_max_size(mut self, max_size: u64) -> Self {
213 self.max_size = max_size;
214 self
215 }
216
217 pub fn with_compression(mut self, compress: bool) -> Self {
219 self.compress = compress;
220 self
221 }
222
223 pub fn with_info(mut self, info: WarcInfo) -> Self {
225 self.warc_info = info;
226 self
227 }
228
229 pub fn current_filename(&self) -> PathBuf {
231 let index = self.current_index.load(Ordering::Relaxed);
232 let timestamp = Utc::now().format("%Y%m%d%H%M%S");
233 let ext = if self.compress { "warc.gz" } else { "warc" };
234 self.base_dir.join(format!("{}-{}-{:05}.{}", self.prefix, timestamp, index, ext))
235 }
236
237 pub fn create_file(&self) -> Result<WarcFileHandle> {
239 std::fs::create_dir_all(&self.base_dir)?;
240
241 let path = self.current_filename();
242 let file = OpenOptions::new()
243 .create(true)
244 .write(true)
245 .truncate(true)
246 .open(&path)?;
247
248 if self.compress {
249 let encoder = GzEncoder::new(file, Compression::default());
250 let mut writer = WarcWriter::new(encoder);
251 writer.write_warcinfo(&self.warc_info)?;
252 Ok(WarcFileHandle::Compressed(writer, path))
253 } else {
254 let mut writer = WarcWriter::new(file);
255 writer.write_warcinfo(&self.warc_info)?;
256 Ok(WarcFileHandle::Uncompressed(writer, path))
257 }
258 }
259
260 pub fn rotate(&self) -> Result<WarcFileHandle> {
262 self.current_index.fetch_add(1, Ordering::SeqCst);
263 self.create_file()
264 }
265
266 pub fn needs_rotation(&self, handle: &WarcFileHandle) -> bool {
268 handle.bytes_written() >= self.max_size
269 }
270}
271
272pub enum WarcFileHandle {
274 Compressed(WarcWriter<GzEncoder<File>>, PathBuf),
276 Uncompressed(WarcWriter<File>, PathBuf),
278}
279
280impl WarcFileHandle {
281 pub fn bytes_written(&self) -> u64 {
283 match self {
284 WarcFileHandle::Compressed(w, _) => w.bytes_written(),
285 WarcFileHandle::Uncompressed(w, _) => w.bytes_written(),
286 }
287 }
288
289 pub fn path(&self) -> &Path {
291 match self {
292 WarcFileHandle::Compressed(_, p) => p,
293 WarcFileHandle::Uncompressed(_, p) => p,
294 }
295 }
296
297 pub fn write_request(&mut self, request: &WarcRequest) -> Result<String> {
299 match self {
300 WarcFileHandle::Compressed(w, _) => w.write_request(request),
301 WarcFileHandle::Uncompressed(w, _) => w.write_request(request),
302 }
303 }
304
305 pub fn write_response(&mut self, response: &WarcResponse, concurrent_to: Option<&str>) -> Result<String> {
307 match self {
308 WarcFileHandle::Compressed(w, _) => w.write_response(response, concurrent_to),
309 WarcFileHandle::Uncompressed(w, _) => w.write_response(response, concurrent_to),
310 }
311 }
312
313 pub fn write_metadata(&mut self, url: &Url, metadata: &WarcMetadata) -> Result<String> {
315 match self {
316 WarcFileHandle::Compressed(w, _) => w.write_metadata(url, metadata),
317 WarcFileHandle::Uncompressed(w, _) => w.write_metadata(url, metadata),
318 }
319 }
320
321 pub fn flush(&mut self) -> Result<()> {
323 match self {
324 WarcFileHandle::Compressed(w, _) => w.flush(),
325 WarcFileHandle::Uncompressed(w, _) => w.flush(),
326 }
327 }
328}
329
330pub struct WarcInfo {
332 pub software: String,
334 pub format: String,
336 pub operator: Option<String>,
338 pub description: Option<String>,
340}
341
342impl Default for WarcInfo {
343 fn default() -> Self {
344 Self {
345 software: "Halldyll/1.0".to_string(),
346 format: "WARC File Format 1.1".to_string(),
347 operator: None,
348 description: None,
349 }
350 }
351}
352
353pub struct WarcRequest {
355 pub url: Url,
357 pub date: chrono::DateTime<chrono::Utc>,
359 pub method: Option<String>,
361 pub headers: Vec<(String, String)>,
363}
364
365impl WarcRequest {
366 pub fn new(url: Url) -> Self {
368 Self {
369 url,
370 date: Utc::now(),
371 method: None,
372 headers: Vec::new(),
373 }
374 }
375}
376
377pub struct WarcResponse {
379 pub url: Url,
381 pub date: chrono::DateTime<chrono::Utc>,
383 pub status_code: u16,
385 pub headers: Vec<(String, String)>,
387 pub body: Vec<u8>,
389 pub ip_address: Option<String>,
391}
392
393impl WarcResponse {
394 pub fn new(url: Url, status_code: u16, body: Vec<u8>) -> Self {
396 Self {
397 url,
398 date: Utc::now(),
399 status_code,
400 headers: Vec::new(),
401 body,
402 ip_address: None,
403 }
404 }
405}
406
407pub struct WarcMetadata {
409 pub data: serde_json::Value,
411 pub refers_to: Option<String>,
413}
414
415fn http_status_text(code: u16) -> &'static str {
417 match code {
418 100 => "Continue",
419 101 => "Switching Protocols",
420 200 => "OK",
421 201 => "Created",
422 202 => "Accepted",
423 204 => "No Content",
424 206 => "Partial Content",
425 301 => "Moved Permanently",
426 302 => "Found",
427 303 => "See Other",
428 304 => "Not Modified",
429 307 => "Temporary Redirect",
430 308 => "Permanent Redirect",
431 400 => "Bad Request",
432 401 => "Unauthorized",
433 403 => "Forbidden",
434 404 => "Not Found",
435 405 => "Method Not Allowed",
436 408 => "Request Timeout",
437 410 => "Gone",
438 429 => "Too Many Requests",
439 500 => "Internal Server Error",
440 501 => "Not Implemented",
441 502 => "Bad Gateway",
442 503 => "Service Unavailable",
443 504 => "Gateway Timeout",
444 _ => "Unknown",
445 }
446}
447