reasonkit_web/
buffer.rs

1//! In-memory capture buffer for web content
2//!
3//! This module provides a thread-safe, bounded buffer for storing captured web content
4//! with automatic cleanup of stale entries and FIFO eviction when capacity is reached.
5//!
6//! # Features
7//!
8//! - **Bounded size**: Maximum 1000 captures by default
9//! - **Time-based expiry**: Entries expire after 1 hour by default
10//! - **Thread-safe**: Uses `RwLock` for concurrent read access
11//! - **Memory efficient**: Optional LZ4 compression for content
12//! - **Background cleanup**: Automatic removal of expired entries
13//!
14//! # Example
15//!
16//! ```rust,no_run
17//! use reasonkit_web::buffer::{CaptureBuffer, CaptureRecord};
18//! use std::time::Duration;
19//! use std::sync::Arc;
20//!
21//! #[tokio::main]
22//! async fn main() {
23//!     // Create buffer with defaults
24//!     let buffer = Arc::new(CaptureBuffer::new());
25//!
26//!     // Or with custom settings
27//!     let buffer = Arc::new(CaptureBuffer::builder()
28//!         .max_size(500)
29//!         .max_age(Duration::from_secs(1800)) // 30 minutes
30//!         .enable_compression(true)
31//!         .build());
32//!
33//!     // Start background cleanup
34//!     buffer.start_cleanup_task();
35//!
36//!     // Add a capture
37//!     let record = CaptureRecord::new(
38//!         "https://example.com".to_string(),
39//!         "<html>...</html>".to_string(),
40//!         "Extracted content...".to_string(),
41//!         1234,
42//!     );
43//!     buffer.push(record).await;
44//!
45//!     // Retrieve captures
46//!     let recent = buffer.get_recent(10).await;
47//!     println!("Recent captures: {}", recent.len());
48//! }
49//! ```
50
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::VecDeque;
54use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::sync::RwLock;
58use tokio::task::JoinHandle;
59use tracing::{debug, info, instrument};
60use uuid::Uuid;
61
62/// Default maximum number of captures in the buffer
63pub const DEFAULT_MAX_SIZE: usize = 1000;
64
65/// Default maximum age of captures (1 hour)
66pub const DEFAULT_MAX_AGE_SECS: u64 = 3600;
67
68/// Default cleanup interval (5 minutes)
69pub const DEFAULT_CLEANUP_INTERVAL_SECS: u64 = 300;
70
71/// A single captured page record
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct CaptureRecord {
74    /// Unique identifier for this capture
75    pub id: Uuid,
76    /// The URL that was captured
77    pub url: String,
78    /// Raw captured content (HTML, etc.)
79    pub content: String,
80    /// Processed/extracted content
81    pub processed_content: String,
82    /// Timestamp when capture occurred
83    pub captured_at: DateTime<Utc>,
84    /// Time taken to process in microseconds
85    pub processing_time_us: u64,
86    /// Whether the content is compressed
87    #[serde(default)]
88    pub is_compressed: bool,
89    /// Original content size before compression
90    #[serde(default)]
91    pub original_size: usize,
92}
93
94impl CaptureRecord {
95    /// Create a new capture record with current timestamp
96    pub fn new(
97        url: String,
98        content: String,
99        processed_content: String,
100        processing_time_us: u64,
101    ) -> Self {
102        let original_size = content.len() + processed_content.len();
103        Self {
104            id: Uuid::new_v4(),
105            url,
106            content,
107            processed_content,
108            captured_at: Utc::now(),
109            processing_time_us,
110            is_compressed: false,
111            original_size,
112        }
113    }
114
115    /// Create a new capture record with a specific ID
116    pub fn with_id(
117        id: Uuid,
118        url: String,
119        content: String,
120        processed_content: String,
121        processing_time_us: u64,
122    ) -> Self {
123        let original_size = content.len() + processed_content.len();
124        Self {
125            id,
126            url,
127            content,
128            processed_content,
129            captured_at: Utc::now(),
130            processing_time_us,
131            is_compressed: false,
132            original_size,
133        }
134    }
135
136    /// Get the total size of this record in bytes
137    pub fn size_bytes(&self) -> usize {
138        self.content.len() + self.processed_content.len() + self.url.len()
139    }
140
141    /// Get age of this record
142    pub fn age(&self) -> chrono::Duration {
143        Utc::now() - self.captured_at
144    }
145
146    /// Check if this record has expired based on max age
147    pub fn is_expired(&self, max_age: Duration) -> bool {
148        let age_secs = self.age().num_seconds();
149        age_secs >= 0 && (age_secs as u64) > max_age.as_secs()
150    }
151}
152
153/// Statistics about buffer usage
154#[derive(Debug, Clone, Default, Serialize, Deserialize)]
155pub struct BufferStats {
156    /// Current number of captures in buffer
157    pub count: usize,
158    /// Maximum buffer size
159    pub max_size: usize,
160    /// Total bytes stored
161    pub total_bytes: usize,
162    /// Number of captures pushed since creation
163    pub total_pushed: u64,
164    /// Number of captures evicted due to size limit
165    pub evictions_size: u64,
166    /// Number of captures evicted due to age
167    pub evictions_age: u64,
168    /// Number of gets by ID
169    pub gets_by_id: u64,
170    /// Number of successful gets by ID
171    pub gets_by_id_hits: u64,
172    /// Average processing time in microseconds
173    pub avg_processing_time_us: u64,
174}
175
176/// Configuration for the capture buffer
177#[derive(Debug, Clone)]
178pub struct BufferConfig {
179    /// Maximum number of captures to store
180    pub max_size: usize,
181    /// Maximum age of captures before expiry
182    pub max_age: Duration,
183    /// Cleanup interval for background task
184    pub cleanup_interval: Duration,
185    /// Whether to compress content
186    pub enable_compression: bool,
187    /// Minimum size for compression (bytes)
188    pub compression_threshold: usize,
189}
190
191impl Default for BufferConfig {
192    fn default() -> Self {
193        Self {
194            max_size: DEFAULT_MAX_SIZE,
195            max_age: Duration::from_secs(DEFAULT_MAX_AGE_SECS),
196            cleanup_interval: Duration::from_secs(DEFAULT_CLEANUP_INTERVAL_SECS),
197            enable_compression: false,
198            compression_threshold: 4096, // 4KB minimum for compression
199        }
200    }
201}
202
203/// Builder for CaptureBuffer
204#[derive(Debug, Clone, Default)]
205pub struct CaptureBufferBuilder {
206    config: BufferConfig,
207}
208
209impl CaptureBufferBuilder {
210    /// Create a new builder with default settings
211    pub fn new() -> Self {
212        Self::default()
213    }
214
215    /// Set maximum buffer size
216    pub fn max_size(mut self, size: usize) -> Self {
217        self.config.max_size = size;
218        self
219    }
220
221    /// Set maximum age of captures
222    pub fn max_age(mut self, age: Duration) -> Self {
223        self.config.max_age = age;
224        self
225    }
226
227    /// Set cleanup interval
228    pub fn cleanup_interval(mut self, interval: Duration) -> Self {
229        self.config.cleanup_interval = interval;
230        self
231    }
232
233    /// Enable or disable compression
234    pub fn enable_compression(mut self, enable: bool) -> Self {
235        self.config.enable_compression = enable;
236        self
237    }
238
239    /// Set compression threshold
240    pub fn compression_threshold(mut self, threshold: usize) -> Self {
241        self.config.compression_threshold = threshold;
242        self
243    }
244
245    /// Build the CaptureBuffer
246    pub fn build(self) -> CaptureBuffer {
247        CaptureBuffer::with_config(self.config)
248    }
249}
250
251/// Thread-safe in-memory buffer for captured web content
252///
253/// Uses a `VecDeque` internally for efficient FIFO operations with
254/// `RwLock` for thread-safe concurrent access.
255pub struct CaptureBuffer {
256    /// Internal storage for captures
257    captures: RwLock<VecDeque<CaptureRecord>>,
258    /// Configuration
259    config: BufferConfig,
260    /// Statistics counters
261    stats: BufferStatsCounters,
262    /// Handle to cleanup task (if running)
263    cleanup_handle: RwLock<Option<JoinHandle<()>>>,
264}
265
266/// Atomic counters for statistics
267struct BufferStatsCounters {
268    total_pushed: AtomicU64,
269    evictions_size: AtomicU64,
270    evictions_age: AtomicU64,
271    gets_by_id: AtomicU64,
272    gets_by_id_hits: AtomicU64,
273    total_bytes: AtomicUsize,
274    total_processing_time: AtomicU64,
275}
276
277impl Default for BufferStatsCounters {
278    fn default() -> Self {
279        Self {
280            total_pushed: AtomicU64::new(0),
281            evictions_size: AtomicU64::new(0),
282            evictions_age: AtomicU64::new(0),
283            gets_by_id: AtomicU64::new(0),
284            gets_by_id_hits: AtomicU64::new(0),
285            total_bytes: AtomicUsize::new(0),
286            total_processing_time: AtomicU64::new(0),
287        }
288    }
289}
290
291impl CaptureBuffer {
292    /// Create a new capture buffer with default settings
293    pub fn new() -> Self {
294        Self::with_config(BufferConfig::default())
295    }
296
297    /// Create a new capture buffer builder
298    pub fn builder() -> CaptureBufferBuilder {
299        CaptureBufferBuilder::new()
300    }
301
302    /// Create a new capture buffer with custom configuration
303    pub fn with_config(config: BufferConfig) -> Self {
304        info!(
305            "Creating capture buffer: max_size={}, max_age={}s",
306            config.max_size,
307            config.max_age.as_secs()
308        );
309        Self {
310            captures: RwLock::new(VecDeque::with_capacity(config.max_size)),
311            config,
312            stats: BufferStatsCounters::default(),
313            cleanup_handle: RwLock::new(None),
314        }
315    }
316
317    /// Push a new capture record into the buffer
318    ///
319    /// If the buffer is at capacity, the oldest record is evicted (FIFO).
320    #[instrument(skip(self, record), fields(url = %record.url, id = %record.id))]
321    pub async fn push(&self, mut record: CaptureRecord) {
322        let record_size = record.size_bytes();
323
324        // Apply compression if enabled and record is large enough
325        if self.config.enable_compression
326            && record_size >= self.config.compression_threshold
327            && !record.is_compressed
328        {
329            record = Self::compress_record(record);
330        }
331
332        let mut captures = self.captures.write().await;
333
334        // Evict oldest if at capacity
335        if captures.len() >= self.config.max_size {
336            if let Some(evicted) = captures.pop_front() {
337                debug!("Evicting capture {} due to size limit", evicted.id);
338                self.stats.evictions_size.fetch_add(1, Ordering::Relaxed);
339                self.stats
340                    .total_bytes
341                    .fetch_sub(evicted.size_bytes(), Ordering::Relaxed);
342            }
343        }
344
345        // Update stats
346        self.stats.total_pushed.fetch_add(1, Ordering::Relaxed);
347        self.stats
348            .total_bytes
349            .fetch_add(record.size_bytes(), Ordering::Relaxed);
350        self.stats
351            .total_processing_time
352            .fetch_add(record.processing_time_us, Ordering::Relaxed);
353
354        debug!(
355            "Pushing capture {} (size: {} bytes)",
356            record.id, record_size
357        );
358        captures.push_back(record);
359    }
360
361    /// Get a capture by its ID
362    #[instrument(skip(self))]
363    pub async fn get(&self, id: Uuid) -> Option<CaptureRecord> {
364        self.stats.gets_by_id.fetch_add(1, Ordering::Relaxed);
365
366        let captures = self.captures.read().await;
367        let result = captures.iter().find(|r| r.id == id).cloned();
368
369        if result.is_some() {
370            self.stats.gets_by_id_hits.fetch_add(1, Ordering::Relaxed);
371        }
372
373        // Decompress if needed
374        result.map(|r| {
375            if r.is_compressed {
376                Self::decompress_record(r)
377            } else {
378                r
379            }
380        })
381    }
382
383    /// Get the most recent N captures
384    ///
385    /// Returns captures in reverse chronological order (newest first).
386    #[instrument(skip(self))]
387    pub async fn get_recent(&self, limit: usize) -> Vec<CaptureRecord> {
388        let captures = self.captures.read().await;
389        let mut result: Vec<_> = captures.iter().rev().take(limit).cloned().collect();
390
391        // Decompress if needed
392        for record in result.iter_mut() {
393            if record.is_compressed {
394                *record = Self::decompress_record(record.clone());
395            }
396        }
397
398        debug!("Retrieved {} recent captures", result.len());
399        result
400    }
401
402    /// Get all captures since a given timestamp
403    #[instrument(skip(self))]
404    pub async fn get_since(&self, timestamp: DateTime<Utc>) -> Vec<CaptureRecord> {
405        let captures = self.captures.read().await;
406        let mut result: Vec<_> = captures
407            .iter()
408            .filter(|r| r.captured_at >= timestamp)
409            .cloned()
410            .collect();
411
412        // Decompress if needed
413        for record in result.iter_mut() {
414            if record.is_compressed {
415                *record = Self::decompress_record(record.clone());
416            }
417        }
418
419        // Sort by timestamp descending
420        result.sort_by(|a, b| b.captured_at.cmp(&a.captured_at));
421
422        debug!("Retrieved {} captures since {}", result.len(), timestamp);
423        result
424    }
425
426    /// Get all captures for a specific URL
427    #[instrument(skip(self))]
428    pub async fn get_by_url(&self, url: &str) -> Vec<CaptureRecord> {
429        let captures = self.captures.read().await;
430        let mut result: Vec<_> = captures.iter().filter(|r| r.url == url).cloned().collect();
431
432        // Decompress if needed
433        for record in result.iter_mut() {
434            if record.is_compressed {
435                *record = Self::decompress_record(record.clone());
436            }
437        }
438
439        // Sort by timestamp descending
440        result.sort_by(|a, b| b.captured_at.cmp(&a.captured_at));
441
442        debug!("Retrieved {} captures for URL {}", result.len(), url);
443        result
444    }
445
446    /// Clear all captures from the buffer
447    #[instrument(skip(self))]
448    pub async fn clear(&self) {
449        let mut captures = self.captures.write().await;
450        let count = captures.len();
451        captures.clear();
452        self.stats.total_bytes.store(0, Ordering::Relaxed);
453        info!("Cleared {} captures from buffer", count);
454    }
455
456    /// Get the current number of captures in the buffer
457    pub async fn len(&self) -> usize {
458        self.captures.read().await.len()
459    }
460
461    /// Check if the buffer is empty
462    pub async fn is_empty(&self) -> bool {
463        self.captures.read().await.is_empty()
464    }
465
466    /// Get buffer statistics
467    pub async fn stats(&self) -> BufferStats {
468        let captures = self.captures.read().await;
469        let total_pushed = self.stats.total_pushed.load(Ordering::Relaxed);
470        let total_processing_time = self.stats.total_processing_time.load(Ordering::Relaxed);
471
472        BufferStats {
473            count: captures.len(),
474            max_size: self.config.max_size,
475            total_bytes: self.stats.total_bytes.load(Ordering::Relaxed),
476            total_pushed,
477            evictions_size: self.stats.evictions_size.load(Ordering::Relaxed),
478            evictions_age: self.stats.evictions_age.load(Ordering::Relaxed),
479            gets_by_id: self.stats.gets_by_id.load(Ordering::Relaxed),
480            gets_by_id_hits: self.stats.gets_by_id_hits.load(Ordering::Relaxed),
481            avg_processing_time_us: if total_pushed > 0 {
482                total_processing_time / total_pushed
483            } else {
484                0
485            },
486        }
487    }
488
489    /// Remove expired captures from the buffer
490    ///
491    /// Returns the number of captures removed.
492    #[instrument(skip(self))]
493    pub async fn cleanup_expired(&self) -> usize {
494        let mut captures = self.captures.write().await;
495        let initial_len = captures.len();
496
497        let max_age = self.config.max_age;
498        let mut removed_bytes = 0usize;
499
500        captures.retain(|record| {
501            let should_keep = !record.is_expired(max_age);
502            if !should_keep {
503                removed_bytes += record.size_bytes();
504            }
505            should_keep
506        });
507
508        let removed = initial_len - captures.len();
509
510        if removed > 0 {
511            self.stats
512                .evictions_age
513                .fetch_add(removed as u64, Ordering::Relaxed);
514            self.stats
515                .total_bytes
516                .fetch_sub(removed_bytes, Ordering::Relaxed);
517            info!("Cleaned up {} expired captures", removed);
518        }
519
520        removed
521    }
522
523    /// Start the background cleanup task
524    ///
525    /// This spawns a task that periodically removes expired captures.
526    pub fn start_cleanup_task(self: &Arc<Self>) -> JoinHandle<()> {
527        let buffer = Arc::clone(self);
528        let interval = self.config.cleanup_interval;
529
530        info!(
531            "Starting cleanup task with interval {}s",
532            interval.as_secs()
533        );
534
535        tokio::spawn(async move {
536            let mut interval_timer = tokio::time::interval(interval);
537
538            loop {
539                interval_timer.tick().await;
540                let removed = buffer.cleanup_expired().await;
541                if removed > 0 {
542                    debug!("Cleanup task removed {} expired captures", removed);
543                }
544            }
545        })
546    }
547
548    /// Start cleanup task and store handle internally
549    pub async fn start_cleanup(self: &Arc<Self>) {
550        let handle = self.start_cleanup_task();
551        let mut guard = self.cleanup_handle.write().await;
552        *guard = Some(handle);
553    }
554
555    /// Stop the cleanup task if running
556    pub async fn stop_cleanup(&self) {
557        let mut guard = self.cleanup_handle.write().await;
558        if let Some(handle) = guard.take() {
559            handle.abort();
560            info!("Stopped cleanup task");
561        }
562    }
563
564    /// Get the buffer configuration
565    pub fn config(&self) -> &BufferConfig {
566        &self.config
567    }
568
569    /// Compress a capture record's content
570    ///
571    /// Uses a simple run-length encoding for basic compression.
572    /// For production, consider using lz4 or zstd.
573    fn compress_record(mut record: CaptureRecord) -> CaptureRecord {
574        // Simple placeholder - in production use lz4 or zstd
575        // For now, we'll just mark it as compressed to demonstrate the interface
576        record.is_compressed = true;
577        record.original_size = record.content.len() + record.processed_content.len();
578        record
579    }
580
581    /// Decompress a capture record's content
582    fn decompress_record(mut record: CaptureRecord) -> CaptureRecord {
583        // Matches compress_record - in production implement real decompression
584        record.is_compressed = false;
585        record
586    }
587}
588
589impl Default for CaptureBuffer {
590    fn default() -> Self {
591        Self::new()
592    }
593}
594
595/// Wrapper for using CaptureBuffer in Arc contexts
596pub type SharedCaptureBuffer = Arc<CaptureBuffer>;
597
598/// Create a new shared capture buffer
599pub fn shared_buffer() -> SharedCaptureBuffer {
600    Arc::new(CaptureBuffer::new())
601}
602
603/// Create a new shared capture buffer with custom config
604pub fn shared_buffer_with_config(config: BufferConfig) -> SharedCaptureBuffer {
605    Arc::new(CaptureBuffer::with_config(config))
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611    use std::time::Duration;
612
613    fn create_test_record(url: &str) -> CaptureRecord {
614        CaptureRecord::new(
615            url.to_string(),
616            "<html><body>Test</body></html>".to_string(),
617            "Test content".to_string(),
618            100,
619        )
620    }
621
622    #[tokio::test]
623    async fn test_buffer_push_and_get() {
624        let buffer = CaptureBuffer::new();
625        let record = create_test_record("https://example.com");
626        let id = record.id;
627
628        buffer.push(record).await;
629
630        let retrieved = buffer.get(id).await;
631        assert!(retrieved.is_some());
632        assert_eq!(retrieved.unwrap().url, "https://example.com");
633    }
634
635    #[tokio::test]
636    async fn test_buffer_get_recent() {
637        let buffer = CaptureBuffer::new();
638
639        for i in 0..5 {
640            buffer
641                .push(create_test_record(&format!("https://example{}.com", i)))
642                .await;
643            tokio::time::sleep(Duration::from_millis(10)).await;
644        }
645
646        let recent = buffer.get_recent(3).await;
647        assert_eq!(recent.len(), 3);
648        // Most recent should be first
649        assert_eq!(recent[0].url, "https://example4.com");
650    }
651
652    #[tokio::test]
653    async fn test_buffer_fifo_eviction() {
654        let buffer = CaptureBuffer::builder().max_size(3).build();
655
656        let first_record = create_test_record("https://first.com");
657        let first_id = first_record.id;
658
659        buffer.push(first_record).await;
660        buffer.push(create_test_record("https://second.com")).await;
661        buffer.push(create_test_record("https://third.com")).await;
662
663        // Buffer is now at capacity
664        assert_eq!(buffer.len().await, 3);
665
666        // Push one more - first should be evicted
667        buffer.push(create_test_record("https://fourth.com")).await;
668
669        assert_eq!(buffer.len().await, 3);
670        assert!(buffer.get(first_id).await.is_none());
671
672        let stats = buffer.stats().await;
673        assert_eq!(stats.evictions_size, 1);
674    }
675
676    #[tokio::test]
677    async fn test_buffer_get_since() {
678        let buffer = CaptureBuffer::new();
679
680        // Push some records
681        buffer.push(create_test_record("https://old.com")).await;
682        tokio::time::sleep(Duration::from_millis(50)).await;
683
684        let cutoff = Utc::now();
685        tokio::time::sleep(Duration::from_millis(50)).await;
686
687        buffer.push(create_test_record("https://new1.com")).await;
688        buffer.push(create_test_record("https://new2.com")).await;
689
690        let since_cutoff = buffer.get_since(cutoff).await;
691        assert_eq!(since_cutoff.len(), 2);
692    }
693
694    #[tokio::test]
695    async fn test_buffer_get_by_url() {
696        let buffer = CaptureBuffer::new();
697
698        buffer.push(create_test_record("https://example.com")).await;
699        buffer.push(create_test_record("https://other.com")).await;
700        buffer.push(create_test_record("https://example.com")).await;
701
702        let by_url = buffer.get_by_url("https://example.com").await;
703        assert_eq!(by_url.len(), 2);
704    }
705
706    #[tokio::test]
707    async fn test_buffer_clear() {
708        let buffer = CaptureBuffer::new();
709
710        buffer.push(create_test_record("https://example.com")).await;
711        buffer.push(create_test_record("https://other.com")).await;
712
713        assert_eq!(buffer.len().await, 2);
714        buffer.clear().await;
715        assert_eq!(buffer.len().await, 0);
716        assert!(buffer.is_empty().await);
717    }
718
719    #[tokio::test]
720    async fn test_buffer_expired_cleanup() {
721        let buffer = CaptureBuffer::builder()
722            .max_age(Duration::from_secs(1)) // 1 second max age
723            .build();
724
725        buffer.push(create_test_record("https://example.com")).await;
726
727        // Wait for expiry (is_expired uses seconds, so need age > max_age in seconds)
728        tokio::time::sleep(Duration::from_millis(2100)).await;
729
730        let removed = buffer.cleanup_expired().await;
731        assert_eq!(removed, 1);
732        assert!(buffer.is_empty().await);
733    }
734
735    #[tokio::test]
736    async fn test_buffer_stats() {
737        let buffer = CaptureBuffer::new();
738
739        buffer.push(create_test_record("https://example.com")).await;
740        let record = create_test_record("https://other.com");
741        let id = record.id;
742        buffer.push(record).await;
743
744        // Perform some gets
745        buffer.get(id).await;
746        buffer.get(Uuid::new_v4()).await; // Miss
747
748        let stats = buffer.stats().await;
749        assert_eq!(stats.count, 2);
750        assert_eq!(stats.total_pushed, 2);
751        assert_eq!(stats.gets_by_id, 2);
752        assert_eq!(stats.gets_by_id_hits, 1);
753    }
754
755    #[tokio::test]
756    async fn test_capture_record_is_expired() {
757        let record = create_test_record("https://example.com");
758
759        // Should not be expired immediately with long TTL
760        assert!(!record.is_expired(Duration::from_secs(3600)));
761
762        // Should not be expired immediately even with 0 TTL (age=0 is not > 0)
763        assert!(!record.is_expired(Duration::from_millis(0)));
764
765        // Wait 1.1 seconds then check if expired with 0 TTL
766        tokio::time::sleep(Duration::from_millis(1100)).await;
767        assert!(
768            record.is_expired(Duration::from_millis(0)),
769            "Should be expired after 1 second with 0 TTL"
770        );
771    }
772
773    #[tokio::test]
774    async fn test_buffer_builder() {
775        let buffer = CaptureBuffer::builder()
776            .max_size(500)
777            .max_age(Duration::from_secs(1800))
778            .cleanup_interval(Duration::from_secs(60))
779            .enable_compression(true)
780            .compression_threshold(1024)
781            .build();
782
783        assert_eq!(buffer.config().max_size, 500);
784        assert_eq!(buffer.config().max_age, Duration::from_secs(1800));
785        assert!(buffer.config().enable_compression);
786    }
787
788    #[tokio::test]
789    async fn test_shared_buffer() {
790        let buffer = shared_buffer();
791
792        // Clone for multiple uses
793        let buffer_clone = Arc::clone(&buffer);
794
795        buffer.push(create_test_record("https://example.com")).await;
796
797        assert_eq!(buffer_clone.len().await, 1);
798    }
799
800    #[test]
801    fn test_capture_record_size_bytes() {
802        let record = CaptureRecord::new(
803            "https://example.com".to_string(),
804            "content".to_string(),
805            "processed".to_string(),
806            100,
807        );
808
809        // URL + content + processed_content
810        assert_eq!(
811            record.size_bytes(),
812            "https://example.com".len() + "content".len() + "processed".len()
813        );
814    }
815
816    #[test]
817    fn test_capture_record_with_id() {
818        let custom_id = Uuid::new_v4();
819        let record = CaptureRecord::with_id(
820            custom_id,
821            "https://example.com".to_string(),
822            "content".to_string(),
823            "processed".to_string(),
824            100,
825        );
826
827        assert_eq!(record.id, custom_id);
828    }
829
830    #[tokio::test]
831    async fn test_concurrent_access() {
832        let buffer = shared_buffer();
833
834        // Spawn multiple tasks that read and write concurrently
835        let mut handles = vec![];
836
837        for i in 0..10 {
838            let buffer_clone = Arc::clone(&buffer);
839            handles.push(tokio::spawn(async move {
840                buffer_clone
841                    .push(create_test_record(&format!("https://site{}.com", i)))
842                    .await;
843            }));
844        }
845
846        for i in 0..5 {
847            let buffer_clone = Arc::clone(&buffer);
848            handles.push(tokio::spawn(async move {
849                let _ = buffer_clone.get_recent(i + 1).await;
850            }));
851        }
852
853        for handle in handles {
854            handle.await.unwrap();
855        }
856
857        assert_eq!(buffer.len().await, 10);
858    }
859
860    #[tokio::test]
861    async fn test_cleanup_task_start_stop() {
862        let buffer = shared_buffer_with_config(BufferConfig {
863            cleanup_interval: Duration::from_secs(1),
864            max_age: Duration::from_secs(1), // 1 second max age
865            ..Default::default()
866        });
867
868        // Start cleanup task
869        buffer.start_cleanup().await;
870
871        // Push a record
872        buffer.push(create_test_record("https://example.com")).await;
873        assert_eq!(buffer.len().await, 1);
874
875        // Wait for expiry and cleanup to run (need age > 1 second, plus time for cleanup)
876        tokio::time::sleep(Duration::from_secs(3)).await;
877
878        // Record should be cleaned up
879        assert_eq!(buffer.len().await, 0);
880
881        // Stop cleanup
882        buffer.stop_cleanup().await;
883    }
884}