halldyll_core/storage/
warc.rs

1//! WARC - Web ARChive format (ISO 28500)
2
3use chrono::Utc;
4use flate2::write::GzEncoder;
5use flate2::Compression;
6use std::fs::{File, OpenOptions};
7use std::io::{BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicU64, Ordering};
10use url::Url;
11use uuid::Uuid;
12
13use crate::types::error::Result;
14
15/// WARC Writer with compression and rotation support
16pub struct WarcWriter<W: Write> {
17    writer: BufWriter<W>,
18    warc_version: String,
19    bytes_written: AtomicU64,
20}
21
22impl<W: Write> WarcWriter<W> {
23    /// New WARC writer
24    pub fn new(writer: W) -> Self {
25        Self {
26            writer: BufWriter::new(writer),
27            warc_version: "WARC/1.1".to_string(),
28            bytes_written: AtomicU64::new(0),
29        }
30    }
31
32    /// Get bytes written so far
33    pub fn bytes_written(&self) -> u64 {
34        self.bytes_written.load(Ordering::Relaxed)
35    }
36
37    /// Write a warcinfo record
38    pub fn write_warcinfo(&mut self, info: &WarcInfo) -> Result<()> {
39        let record_id = Self::generate_record_id();
40        let date = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
41        
42        let content = format!(
43            "software: {}\r\nformat: {}\r\nconformsTo: http://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/\r\n",
44            info.software, info.format
45        );
46        let content_bytes = content.as_bytes();
47
48        let header = format!(
49            "{}\r\nWARC-Type: warcinfo\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\nContent-Type: application/warc-fields\r\nContent-Length: {}\r\n\r\n",
50            self.warc_version, date, record_id, content_bytes.len()
51        );
52
53        let bytes = header.len() + content_bytes.len() + 4; // +4 for \r\n\r\n
54        self.writer.write_all(header.as_bytes())?;
55        self.writer.write_all(content_bytes)?;
56        self.writer.write_all(b"\r\n\r\n")?;
57        
58        self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
59
60        Ok(())
61    }
62
63    /// Write a request record
64    pub fn write_request(&mut self, request: &WarcRequest) -> Result<String> {
65        let record_id = Self::generate_record_id();
66        let date = request.date.format("%Y-%m-%dT%H:%M:%SZ").to_string();
67        
68        let mut http_request = format!(
69            "{} {} HTTP/1.1\r\n",
70            request.method.as_deref().unwrap_or("GET"),
71            request.url.path()
72        );
73        http_request.push_str(&format!("Host: {}\r\n", request.url.host_str().unwrap_or("")));
74        for (key, value) in &request.headers {
75            http_request.push_str(&format!("{}: {}\r\n", key, value));
76        }
77        http_request.push_str("\r\n");
78        
79        let content_bytes = http_request.as_bytes();
80
81        let header = format!(
82            "{}\r\nWARC-Type: request\r\nWARC-Target-URI: {}\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\nContent-Type: application/http;msgtype=request\r\nContent-Length: {}\r\n\r\n",
83            self.warc_version, request.url, date, record_id, content_bytes.len()
84        );
85
86        let bytes = header.len() + content_bytes.len() + 4;
87        self.writer.write_all(header.as_bytes())?;
88        self.writer.write_all(content_bytes)?;
89        self.writer.write_all(b"\r\n\r\n")?;
90
91        self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
92
93        Ok(record_id)
94    }
95
96    /// Write a response record
97    pub fn write_response(&mut self, response: &WarcResponse, concurrent_to: Option<&str>) -> Result<String> {
98        let record_id = Self::generate_record_id();
99        let date = response.date.format("%Y-%m-%dT%H:%M:%SZ").to_string();
100        
101        // Build HTTP response
102        let status_text = http_status_text(response.status_code);
103        let mut http_response = format!(
104            "HTTP/1.1 {} {}\r\n",
105            response.status_code, status_text
106        );
107        for (key, value) in &response.headers {
108            http_response.push_str(&format!("{}: {}\r\n", key, value));
109        }
110        http_response.push_str("\r\n");
111        
112        let mut content_bytes = http_response.into_bytes();
113        content_bytes.extend_from_slice(&response.body);
114
115        let mut header = format!(
116            "{}\r\nWARC-Type: response\r\nWARC-Target-URI: {}\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\n",
117            self.warc_version, response.url, date, record_id
118        );
119        if let Some(req_id) = concurrent_to {
120            header.push_str(&format!("WARC-Concurrent-To: <{}>\r\n", req_id));
121        }
122        if let Some(ip) = &response.ip_address {
123            header.push_str(&format!("WARC-IP-Address: {}\r\n", ip));
124        }
125        header.push_str(&format!(
126            "Content-Type: application/http;msgtype=response\r\nContent-Length: {}\r\n\r\n",
127            content_bytes.len()
128        ));
129
130        let bytes = header.len() + content_bytes.len() + 4;
131        self.writer.write_all(header.as_bytes())?;
132        self.writer.write_all(&content_bytes)?;
133        self.writer.write_all(b"\r\n\r\n")?;
134
135        self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
136
137        Ok(record_id)
138    }
139
140    /// Write a metadata record
141    pub fn write_metadata(&mut self, url: &Url, metadata: &WarcMetadata) -> Result<String> {
142        let record_id = Self::generate_record_id();
143        let date = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
144        
145        let content = serde_json::to_string(&metadata.data).unwrap_or_default();
146        let content_bytes = content.as_bytes();
147
148        let mut header = format!(
149            "{}\r\nWARC-Type: metadata\r\nWARC-Target-URI: {}\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\n",
150            self.warc_version, url, date, record_id
151        );
152        if let Some(refers_to) = &metadata.refers_to {
153            header.push_str(&format!("WARC-Refers-To: <{}>\r\n", refers_to));
154        }
155        header.push_str(&format!(
156            "Content-Type: application/json\r\nContent-Length: {}\r\n\r\n",
157            content_bytes.len()
158        ));
159
160        let bytes = header.len() + content_bytes.len() + 4;
161        self.writer.write_all(header.as_bytes())?;
162        self.writer.write_all(content_bytes)?;
163        self.writer.write_all(b"\r\n\r\n")?;
164
165        self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
166
167        Ok(record_id)
168    }
169
170    /// Generate a unique Record-ID
171    fn generate_record_id() -> String {
172        format!("urn:uuid:{}", Uuid::new_v4())
173    }
174
175    /// Flush the writer
176    pub fn flush(&mut self) -> Result<()> {
177        self.writer.flush()?;
178        Ok(())
179    }
180}
181
182/// Rotating WARC writer with compression
183pub struct RotatingWarcWriter {
184    /// Base directory for WARC files
185    base_dir: PathBuf,
186    /// Prefix for filenames
187    prefix: String,
188    /// Max file size before rotation (bytes)
189    max_size: u64,
190    /// Use gzip compression
191    compress: bool,
192    /// Current file index
193    current_index: AtomicU64,
194    /// WARC info to write at start of each file
195    warc_info: WarcInfo,
196}
197
198impl RotatingWarcWriter {
199    /// Create a new rotating writer
200    pub fn new(base_dir: impl AsRef<Path>, prefix: impl Into<String>) -> Self {
201        Self {
202            base_dir: base_dir.as_ref().to_path_buf(),
203            prefix: prefix.into(),
204            max_size: 1024 * 1024 * 1024, // 1GB default
205            compress: true,
206            current_index: AtomicU64::new(0),
207            warc_info: WarcInfo::default(),
208        }
209    }
210
211    /// Set max file size
212    pub fn with_max_size(mut self, max_size: u64) -> Self {
213        self.max_size = max_size;
214        self
215    }
216
217    /// Enable/disable compression
218    pub fn with_compression(mut self, compress: bool) -> Self {
219        self.compress = compress;
220        self
221    }
222
223    /// Set WARC info
224    pub fn with_info(mut self, info: WarcInfo) -> Self {
225        self.warc_info = info;
226        self
227    }
228
229    /// Get the current filename
230    pub fn current_filename(&self) -> PathBuf {
231        let index = self.current_index.load(Ordering::Relaxed);
232        let timestamp = Utc::now().format("%Y%m%d%H%M%S");
233        let ext = if self.compress { "warc.gz" } else { "warc" };
234        self.base_dir.join(format!("{}-{}-{:05}.{}", self.prefix, timestamp, index, ext))
235    }
236
237    /// Create a new WARC file
238    pub fn create_file(&self) -> Result<WarcFileHandle> {
239        std::fs::create_dir_all(&self.base_dir)?;
240        
241        let path = self.current_filename();
242        let file = OpenOptions::new()
243            .create(true)
244            .write(true)
245            .truncate(true)
246            .open(&path)?;
247
248        if self.compress {
249            let encoder = GzEncoder::new(file, Compression::default());
250            let mut writer = WarcWriter::new(encoder);
251            writer.write_warcinfo(&self.warc_info)?;
252            Ok(WarcFileHandle::Compressed(writer, path))
253        } else {
254            let mut writer = WarcWriter::new(file);
255            writer.write_warcinfo(&self.warc_info)?;
256            Ok(WarcFileHandle::Uncompressed(writer, path))
257        }
258    }
259
260    /// Rotate to the next file
261    pub fn rotate(&self) -> Result<WarcFileHandle> {
262        self.current_index.fetch_add(1, Ordering::SeqCst);
263        self.create_file()
264    }
265
266    /// Check if rotation is needed based on size
267    pub fn needs_rotation(&self, handle: &WarcFileHandle) -> bool {
268        handle.bytes_written() >= self.max_size
269    }
270}
271
272/// Handle to an open WARC file
273pub enum WarcFileHandle {
274    /// Compressed file
275    Compressed(WarcWriter<GzEncoder<File>>, PathBuf),
276    /// Uncompressed file
277    Uncompressed(WarcWriter<File>, PathBuf),
278}
279
280impl WarcFileHandle {
281    /// Get bytes written
282    pub fn bytes_written(&self) -> u64 {
283        match self {
284            WarcFileHandle::Compressed(w, _) => w.bytes_written(),
285            WarcFileHandle::Uncompressed(w, _) => w.bytes_written(),
286        }
287    }
288
289    /// Get the file path
290    pub fn path(&self) -> &Path {
291        match self {
292            WarcFileHandle::Compressed(_, p) => p,
293            WarcFileHandle::Uncompressed(_, p) => p,
294        }
295    }
296
297    /// Write a request
298    pub fn write_request(&mut self, request: &WarcRequest) -> Result<String> {
299        match self {
300            WarcFileHandle::Compressed(w, _) => w.write_request(request),
301            WarcFileHandle::Uncompressed(w, _) => w.write_request(request),
302        }
303    }
304
305    /// Write a response
306    pub fn write_response(&mut self, response: &WarcResponse, concurrent_to: Option<&str>) -> Result<String> {
307        match self {
308            WarcFileHandle::Compressed(w, _) => w.write_response(response, concurrent_to),
309            WarcFileHandle::Uncompressed(w, _) => w.write_response(response, concurrent_to),
310        }
311    }
312
313    /// Write metadata
314    pub fn write_metadata(&mut self, url: &Url, metadata: &WarcMetadata) -> Result<String> {
315        match self {
316            WarcFileHandle::Compressed(w, _) => w.write_metadata(url, metadata),
317            WarcFileHandle::Uncompressed(w, _) => w.write_metadata(url, metadata),
318        }
319    }
320
321    /// Flush
322    pub fn flush(&mut self) -> Result<()> {
323        match self {
324            WarcFileHandle::Compressed(w, _) => w.flush(),
325            WarcFileHandle::Uncompressed(w, _) => w.flush(),
326        }
327    }
328}
329
330/// WARC info record
331pub struct WarcInfo {
332    /// Software identifier
333    pub software: String,
334    /// WARC format version
335    pub format: String,
336    /// Operator/organization
337    pub operator: Option<String>,
338    /// Description
339    pub description: Option<String>,
340}
341
342impl Default for WarcInfo {
343    fn default() -> Self {
344        Self {
345            software: "Halldyll/1.0".to_string(),
346            format: "WARC File Format 1.1".to_string(),
347            operator: None,
348            description: None,
349        }
350    }
351}
352
353/// WARC request record
354pub struct WarcRequest {
355    /// Request URL
356    pub url: Url,
357    /// Request timestamp
358    pub date: chrono::DateTime<chrono::Utc>,
359    /// HTTP method
360    pub method: Option<String>,
361    /// Request headers
362    pub headers: Vec<(String, String)>,
363}
364
365impl WarcRequest {
366    /// Create a new request record
367    pub fn new(url: Url) -> Self {
368        Self {
369            url,
370            date: Utc::now(),
371            method: None,
372            headers: Vec::new(),
373        }
374    }
375}
376
377/// WARC response record
378pub struct WarcResponse {
379    /// Response URL
380    pub url: Url,
381    /// Response timestamp
382    pub date: chrono::DateTime<chrono::Utc>,
383    /// HTTP status code
384    pub status_code: u16,
385    /// Response headers
386    pub headers: Vec<(String, String)>,
387    /// Response body
388    pub body: Vec<u8>,
389    /// Server IP address
390    pub ip_address: Option<String>,
391}
392
393impl WarcResponse {
394    /// Create a new response record
395    pub fn new(url: Url, status_code: u16, body: Vec<u8>) -> Self {
396        Self {
397            url,
398            date: Utc::now(),
399            status_code,
400            headers: Vec::new(),
401            body,
402            ip_address: None,
403        }
404    }
405}
406
407/// WARC metadata record
408pub struct WarcMetadata {
409    /// JSON data
410    pub data: serde_json::Value,
411    /// Record ID this metadata refers to
412    pub refers_to: Option<String>,
413}
414
415/// Get HTTP status text
416fn http_status_text(code: u16) -> &'static str {
417    match code {
418        100 => "Continue",
419        101 => "Switching Protocols",
420        200 => "OK",
421        201 => "Created",
422        202 => "Accepted",
423        204 => "No Content",
424        206 => "Partial Content",
425        301 => "Moved Permanently",
426        302 => "Found",
427        303 => "See Other",
428        304 => "Not Modified",
429        307 => "Temporary Redirect",
430        308 => "Permanent Redirect",
431        400 => "Bad Request",
432        401 => "Unauthorized",
433        403 => "Forbidden",
434        404 => "Not Found",
435        405 => "Method Not Allowed",
436        408 => "Request Timeout",
437        410 => "Gone",
438        429 => "Too Many Requests",
439        500 => "Internal Server Error",
440        501 => "Not Implemented",
441        502 => "Bad Gateway",
442        503 => "Service Unavailable",
443        504 => "Gateway Timeout",
444        _ => "Unknown",
445    }
446}
447