1use axum::{
23 extract::State,
24 http::StatusCode,
25 response::{IntoResponse, Response},
26 Json,
27};
28use chrono::{DateTime, Utc};
29use metrics::{counter, histogram};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use std::sync::Arc;
33use std::time::Instant;
34use tokio::sync::mpsc;
35use tracing::{debug, error, info, instrument, warn};
36use url::Url;
37use uuid::Uuid;
38
39pub const DEFAULT_MAX_CONTENT_LENGTH: usize = 10 * 1024 * 1024;
45
46pub const DEFAULT_BUFFER_CAPACITY: usize = 1000;
48
49#[derive(Debug, Clone)]
51pub struct CaptureConfig {
52 pub max_content_length: usize,
54 pub truncate_length: Option<usize>,
56 pub strip_scripts: bool,
58 pub strip_styles: bool,
60 pub decode_entities: bool,
62 pub normalize_whitespace: bool,
64}
65
66impl Default for CaptureConfig {
67 fn default() -> Self {
68 Self {
69 max_content_length: DEFAULT_MAX_CONTENT_LENGTH,
70 truncate_length: Some(1024 * 1024), strip_scripts: true,
72 strip_styles: true,
73 decode_entities: true,
74 normalize_whitespace: true,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Deserialize)]
85pub struct CaptureRequest {
86 pub url: String,
88
89 pub content: String,
91
92 #[serde(default)]
94 pub title: Option<String>,
95
96 #[serde(default)]
98 pub description: Option<String>,
99
100 #[serde(default)]
102 pub captured_at: Option<DateTime<Utc>>,
103
104 #[serde(default)]
106 pub metadata: Option<serde_json::Value>,
107}
108
109#[derive(Debug, Clone, Serialize)]
111pub struct CaptureResponse {
112 pub id: Uuid,
114
115 pub url: String,
117
118 pub processed_at: DateTime<Utc>,
120
121 pub original_size: usize,
123
124 pub processed_size: usize,
126
127 pub truncated: bool,
129
130 pub processing_time_ms: u64,
132}
133
134#[derive(Debug, Clone, Serialize)]
136pub struct ProcessedCapture {
137 pub id: Uuid,
139
140 pub url: String,
142
143 pub title: Option<String>,
145
146 pub description: Option<String>,
148
149 pub content: String,
151
152 pub original_html: Option<String>,
154
155 pub captured_at: DateTime<Utc>,
157
158 pub processed_at: DateTime<Utc>,
160
161 pub metadata: Option<serde_json::Value>,
163}
164
165#[derive(Debug, thiserror::Error)]
171pub enum CaptureError {
172 #[error("Invalid request: {0}")]
174 InvalidRequest(String),
175
176 #[error("Invalid URL: {0}")]
178 InvalidUrl(String),
179
180 #[error("Content too large: {size} bytes exceeds maximum of {max} bytes")]
182 ContentTooLarge {
183 size: usize,
185 max: usize,
187 },
188
189 #[error("Processing error: {0}")]
191 ProcessingError(String),
192
193 #[error("Storage error: {0}")]
195 StorageError(String),
196
197 #[error("Internal error: {0}")]
199 InternalError(String),
200}
201
202impl IntoResponse for CaptureError {
203 fn into_response(self) -> Response {
204 let (status, error_type, message) = match &self {
205 CaptureError::InvalidRequest(msg) => {
206 (StatusCode::BAD_REQUEST, "invalid_request", msg.clone())
207 }
208 CaptureError::InvalidUrl(msg) => (StatusCode::BAD_REQUEST, "invalid_url", msg.clone()),
209 CaptureError::ContentTooLarge { size, max } => (
210 StatusCode::PAYLOAD_TOO_LARGE,
211 "content_too_large",
212 format!("Content size {} exceeds maximum {}", size, max),
213 ),
214 CaptureError::ProcessingError(msg) => (
215 StatusCode::INTERNAL_SERVER_ERROR,
216 "processing_error",
217 msg.clone(),
218 ),
219 CaptureError::StorageError(msg) => (
220 StatusCode::INTERNAL_SERVER_ERROR,
221 "storage_error",
222 msg.clone(),
223 ),
224 CaptureError::InternalError(msg) => (
225 StatusCode::INTERNAL_SERVER_ERROR,
226 "internal_error",
227 msg.clone(),
228 ),
229 };
230
231 counter!("capture_errors_total", "type" => error_type).increment(1);
233
234 let body = serde_json::json!({
235 "error": {
236 "type": error_type,
237 "message": message,
238 }
239 });
240
241 (status, Json(body)).into_response()
242 }
243}
244
245#[derive(Clone)]
251pub struct CaptureState {
252 pub config: CaptureConfig,
254 pub sender: mpsc::Sender<ProcessedCapture>,
256}
257
258impl CaptureState {
259 pub fn new(config: CaptureConfig, sender: mpsc::Sender<ProcessedCapture>) -> Self {
261 Self { config, sender }
262 }
263
264 pub fn with_defaults(sender: mpsc::Sender<ProcessedCapture>) -> Self {
266 Self::new(CaptureConfig::default(), sender)
267 }
268}
269
270pub fn create_capture_buffer(
272 capacity: usize,
273) -> (
274 mpsc::Sender<ProcessedCapture>,
275 mpsc::Receiver<ProcessedCapture>,
276) {
277 mpsc::channel(capacity)
278}
279
280pub struct ContentProcessor {
286 script_regex: Regex,
288 style_regex: Regex,
290 tag_regex: Regex,
292 whitespace_regex: Regex,
294 newline_regex: Regex,
296}
297
298impl ContentProcessor {
299 pub fn new() -> Self {
301 Self {
302 script_regex: Regex::new(r"(?is)<script[^>]*>[\s\S]*?</script>").unwrap(),
303 style_regex: Regex::new(r"(?is)<style[^>]*>[\s\S]*?</style>").unwrap(),
304 tag_regex: Regex::new(r"<[^>]+>").unwrap(),
305 whitespace_regex: Regex::new(r"[ \t]+").unwrap(),
306 newline_regex: Regex::new(r"\n{3,}").unwrap(),
307 }
308 }
309
310 #[instrument(skip(self, html, config))]
312 pub fn process(&self, html: &str, config: &CaptureConfig) -> Result<String, CaptureError> {
313 let mut content = html.to_string();
314
315 if config.strip_scripts {
317 content = self.script_regex.replace_all(&content, "").to_string();
318 debug!("Stripped script tags");
319 }
320
321 if config.strip_styles {
323 content = self.style_regex.replace_all(&content, "").to_string();
324 debug!("Stripped style tags");
325 }
326
327 content = content
329 .replace("</p>", "\n")
330 .replace("</div>", "\n")
331 .replace("</li>", "\n")
332 .replace("</tr>", "\n")
333 .replace("<br>", "\n")
334 .replace("<br/>", "\n")
335 .replace("<br />", "\n");
336
337 content = self.tag_regex.replace_all(&content, "").to_string();
339
340 if config.decode_entities {
342 content = Self::decode_html_entities(&content);
343 debug!("Decoded HTML entities");
344 }
345
346 if config.normalize_whitespace {
348 content = self.whitespace_regex.replace_all(&content, " ").to_string();
350 content = self.newline_regex.replace_all(&content, "\n\n").to_string();
352 content = content
354 .lines()
355 .map(|l| l.trim())
356 .collect::<Vec<_>>()
357 .join("\n");
358 debug!("Normalized whitespace");
359 }
360
361 content = content.trim().to_string();
363
364 Ok(content)
365 }
366
367 fn decode_html_entities(text: &str) -> String {
369 match htmlescape::decode_html(text) {
371 Ok(decoded) => decoded,
372 Err(_) => {
373 text.replace(" ", " ")
375 .replace("<", "<")
376 .replace(">", ">")
377 .replace("&", "&")
378 .replace(""", "\"")
379 .replace("'", "'")
380 .replace("'", "'")
381 .replace("—", "\u{2014}")
382 .replace("–", "\u{2013}")
383 .replace("…", "\u{2026}")
384 .replace("‘", "\u{2018}")
385 .replace("’", "\u{2019}")
386 .replace("“", "\u{201C}")
387 .replace("”", "\u{201D}")
388 .replace("©", "\u{00A9}")
389 .replace("®", "\u{00AE}")
390 .replace("™", "\u{2122}")
391 }
392 }
393 }
394}
395
396impl Default for ContentProcessor {
397 fn default() -> Self {
398 Self::new()
399 }
400}
401
402fn validate_url(url_str: &str) -> Result<Url, CaptureError> {
408 if url_str.is_empty() {
410 return Err(CaptureError::InvalidUrl("URL cannot be empty".to_string()));
411 }
412
413 let url = Url::parse(url_str)
415 .map_err(|e| CaptureError::InvalidUrl(format!("Failed to parse URL: {}", e)))?;
416
417 match url.scheme() {
419 "http" | "https" => {}
420 scheme => {
421 return Err(CaptureError::InvalidUrl(format!(
422 "Invalid URL scheme '{}': only http and https are allowed",
423 scheme
424 )));
425 }
426 }
427
428 if url.host().is_none() {
430 return Err(CaptureError::InvalidUrl(
431 "URL must have a valid host".to_string(),
432 ));
433 }
434
435 Ok(url)
436}
437
438#[instrument(skip(state, request), fields(url = %request.url))]
480pub async fn capture_handler(
481 State(state): State<Arc<CaptureState>>,
482 Json(request): Json<CaptureRequest>,
483) -> Result<Json<CaptureResponse>, CaptureError> {
484 let start_time = Instant::now();
485 info!("Processing capture request for URL: {}", request.url);
486
487 let validated_url = validate_url(&request.url)?;
489 debug!("URL validated: {}", validated_url);
490
491 let original_size = request.content.len();
493 if original_size > state.config.max_content_length {
494 warn!(
495 "Content too large: {} bytes (max: {})",
496 original_size, state.config.max_content_length
497 );
498 return Err(CaptureError::ContentTooLarge {
499 size: original_size,
500 max: state.config.max_content_length,
501 });
502 }
503
504 let processor = ContentProcessor::new();
506 let processed_content = processor.process(&request.content, &state.config)?;
507
508 let (final_content, truncated) = match state.config.truncate_length {
510 Some(max_len) if processed_content.len() > max_len => {
511 let truncated_content =
513 if let Some(last_space) = processed_content[..max_len].rfind(' ') {
514 format!("{}...", &processed_content[..last_space])
515 } else {
516 format!("{}...", &processed_content[..max_len])
517 };
518 info!(
519 "Content truncated from {} to {} bytes",
520 processed_content.len(),
521 truncated_content.len()
522 );
523 (truncated_content, true)
524 }
525 _ => (processed_content, false),
526 };
527
528 let processed_size = final_content.len();
529
530 let id = Uuid::new_v4();
532 let now = Utc::now();
533
534 let capture = ProcessedCapture {
536 id,
537 url: validated_url.to_string(),
538 title: request.title,
539 description: request.description,
540 content: final_content,
541 original_html: None, captured_at: request.captured_at.unwrap_or(now),
543 processed_at: now,
544 metadata: request.metadata,
545 };
546
547 state.sender.send(capture).await.map_err(|e| {
549 error!("Failed to store capture in buffer: {}", e);
550 CaptureError::StorageError(format!("Buffer full or closed: {}", e))
551 })?;
552
553 let processing_time = start_time.elapsed();
554 let processing_time_ms = processing_time.as_millis() as u64;
555
556 counter!("captures_total").increment(1);
558 histogram!("capture_processing_time_seconds").record(processing_time.as_secs_f64());
559 histogram!("capture_original_size_bytes").record(original_size as f64);
560 histogram!("capture_processed_size_bytes").record(processed_size as f64);
561
562 if truncated {
563 counter!("captures_truncated_total").increment(1);
564 }
565
566 info!(
567 "Capture processed successfully: id={}, original_size={}, processed_size={}, time={}ms",
568 id, original_size, processed_size, processing_time_ms
569 );
570
571 Ok(Json(CaptureResponse {
572 id,
573 url: validated_url.to_string(),
574 processed_at: now,
575 original_size,
576 processed_size,
577 truncated,
578 processing_time_ms,
579 }))
580}
581
582pub async fn capture_health() -> impl IntoResponse {
584 Json(serde_json::json!({
585 "status": "healthy",
586 "service": "capture",
587 "timestamp": Utc::now().to_rfc3339()
588 }))
589}
590
591use axum::{routing::post, Router};
596
597pub fn capture_router(state: Arc<CaptureState>) -> Router {
599 Router::new()
600 .route("/capture", post(capture_handler))
601 .route("/capture/health", axum::routing::get(capture_health))
602 .with_state(state)
603}
604
605#[cfg(test)]
610mod tests {
611 use super::*;
612
613 #[test]
614 fn test_validate_url_valid_http() {
615 let result = validate_url("http://example.com/page");
616 assert!(result.is_ok());
617 }
618
619 #[test]
620 fn test_validate_url_valid_https() {
621 let result = validate_url("https://example.com/page?query=1");
622 assert!(result.is_ok());
623 }
624
625 #[test]
626 fn test_validate_url_empty() {
627 let result = validate_url("");
628 assert!(matches!(result, Err(CaptureError::InvalidUrl(_))));
629 }
630
631 #[test]
632 fn test_validate_url_invalid_scheme() {
633 let result = validate_url("ftp://example.com/file");
634 assert!(matches!(result, Err(CaptureError::InvalidUrl(_))));
635 }
636
637 #[test]
638 fn test_validate_url_no_host() {
639 let result = validate_url("file:///path");
644 assert!(matches!(result, Err(CaptureError::InvalidUrl(_))));
645
646 let result = validate_url("http:///");
648 assert!(result.is_err() || result.unwrap().host().is_some());
651 }
652
653 #[test]
654 fn test_validate_url_malformed() {
655 let result = validate_url("not a url");
656 assert!(matches!(result, Err(CaptureError::InvalidUrl(_))));
657 }
658
659 #[test]
660 fn test_content_processor_strips_scripts() {
661 let processor = ContentProcessor::new();
662 let config = CaptureConfig::default();
663
664 let html = "<p>Hello</p><script>evil();</script><p>World</p>";
665 let result = processor.process(html, &config).unwrap();
666
667 assert!(!result.contains("script"));
668 assert!(!result.contains("evil"));
669 assert!(result.contains("Hello"));
670 assert!(result.contains("World"));
671 }
672
673 #[test]
674 fn test_content_processor_strips_styles() {
675 let processor = ContentProcessor::new();
676 let config = CaptureConfig::default();
677
678 let html = "<p>Content</p><style>.hidden { display: none; }</style>";
679 let result = processor.process(html, &config).unwrap();
680
681 assert!(!result.contains("style"));
682 assert!(!result.contains("display"));
683 assert!(result.contains("Content"));
684 }
685
686 #[test]
687 fn test_content_processor_decodes_entities() {
688 let processor = ContentProcessor::new();
689 let config = CaptureConfig::default();
690
691 let html = "<p>Hello & World <test></p>";
692 let result = processor.process(html, &config).unwrap();
693
694 assert!(result.contains("Hello & World <test>"));
695 }
696
697 #[test]
698 fn test_content_processor_normalizes_whitespace() {
699 let processor = ContentProcessor::new();
700 let config = CaptureConfig::default();
701
702 let html = "<p>Hello World</p>\n\n\n\n<p>Next</p>";
703 let result = processor.process(html, &config).unwrap();
704
705 assert!(!result.contains(" "));
707 assert!(!result.contains("\n\n\n"));
708 }
709
710 #[test]
711 fn test_content_processor_strips_tags() {
712 let processor = ContentProcessor::new();
713 let config = CaptureConfig::default();
714
715 let html = "<div class=\"container\"><p>Text</p></div>";
716 let result = processor.process(html, &config).unwrap();
717
718 assert!(!result.contains("<"));
719 assert!(!result.contains(">"));
720 assert!(result.contains("Text"));
721 }
722
723 #[test]
724 fn test_capture_request_deserialization() {
725 let json = r#"{
726 "url": "https://example.com",
727 "content": "<p>Hello</p>",
728 "title": "Test Page"
729 }"#;
730
731 let request: CaptureRequest = serde_json::from_str(json).unwrap();
732 assert_eq!(request.url, "https://example.com");
733 assert_eq!(request.content, "<p>Hello</p>");
734 assert_eq!(request.title, Some("Test Page".to_string()));
735 assert!(request.description.is_none());
736 }
737
738 #[test]
739 fn test_capture_response_serialization() {
740 let response = CaptureResponse {
741 id: Uuid::new_v4(),
742 url: "https://example.com".to_string(),
743 processed_at: Utc::now(),
744 original_size: 1000,
745 processed_size: 500,
746 truncated: false,
747 processing_time_ms: 10,
748 };
749
750 let json = serde_json::to_string(&response).unwrap();
751 assert!(json.contains("\"id\""));
752 assert!(json.contains("\"url\""));
753 assert!(json.contains("\"processed_size\""));
754 }
755
756 #[test]
757 fn test_capture_config_default() {
758 let config = CaptureConfig::default();
759 assert_eq!(config.max_content_length, DEFAULT_MAX_CONTENT_LENGTH);
760 assert!(config.strip_scripts);
761 assert!(config.strip_styles);
762 assert!(config.decode_entities);
763 assert!(config.normalize_whitespace);
764 }
765
766 #[tokio::test]
767 async fn test_capture_buffer_channel() {
768 let (tx, mut rx) = create_capture_buffer(10);
769
770 let capture = ProcessedCapture {
771 id: Uuid::new_v4(),
772 url: "https://example.com".to_string(),
773 title: Some("Test".to_string()),
774 description: None,
775 content: "Hello World".to_string(),
776 original_html: None,
777 captured_at: Utc::now(),
778 processed_at: Utc::now(),
779 metadata: None,
780 };
781
782 tx.send(capture.clone()).await.unwrap();
783
784 let received = rx.recv().await.unwrap();
785 assert_eq!(received.url, "https://example.com");
786 assert_eq!(received.content, "Hello World");
787 }
788
789 #[test]
790 fn test_capture_error_into_response() {
791 let error = CaptureError::InvalidRequest("test error".to_string());
792 let response = error.into_response();
793 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
794
795 let error = CaptureError::ContentTooLarge { size: 100, max: 50 };
796 let response = error.into_response();
797 assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
798
799 let error = CaptureError::ProcessingError("failed".to_string());
800 let response = error.into_response();
801 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
802 }
803
804 #[test]
805 fn test_html_entity_decoding_comprehensive() {
806 let text = " <>&"''—–…";
807 let decoded = ContentProcessor::decode_html_entities(text);
808
809 assert!(decoded.contains('<'));
810 assert!(decoded.contains('>'));
811 assert!(decoded.contains('&'));
812 assert!(decoded.contains('"'));
813 assert!(decoded.contains('\''));
814 }
815}