1use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::VecDeque;
54use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::sync::RwLock;
58use tokio::task::JoinHandle;
59use tracing::{debug, info, instrument};
60use uuid::Uuid;
61
62pub const DEFAULT_MAX_SIZE: usize = 1000;
64
65pub const DEFAULT_MAX_AGE_SECS: u64 = 3600;
67
68pub const DEFAULT_CLEANUP_INTERVAL_SECS: u64 = 300;
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct CaptureRecord {
74 pub id: Uuid,
76 pub url: String,
78 pub content: String,
80 pub processed_content: String,
82 pub captured_at: DateTime<Utc>,
84 pub processing_time_us: u64,
86 #[serde(default)]
88 pub is_compressed: bool,
89 #[serde(default)]
91 pub original_size: usize,
92}
93
94impl CaptureRecord {
95 pub fn new(
97 url: String,
98 content: String,
99 processed_content: String,
100 processing_time_us: u64,
101 ) -> Self {
102 let original_size = content.len() + processed_content.len();
103 Self {
104 id: Uuid::new_v4(),
105 url,
106 content,
107 processed_content,
108 captured_at: Utc::now(),
109 processing_time_us,
110 is_compressed: false,
111 original_size,
112 }
113 }
114
115 pub fn with_id(
117 id: Uuid,
118 url: String,
119 content: String,
120 processed_content: String,
121 processing_time_us: u64,
122 ) -> Self {
123 let original_size = content.len() + processed_content.len();
124 Self {
125 id,
126 url,
127 content,
128 processed_content,
129 captured_at: Utc::now(),
130 processing_time_us,
131 is_compressed: false,
132 original_size,
133 }
134 }
135
136 pub fn size_bytes(&self) -> usize {
138 self.content.len() + self.processed_content.len() + self.url.len()
139 }
140
141 pub fn age(&self) -> chrono::Duration {
143 Utc::now() - self.captured_at
144 }
145
146 pub fn is_expired(&self, max_age: Duration) -> bool {
148 let age_secs = self.age().num_seconds();
149 age_secs >= 0 && (age_secs as u64) > max_age.as_secs()
150 }
151}
152
153#[derive(Debug, Clone, Default, Serialize, Deserialize)]
155pub struct BufferStats {
156 pub count: usize,
158 pub max_size: usize,
160 pub total_bytes: usize,
162 pub total_pushed: u64,
164 pub evictions_size: u64,
166 pub evictions_age: u64,
168 pub gets_by_id: u64,
170 pub gets_by_id_hits: u64,
172 pub avg_processing_time_us: u64,
174}
175
176#[derive(Debug, Clone)]
178pub struct BufferConfig {
179 pub max_size: usize,
181 pub max_age: Duration,
183 pub cleanup_interval: Duration,
185 pub enable_compression: bool,
187 pub compression_threshold: usize,
189}
190
191impl Default for BufferConfig {
192 fn default() -> Self {
193 Self {
194 max_size: DEFAULT_MAX_SIZE,
195 max_age: Duration::from_secs(DEFAULT_MAX_AGE_SECS),
196 cleanup_interval: Duration::from_secs(DEFAULT_CLEANUP_INTERVAL_SECS),
197 enable_compression: false,
198 compression_threshold: 4096, }
200 }
201}
202
203#[derive(Debug, Clone, Default)]
205pub struct CaptureBufferBuilder {
206 config: BufferConfig,
207}
208
209impl CaptureBufferBuilder {
210 pub fn new() -> Self {
212 Self::default()
213 }
214
215 pub fn max_size(mut self, size: usize) -> Self {
217 self.config.max_size = size;
218 self
219 }
220
221 pub fn max_age(mut self, age: Duration) -> Self {
223 self.config.max_age = age;
224 self
225 }
226
227 pub fn cleanup_interval(mut self, interval: Duration) -> Self {
229 self.config.cleanup_interval = interval;
230 self
231 }
232
233 pub fn enable_compression(mut self, enable: bool) -> Self {
235 self.config.enable_compression = enable;
236 self
237 }
238
239 pub fn compression_threshold(mut self, threshold: usize) -> Self {
241 self.config.compression_threshold = threshold;
242 self
243 }
244
245 pub fn build(self) -> CaptureBuffer {
247 CaptureBuffer::with_config(self.config)
248 }
249}
250
251pub struct CaptureBuffer {
256 captures: RwLock<VecDeque<CaptureRecord>>,
258 config: BufferConfig,
260 stats: BufferStatsCounters,
262 cleanup_handle: RwLock<Option<JoinHandle<()>>>,
264}
265
266struct BufferStatsCounters {
268 total_pushed: AtomicU64,
269 evictions_size: AtomicU64,
270 evictions_age: AtomicU64,
271 gets_by_id: AtomicU64,
272 gets_by_id_hits: AtomicU64,
273 total_bytes: AtomicUsize,
274 total_processing_time: AtomicU64,
275}
276
277impl Default for BufferStatsCounters {
278 fn default() -> Self {
279 Self {
280 total_pushed: AtomicU64::new(0),
281 evictions_size: AtomicU64::new(0),
282 evictions_age: AtomicU64::new(0),
283 gets_by_id: AtomicU64::new(0),
284 gets_by_id_hits: AtomicU64::new(0),
285 total_bytes: AtomicUsize::new(0),
286 total_processing_time: AtomicU64::new(0),
287 }
288 }
289}
290
291impl CaptureBuffer {
292 pub fn new() -> Self {
294 Self::with_config(BufferConfig::default())
295 }
296
297 pub fn builder() -> CaptureBufferBuilder {
299 CaptureBufferBuilder::new()
300 }
301
302 pub fn with_config(config: BufferConfig) -> Self {
304 info!(
305 "Creating capture buffer: max_size={}, max_age={}s",
306 config.max_size,
307 config.max_age.as_secs()
308 );
309 Self {
310 captures: RwLock::new(VecDeque::with_capacity(config.max_size)),
311 config,
312 stats: BufferStatsCounters::default(),
313 cleanup_handle: RwLock::new(None),
314 }
315 }
316
317 #[instrument(skip(self, record), fields(url = %record.url, id = %record.id))]
321 pub async fn push(&self, mut record: CaptureRecord) {
322 let record_size = record.size_bytes();
323
324 if self.config.enable_compression
326 && record_size >= self.config.compression_threshold
327 && !record.is_compressed
328 {
329 record = Self::compress_record(record);
330 }
331
332 let mut captures = self.captures.write().await;
333
334 if captures.len() >= self.config.max_size {
336 if let Some(evicted) = captures.pop_front() {
337 debug!("Evicting capture {} due to size limit", evicted.id);
338 self.stats.evictions_size.fetch_add(1, Ordering::Relaxed);
339 self.stats
340 .total_bytes
341 .fetch_sub(evicted.size_bytes(), Ordering::Relaxed);
342 }
343 }
344
345 self.stats.total_pushed.fetch_add(1, Ordering::Relaxed);
347 self.stats
348 .total_bytes
349 .fetch_add(record.size_bytes(), Ordering::Relaxed);
350 self.stats
351 .total_processing_time
352 .fetch_add(record.processing_time_us, Ordering::Relaxed);
353
354 debug!(
355 "Pushing capture {} (size: {} bytes)",
356 record.id, record_size
357 );
358 captures.push_back(record);
359 }
360
361 #[instrument(skip(self))]
363 pub async fn get(&self, id: Uuid) -> Option<CaptureRecord> {
364 self.stats.gets_by_id.fetch_add(1, Ordering::Relaxed);
365
366 let captures = self.captures.read().await;
367 let result = captures.iter().find(|r| r.id == id).cloned();
368
369 if result.is_some() {
370 self.stats.gets_by_id_hits.fetch_add(1, Ordering::Relaxed);
371 }
372
373 result.map(|r| {
375 if r.is_compressed {
376 Self::decompress_record(r)
377 } else {
378 r
379 }
380 })
381 }
382
383 #[instrument(skip(self))]
387 pub async fn get_recent(&self, limit: usize) -> Vec<CaptureRecord> {
388 let captures = self.captures.read().await;
389 let mut result: Vec<_> = captures.iter().rev().take(limit).cloned().collect();
390
391 for record in result.iter_mut() {
393 if record.is_compressed {
394 *record = Self::decompress_record(record.clone());
395 }
396 }
397
398 debug!("Retrieved {} recent captures", result.len());
399 result
400 }
401
402 #[instrument(skip(self))]
404 pub async fn get_since(&self, timestamp: DateTime<Utc>) -> Vec<CaptureRecord> {
405 let captures = self.captures.read().await;
406 let mut result: Vec<_> = captures
407 .iter()
408 .filter(|r| r.captured_at >= timestamp)
409 .cloned()
410 .collect();
411
412 for record in result.iter_mut() {
414 if record.is_compressed {
415 *record = Self::decompress_record(record.clone());
416 }
417 }
418
419 result.sort_by(|a, b| b.captured_at.cmp(&a.captured_at));
421
422 debug!("Retrieved {} captures since {}", result.len(), timestamp);
423 result
424 }
425
426 #[instrument(skip(self))]
428 pub async fn get_by_url(&self, url: &str) -> Vec<CaptureRecord> {
429 let captures = self.captures.read().await;
430 let mut result: Vec<_> = captures.iter().filter(|r| r.url == url).cloned().collect();
431
432 for record in result.iter_mut() {
434 if record.is_compressed {
435 *record = Self::decompress_record(record.clone());
436 }
437 }
438
439 result.sort_by(|a, b| b.captured_at.cmp(&a.captured_at));
441
442 debug!("Retrieved {} captures for URL {}", result.len(), url);
443 result
444 }
445
446 #[instrument(skip(self))]
448 pub async fn clear(&self) {
449 let mut captures = self.captures.write().await;
450 let count = captures.len();
451 captures.clear();
452 self.stats.total_bytes.store(0, Ordering::Relaxed);
453 info!("Cleared {} captures from buffer", count);
454 }
455
456 pub async fn len(&self) -> usize {
458 self.captures.read().await.len()
459 }
460
461 pub async fn is_empty(&self) -> bool {
463 self.captures.read().await.is_empty()
464 }
465
466 pub async fn stats(&self) -> BufferStats {
468 let captures = self.captures.read().await;
469 let total_pushed = self.stats.total_pushed.load(Ordering::Relaxed);
470 let total_processing_time = self.stats.total_processing_time.load(Ordering::Relaxed);
471
472 BufferStats {
473 count: captures.len(),
474 max_size: self.config.max_size,
475 total_bytes: self.stats.total_bytes.load(Ordering::Relaxed),
476 total_pushed,
477 evictions_size: self.stats.evictions_size.load(Ordering::Relaxed),
478 evictions_age: self.stats.evictions_age.load(Ordering::Relaxed),
479 gets_by_id: self.stats.gets_by_id.load(Ordering::Relaxed),
480 gets_by_id_hits: self.stats.gets_by_id_hits.load(Ordering::Relaxed),
481 avg_processing_time_us: if total_pushed > 0 {
482 total_processing_time / total_pushed
483 } else {
484 0
485 },
486 }
487 }
488
489 #[instrument(skip(self))]
493 pub async fn cleanup_expired(&self) -> usize {
494 let mut captures = self.captures.write().await;
495 let initial_len = captures.len();
496
497 let max_age = self.config.max_age;
498 let mut removed_bytes = 0usize;
499
500 captures.retain(|record| {
501 let should_keep = !record.is_expired(max_age);
502 if !should_keep {
503 removed_bytes += record.size_bytes();
504 }
505 should_keep
506 });
507
508 let removed = initial_len - captures.len();
509
510 if removed > 0 {
511 self.stats
512 .evictions_age
513 .fetch_add(removed as u64, Ordering::Relaxed);
514 self.stats
515 .total_bytes
516 .fetch_sub(removed_bytes, Ordering::Relaxed);
517 info!("Cleaned up {} expired captures", removed);
518 }
519
520 removed
521 }
522
523 pub fn start_cleanup_task(self: &Arc<Self>) -> JoinHandle<()> {
527 let buffer = Arc::clone(self);
528 let interval = self.config.cleanup_interval;
529
530 info!(
531 "Starting cleanup task with interval {}s",
532 interval.as_secs()
533 );
534
535 tokio::spawn(async move {
536 let mut interval_timer = tokio::time::interval(interval);
537
538 loop {
539 interval_timer.tick().await;
540 let removed = buffer.cleanup_expired().await;
541 if removed > 0 {
542 debug!("Cleanup task removed {} expired captures", removed);
543 }
544 }
545 })
546 }
547
548 pub async fn start_cleanup(self: &Arc<Self>) {
550 let handle = self.start_cleanup_task();
551 let mut guard = self.cleanup_handle.write().await;
552 *guard = Some(handle);
553 }
554
555 pub async fn stop_cleanup(&self) {
557 let mut guard = self.cleanup_handle.write().await;
558 if let Some(handle) = guard.take() {
559 handle.abort();
560 info!("Stopped cleanup task");
561 }
562 }
563
564 pub fn config(&self) -> &BufferConfig {
566 &self.config
567 }
568
569 fn compress_record(mut record: CaptureRecord) -> CaptureRecord {
574 record.is_compressed = true;
577 record.original_size = record.content.len() + record.processed_content.len();
578 record
579 }
580
581 fn decompress_record(mut record: CaptureRecord) -> CaptureRecord {
583 record.is_compressed = false;
585 record
586 }
587}
588
589impl Default for CaptureBuffer {
590 fn default() -> Self {
591 Self::new()
592 }
593}
594
595pub type SharedCaptureBuffer = Arc<CaptureBuffer>;
597
598pub fn shared_buffer() -> SharedCaptureBuffer {
600 Arc::new(CaptureBuffer::new())
601}
602
603pub fn shared_buffer_with_config(config: BufferConfig) -> SharedCaptureBuffer {
605 Arc::new(CaptureBuffer::with_config(config))
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611 use std::time::Duration;
612
613 fn create_test_record(url: &str) -> CaptureRecord {
614 CaptureRecord::new(
615 url.to_string(),
616 "<html><body>Test</body></html>".to_string(),
617 "Test content".to_string(),
618 100,
619 )
620 }
621
622 #[tokio::test]
623 async fn test_buffer_push_and_get() {
624 let buffer = CaptureBuffer::new();
625 let record = create_test_record("https://example.com");
626 let id = record.id;
627
628 buffer.push(record).await;
629
630 let retrieved = buffer.get(id).await;
631 assert!(retrieved.is_some());
632 assert_eq!(retrieved.unwrap().url, "https://example.com");
633 }
634
635 #[tokio::test]
636 async fn test_buffer_get_recent() {
637 let buffer = CaptureBuffer::new();
638
639 for i in 0..5 {
640 buffer
641 .push(create_test_record(&format!("https://example{}.com", i)))
642 .await;
643 tokio::time::sleep(Duration::from_millis(10)).await;
644 }
645
646 let recent = buffer.get_recent(3).await;
647 assert_eq!(recent.len(), 3);
648 assert_eq!(recent[0].url, "https://example4.com");
650 }
651
652 #[tokio::test]
653 async fn test_buffer_fifo_eviction() {
654 let buffer = CaptureBuffer::builder().max_size(3).build();
655
656 let first_record = create_test_record("https://first.com");
657 let first_id = first_record.id;
658
659 buffer.push(first_record).await;
660 buffer.push(create_test_record("https://second.com")).await;
661 buffer.push(create_test_record("https://third.com")).await;
662
663 assert_eq!(buffer.len().await, 3);
665
666 buffer.push(create_test_record("https://fourth.com")).await;
668
669 assert_eq!(buffer.len().await, 3);
670 assert!(buffer.get(first_id).await.is_none());
671
672 let stats = buffer.stats().await;
673 assert_eq!(stats.evictions_size, 1);
674 }
675
676 #[tokio::test]
677 async fn test_buffer_get_since() {
678 let buffer = CaptureBuffer::new();
679
680 buffer.push(create_test_record("https://old.com")).await;
682 tokio::time::sleep(Duration::from_millis(50)).await;
683
684 let cutoff = Utc::now();
685 tokio::time::sleep(Duration::from_millis(50)).await;
686
687 buffer.push(create_test_record("https://new1.com")).await;
688 buffer.push(create_test_record("https://new2.com")).await;
689
690 let since_cutoff = buffer.get_since(cutoff).await;
691 assert_eq!(since_cutoff.len(), 2);
692 }
693
694 #[tokio::test]
695 async fn test_buffer_get_by_url() {
696 let buffer = CaptureBuffer::new();
697
698 buffer.push(create_test_record("https://example.com")).await;
699 buffer.push(create_test_record("https://other.com")).await;
700 buffer.push(create_test_record("https://example.com")).await;
701
702 let by_url = buffer.get_by_url("https://example.com").await;
703 assert_eq!(by_url.len(), 2);
704 }
705
706 #[tokio::test]
707 async fn test_buffer_clear() {
708 let buffer = CaptureBuffer::new();
709
710 buffer.push(create_test_record("https://example.com")).await;
711 buffer.push(create_test_record("https://other.com")).await;
712
713 assert_eq!(buffer.len().await, 2);
714 buffer.clear().await;
715 assert_eq!(buffer.len().await, 0);
716 assert!(buffer.is_empty().await);
717 }
718
719 #[tokio::test]
720 async fn test_buffer_expired_cleanup() {
721 let buffer = CaptureBuffer::builder()
722 .max_age(Duration::from_secs(1)) .build();
724
725 buffer.push(create_test_record("https://example.com")).await;
726
727 tokio::time::sleep(Duration::from_millis(2100)).await;
729
730 let removed = buffer.cleanup_expired().await;
731 assert_eq!(removed, 1);
732 assert!(buffer.is_empty().await);
733 }
734
735 #[tokio::test]
736 async fn test_buffer_stats() {
737 let buffer = CaptureBuffer::new();
738
739 buffer.push(create_test_record("https://example.com")).await;
740 let record = create_test_record("https://other.com");
741 let id = record.id;
742 buffer.push(record).await;
743
744 buffer.get(id).await;
746 buffer.get(Uuid::new_v4()).await; let stats = buffer.stats().await;
749 assert_eq!(stats.count, 2);
750 assert_eq!(stats.total_pushed, 2);
751 assert_eq!(stats.gets_by_id, 2);
752 assert_eq!(stats.gets_by_id_hits, 1);
753 }
754
755 #[tokio::test]
756 async fn test_capture_record_is_expired() {
757 let record = create_test_record("https://example.com");
758
759 assert!(!record.is_expired(Duration::from_secs(3600)));
761
762 assert!(!record.is_expired(Duration::from_millis(0)));
764
765 tokio::time::sleep(Duration::from_millis(1100)).await;
767 assert!(
768 record.is_expired(Duration::from_millis(0)),
769 "Should be expired after 1 second with 0 TTL"
770 );
771 }
772
773 #[tokio::test]
774 async fn test_buffer_builder() {
775 let buffer = CaptureBuffer::builder()
776 .max_size(500)
777 .max_age(Duration::from_secs(1800))
778 .cleanup_interval(Duration::from_secs(60))
779 .enable_compression(true)
780 .compression_threshold(1024)
781 .build();
782
783 assert_eq!(buffer.config().max_size, 500);
784 assert_eq!(buffer.config().max_age, Duration::from_secs(1800));
785 assert!(buffer.config().enable_compression);
786 }
787
788 #[tokio::test]
789 async fn test_shared_buffer() {
790 let buffer = shared_buffer();
791
792 let buffer_clone = Arc::clone(&buffer);
794
795 buffer.push(create_test_record("https://example.com")).await;
796
797 assert_eq!(buffer_clone.len().await, 1);
798 }
799
800 #[test]
801 fn test_capture_record_size_bytes() {
802 let record = CaptureRecord::new(
803 "https://example.com".to_string(),
804 "content".to_string(),
805 "processed".to_string(),
806 100,
807 );
808
809 assert_eq!(
811 record.size_bytes(),
812 "https://example.com".len() + "content".len() + "processed".len()
813 );
814 }
815
816 #[test]
817 fn test_capture_record_with_id() {
818 let custom_id = Uuid::new_v4();
819 let record = CaptureRecord::with_id(
820 custom_id,
821 "https://example.com".to_string(),
822 "content".to_string(),
823 "processed".to_string(),
824 100,
825 );
826
827 assert_eq!(record.id, custom_id);
828 }
829
830 #[tokio::test]
831 async fn test_concurrent_access() {
832 let buffer = shared_buffer();
833
834 let mut handles = vec![];
836
837 for i in 0..10 {
838 let buffer_clone = Arc::clone(&buffer);
839 handles.push(tokio::spawn(async move {
840 buffer_clone
841 .push(create_test_record(&format!("https://site{}.com", i)))
842 .await;
843 }));
844 }
845
846 for i in 0..5 {
847 let buffer_clone = Arc::clone(&buffer);
848 handles.push(tokio::spawn(async move {
849 let _ = buffer_clone.get_recent(i + 1).await;
850 }));
851 }
852
853 for handle in handles {
854 handle.await.unwrap();
855 }
856
857 assert_eq!(buffer.len().await, 10);
858 }
859
860 #[tokio::test]
861 async fn test_cleanup_task_start_stop() {
862 let buffer = shared_buffer_with_config(BufferConfig {
863 cleanup_interval: Duration::from_secs(1),
864 max_age: Duration::from_secs(1), ..Default::default()
866 });
867
868 buffer.start_cleanup().await;
870
871 buffer.push(create_test_record("https://example.com")).await;
873 assert_eq!(buffer.len().await, 1);
874
875 tokio::time::sleep(Duration::from_secs(3)).await;
877
878 assert_eq!(buffer.len().await, 0);
880
881 buffer.stop_cleanup().await;
883 }
884}