use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use chrono::{DateTime, Utc};
use metrics::{counter, histogram};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{debug, error, info, instrument, warn};
use url::Url;
use uuid::Uuid;
pub const DEFAULT_MAX_CONTENT_LENGTH: usize = 10 * 1024 * 1024;
pub const DEFAULT_BUFFER_CAPACITY: usize = 1000;
#[derive(Debug, Clone)]
pub struct CaptureConfig {
pub max_content_length: usize,
pub truncate_length: Option<usize>,
pub strip_scripts: bool,
pub strip_styles: bool,
pub decode_entities: bool,
pub normalize_whitespace: bool,
}
impl Default for CaptureConfig {
fn default() -> Self {
Self {
max_content_length: DEFAULT_MAX_CONTENT_LENGTH,
truncate_length: Some(1024 * 1024), strip_scripts: true,
strip_styles: true,
decode_entities: true,
normalize_whitespace: true,
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct CaptureRequest {
pub url: String,
pub content: String,
#[serde(default)]
pub title: Option<String>,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub captured_at: Option<DateTime<Utc>>,
#[serde(default)]
pub metadata: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize)]
pub struct CaptureResponse {
pub id: Uuid,
pub url: String,
pub processed_at: DateTime<Utc>,
pub original_size: usize,
pub processed_size: usize,
pub truncated: bool,
pub processing_time_ms: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct ProcessedCapture {
pub id: Uuid,
pub url: String,
pub title: Option<String>,
pub description: Option<String>,
pub content: String,
pub original_html: Option<String>,
pub captured_at: DateTime<Utc>,
pub processed_at: DateTime<Utc>,
pub metadata: Option<serde_json::Value>,
}
#[derive(Debug, thiserror::Error)]
pub enum CaptureError {
#[error("Invalid request: {0}")]
InvalidRequest(String),
#[error("Invalid URL: {0}")]
InvalidUrl(String),
#[error("Content too large: {size} bytes exceeds maximum of {max} bytes")]
ContentTooLarge {
size: usize,
max: usize,
},
#[error("Processing error: {0}")]
ProcessingError(String),
#[error("Storage error: {0}")]
StorageError(String),
#[error("Internal error: {0}")]
InternalError(String),
}
impl IntoResponse for CaptureError {
fn into_response(self) -> Response {
let (status, error_type, message) = match &self {
CaptureError::InvalidRequest(msg) => {
(StatusCode::BAD_REQUEST, "invalid_request", msg.clone())
}
CaptureError::InvalidUrl(msg) => (StatusCode::BAD_REQUEST, "invalid_url", msg.clone()),
CaptureError::ContentTooLarge { size, max } => (
StatusCode::PAYLOAD_TOO_LARGE,
"content_too_large",
format!("Content size {} exceeds maximum {}", size, max),
),
CaptureError::ProcessingError(msg) => (
StatusCode::INTERNAL_SERVER_ERROR,
"processing_error",
msg.clone(),
),
CaptureError::StorageError(msg) => (
StatusCode::INTERNAL_SERVER_ERROR,
"storage_error",
msg.clone(),
),
CaptureError::InternalError(msg) => (
StatusCode::INTERNAL_SERVER_ERROR,
"internal_error",
msg.clone(),
),
};
counter!("capture_errors_total", "type" => error_type).increment(1);
let body = serde_json::json!({
"error": {
"type": error_type,
"message": message,
}
});
(status, Json(body)).into_response()
}
}
#[derive(Clone)]
pub struct CaptureState {
pub config: CaptureConfig,
pub sender: mpsc::Sender<ProcessedCapture>,
}
impl CaptureState {
pub fn new(config: CaptureConfig, sender: mpsc::Sender<ProcessedCapture>) -> Self {
Self { config, sender }
}
pub fn with_defaults(sender: mpsc::Sender<ProcessedCapture>) -> Self {
Self::new(CaptureConfig::default(), sender)
}
}
pub fn create_capture_buffer(
capacity: usize,
) -> (
mpsc::Sender<ProcessedCapture>,
mpsc::Receiver<ProcessedCapture>,
) {
mpsc::channel(capacity)
}
pub struct ContentProcessor {
script_regex: Regex,
style_regex: Regex,
tag_regex: Regex,
whitespace_regex: Regex,
newline_regex: Regex,
}
impl ContentProcessor {
pub fn new() -> Self {
Self {
script_regex: Regex::new(r"(?is)<script[^>]*>[\s\S]*?</script>").unwrap(),
style_regex: Regex::new(r"(?is)<style[^>]*>[\s\S]*?</style>").unwrap(),
tag_regex: Regex::new(r"<[^>]+>").unwrap(),
whitespace_regex: Regex::new(r"[ \t]+").unwrap(),
newline_regex: Regex::new(r"\n{3,}").unwrap(),
}
}
#[instrument(skip(self, html, config))]
pub fn process(&self, html: &str, config: &CaptureConfig) -> Result<String, CaptureError> {
let mut content = html.to_string();
if config.strip_scripts {
content = self.script_regex.replace_all(&content, "").to_string();
debug!("Stripped script tags");
}
if config.strip_styles {
content = self.style_regex.replace_all(&content, "").to_string();
debug!("Stripped style tags");
}
content = content
.replace("</p>", "\n")
.replace("</div>", "\n")
.replace("</li>", "\n")
.replace("</tr>", "\n")
.replace("<br>", "\n")
.replace("<br/>", "\n")
.replace("<br />", "\n");
content = self.tag_regex.replace_all(&content, "").to_string();
if config.decode_entities {
content = Self::decode_html_entities(&content);
debug!("Decoded HTML entities");
}
if config.normalize_whitespace {
content = self.whitespace_regex.replace_all(&content, " ").to_string();
content = self.newline_regex.replace_all(&content, "\n\n").to_string();
content = content
.lines()
.map(|l| l.trim())
.collect::<Vec<_>>()
.join("\n");
debug!("Normalized whitespace");
}
content = content.trim().to_string();
Ok(content)
}
fn decode_html_entities(text: &str) -> String {
match htmlescape::decode_html(text) {
Ok(decoded) => decoded,
Err(_) => {
text.replace(" ", " ")
.replace("<", "<")
.replace(">", ">")
.replace("&", "&")
.replace(""", "\"")
.replace("'", "'")
.replace("'", "'")
.replace("—", "\u{2014}")
.replace("–", "\u{2013}")
.replace("…", "\u{2026}")
.replace("‘", "\u{2018}")
.replace("’", "\u{2019}")
.replace("“", "\u{201C}")
.replace("”", "\u{201D}")
.replace("©", "\u{00A9}")
.replace("®", "\u{00AE}")
.replace("™", "\u{2122}")
}
}
}
}
impl Default for ContentProcessor {
fn default() -> Self {
Self::new()
}
}
fn validate_url(url_str: &str) -> Result<Url, CaptureError> {
if url_str.is_empty() {
return Err(CaptureError::InvalidUrl("URL cannot be empty".to_string()));
}
let url = Url::parse(url_str)
.map_err(|e| CaptureError::InvalidUrl(format!("Failed to parse URL: {}", e)))?;
match url.scheme() {
"http" | "https" => {}
scheme => {
return Err(CaptureError::InvalidUrl(format!(
"Invalid URL scheme '{}': only http and https are allowed",
scheme
)));
}
}
if url.host().is_none() {
return Err(CaptureError::InvalidUrl(
"URL must have a valid host".to_string(),
));
}
Ok(url)
}
#[instrument(skip(state, request), fields(url = %request.url))]
pub async fn capture_handler(
State(state): State<Arc<CaptureState>>,
Json(request): Json<CaptureRequest>,
) -> Result<Json<CaptureResponse>, CaptureError> {
let start_time = Instant::now();
info!("Processing capture request for URL: {}", request.url);
let validated_url = validate_url(&request.url)?;
debug!("URL validated: {}", validated_url);
let original_size = request.content.len();
if original_size > state.config.max_content_length {
warn!(
"Content too large: {} bytes (max: {})",
original_size, state.config.max_content_length
);
return Err(CaptureError::ContentTooLarge {
size: original_size,
max: state.config.max_content_length,
});
}
let processor = ContentProcessor::new();
let processed_content = processor.process(&request.content, &state.config)?;
let (final_content, truncated) = match state.config.truncate_length {
Some(max_len) if processed_content.len() > max_len => {
let truncated_content =
if let Some(last_space) = processed_content[..max_len].rfind(' ') {
format!("{}...", &processed_content[..last_space])
} else {
format!("{}...", &processed_content[..max_len])
};
info!(
"Content truncated from {} to {} bytes",
processed_content.len(),
truncated_content.len()
);
(truncated_content, true)
}
_ => (processed_content, false),
};
let processed_size = final_content.len();
let id = Uuid::new_v4();
let now = Utc::now();
let capture = ProcessedCapture {
id,
url: validated_url.to_string(),
title: request.title,
description: request.description,
content: final_content,
original_html: None, captured_at: request.captured_at.unwrap_or(now),
processed_at: now,
metadata: request.metadata,
};
state.sender.send(capture).await.map_err(|e| {
error!("Failed to store capture in buffer: {}", e);
CaptureError::StorageError(format!("Buffer full or closed: {}", e))
})?;
let processing_time = start_time.elapsed();
let processing_time_ms = processing_time.as_millis() as u64;
counter!("captures_total").increment(1);
histogram!("capture_processing_time_seconds").record(processing_time.as_secs_f64());
histogram!("capture_original_size_bytes").record(original_size as f64);
histogram!("capture_processed_size_bytes").record(processed_size as f64);
if truncated {
counter!("captures_truncated_total").increment(1);
}
info!(
"Capture processed successfully: id={}, original_size={}, processed_size={}, time={}ms",
id, original_size, processed_size, processing_time_ms
);
Ok(Json(CaptureResponse {
id,
url: validated_url.to_string(),
processed_at: now,
original_size,
processed_size,
truncated,
processing_time_ms,
}))
}
pub async fn capture_health() -> impl IntoResponse {
Json(serde_json::json!({
"status": "healthy",
"service": "capture",
"timestamp": Utc::now().to_rfc3339()
}))
}
use axum::{routing::post, Router};
pub fn capture_router(state: Arc<CaptureState>) -> Router {
Router::new()
.route("/capture", post(capture_handler))
.route("/capture/health", axum::routing::get(capture_health))
.with_state(state)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_url_valid_http() {
let result = validate_url("http://example.com/page");
assert!(result.is_ok());
}
#[test]
fn test_validate_url_valid_https() {
let result = validate_url("https://example.com/page?query=1");
assert!(result.is_ok());
}
#[test]
fn test_validate_url_empty() {
let result = validate_url("");
assert!(matches!(result, Err(CaptureError::InvalidUrl(_))));
}
#[test]
fn test_validate_url_invalid_scheme() {
let result = validate_url("ftp://example.com/file");
assert!(matches!(result, Err(CaptureError::InvalidUrl(_))));
}
#[test]
fn test_validate_url_no_host() {
let result = validate_url("file:///path");
assert!(matches!(result, Err(CaptureError::InvalidUrl(_))));
let result = validate_url("http:///");
assert!(result.is_err() || result.unwrap().host().is_some());
}
#[test]
fn test_validate_url_malformed() {
let result = validate_url("not a url");
assert!(matches!(result, Err(CaptureError::InvalidUrl(_))));
}
#[test]
fn test_content_processor_strips_scripts() {
let processor = ContentProcessor::new();
let config = CaptureConfig::default();
let html = "<p>Hello</p><script>evil();</script><p>World</p>";
let result = processor.process(html, &config).unwrap();
assert!(!result.contains("script"));
assert!(!result.contains("evil"));
assert!(result.contains("Hello"));
assert!(result.contains("World"));
}
#[test]
fn test_content_processor_strips_styles() {
let processor = ContentProcessor::new();
let config = CaptureConfig::default();
let html = "<p>Content</p><style>.hidden { display: none; }</style>";
let result = processor.process(html, &config).unwrap();
assert!(!result.contains("style"));
assert!(!result.contains("display"));
assert!(result.contains("Content"));
}
#[test]
fn test_content_processor_decodes_entities() {
let processor = ContentProcessor::new();
let config = CaptureConfig::default();
let html = "<p>Hello & World <test></p>";
let result = processor.process(html, &config).unwrap();
assert!(result.contains("Hello & World <test>"));
}
#[test]
fn test_content_processor_normalizes_whitespace() {
let processor = ContentProcessor::new();
let config = CaptureConfig::default();
let html = "<p>Hello World</p>\n\n\n\n<p>Next</p>";
let result = processor.process(html, &config).unwrap();
assert!(!result.contains(" "));
assert!(!result.contains("\n\n\n"));
}
#[test]
fn test_content_processor_strips_tags() {
let processor = ContentProcessor::new();
let config = CaptureConfig::default();
let html = "<div class=\"container\"><p>Text</p></div>";
let result = processor.process(html, &config).unwrap();
assert!(!result.contains("<"));
assert!(!result.contains(">"));
assert!(result.contains("Text"));
}
#[test]
fn test_capture_request_deserialization() {
let json = r#"{
"url": "https://example.com",
"content": "<p>Hello</p>",
"title": "Test Page"
}"#;
let request: CaptureRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.url, "https://example.com");
assert_eq!(request.content, "<p>Hello</p>");
assert_eq!(request.title, Some("Test Page".to_string()));
assert!(request.description.is_none());
}
#[test]
fn test_capture_response_serialization() {
let response = CaptureResponse {
id: Uuid::new_v4(),
url: "https://example.com".to_string(),
processed_at: Utc::now(),
original_size: 1000,
processed_size: 500,
truncated: false,
processing_time_ms: 10,
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("\"id\""));
assert!(json.contains("\"url\""));
assert!(json.contains("\"processed_size\""));
}
#[test]
fn test_capture_config_default() {
let config = CaptureConfig::default();
assert_eq!(config.max_content_length, DEFAULT_MAX_CONTENT_LENGTH);
assert!(config.strip_scripts);
assert!(config.strip_styles);
assert!(config.decode_entities);
assert!(config.normalize_whitespace);
}
#[tokio::test]
async fn test_capture_buffer_channel() {
let (tx, mut rx) = create_capture_buffer(10);
let capture = ProcessedCapture {
id: Uuid::new_v4(),
url: "https://example.com".to_string(),
title: Some("Test".to_string()),
description: None,
content: "Hello World".to_string(),
original_html: None,
captured_at: Utc::now(),
processed_at: Utc::now(),
metadata: None,
};
tx.send(capture.clone()).await.unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received.url, "https://example.com");
assert_eq!(received.content, "Hello World");
}
#[test]
fn test_capture_error_into_response() {
let error = CaptureError::InvalidRequest("test error".to_string());
let response = error.into_response();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let error = CaptureError::ContentTooLarge { size: 100, max: 50 };
let response = error.into_response();
assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
let error = CaptureError::ProcessingError("failed".to_string());
let response = error.into_response();
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
#[test]
fn test_html_entity_decoding_comprehensive() {
let text = " <>&"''—–…";
let decoded = ContentProcessor::decode_html_entities(text);
assert!(decoded.contains('<'));
assert!(decoded.contains('>'));
assert!(decoded.contains('&'));
assert!(decoded.contains('"'));
assert!(decoded.contains('\''));
}
}