use chrono::Utc;
use flate2::write::GzEncoder;
use flate2::Compression;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use url::Url;
use uuid::Uuid;
use crate::types::error::Result;
pub struct WarcWriter<W: Write> {
writer: BufWriter<W>,
warc_version: String,
bytes_written: AtomicU64,
}
impl<W: Write> WarcWriter<W> {
pub fn new(writer: W) -> Self {
Self {
writer: BufWriter::new(writer),
warc_version: "WARC/1.1".to_string(),
bytes_written: AtomicU64::new(0),
}
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written.load(Ordering::Relaxed)
}
pub fn write_warcinfo(&mut self, info: &WarcInfo) -> Result<()> {
let record_id = Self::generate_record_id();
let date = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
let content = format!(
"software: {}\r\nformat: {}\r\nconformsTo: http://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/\r\n",
info.software, info.format
);
let content_bytes = content.as_bytes();
let header = format!(
"{}\r\nWARC-Type: warcinfo\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\nContent-Type: application/warc-fields\r\nContent-Length: {}\r\n\r\n",
self.warc_version, date, record_id, content_bytes.len()
);
let bytes = header.len() + content_bytes.len() + 4; self.writer.write_all(header.as_bytes())?;
self.writer.write_all(content_bytes)?;
self.writer.write_all(b"\r\n\r\n")?;
self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
Ok(())
}
pub fn write_request(&mut self, request: &WarcRequest) -> Result<String> {
let record_id = Self::generate_record_id();
let date = request.date.format("%Y-%m-%dT%H:%M:%SZ").to_string();
let mut http_request = format!(
"{} {} HTTP/1.1\r\n",
request.method.as_deref().unwrap_or("GET"),
request.url.path()
);
http_request.push_str(&format!("Host: {}\r\n", request.url.host_str().unwrap_or("")));
for (key, value) in &request.headers {
http_request.push_str(&format!("{}: {}\r\n", key, value));
}
http_request.push_str("\r\n");
let content_bytes = http_request.as_bytes();
let header = format!(
"{}\r\nWARC-Type: request\r\nWARC-Target-URI: {}\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\nContent-Type: application/http;msgtype=request\r\nContent-Length: {}\r\n\r\n",
self.warc_version, request.url, date, record_id, content_bytes.len()
);
let bytes = header.len() + content_bytes.len() + 4;
self.writer.write_all(header.as_bytes())?;
self.writer.write_all(content_bytes)?;
self.writer.write_all(b"\r\n\r\n")?;
self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
Ok(record_id)
}
pub fn write_response(&mut self, response: &WarcResponse, concurrent_to: Option<&str>) -> Result<String> {
let record_id = Self::generate_record_id();
let date = response.date.format("%Y-%m-%dT%H:%M:%SZ").to_string();
let status_text = http_status_text(response.status_code);
let mut http_response = format!(
"HTTP/1.1 {} {}\r\n",
response.status_code, status_text
);
for (key, value) in &response.headers {
http_response.push_str(&format!("{}: {}\r\n", key, value));
}
http_response.push_str("\r\n");
let mut content_bytes = http_response.into_bytes();
content_bytes.extend_from_slice(&response.body);
let mut header = format!(
"{}\r\nWARC-Type: response\r\nWARC-Target-URI: {}\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\n",
self.warc_version, response.url, date, record_id
);
if let Some(req_id) = concurrent_to {
header.push_str(&format!("WARC-Concurrent-To: <{}>\r\n", req_id));
}
if let Some(ip) = &response.ip_address {
header.push_str(&format!("WARC-IP-Address: {}\r\n", ip));
}
header.push_str(&format!(
"Content-Type: application/http;msgtype=response\r\nContent-Length: {}\r\n\r\n",
content_bytes.len()
));
let bytes = header.len() + content_bytes.len() + 4;
self.writer.write_all(header.as_bytes())?;
self.writer.write_all(&content_bytes)?;
self.writer.write_all(b"\r\n\r\n")?;
self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
Ok(record_id)
}
pub fn write_metadata(&mut self, url: &Url, metadata: &WarcMetadata) -> Result<String> {
let record_id = Self::generate_record_id();
let date = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
let content = serde_json::to_string(&metadata.data).unwrap_or_default();
let content_bytes = content.as_bytes();
let mut header = format!(
"{}\r\nWARC-Type: metadata\r\nWARC-Target-URI: {}\r\nWARC-Date: {}\r\nWARC-Record-ID: <{}>\r\n",
self.warc_version, url, date, record_id
);
if let Some(refers_to) = &metadata.refers_to {
header.push_str(&format!("WARC-Refers-To: <{}>\r\n", refers_to));
}
header.push_str(&format!(
"Content-Type: application/json\r\nContent-Length: {}\r\n\r\n",
content_bytes.len()
));
let bytes = header.len() + content_bytes.len() + 4;
self.writer.write_all(header.as_bytes())?;
self.writer.write_all(content_bytes)?;
self.writer.write_all(b"\r\n\r\n")?;
self.bytes_written.fetch_add(bytes as u64, Ordering::Relaxed);
Ok(record_id)
}
fn generate_record_id() -> String {
format!("urn:uuid:{}", Uuid::new_v4())
}
pub fn flush(&mut self) -> Result<()> {
self.writer.flush()?;
Ok(())
}
}
pub struct RotatingWarcWriter {
base_dir: PathBuf,
prefix: String,
max_size: u64,
compress: bool,
current_index: AtomicU64,
warc_info: WarcInfo,
}
impl RotatingWarcWriter {
pub fn new(base_dir: impl AsRef<Path>, prefix: impl Into<String>) -> Self {
Self {
base_dir: base_dir.as_ref().to_path_buf(),
prefix: prefix.into(),
max_size: 1024 * 1024 * 1024, compress: true,
current_index: AtomicU64::new(0),
warc_info: WarcInfo::default(),
}
}
pub fn with_max_size(mut self, max_size: u64) -> Self {
self.max_size = max_size;
self
}
pub fn with_compression(mut self, compress: bool) -> Self {
self.compress = compress;
self
}
pub fn with_info(mut self, info: WarcInfo) -> Self {
self.warc_info = info;
self
}
pub fn current_filename(&self) -> PathBuf {
let index = self.current_index.load(Ordering::Relaxed);
let timestamp = Utc::now().format("%Y%m%d%H%M%S");
let ext = if self.compress { "warc.gz" } else { "warc" };
self.base_dir.join(format!("{}-{}-{:05}.{}", self.prefix, timestamp, index, ext))
}
pub fn create_file(&self) -> Result<WarcFileHandle> {
std::fs::create_dir_all(&self.base_dir)?;
let path = self.current_filename();
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&path)?;
if self.compress {
let encoder = GzEncoder::new(file, Compression::default());
let mut writer = WarcWriter::new(encoder);
writer.write_warcinfo(&self.warc_info)?;
Ok(WarcFileHandle::Compressed(writer, path))
} else {
let mut writer = WarcWriter::new(file);
writer.write_warcinfo(&self.warc_info)?;
Ok(WarcFileHandle::Uncompressed(writer, path))
}
}
pub fn rotate(&self) -> Result<WarcFileHandle> {
self.current_index.fetch_add(1, Ordering::SeqCst);
self.create_file()
}
pub fn needs_rotation(&self, handle: &WarcFileHandle) -> bool {
handle.bytes_written() >= self.max_size
}
}
pub enum WarcFileHandle {
Compressed(WarcWriter<GzEncoder<File>>, PathBuf),
Uncompressed(WarcWriter<File>, PathBuf),
}
impl WarcFileHandle {
pub fn bytes_written(&self) -> u64 {
match self {
WarcFileHandle::Compressed(w, _) => w.bytes_written(),
WarcFileHandle::Uncompressed(w, _) => w.bytes_written(),
}
}
pub fn path(&self) -> &Path {
match self {
WarcFileHandle::Compressed(_, p) => p,
WarcFileHandle::Uncompressed(_, p) => p,
}
}
pub fn write_request(&mut self, request: &WarcRequest) -> Result<String> {
match self {
WarcFileHandle::Compressed(w, _) => w.write_request(request),
WarcFileHandle::Uncompressed(w, _) => w.write_request(request),
}
}
pub fn write_response(&mut self, response: &WarcResponse, concurrent_to: Option<&str>) -> Result<String> {
match self {
WarcFileHandle::Compressed(w, _) => w.write_response(response, concurrent_to),
WarcFileHandle::Uncompressed(w, _) => w.write_response(response, concurrent_to),
}
}
pub fn write_metadata(&mut self, url: &Url, metadata: &WarcMetadata) -> Result<String> {
match self {
WarcFileHandle::Compressed(w, _) => w.write_metadata(url, metadata),
WarcFileHandle::Uncompressed(w, _) => w.write_metadata(url, metadata),
}
}
pub fn flush(&mut self) -> Result<()> {
match self {
WarcFileHandle::Compressed(w, _) => w.flush(),
WarcFileHandle::Uncompressed(w, _) => w.flush(),
}
}
}
pub struct WarcInfo {
pub software: String,
pub format: String,
pub operator: Option<String>,
pub description: Option<String>,
}
impl Default for WarcInfo {
fn default() -> Self {
Self {
software: "Halldyll/1.0".to_string(),
format: "WARC File Format 1.1".to_string(),
operator: None,
description: None,
}
}
}
pub struct WarcRequest {
pub url: Url,
pub date: chrono::DateTime<chrono::Utc>,
pub method: Option<String>,
pub headers: Vec<(String, String)>,
}
impl WarcRequest {
pub fn new(url: Url) -> Self {
Self {
url,
date: Utc::now(),
method: None,
headers: Vec::new(),
}
}
}
pub struct WarcResponse {
pub url: Url,
pub date: chrono::DateTime<chrono::Utc>,
pub status_code: u16,
pub headers: Vec<(String, String)>,
pub body: Vec<u8>,
pub ip_address: Option<String>,
}
impl WarcResponse {
pub fn new(url: Url, status_code: u16, body: Vec<u8>) -> Self {
Self {
url,
date: Utc::now(),
status_code,
headers: Vec::new(),
body,
ip_address: None,
}
}
}
pub struct WarcMetadata {
pub data: serde_json::Value,
pub refers_to: Option<String>,
}
fn http_status_text(code: u16) -> &'static str {
match code {
100 => "Continue",
101 => "Switching Protocols",
200 => "OK",
201 => "Created",
202 => "Accepted",
204 => "No Content",
206 => "Partial Content",
301 => "Moved Permanently",
302 => "Found",
303 => "See Other",
304 => "Not Modified",
307 => "Temporary Redirect",
308 => "Permanent Redirect",
400 => "Bad Request",
401 => "Unauthorized",
403 => "Forbidden",
404 => "Not Found",
405 => "Method Not Allowed",
408 => "Request Timeout",
410 => "Gone",
429 => "Too Many Requests",
500 => "Internal Server Error",
501 => "Not Implemented",
502 => "Bad Gateway",
503 => "Service Unavailable",
504 => "Gateway Timeout",
_ => "Unknown",
}
}