Skip to main content

sanitize_engine/
scanner.rs

1//! Streaming scanner for detecting and replacing sensitive data.
2//!
3//! # Architecture
4//!
5//! The streaming scanner processes input data in configurable chunks,
6//! detecting secret patterns (regex or literal) and applying one-way
7//! replacements via the [`MappingStore`].
8//! This design supports files of 20–100 GB+ without requiring the entire
9//! content to fit in memory.
10//!
11//! ```text
12//! ┌──────────────┐     ┌─────────────────┐     ┌──────────────────┐
13//! │  Input (Read) │ ──▶ │  StreamScanner  │ ──▶ │  Output (Write)  │
14//! │  (chunked)    │     │  (pattern match │     │  (sanitized)     │
15//! └──────────────┘     │   + replace)    │     └──────────────────┘
16//!                       └────────┬────────┘
17//!                                │
18//!                       ┌────────▼────────┐
19//!                       │  MappingStore   │
20//!                       │  (dedup cache)  │
21//!                       └─────────────────┘
22//! ```
23//!
24//! # Chunk Overlap Strategy
25//!
26//! To avoid missing matches that span chunk boundaries, the scanner
27//! maintains an overlap window between consecutive chunks:
28//!
29//! 1. Read `chunk_size` bytes of new data.
30//! 2. Prepend the `carry` buffer (tail of previous window).
31//! 3. Scan the combined `window` for all pattern matches.
32//! 4. Compute `commit_point = window.len() - overlap_size` (adjusted
33//!    upward if a match straddles the boundary).
34//! 5. Emit output for `window[..commit_point]` with replacements applied.
35//! 6. Set `carry = window[commit_point..]` for the next iteration.
36//!
37//! The `overlap_size` should be ≥ the maximum expected match length to
38//! guarantee no matches are missed at boundaries.
39//!
40//! # Thread Safety
41//!
42//! [`StreamScanner`] is `Send + Sync`. Multiple files can be scanned
43//! concurrently using a shared `Arc<StreamScanner>`, all backed by the
44//! same [`MappingStore`] for per-run dedup
45//! consistency.
46//!
47//! # Performance
48//!
49//! - **Chunk-based I/O**: only `chunk_size + overlap_size` bytes in
50//!   memory per active scan.
51//! - **Compiled regex**: patterns are compiled once at construction and
52//!   reused across all chunks and files.
53//! - **Lock-free reads**: the `DashMap` inside `MappingStore` provides
54//!   lock-free reads for already-seen values.
55//! - **File-level parallelism**: share `Arc<StreamScanner>` across
56//!   threads to scan multiple files concurrently.
57
58use crate::category::Category;
59use crate::error::{Result, SanitizeError};
60use crate::store::MappingStore;
61use aho_corasick::AhoCorasick;
62use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder};
63use std::collections::HashMap;
64use std::io::{self, Read, Write};
65use std::sync::Arc;
66
67// ---------------------------------------------------------------------------
68// Configuration
69// ---------------------------------------------------------------------------
70
71/// Default chunk size: 1 MiB.
72const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
73
74/// Default overlap size: 4 KiB.
75const DEFAULT_OVERLAP_SIZE: usize = 4096;
76
77/// Maximum compiled regex automaton size (bytes). Prevents DoS via
78/// pathologically complex user-supplied patterns.
79const REGEX_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
80
81/// Maximum DFA cache size (bytes) per regex.
82const REGEX_DFA_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
83
84/// Maximum number of patterns allowed in a single scanner (F-05 fix).
85/// The `RegexSet` automaton memory scales linearly with pattern count.
86/// With 1 MiB size/DFA limits per pattern, 10 000 patterns could
87/// allocate up to ~20 GiB of automaton memory.  This cap prevents
88/// accidental resource exhaustion.  Override via
89/// [`StreamScanner::new_with_max_patterns`] if needed.
90const DEFAULT_MAX_PATTERNS: usize = 10_000;
91
92/// Configuration for the streaming scanner.
93///
94/// # Tuning Guide
95///
96/// | Workload               | `chunk_size` | `overlap_size` |
97/// |------------------------|--------------|----------------|
98/// | Small files (< 10 MB)  | 256 KiB      | 1 KiB          |
99/// | General purpose        | 1 MiB        | 4 KiB          |
100/// | Large files (> 1 GB)   | 4–8 MiB      | 8 KiB          |
101/// | Memory-constrained     | 64 KiB       | 1 KiB          |
102///
103/// `overlap_size` should be ≥ the longest expected match. Most secret
104/// patterns (API keys, emails, SSNs) are well under 256 bytes, so the
105/// 4 KiB default provides ample margin.
106#[derive(Debug, Clone)]
107pub struct ScanConfig {
108    /// Size of each chunk read from the input (bytes).
109    ///
110    /// Larger chunks improve throughput (fewer syscalls) but use more
111    /// memory. Default: 1 MiB.
112    pub chunk_size: usize,
113
114    /// Overlap between consecutive chunks (bytes).
115    ///
116    /// Must be ≥ the maximum expected match length. Patterns whose
117    /// matches can exceed this length risk being missed at chunk
118    /// boundaries. Default: 4 KiB.
119    pub overlap_size: usize,
120}
121
122impl Default for ScanConfig {
123    fn default() -> Self {
124        Self {
125            chunk_size: DEFAULT_CHUNK_SIZE,
126            overlap_size: DEFAULT_OVERLAP_SIZE,
127        }
128    }
129}
130
131impl ScanConfig {
132    /// Create a new configuration with explicit values.
133    #[must_use]
134    pub fn new(chunk_size: usize, overlap_size: usize) -> Self {
135        Self {
136            chunk_size,
137            overlap_size,
138        }
139    }
140
141    /// Validate the configuration, returning an error if invalid.
142    ///
143    /// # Errors
144    ///
145    /// Returns [`SanitizeError::InvalidConfig`] if `chunk_size` is zero
146    /// or `overlap_size >= chunk_size`.
147    pub fn validate(&self) -> Result<()> {
148        if self.chunk_size == 0 {
149            return Err(SanitizeError::InvalidConfig(
150                "chunk_size must be > 0".into(),
151            ));
152        }
153        if self.overlap_size >= self.chunk_size {
154            return Err(SanitizeError::InvalidConfig(
155                "overlap_size must be < chunk_size".into(),
156            ));
157        }
158        Ok(())
159    }
160}
161
162// ---------------------------------------------------------------------------
163// Internal helpers
164// ---------------------------------------------------------------------------
165
166/// Convert any compile-time pattern error into [`SanitizeError::PatternCompileError`].
167#[inline]
168fn compile_err(e: impl std::fmt::Display) -> SanitizeError {
169    SanitizeError::PatternCompileError(e.to_string())
170}
171
172// ---------------------------------------------------------------------------
173// Scan pattern
174// ---------------------------------------------------------------------------
175
176/// A pattern rule defining what to scan for and how to categorize matches.
177///
178/// Wraps a compiled [`regex::bytes::Regex`] with a [`Category`] for
179/// replacement lookups and a human-readable label for reporting.
180///
181/// Both regex and literal patterns are supported. Literal patterns keep
182/// their original text and are matched by the scanner's Aho-Corasick
183/// automaton for fast multi-literal scanning.
184pub struct ScanPattern {
185    /// Compiled regex matcher (used for non-literal patterns and as a
186    /// fallback; literal patterns are matched via Aho-Corasick instead).
187    regex: Regex,
188    /// Category for replacement lookups.
189    category: Category,
190    /// Human-readable label for reporting / stats.
191    label: String,
192    /// Original (unescaped) literal string when created via `from_literal`.
193    /// `None` for patterns created via `from_regex`.
194    /// Stored so `StreamScanner` can build an Aho-Corasick automaton for
195    /// fast SIMD literal matching instead of running the regex engine.
196    literal: Option<String>,
197}
198
199impl std::fmt::Debug for ScanPattern {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        f.debug_struct("ScanPattern")
202            .field("pattern", &self.regex.as_str())
203            .field("category", &self.category)
204            .field("label", &self.label)
205            .field("literal", &self.literal.as_deref())
206            .finish()
207    }
208}
209
210impl Clone for ScanPattern {
211    fn clone(&self) -> Self {
212        Self {
213            regex: self.regex.clone(),
214            category: self.category.clone(),
215            label: self.label.clone(),
216            literal: self.literal.clone(),
217        }
218    }
219}
220
221impl ScanPattern {
222    /// Create a pattern from a regex string.
223    ///
224    /// # Errors
225    ///
226    /// Returns [`SanitizeError::PatternCompileError`] if the regex is invalid.
227    ///
228    /// # Examples
229    ///
230    /// ```
231    /// use sanitize_engine::scanner::ScanPattern;
232    /// use sanitize_engine::category::Category;
233    ///
234    /// let pat = ScanPattern::from_regex(
235    ///     r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
236    ///     Category::Email,
237    ///     "email_address",
238    /// ).unwrap();
239    /// ```
240    pub fn from_regex(pattern: &str, category: Category, label: impl Into<String>) -> Result<Self> {
241        let regex = RegexBuilder::new(pattern)
242            .size_limit(REGEX_SIZE_LIMIT)
243            .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
244            .build()
245            .map_err(compile_err)?;
246        Ok(Self {
247            regex,
248            category,
249            label: label.into(),
250            literal: None,
251        })
252    }
253
254    /// Create a pattern from a literal string.
255    ///
256    /// The literal is escaped so that regex metacharacters are matched
257    /// verbatim.
258    ///
259    /// # Errors
260    ///
261    /// Returns [`SanitizeError::PatternCompileError`] if regex compilation fails.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// use sanitize_engine::scanner::ScanPattern;
267    /// use sanitize_engine::category::Category;
268    ///
269    /// let pat = ScanPattern::from_literal(
270    ///     "sk-proj-abc123secret",
271    ///     Category::Custom("api_key".into()),
272    ///     "openai_key",
273    /// ).unwrap();
274    /// ```
275    pub fn from_literal(
276        literal: &str,
277        category: Category,
278        label: impl Into<String>,
279    ) -> Result<Self> {
280        let escaped = regex::escape(literal);
281        let regex = RegexBuilder::new(&escaped)
282            .size_limit(REGEX_SIZE_LIMIT)
283            .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
284            .build()
285            .map_err(compile_err)?;
286        Ok(Self {
287            regex,
288            category,
289            label: label.into(),
290            literal: Some(literal.to_owned()),
291        })
292    }
293
294    /// The category this pattern maps to.
295    #[must_use]
296    pub fn category(&self) -> &Category {
297        &self.category
298    }
299
300    /// The human-readable label.
301    #[must_use]
302    pub fn label(&self) -> &str {
303        &self.label
304    }
305
306    /// Return the raw regex pattern string for RegexSet construction.
307    #[must_use]
308    pub fn regex_pattern(&self) -> &str {
309        self.regex.as_str()
310    }
311}
312
313// ScanPattern is Send + Sync because:
314// - regex::bytes::Regex is Send + Sync
315// - Category is Send + Sync (it's an enum of primitives + CompactString)
316// - String is Send + Sync
317
318// ---------------------------------------------------------------------------
319// Internal: raw match descriptor
320// ---------------------------------------------------------------------------
321
322/// A single match found during scanning (internal).
323#[derive(Debug, Clone, Copy)]
324struct RawMatch {
325    /// Start byte offset within the scan window.
326    start: usize,
327    /// End byte offset (exclusive) within the scan window.
328    end: usize,
329    /// Index into the `StreamScanner::patterns` vector.
330    pattern_idx: usize,
331}
332
333// ---------------------------------------------------------------------------
334// Per-scan scratch buffers
335// ---------------------------------------------------------------------------
336
337/// Scratch buffers reused across chunks within a single scan call.
338///
339/// Allocating these once per `scan_reader_with_progress` invocation
340/// and reusing them each chunk eliminates the per-chunk heap pressure
341/// that would otherwise come from `Vec` allocations in `find_matches`
342/// and `apply_replacements`.
343struct ScanScratch {
344    /// Accumulates raw matches from all patterns before deduplication.
345    all_matches: Vec<RawMatch>,
346    /// Non-overlapping matches selected for the current window
347    /// (populated by `find_matches`, consumed by `apply_replacements`).
348    selected: Vec<RawMatch>,
349    /// Output bytes for the committed region, written by `apply_replacements`.
350    output: Vec<u8>,
351    /// Per-pattern match counts indexed by `pattern_idx`.
352    /// Reset to zero after each chunk's counts are folded into `ScanStats`.
353    pattern_counts: Vec<u64>,
354}
355
356impl ScanScratch {
357    fn new(pattern_count: usize, chunk_size: usize, overlap_size: usize) -> Self {
358        Self {
359            all_matches: Vec::new(),
360            selected: Vec::new(),
361            output: Vec::with_capacity(chunk_size + overlap_size),
362            pattern_counts: vec![0u64; pattern_count],
363        }
364    }
365}
366
367// ---------------------------------------------------------------------------
368// Scan statistics
369// ---------------------------------------------------------------------------
370
371/// Statistics collected during a scan operation.
372///
373/// Returned by [`StreamScanner::scan_reader`] and
374/// [`StreamScanner::scan_bytes`] to provide visibility into what
375/// the scanner did.
376#[derive(Debug, Clone, Default)]
377pub struct ScanStats {
378    /// Total bytes read from the input.
379    pub bytes_processed: u64,
380    /// Total bytes written to the output (may differ from `bytes_processed`
381    /// when replacements have different lengths than the originals).
382    pub bytes_output: u64,
383    /// Total number of matches found across all patterns.
384    pub matches_found: u64,
385    /// Total number of replacements applied (always == `matches_found`
386    /// in one-way mode).
387    pub replacements_applied: u64,
388    /// Per-pattern match counts, keyed by pattern label.
389    pub pattern_counts: HashMap<String, u64>,
390}
391
392/// Progress snapshot emitted during streaming scans.
393#[derive(Debug, Clone, Default, Eq, PartialEq)]
394pub struct ScanProgress {
395    /// Total bytes read from the input so far.
396    pub bytes_processed: u64,
397    /// Total bytes written to the output so far.
398    pub bytes_output: u64,
399    /// Total input size when known.
400    pub total_bytes: Option<u64>,
401    /// Total number of matches found so far.
402    pub matches_found: u64,
403    /// Total replacements applied so far.
404    pub replacements_applied: u64,
405}
406
407// ---------------------------------------------------------------------------
408// StreamScanner
409// ---------------------------------------------------------------------------
410
411/// Streaming scanner that detects and replaces sensitive patterns.
412///
413/// Thread-safe: can be shared via `Arc<StreamScanner>` for concurrent
414/// scanning of multiple files. Each call to [`scan_reader`](Self::scan_reader)
415/// is independent and maintains its own chunking state.
416///
417/// # Usage
418///
419/// ```rust
420/// use sanitize_engine::scanner::{StreamScanner, ScanPattern, ScanConfig};
421/// use sanitize_engine::category::Category;
422/// use sanitize_engine::generator::HmacGenerator;
423/// use sanitize_engine::store::MappingStore;
424/// use std::sync::Arc;
425///
426/// // 1. Build the replacement store.
427/// let gen = Arc::new(HmacGenerator::new([42u8; 32]));
428/// let store = Arc::new(MappingStore::new(gen, None));
429///
430/// // 2. Define patterns.
431/// let patterns = vec![
432///     ScanPattern::from_regex(
433///         r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
434///         Category::Email,
435///         "email",
436///     ).unwrap(),
437/// ];
438///
439/// // 3. Create the scanner.
440/// let scanner = StreamScanner::new(patterns, store, ScanConfig::default()).unwrap();
441///
442/// // 4. Scan.
443/// let input = b"Contact alice@corp.com for details.";
444/// let (output, stats) = scanner.scan_bytes(input).unwrap();
445/// assert_eq!(stats.matches_found, 1);
446/// assert!(!output.windows(b"alice@corp.com".len())
447///     .any(|w| w == b"alice@corp.com"));
448/// ```
449pub struct StreamScanner {
450    /// Compiled scan patterns (both literal and regex).
451    patterns: Vec<ScanPattern>,
452    /// Pre-compiled set for fast multi-pattern pre-filtering of **regex**
453    /// (non-literal) patterns only.  `matches()` returns which regex-pattern
454    /// indices matched, avoiding running every individual regex on each chunk
455    /// (R-3 optimisation).
456    regex_set: RegexSet,
457    /// Maps a `RegexSet` index → index into `self.patterns`.
458    /// Only non-literal patterns are in the `RegexSet`.
459    regex_indices: Vec<usize>,
460    /// Aho-Corasick automaton for fast SIMD literal matching.
461    /// `None` when there are no literal patterns.
462    aho_corasick: Option<AhoCorasick>,
463    /// Maps an Aho-Corasick pattern index → index into `self.patterns`.
464    /// Only literal patterns appear here.
465    literal_indices: Vec<usize>,
466    /// Thread-safe dedup replacement store.
467    store: Arc<MappingStore>,
468    /// Scanner configuration.
469    config: ScanConfig,
470}
471
472impl StreamScanner {
473    /// Create a new streaming scanner.
474    ///
475    /// # Arguments
476    ///
477    /// - `patterns` — the set of patterns to scan for.
478    /// - `store` — the mapping store for dedup-consistent replacements.
479    /// - `config` — chunking / overlap configuration.
480    ///
481    /// # Errors
482    ///
483    /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
484    /// invalid (e.g. `chunk_size == 0` or `overlap_size >= chunk_size`).
485    pub fn new(
486        patterns: Vec<ScanPattern>,
487        store: Arc<MappingStore>,
488        config: ScanConfig,
489    ) -> Result<Self> {
490        Self::new_with_max_patterns(patterns, store, config, DEFAULT_MAX_PATTERNS)
491    }
492
493    /// Create a new streaming scanner with a custom pattern limit.
494    ///
495    /// This is identical to [`new`](Self::new) but allows overriding the
496    /// default pattern cap (10 000).  Use this
497    /// when you have a legitimate need for more patterns and have
498    /// verified that your system has enough memory for the resulting
499    /// `RegexSet`.
500    ///
501    /// # Errors
502    ///
503    /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
504    /// invalid or the pattern count exceeds `max_patterns`.
505    pub fn new_with_max_patterns(
506        patterns: Vec<ScanPattern>,
507        store: Arc<MappingStore>,
508        config: ScanConfig,
509        max_patterns: usize,
510    ) -> Result<Self> {
511        config.validate()?;
512
513        // F-05 fix: enforce maximum pattern count to bound RegexSet memory.
514        if patterns.len() > max_patterns {
515            return Err(SanitizeError::InvalidConfig(format!(
516                "pattern count ({}) exceeds maximum allowed ({}) — \
517                 RegexSet memory scales linearly with pattern count",
518                patterns.len(),
519                max_patterns
520            )));
521        }
522
523        // Partition patterns into literal (Aho-Corasick) and regex (RegexSet)
524        // so each is matched by the most efficient engine.
525        let mut literal_bytes: Vec<Vec<u8>> = Vec::new();
526        let mut literal_indices: Vec<usize> = Vec::new();
527        let mut regex_strs: Vec<&str> = Vec::new();
528        let mut regex_indices: Vec<usize> = Vec::new();
529
530        for (i, pattern) in patterns.iter().enumerate() {
531            if let Some(lit) = &pattern.literal {
532                literal_bytes.push(lit.as_bytes().to_vec());
533                literal_indices.push(i);
534            } else {
535                regex_strs.push(pattern.regex_pattern());
536                regex_indices.push(i);
537            }
538        }
539
540        // Build Aho-Corasick automaton for literal patterns (SIMD-accelerated,
541        // single O(n) pass over the input per chunk).
542        let aho_corasick = if literal_bytes.is_empty() {
543            None
544        } else {
545            Some(
546                AhoCorasick::new(&literal_bytes)
547                    .map_err(compile_err)?,
548            )
549        };
550
551        // Build RegexSet from non-literal patterns only (R-3 pre-filter).
552        let regex_set = if regex_strs.is_empty() {
553            RegexSetBuilder::new(Vec::<&str>::new())
554                .size_limit(REGEX_SIZE_LIMIT)
555                .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
556                .build()
557                .map_err(compile_err)?
558        } else {
559            RegexSetBuilder::new(&regex_strs)
560                .size_limit(REGEX_SIZE_LIMIT * regex_strs.len().max(1))
561                .dfa_size_limit(REGEX_DFA_SIZE_LIMIT * regex_strs.len().max(1))
562                .build()
563                .map_err(compile_err)?
564        };
565
566        Ok(Self {
567            patterns,
568            regex_set,
569            regex_indices,
570            aho_corasick,
571            literal_indices,
572            store,
573            config,
574        })
575    }
576
577    /// Create a copy of this scanner extended with additional literal patterns.
578    ///
579    /// Clones the existing pattern set and appends `extra`, then rebuilds
580    /// the internal Aho-Corasick and RegexSet automata. Used by the
581    /// format-preserving structured pass to scan original bytes with
582    /// discovered field-value literals added to the base pattern set.
583    ///
584    /// # Errors
585    ///
586    /// Returns [`SanitizeError`] if automaton construction fails or the
587    /// combined pattern count exceeds the default limit.
588    pub fn with_extra_literals(&self, extra: Vec<ScanPattern>) -> Result<Self> {
589        let mut patterns = self.patterns.clone();
590        patterns.extend(extra);
591        Self::new(patterns, Arc::clone(&self.store), self.config.clone())
592    }
593
594    /// Scan a reader and write sanitized output to a writer.
595    ///
596    /// Processes the input in chunks of `config.chunk_size` bytes,
597    /// maintaining an overlap window of `config.overlap_size` bytes to
598    /// catch matches spanning chunk boundaries. All detected matches
599    /// are replaced one-way via the [`MappingStore`].
600    ///
601    /// # Arguments
602    ///
603    /// - `reader` — input source (file, network stream, `&[u8]`, …).
604    /// - `writer` — output sink (file, `Vec<u8>`, …).
605    ///
606    /// # Returns
607    ///
608    /// [`ScanStats`] with counters for bytes processed, matches found, etc.
609    ///
610    /// # Errors
611    ///
612    /// Returns [`SanitizeError`] on I/O failures or if a replacement
613    /// cannot be generated (e.g. store capacity exceeded).
614    pub fn scan_reader<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ScanStats> {
615        self.scan_reader_with_progress(reader, writer, None, |_| {})
616    }
617
618    /// Scan a reader and emit progress snapshots after each committed chunk.
619    ///
620    /// `total_bytes` should be provided when the caller knows the full input
621    /// size. When omitted, progress consumers should avoid percentages/ETA.
622    ///
623    /// # Errors
624    ///
625    /// Returns [`SanitizeError`] on I/O failures or if a replacement
626    /// cannot be generated (e.g. store capacity exceeded).
627    pub fn scan_reader_with_progress<R: Read, W: Write, F>(
628        &self,
629        mut reader: R,
630        mut writer: W,
631        total_bytes: Option<u64>,
632        mut on_progress: F,
633    ) -> Result<ScanStats>
634    where
635        F: FnMut(&ScanProgress),
636    {
637        let mut stats = ScanStats::default();
638
639        // Carry buffer: the tail of the previous window that needs
640        // to be re-scanned with the next chunk.
641        let mut carry: Vec<u8> = Vec::new();
642
643        // Read buffer (reused across iterations to avoid re-allocation).
644        let mut read_buf = vec![0u8; self.config.chunk_size];
645
646        // Scan window (reused across iterations — grows to peak size then
647        // stays there, avoiding per-chunk allocation).
648        let mut window: Vec<u8> =
649            Vec::with_capacity(self.config.chunk_size + self.config.overlap_size);
650
651        // Scratch buffers reused every chunk to eliminate per-chunk heap
652        // pressure from match collection, output building, and stats tracking.
653        let mut scratch = ScanScratch::new(
654            self.patterns.len(),
655            self.config.chunk_size,
656            self.config.overlap_size,
657        );
658
659        loop {
660            // Read the next chunk.
661            let bytes_read = read_fully(&mut reader, &mut read_buf)?;
662            let is_eof = bytes_read < read_buf.len();
663
664            // Track only genuinely new bytes (carry was already counted).
665            stats.bytes_processed += bytes_read as u64;
666
667            if bytes_read == 0 && carry.is_empty() {
668                break;
669            }
670
671            // Build the scan window: carry ++ new_data.
672            // Reuse the window buffer to avoid per-chunk allocation.
673            let new_data = &read_buf[..bytes_read];
674            window.clear();
675            window.extend_from_slice(&carry);
676            window.extend_from_slice(new_data);
677
678            if window.is_empty() {
679                break;
680            }
681
682            // Find all non-overlapping matches in the window (fills scratch.selected).
683            self.find_matches(&window, &mut scratch);
684
685            // Determine the commit point — how much of the window we can
686            // safely emit this iteration.
687            let base_commit = if is_eof {
688                window.len()
689            } else {
690                window.len().saturating_sub(self.config.overlap_size)
691            };
692
693            let commit_point =
694                self.adjusted_commit_point(&scratch.selected, base_commit, window.len(), is_eof);
695
696            // Build output into scratch.output and update stats counters.
697            // Matches beyond commit_point are filtered inside apply_replacements.
698            self.apply_replacements(
699                &window[..commit_point],
700                &scratch.selected,
701                &mut stats,
702                &mut scratch.output,
703                &mut scratch.pattern_counts,
704            )?;
705
706            writer
707                .write_all(&scratch.output)
708                .map_err(|e| SanitizeError::IoError(e.to_string()))?;
709            stats.bytes_output += scratch.output.len() as u64;
710
711            // Fold per-chunk pattern counts into stats.
712            // label.clone() is called at most once per distinct pattern per
713            // chunk (not once per match hit), which is far cheaper at scale.
714            for (idx, count) in scratch.pattern_counts.iter_mut().enumerate() {
715                if *count > 0 {
716                    *stats
717                        .pattern_counts
718                        .entry(self.patterns[idx].label.clone())
719                        .or_insert(0) += *count;
720                    *count = 0; // reset for next chunk
721                }
722            }
723
724            on_progress(&ScanProgress {
725                bytes_processed: stats.bytes_processed,
726                bytes_output: stats.bytes_output,
727                total_bytes,
728                matches_found: stats.matches_found,
729                replacements_applied: stats.replacements_applied,
730            });
731
732            // Update carry for next iteration. Reuse the carry buffer
733            // by copying remaining bytes down.
734            if is_eof {
735                carry.clear();
736                break;
737            }
738            carry.clear();
739            carry.extend_from_slice(&window[commit_point..]);
740        }
741
742        Ok(stats)
743    }
744
745    /// Convenience: scan byte slice in-memory and return sanitized output.
746    ///
747    /// Equivalent to `scan_reader(input, Vec::new())` but returns the
748    /// output buffer directly.
749    ///
750    /// # Errors
751    ///
752    /// Returns [`SanitizeError`] if a replacement cannot be generated
753    /// (e.g. store capacity exceeded).
754    pub fn scan_bytes(&self, input: &[u8]) -> Result<(Vec<u8>, ScanStats)> {
755        self.scan_bytes_with_progress(input, |_| {})
756    }
757
758    /// Scan a byte slice in memory and emit progress snapshots.
759    ///
760    /// # Errors
761    ///
762    /// Returns [`SanitizeError`] if a replacement cannot be generated
763    /// (e.g. store capacity exceeded).
764    pub fn scan_bytes_with_progress<F>(
765        &self,
766        input: &[u8],
767        on_progress: F,
768    ) -> Result<(Vec<u8>, ScanStats)>
769    where
770        F: FnMut(&ScanProgress),
771    {
772        let mut output = Vec::with_capacity(input.len());
773        let stats = self.scan_reader_with_progress(
774            input,
775            &mut output,
776            Some(input.len() as u64),
777            on_progress,
778        )?;
779        Ok((output, stats))
780    }
781
782    // ---- Accessors ----
783
784    /// Access the scanner's configuration.
785    #[must_use]
786    pub fn config(&self) -> &ScanConfig {
787        &self.config
788    }
789
790    /// Access the underlying mapping store.
791    #[must_use]
792    pub fn store(&self) -> &Arc<MappingStore> {
793        &self.store
794    }
795
796    /// Number of patterns registered in this scanner.
797    #[must_use]
798    pub fn pattern_count(&self) -> usize {
799        self.patterns.len()
800    }
801
802    /// Create a scanner from an encrypted secrets file.
803    ///
804    /// Decrypts the file in memory, parses the entries, compiles
805    /// patterns, and returns the scanner ready to scan. Decrypted
806    /// plaintext is scrubbed from memory after parsing.
807    ///
808    /// # Arguments
809    ///
810    /// - `encrypted_bytes` — raw bytes of the `.enc` file.
811    /// - `password` — user password.
812    /// - `format` — optional format override for the plaintext.
813    /// - `store` — mapping store for dedup-consistent replacements.
814    /// - `config` — chunking / overlap configuration.
815    /// - `extra_patterns` — additional patterns to merge in.
816    ///
817    /// # Returns
818    ///
819    /// `(scanner, warnings)` where `warnings` lists entries that
820    /// failed to compile (index + error).
821    ///
822    /// # Errors
823    ///
824    /// Returns [`SanitizeError::SecretsError`] on decryption failure
825    /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
826    pub fn from_encrypted_secrets(
827        encrypted_bytes: &[u8],
828        password: &str,
829        format: Option<crate::secrets::SecretsFormat>,
830        store: Arc<MappingStore>,
831        config: ScanConfig,
832        extra_patterns: Vec<ScanPattern>,
833    ) -> Result<(Self, Vec<(usize, SanitizeError)>)> {
834        let (mut patterns, warnings) =
835            crate::secrets::load_encrypted_secrets(encrypted_bytes, password, format)?;
836        patterns.extend(extra_patterns);
837        let scanner = Self::new(patterns, store, config)?;
838        Ok((scanner, warnings))
839    }
840
841    /// Create a scanner from a plaintext secrets file.
842    ///
843    /// Convenience for development / testing without encryption.
844    ///
845    /// # Errors
846    ///
847    /// Returns [`SanitizeError::SecretsError`] on parse failure
848    /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
849    pub fn from_plaintext_secrets(
850        plaintext: &[u8],
851        format: Option<crate::secrets::SecretsFormat>,
852        store: Arc<MappingStore>,
853        config: ScanConfig,
854        extra_patterns: Vec<ScanPattern>,
855    ) -> Result<(Self, Vec<(usize, SanitizeError)>)> {
856        let (mut patterns, warnings) = crate::secrets::load_plaintext_secrets(plaintext, format)?;
857        patterns.extend(extra_patterns);
858        let scanner = Self::new(patterns, store, config)?;
859        Ok((scanner, warnings))
860    }
861
862    // ---- Internal helpers ----
863
864    /// Find all non-overlapping matches across all patterns.
865    ///
866    /// Fills `scratch.selected` with the winning non-overlapping matches
867    /// for the given `window`.  All three scratch `Vec`s are cleared and
868    /// repopulated on each call so callers can freely reuse the same
869    /// `ScanScratch` instance across chunks.
870    ///
871    /// ## Strategy
872    ///
873    /// 1. **Aho-Corasick** (`aho_corasick`): single O(n) SIMD pass over the
874    ///    window reporting every occurrence of every literal pattern,
875    ///    including overlapping ones.  This replaces O(k·n) individual regex
876    ///    scans for the literal subset.
877    /// 2. **RegexSet pre-filter** (R-3 optimisation): fast check of which
878    ///    *non-literal* regex patterns have any match in the window.
879    /// 3. **Individual regex `find_iter`**: only for regex patterns flagged
880    ///    by step 2.
881    /// 4. **Sort + greedy dedup**: all raw matches are sorted by start
882    ///    (ascending), then length (descending), and a single greedy pass
883    ///    selects the final non-overlapping set.
884    fn find_matches(&self, window: &[u8], scratch: &mut ScanScratch) {
885        scratch.all_matches.clear();
886        scratch.selected.clear();
887
888        // Step 1: Aho-Corasick overlapping scan for all literal patterns.
889        // find_overlapping_iter reports every match position including
890        // overlapping ones, so the sort+greedy step below correctly resolves
891        // ambiguities between literals (e.g. "abc" vs "abcd" at same offset).
892        if let Some(ac) = &self.aho_corasick {
893            for mat in ac.find_overlapping_iter(window) {
894                scratch.all_matches.push(RawMatch {
895                    start: mat.start(),
896                    end: mat.end(),
897                    pattern_idx: self.literal_indices[mat.pattern().as_usize()],
898                });
899            }
900        }
901
902        // Steps 2+3: RegexSet pre-filter then individual scan for non-literal
903        // patterns.  regex_set only contains non-literal pattern strings, so
904        // literals are never scanned twice.
905        for rs_idx in self.regex_set.matches(window) {
906            let pattern_idx = self.regex_indices[rs_idx];
907            for m in self.patterns[pattern_idx].regex.find_iter(window) {
908                scratch.all_matches.push(RawMatch {
909                    start: m.start(),
910                    end: m.end(),
911                    pattern_idx,
912                });
913            }
914        }
915
916        // Step 4: sort then greedy non-overlapping selection.
917        // Skip entirely when no matches were found (the common case for
918        // clean data), avoiding an unnecessary sort of an empty Vec.
919        if scratch.all_matches.is_empty() {
920            return;
921        }
922
923        // Primary: start ascending. Secondary: length descending (longer
924        // match wins when two matches begin at the same position).
925        scratch.all_matches.sort_unstable_by(|a, b| {
926            a.start
927                .cmp(&b.start)
928                .then_with(|| (b.end - b.start).cmp(&(a.end - a.start)))
929        });
930
931        let mut last_end = 0;
932        for m in scratch.all_matches.drain(..) {
933            if m.start >= last_end {
934                last_end = m.end;
935                scratch.selected.push(m);
936            }
937        }
938    }
939
940    /// Adjust the commit point to avoid splitting a match across the
941    /// commit / carry boundary.
942    ///
943    /// If any match straddles `base_commit` (starts before, ends after),
944    /// the commit point is moved to after that match so it is emitted
945    /// in full this iteration.
946    #[allow(clippy::unused_self)] // keep &self for API consistency with other scanner methods
947    fn adjusted_commit_point(
948        &self,
949        matches: &[RawMatch],
950        base_commit: usize,
951        window_len: usize,
952        is_eof: bool,
953    ) -> usize {
954        if is_eof {
955            return window_len;
956        }
957
958        let mut commit = base_commit;
959
960        for m in matches {
961            if m.start < commit && m.end > commit {
962                // Match straddles the boundary — extend commit to include it.
963                commit = m.end;
964            }
965        }
966
967        // Never exceed window length.
968        commit.min(window_len)
969    }
970
971    /// Build the output for the committed region by splicing in replacements.
972    ///
973    /// Writes into `output_buf` (cleared on entry) and increments
974    /// `stats.matches_found` / `stats.replacements_applied` for each applied
975    /// replacement.  Per-pattern hit counts are written to `pattern_counts`
976    /// (indexed by `pattern_idx`); the caller is responsible for folding
977    /// these into `ScanStats::pattern_counts` and resetting them.
978    ///
979    /// `matches` is the full selected set for the window (may include matches
980    /// in the carry region beyond `committed`).  Because `adjusted_commit_point`
981    /// guarantees no match straddles the boundary, any match with
982    /// `start < committed.len()` also has `end <= committed.len()`.  The
983    /// loop breaks early once `m.start >= committed.len()` since matches are
984    /// sorted by start.
985    ///
986    /// # Note on `from_utf8_lossy`
987    ///
988    /// `String::from_utf8_lossy` returns `Cow::Borrowed(&str)` for valid
989    /// UTF-8 input (the common case for ASCII secrets) — no heap allocation
990    /// on the hot path.
991    fn apply_replacements(
992        &self,
993        committed: &[u8],
994        matches: &[RawMatch],
995        stats: &mut ScanStats,
996        output_buf: &mut Vec<u8>,
997        pattern_counts: &mut [u64],
998    ) -> Result<()> {
999        output_buf.clear();
1000
1001        let mut last_end = 0;
1002
1003        for &m in matches {
1004            // Matches are sorted by start; those at or beyond the committed
1005            // region belong to the carry window — stop here.
1006            if m.start >= committed.len() {
1007                break;
1008            }
1009
1010            // Emit bytes before this match verbatim.
1011            output_buf.extend_from_slice(&committed[last_end..m.start]);
1012
1013            // Decode matched bytes.  from_utf8_lossy is zero-copy (Cow::Borrowed)
1014            // for valid UTF-8, which covers all ASCII secrets.
1015            let matched_text = String::from_utf8_lossy(&committed[m.start..m.end]);
1016
1017            // One-way deterministic replacement via the MappingStore.
1018            let pattern = &self.patterns[m.pattern_idx];
1019            let replacement = self.store.get_or_insert(&pattern.category, &matched_text)?;
1020
1021            output_buf.extend_from_slice(replacement.as_bytes());
1022            last_end = m.end;
1023
1024            stats.matches_found += 1;
1025            stats.replacements_applied += 1;
1026            pattern_counts[m.pattern_idx] += 1;
1027        }
1028
1029        // Emit the trailing non-matching tail.
1030        output_buf.extend_from_slice(&committed[last_end..]);
1031
1032        Ok(())
1033    }
1034}
1035
1036// ---------------------------------------------------------------------------
1037// Send + Sync compile-time assertion
1038// ---------------------------------------------------------------------------
1039
1040const _: fn() = || {
1041    fn assert_send<T: Send>() {}
1042    fn assert_sync<T: Sync>() {}
1043    assert_send::<StreamScanner>();
1044    assert_sync::<StreamScanner>();
1045};
1046
1047// ---------------------------------------------------------------------------
1048// I/O helper
1049// ---------------------------------------------------------------------------
1050
1051/// Read up to `buf.len()` bytes from `reader`, retrying on `Interrupted`.
1052///
1053/// Returns the number of bytes actually read (< `buf.len()` only at EOF).
1054fn read_fully<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
1055    let mut total = 0;
1056    while total < buf.len() {
1057        match reader.read(&mut buf[total..]) {
1058            Ok(0) => break, // EOF
1059            Ok(n) => total += n,
1060            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
1061            Err(e) => return Err(SanitizeError::IoError(e.to_string())),
1062        }
1063    }
1064    Ok(total)
1065}
1066
1067// ---------------------------------------------------------------------------
1068// Unit tests
1069// ---------------------------------------------------------------------------
1070
1071#[cfg(test)]
1072mod tests {
1073    use super::*;
1074    use crate::generator::HmacGenerator;
1075
1076    /// Helper: build a scanner with given patterns and small chunk config.
1077    fn test_scanner(patterns: Vec<ScanPattern>) -> StreamScanner {
1078        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1079        let store = Arc::new(MappingStore::new(gen, None));
1080        StreamScanner::new(
1081            patterns,
1082            store,
1083            ScanConfig {
1084                chunk_size: 64,
1085                overlap_size: 16,
1086            },
1087        )
1088        .unwrap()
1089    }
1090
1091    /// Helper: email pattern.
1092    fn email_pattern() -> ScanPattern {
1093        ScanPattern::from_regex(
1094            r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
1095            Category::Email,
1096            "email",
1097        )
1098        .unwrap()
1099    }
1100
1101    /// Helper: IPv4 pattern.
1102    fn ipv4_pattern() -> ScanPattern {
1103        ScanPattern::from_regex(
1104            r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
1105            Category::IpV4,
1106            "ipv4",
1107        )
1108        .unwrap()
1109    }
1110
1111    // ---- Construction ----
1112
1113    #[test]
1114    fn scanner_creation() {
1115        let scanner = test_scanner(vec![email_pattern()]);
1116        assert_eq!(scanner.pattern_count(), 1);
1117    }
1118
1119    #[test]
1120    fn invalid_config_zero_chunk() {
1121        let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1122        let store = Arc::new(MappingStore::new(gen, None));
1123        let result = StreamScanner::new(vec![], store, ScanConfig::new(0, 0));
1124        assert!(result.is_err());
1125    }
1126
1127    #[test]
1128    fn invalid_config_overlap_ge_chunk() {
1129        let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1130        let store = Arc::new(MappingStore::new(gen, None));
1131        let result = StreamScanner::new(vec![], store, ScanConfig::new(100, 100));
1132        assert!(result.is_err());
1133    }
1134
1135    // ---- Empty / no-match cases ----
1136
1137    #[test]
1138    fn empty_input() {
1139        let scanner = test_scanner(vec![email_pattern()]);
1140        let (output, stats) = scanner.scan_bytes(b"").unwrap();
1141        assert!(output.is_empty());
1142        assert_eq!(stats.matches_found, 0);
1143        assert_eq!(stats.bytes_processed, 0);
1144    }
1145
1146    #[test]
1147    fn no_matches() {
1148        let scanner = test_scanner(vec![email_pattern()]);
1149        let input = b"There are no email addresses here.";
1150        let (output, stats) = scanner.scan_bytes(input).unwrap();
1151        assert_eq!(output, input.as_slice());
1152        assert_eq!(stats.matches_found, 0);
1153    }
1154
1155    // ---- Single match ----
1156
1157    #[test]
1158    fn single_email_replaced() {
1159        let scanner = test_scanner(vec![email_pattern()]);
1160        let input = b"Contact alice@corp.com for help.";
1161        let (output, stats) = scanner.scan_bytes(input).unwrap();
1162        assert_eq!(stats.matches_found, 1);
1163        assert_eq!(stats.replacements_applied, 1);
1164        // Original must not appear in output.
1165        assert!(!output
1166            .windows(b"alice@corp.com".len())
1167            .any(|w| w == b"alice@corp.com"));
1168        // Replacement should contain the @ from the domain-preserving email.
1169        let output_str = String::from_utf8_lossy(&output);
1170        assert!(output_str.contains("@corp.com"));
1171        // Length preserved: output is same total length as input.
1172        assert_eq!(output.len(), input.len(), "length must be preserved");
1173        // Surrounding text preserved.
1174        assert!(output_str.starts_with("Contact "));
1175        assert!(output_str.ends_with(" for help."));
1176    }
1177
1178    // ---- Multiple matches ----
1179
1180    #[test]
1181    fn multiple_emails_replaced() {
1182        let scanner = test_scanner(vec![email_pattern()]);
1183        let input = b"From alice@corp.com to bob@corp.com cc admin@corp.com";
1184        let (output, stats) = scanner.scan_bytes(input).unwrap();
1185        assert_eq!(stats.matches_found, 3);
1186        let out_str = String::from_utf8_lossy(&output);
1187        assert!(!out_str.contains("alice@corp.com"));
1188        assert!(!out_str.contains("bob@corp.com"));
1189        assert!(!out_str.contains("admin@corp.com"));
1190    }
1191
1192    // ---- Same secret gets same replacement ----
1193
1194    #[test]
1195    fn same_secret_same_replacement() {
1196        let scanner = test_scanner(vec![email_pattern()]);
1197        let input = b"First alice@corp.com then alice@corp.com again.";
1198        let (output, stats) = scanner.scan_bytes(input).unwrap();
1199        assert_eq!(stats.matches_found, 2);
1200        let out_str = String::from_utf8_lossy(&output);
1201        // Both occurrences should be replaced with the same value.
1202        // With length-preserving replacements, look for the preserved domain.
1203        let parts: Vec<&str> = out_str.split("@corp.com").collect();
1204        // 3 parts = 2 occurrences of the replacement.
1205        assert_eq!(parts.len(), 3);
1206    }
1207
1208    // ---- Literal pattern ----
1209
1210    #[test]
1211    fn literal_pattern_matched() {
1212        let pat = ScanPattern::from_literal(
1213            "SECRET_API_KEY_12345",
1214            Category::Custom("api_key".into()),
1215            "api_key",
1216        )
1217        .unwrap();
1218        let scanner = test_scanner(vec![pat]);
1219        let input = b"key=SECRET_API_KEY_12345&foo=bar";
1220        let (output, stats) = scanner.scan_bytes(input).unwrap();
1221        assert_eq!(stats.matches_found, 1);
1222        assert!(!output
1223            .windows(b"SECRET_API_KEY_12345".len())
1224            .any(|w| w == b"SECRET_API_KEY_12345"));
1225    }
1226
1227    // ---- Multiple pattern types ----
1228
1229    #[test]
1230    fn multiple_pattern_types() {
1231        let scanner = test_scanner(vec![email_pattern(), ipv4_pattern()]);
1232        let input = b"Server 192.168.1.100 contact admin@server.com";
1233        let (output, stats) = scanner.scan_bytes(input).unwrap();
1234        assert_eq!(stats.matches_found, 2);
1235        let out_str = String::from_utf8_lossy(&output);
1236        assert!(!out_str.contains("192.168.1.100"));
1237        assert!(!out_str.contains("admin@server.com"));
1238        assert_eq!(*stats.pattern_counts.get("email").unwrap(), 1);
1239        assert_eq!(*stats.pattern_counts.get("ipv4").unwrap(), 1);
1240    }
1241
1242    // ---- Chunk boundary: match spans two chunks ----
1243
1244    #[test]
1245    fn match_at_chunk_boundary() {
1246        // Use a very small chunk size so the email straddles a boundary.
1247        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1248        let store = Arc::new(MappingStore::new(gen, None));
1249        let scanner = StreamScanner::new(
1250            vec![email_pattern()],
1251            store,
1252            ScanConfig {
1253                chunk_size: 20, // very small
1254                overlap_size: 16,
1255            },
1256        )
1257        .unwrap();
1258
1259        // Place an email address that will definitely straddle a boundary.
1260        let input = b"AAAAAAAAAAAAAAAA alice@corp.com BBBBBBBBBBBBB";
1261        let (output, stats) = scanner.scan_bytes(input).unwrap();
1262        assert_eq!(stats.matches_found, 1);
1263        let out_str = String::from_utf8_lossy(&output);
1264        assert!(!out_str.contains("alice@corp.com"));
1265        assert!(out_str.contains("@corp.com"), "domain must be preserved");
1266    }
1267
1268    // ---- Large input requiring many chunks ----
1269
1270    #[test]
1271    fn large_input_many_chunks() {
1272        let scanner = test_scanner(vec![email_pattern()]);
1273
1274        // Build a ~2 KiB input with emails sprinkled in.
1275        let mut input = Vec::new();
1276        let filler = b"Lorem ipsum dolor sit amet. ";
1277        for i in 0..20 {
1278            input.extend_from_slice(filler);
1279            let email = format!("user{}@example.com ", i);
1280            input.extend_from_slice(email.as_bytes());
1281        }
1282
1283        let (output, stats) = scanner.scan_bytes(&input).unwrap();
1284        assert_eq!(stats.matches_found, 20);
1285        let out_str = String::from_utf8_lossy(&output);
1286        for i in 0..20 {
1287            let email = format!("user{}@example.com", i);
1288            assert!(!out_str.contains(&email));
1289        }
1290    }
1291
1292    #[test]
1293    fn scan_bytes_with_progress_preserves_output_and_stats() {
1294        let scanner = test_scanner(vec![email_pattern()]);
1295        let input = b"Contact alice@corp.com and bob@corp.com for help.";
1296
1297        let (baseline_output, baseline_stats) = scanner.scan_bytes(input).unwrap();
1298
1299        let mut updates = Vec::new();
1300        let (progress_output, progress_stats) = scanner
1301            .scan_bytes_with_progress(input, |progress| updates.push(progress.clone()))
1302            .unwrap();
1303
1304        assert_eq!(progress_output, baseline_output);
1305        assert_eq!(
1306            progress_stats.bytes_processed,
1307            baseline_stats.bytes_processed
1308        );
1309        assert_eq!(progress_stats.bytes_output, baseline_stats.bytes_output);
1310        assert_eq!(progress_stats.matches_found, baseline_stats.matches_found);
1311        assert_eq!(
1312            progress_stats.replacements_applied,
1313            baseline_stats.replacements_applied
1314        );
1315        assert!(!updates.is_empty());
1316        assert_eq!(updates.last().unwrap().bytes_processed, input.len() as u64);
1317        assert_eq!(
1318            updates.last().unwrap().total_bytes,
1319            Some(input.len() as u64)
1320        );
1321        assert_eq!(updates.last().unwrap().matches_found, 2);
1322    }
1323
1324    #[test]
1325    fn scan_reader_with_progress_reports_multiple_updates_for_multi_chunk_input() {
1326        let scanner = test_scanner(vec![email_pattern()]);
1327        let mut input = Vec::new();
1328        for i in 0..8 {
1329            input.extend_from_slice(b"padding padding padding ");
1330            input.extend_from_slice(format!("user{i}@example.com ").as_bytes());
1331        }
1332
1333        let mut output = Vec::new();
1334        let mut updates = Vec::new();
1335        let stats = scanner
1336            .scan_reader_with_progress(
1337                &input[..],
1338                &mut output,
1339                Some(input.len() as u64),
1340                |progress| {
1341                    updates.push(progress.clone());
1342                },
1343            )
1344            .unwrap();
1345
1346        assert!(updates.len() >= 2);
1347        assert_eq!(
1348            updates.last().unwrap().bytes_processed,
1349            stats.bytes_processed
1350        );
1351        assert_eq!(updates.last().unwrap().bytes_output, stats.bytes_output);
1352        assert_eq!(
1353            updates.last().unwrap().total_bytes,
1354            Some(input.len() as u64)
1355        );
1356    }
1357
1358    // ---- Scan via Read/Write interface ----
1359
1360    #[test]
1361    fn scan_reader_writer() {
1362        let scanner = test_scanner(vec![email_pattern()]);
1363        let input = b"hello alice@corp.com world";
1364        let mut output = Vec::new();
1365        let stats = scanner.scan_reader(&input[..], &mut output).unwrap();
1366        assert_eq!(stats.matches_found, 1);
1367        let out_str = String::from_utf8_lossy(&output);
1368        assert!(out_str.contains("@corp.com"), "domain must be preserved");
1369    }
1370
1371    // ---- Pattern compile error ----
1372
1373    #[test]
1374    fn invalid_regex_pattern() {
1375        let result = ScanPattern::from_regex("[invalid(", Category::Email, "bad");
1376        assert!(result.is_err());
1377    }
1378
1379    // ---- Default config ----
1380
1381    #[test]
1382    fn default_config_valid() {
1383        ScanConfig::default().validate().unwrap();
1384    }
1385
1386    // ---- Config edge cases ----
1387
1388    #[test]
1389    fn config_chunk_1_overlap_0() {
1390        // Extreme but valid: 1-byte chunks, no overlap.
1391        // Won't catch multi-byte patterns, but should not crash.
1392        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1393        let store = Arc::new(MappingStore::new(gen, None));
1394        let scanner = StreamScanner::new(vec![], store, ScanConfig::new(1, 0)).unwrap();
1395        let (output, _) = scanner.scan_bytes(b"hello").unwrap();
1396        assert_eq!(output, b"hello");
1397    }
1398
1399    // ---- Bytes output tracking ----
1400
1401    #[test]
1402    fn bytes_output_preserved_on_replacement() {
1403        let scanner = test_scanner(vec![email_pattern()]);
1404        let input = b"a@b.cc"; // short email
1405        let (output, stats) = scanner.scan_bytes(input).unwrap();
1406        assert_eq!(stats.bytes_processed, input.len() as u64);
1407        assert_eq!(stats.bytes_output, output.len() as u64);
1408        // Length-preserving: output length matches input length.
1409        assert_eq!(output.len(), input.len());
1410    }
1411}