Skip to main content

sanitize_engine/
scanner.rs

1//! Streaming scanner for detecting and replacing sensitive data.
2//!
3//! # Architecture
4//!
5//! The streaming scanner processes input data in configurable chunks,
6//! detecting secret patterns (regex or literal) and applying one-way
7//! replacements via the [`MappingStore`].
8//! This design supports files of 20–100 GB+ without requiring the entire
9//! content to fit in memory.
10//!
11//! ```text
12//! ┌──────────────┐     ┌─────────────────┐     ┌──────────────────┐
13//! │  Input (Read) │ ──▶ │  StreamScanner  │ ──▶ │  Output (Write)  │
14//! │  (chunked)    │     │  (pattern match │     │  (sanitized)     │
15//! └──────────────┘     │   + replace)    │     └──────────────────┘
16//!                       └────────┬────────┘
17//!                                │
18//!                       ┌────────▼────────┐
19//!                       │  MappingStore   │
20//!                       │  (dedup cache)  │
21//!                       └─────────────────┘
22//! ```
23//!
24//! # Chunk Overlap Strategy
25//!
26//! To avoid missing matches that span chunk boundaries, the scanner
27//! maintains an overlap window between consecutive chunks:
28//!
29//! 1. Read `chunk_size` bytes of new data.
30//! 2. Prepend the `carry` buffer (tail of previous window).
31//! 3. Scan the combined `window` for all pattern matches.
32//! 4. Compute `commit_point = window.len() - overlap_size` (adjusted
33//!    upward if a match straddles the boundary).
34//! 5. Emit output for `window[..commit_point]` with replacements applied.
35//! 6. Set `carry = window[commit_point..]` for the next iteration.
36//!
37//! The `overlap_size` should be ≥ the maximum expected match length to
38//! guarantee no matches are missed at boundaries.
39//!
40//! # Thread Safety
41//!
42//! [`StreamScanner`] is `Send + Sync`. Multiple files can be scanned
43//! concurrently using a shared `Arc<StreamScanner>`, all backed by the
44//! same [`MappingStore`] for per-run dedup
45//! consistency.
46//!
47//! # Performance
48//!
49//! - **Chunk-based I/O**: only `chunk_size + overlap_size` bytes in
50//!   memory per active scan.
51//! - **Compiled regex**: patterns are compiled once at construction and
52//!   reused across all chunks and files.
53//! - **Lock-free reads**: the `DashMap` inside `MappingStore` provides
54//!   lock-free reads for already-seen values.
55//! - **File-level parallelism**: share `Arc<StreamScanner>` across
56//!   threads to scan multiple files concurrently.
57
58use crate::category::Category;
59use crate::error::{Result, SanitizeError};
60use crate::store::MappingStore;
61use aho_corasick::AhoCorasick;
62use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder};
63use serde::Serialize;
64use std::collections::HashMap;
65use std::io::{self, Read, Write};
66use std::sync::Arc;
67
68// ---------------------------------------------------------------------------
69// Configuration
70// ---------------------------------------------------------------------------
71
72/// Default chunk size: 1 MiB.
73const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
74
75/// Default overlap size: 4 KiB.
76const DEFAULT_OVERLAP_SIZE: usize = 4096;
77
78/// Maximum compiled regex automaton size (bytes). Prevents DoS via
79/// pathologically complex user-supplied patterns.
80const REGEX_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
81
82/// Maximum DFA cache size (bytes) per regex.
83const REGEX_DFA_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
84
85/// Hard ceiling on the combined RegexSet automaton budget.
86/// The per-pattern limit is multiplied by the pattern count so that a large
87/// pattern set can still compile, but without this cap a pathological secrets
88/// file with 10 000 patterns could claim up to ~20 GiB of automaton memory.
89const REGEX_SET_SIZE_CAP: usize = 256 * 1024 * 1024; // 256 MiB
90
91/// Maximum number of patterns allowed in a single scanner (F-05 fix).
92/// The `RegexSet` automaton memory scales linearly with pattern count.
93/// With 1 MiB size/DFA limits per pattern, 10 000 patterns could
94/// allocate up to ~20 GiB of automaton memory.  This cap prevents
95/// accidental resource exhaustion.  Override via
96/// [`StreamScanner::new_with_max_patterns`] if needed.
97const DEFAULT_MAX_PATTERNS: usize = 10_000;
98
99/// Label suffix that marks patterns as key-value-only.
100///
101/// Patterns whose label ends with this suffix are excluded from the streaming
102/// scanner pass (`for_structured_pass`) because the key-value processor
103/// resolves their values structurally and the scanner would produce spurious
104/// duplicate replacements on the surrounding syntax.
105pub const KV_LABEL_SUFFIX: &str = "_kv";
106
107/// Configuration for the streaming scanner.
108///
109/// # Tuning Guide
110///
111/// | Workload               | `chunk_size` | `overlap_size` |
112/// |------------------------|--------------|----------------|
113/// | Small files (< 10 MB)  | 256 KiB      | 1 KiB          |
114/// | General purpose        | 1 MiB        | 4 KiB          |
115/// | Large files (> 1 GB)   | 4–8 MiB      | 8 KiB          |
116/// | Memory-constrained     | 64 KiB       | 1 KiB          |
117///
118/// `overlap_size` should be ≥ the longest expected match. Most secret
119/// patterns (API keys, emails, SSNs) are well under 256 bytes, so the
120/// 4 KiB default provides ample margin.
121#[derive(Debug, Clone)]
122pub struct ScanConfig {
123    /// Size of each chunk read from the input (bytes).
124    ///
125    /// Larger chunks improve throughput (fewer syscalls) but use more
126    /// memory. Default: 1 MiB.
127    pub chunk_size: usize,
128
129    /// Overlap between consecutive chunks (bytes).
130    ///
131    /// Must be ≥ the maximum expected match length. Patterns whose
132    /// matches can exceed this length risk being missed at chunk
133    /// boundaries. Default: 4 KiB.
134    pub overlap_size: usize,
135}
136
137impl Default for ScanConfig {
138    fn default() -> Self {
139        Self {
140            chunk_size: DEFAULT_CHUNK_SIZE,
141            overlap_size: DEFAULT_OVERLAP_SIZE,
142        }
143    }
144}
145
146impl ScanConfig {
147    /// Create a new configuration with explicit values.
148    #[must_use]
149    pub fn new(chunk_size: usize, overlap_size: usize) -> Self {
150        Self {
151            chunk_size,
152            overlap_size,
153        }
154    }
155
156    /// Validate the configuration, returning an error if invalid.
157    ///
158    /// # Errors
159    ///
160    /// Returns [`SanitizeError::InvalidConfig`] if `chunk_size` is zero
161    /// or `overlap_size >= chunk_size`.
162    pub fn validate(&self) -> Result<()> {
163        if self.chunk_size == 0 {
164            return Err(SanitizeError::InvalidConfig(
165                "chunk_size must be > 0".into(),
166            ));
167        }
168        if self.overlap_size >= self.chunk_size {
169            return Err(SanitizeError::InvalidConfig(
170                "overlap_size must be < chunk_size".into(),
171            ));
172        }
173        Ok(())
174    }
175}
176
177// ---------------------------------------------------------------------------
178// Internal helpers
179// ---------------------------------------------------------------------------
180
181/// Convert any compile-time pattern error into [`SanitizeError::PatternCompileError`].
182#[inline]
183fn compile_err(e: impl std::fmt::Display) -> SanitizeError {
184    SanitizeError::PatternCompileError(e.to_string())
185}
186
187// ---------------------------------------------------------------------------
188// Scan pattern
189// ---------------------------------------------------------------------------
190
191/// A pattern rule defining what to scan for and how to categorize matches.
192///
193/// Wraps a compiled [`regex::bytes::Regex`] with a [`Category`] for
194/// replacement lookups and a human-readable label for reporting.
195///
196/// Both regex and literal patterns are supported. Literal patterns keep
197/// their original text and are matched by the scanner's Aho-Corasick
198/// automaton for fast multi-literal scanning.
199pub struct ScanPattern {
200    /// Compiled regex matcher (used for non-literal patterns and as a
201    /// fallback; literal patterns are matched via Aho-Corasick instead).
202    regex: Regex,
203    /// Category for replacement lookups.
204    category: Category,
205    /// Human-readable label for reporting / stats.
206    label: String,
207    /// Original (unescaped) literal string when created via `from_literal`.
208    /// `None` for patterns created via `from_regex`.
209    /// Stored so `StreamScanner` can build an Aho-Corasick automaton for
210    /// fast SIMD literal matching instead of running the regex engine.
211    literal: Option<String>,
212    /// Minimum window size (bytes) required to produce a match.
213    /// For literal patterns this equals the byte length of the literal itself.
214    /// For regex patterns this is `0` (no guaranteed minimum).
215    /// Used to skip `captures_iter` when the window is provably too short.
216    pub min_length: usize,
217}
218
219impl std::fmt::Debug for ScanPattern {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("ScanPattern")
222            .field("pattern", &self.regex.as_str())
223            .field("category", &self.category)
224            .field("label", &self.label)
225            .field("literal", &self.literal.as_deref())
226            .field("min_length", &self.min_length)
227            .finish()
228    }
229}
230
231impl Clone for ScanPattern {
232    fn clone(&self) -> Self {
233        Self {
234            regex: self.regex.clone(),
235            category: self.category.clone(),
236            label: self.label.clone(),
237            literal: self.literal.clone(),
238            min_length: self.min_length,
239        }
240    }
241}
242
243impl ScanPattern {
244    /// Create a pattern from a regex string.
245    ///
246    /// ## Capture group 1 — partial replacement
247    ///
248    /// If the regex contains a capture group 1 (`(...)`), only the bytes
249    /// matched by that group are replaced; the bytes before and after it
250    /// within the full match are emitted verbatim. This lets you write
251    /// context-anchored patterns without redacting the prefix/suffix:
252    ///
253    /// ```text
254    /// pattern: glpat-([A-Za-z0-9_-]{20})
255    ///           ^^^^^^ prefix preserved
256    ///                  ^^^^^^^^^^^^^^^^^^^^ group 1 → replaced
257    /// ```
258    ///
259    /// Patterns **without** a capture group replace the entire match.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`SanitizeError::PatternCompileError`] if the regex is invalid.
264    ///
265    /// # Examples
266    ///
267    /// ```
268    /// use sanitize_engine::scanner::ScanPattern;
269    /// use sanitize_engine::category::Category;
270    ///
271    /// // No capture group — full match replaced:
272    /// let email = ScanPattern::from_regex(
273    ///     r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
274    ///     Category::Email,
275    ///     "email_address",
276    /// ).unwrap();
277    ///
278    /// // Capture group 1 — prefix preserved, only the token value replaced:
279    /// let token = ScanPattern::from_regex(
280    ///     r"glpat-([A-Za-z0-9_-]{20})",
281    ///     Category::AuthToken,
282    ///     "gitlab_pat",
283    /// ).unwrap();
284    /// ```
285    pub fn from_regex(pattern: &str, category: Category, label: impl Into<String>) -> Result<Self> {
286        let regex = RegexBuilder::new(pattern)
287            .size_limit(REGEX_SIZE_LIMIT)
288            .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
289            .build()
290            .map_err(compile_err)?;
291        Ok(Self {
292            regex,
293            category,
294            label: label.into(),
295            literal: None,
296            min_length: 0,
297        })
298    }
299
300    /// Create a pattern from a literal string.
301    ///
302    /// The literal is escaped so that regex metacharacters are matched
303    /// verbatim.
304    ///
305    /// # Errors
306    ///
307    /// Returns [`SanitizeError::PatternCompileError`] if regex compilation fails.
308    ///
309    /// # Examples
310    ///
311    /// ```
312    /// use sanitize_engine::scanner::ScanPattern;
313    /// use sanitize_engine::category::Category;
314    ///
315    /// let pat = ScanPattern::from_literal(
316    ///     "sk-proj-abc123secret",
317    ///     Category::Custom("api_key".into()),
318    ///     "openai_key",
319    /// ).unwrap();
320    /// ```
321    pub fn from_literal(
322        literal: &str,
323        category: Category,
324        label: impl Into<String>,
325    ) -> Result<Self> {
326        let escaped = regex::escape(literal);
327        let regex = RegexBuilder::new(&escaped)
328            .size_limit(REGEX_SIZE_LIMIT)
329            .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
330            .build()
331            .map_err(compile_err)?;
332        Ok(Self {
333            regex,
334            category,
335            label: label.into(),
336            min_length: literal.len(),
337            literal: Some(literal.to_owned()),
338        })
339    }
340
341    /// The category this pattern maps to.
342    #[must_use]
343    pub fn category(&self) -> &Category {
344        &self.category
345    }
346
347    /// The human-readable label.
348    #[must_use]
349    pub fn label(&self) -> &str {
350        &self.label
351    }
352
353    /// Return the raw regex pattern string for RegexSet construction.
354    #[must_use]
355    pub fn regex_pattern(&self) -> &str {
356        self.regex.as_str()
357    }
358}
359
360// ScanPattern is Send + Sync because:
361// - regex::bytes::Regex is Send + Sync
362// - Category is Send + Sync (it's an enum of primitives + CompactString)
363// - String is Send + Sync
364
365// ---------------------------------------------------------------------------
366// Internal: raw match descriptor
367// ---------------------------------------------------------------------------
368
369/// A single match found during scanning (internal).
370#[derive(Debug, Clone, Copy)]
371struct RawMatch {
372    /// Start byte offset within the scan window.
373    start: usize,
374    /// End byte offset (exclusive) within the scan window.
375    end: usize,
376    /// Index into the `StreamScanner::patterns` vector.
377    pattern_idx: usize,
378    /// Byte range of capture group 1 within the window, if the pattern has one.
379    /// When present, only this sub-range is replaced; the bytes between
380    /// `start..capture_start` and `capture_end..end` are emitted verbatim,
381    /// preserving surrounding context (delimiters, key names, prefixes).
382    capture: Option<(usize, usize)>,
383}
384
385// ---------------------------------------------------------------------------
386// Per-scan scratch buffers
387// ---------------------------------------------------------------------------
388
389/// Scratch buffers reused across chunks within a single scan call.
390///
391/// Allocating these once per `scan_reader_with_progress` invocation
392/// and reusing them each chunk eliminates the per-chunk heap pressure
393/// that would otherwise come from `Vec` allocations in `find_matches`
394/// and `apply_replacements`.
395struct ScanScratch {
396    /// Accumulates raw matches from all patterns before deduplication.
397    all_matches: Vec<RawMatch>,
398    /// Non-overlapping matches selected for the current window
399    /// (populated by `find_matches`, consumed by `apply_replacements`).
400    selected: Vec<RawMatch>,
401    /// Output bytes for the committed region, written by `apply_replacements`.
402    output: Vec<u8>,
403    /// Per-pattern match counts indexed by `pattern_idx`.
404    /// Reset to zero after each chunk's counts are folded into `ScanStats`.
405    pattern_counts: Vec<u64>,
406}
407
408impl ScanScratch {
409    fn new(pattern_count: usize, chunk_size: usize, overlap_size: usize) -> Self {
410        Self {
411            all_matches: Vec::new(),
412            selected: Vec::new(),
413            output: Vec::with_capacity(chunk_size + overlap_size),
414            pattern_counts: vec![0u64; pattern_count],
415        }
416    }
417}
418
419// ---------------------------------------------------------------------------
420// Scan statistics
421// ---------------------------------------------------------------------------
422
423/// The file-level position of a single scanner match.
424///
425/// Emitted via the `on_match` callback in
426/// [`StreamScanner::scan_reader_with_callbacks`]. Line numbers are
427/// 1-based and count `\n` bytes only (Unix line endings). For files with
428/// Windows line endings (`\r\n`), `line` is still correct because `\n` is
429/// the canonical line separator — `\r` bytes do not affect the count.
430///
431/// `byte_offset` is the absolute byte position of the first byte of the
432/// matched region within the file (0-based). Both fields refer to the
433/// *input* file, not the sanitized output.
434#[derive(Debug, Clone, Serialize)]
435pub struct MatchLocation {
436    /// 1-based line number of the match within the file.
437    pub line: u64,
438    /// 0-based byte offset of the match start within the file.
439    pub byte_offset: u64,
440    /// Pattern label that triggered this match.
441    pub pattern: String,
442}
443
444/// Statistics collected during a scan operation.
445///
446/// Returned by [`StreamScanner::scan_reader`] and
447/// [`StreamScanner::scan_bytes`] to provide visibility into what
448/// the scanner did.
449#[derive(Debug, Clone, Default)]
450pub struct ScanStats {
451    /// Total bytes read from the input.
452    pub bytes_processed: u64,
453    /// Total bytes written to the output (may differ from `bytes_processed`
454    /// when replacements have different lengths than the originals).
455    pub bytes_output: u64,
456    /// Total number of matches found across all patterns.
457    pub matches_found: u64,
458    /// Total number of replacements applied (always == `matches_found`
459    /// in one-way mode).
460    pub replacements_applied: u64,
461    /// Per-pattern match counts, keyed by pattern label.
462    pub pattern_counts: HashMap<String, u64>,
463}
464
465/// Progress snapshot emitted during streaming scans.
466#[derive(Debug, Clone, Default, Eq, PartialEq)]
467pub struct ScanProgress {
468    /// Total bytes read from the input so far.
469    pub bytes_processed: u64,
470    /// Total bytes written to the output so far.
471    pub bytes_output: u64,
472    /// Total input size when known.
473    pub total_bytes: Option<u64>,
474    /// Total number of matches found so far.
475    pub matches_found: u64,
476    /// Total replacements applied so far.
477    pub replacements_applied: u64,
478}
479
480// ---------------------------------------------------------------------------
481// StreamScanner
482// ---------------------------------------------------------------------------
483
484/// Streaming scanner that detects and replaces sensitive patterns.
485///
486/// Thread-safe: can be shared via `Arc<StreamScanner>` for concurrent
487/// scanning of multiple files. Each call to [`scan_reader`](Self::scan_reader)
488/// is independent and maintains its own chunking state.
489///
490/// # Usage
491///
492/// ```rust
493/// use sanitize_engine::scanner::{StreamScanner, ScanPattern, ScanConfig};
494/// use sanitize_engine::category::Category;
495/// use sanitize_engine::generator::HmacGenerator;
496/// use sanitize_engine::store::MappingStore;
497/// use std::sync::Arc;
498///
499/// // 1. Build the replacement store.
500/// let gen = Arc::new(HmacGenerator::new([42u8; 32]));
501/// let store = Arc::new(MappingStore::new(gen, None));
502///
503/// // 2. Define patterns.
504/// let patterns = vec![
505///     ScanPattern::from_regex(
506///         r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
507///         Category::Email,
508///         "email",
509///     ).unwrap(),
510/// ];
511///
512/// // 3. Create the scanner.
513/// let scanner = StreamScanner::new(patterns, store, ScanConfig::default()).unwrap();
514///
515/// // 4. Scan.
516/// let input = b"Contact alice@corp.com for details.";
517/// let (output, stats) = scanner.scan_bytes(input).unwrap();
518/// assert_eq!(stats.matches_found, 1);
519/// assert!(!output.windows(b"alice@corp.com".len())
520///     .any(|w| w == b"alice@corp.com"));
521/// ```
522pub struct StreamScanner {
523    /// Compiled scan patterns (both literal and regex).
524    patterns: Vec<ScanPattern>,
525    /// Pre-compiled set for fast multi-pattern pre-filtering of **regex**
526    /// (non-literal) patterns only.  `matches()` returns which regex-pattern
527    /// indices matched, avoiding running every individual regex on each chunk
528    /// (R-3 optimisation).
529    regex_set: RegexSet,
530    /// Maps a `RegexSet` index → index into `self.patterns`.
531    /// Only non-literal patterns are in the `RegexSet`.
532    regex_indices: Vec<usize>,
533    /// Aho-Corasick automaton for fast SIMD literal matching.
534    /// `None` when there are no literal patterns.
535    aho_corasick: Option<AhoCorasick>,
536    /// Maps an Aho-Corasick pattern index → index into `self.patterns`.
537    /// Only literal patterns appear here.
538    literal_indices: Vec<usize>,
539    /// Thread-safe dedup replacement store.
540    store: Arc<MappingStore>,
541    /// Scanner configuration.
542    config: ScanConfig,
543}
544
545/// Return type for scanner factory methods that load a secrets file.
546///
547/// Contains `(scanner, warnings, allow_patterns)` where `warnings` are
548/// non-fatal parse errors and `allow_patterns` are raw strings from
549/// `kind: allow` entries.
550type SecretsLoadResult = Result<(StreamScanner, Vec<(usize, SanitizeError)>, Vec<String>)>;
551
552impl StreamScanner {
553    /// Create a new streaming scanner.
554    ///
555    /// # Arguments
556    ///
557    /// - `patterns` — the set of patterns to scan for.
558    /// - `store` — the mapping store for dedup-consistent replacements.
559    /// - `config` — chunking / overlap configuration.
560    ///
561    /// # Errors
562    ///
563    /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
564    /// invalid (e.g. `chunk_size == 0` or `overlap_size >= chunk_size`).
565    pub fn new(
566        patterns: Vec<ScanPattern>,
567        store: Arc<MappingStore>,
568        config: ScanConfig,
569    ) -> Result<Self> {
570        Self::new_with_max_patterns(patterns, store, config, DEFAULT_MAX_PATTERNS)
571    }
572
573    /// Create a new streaming scanner with a custom pattern limit.
574    ///
575    /// This is identical to [`new`](Self::new) but allows overriding the
576    /// default pattern cap (10 000).  Use this
577    /// when you have a legitimate need for more patterns and have
578    /// verified that your system has enough memory for the resulting
579    /// `RegexSet`.
580    ///
581    /// # Errors
582    ///
583    /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
584    /// invalid or the pattern count exceeds `max_patterns`.
585    pub fn new_with_max_patterns(
586        patterns: Vec<ScanPattern>,
587        store: Arc<MappingStore>,
588        config: ScanConfig,
589        max_patterns: usize,
590    ) -> Result<Self> {
591        config.validate()?;
592
593        // F-05 fix: enforce maximum pattern count to bound RegexSet memory.
594        if patterns.len() > max_patterns {
595            return Err(SanitizeError::InvalidConfig(format!(
596                "pattern count ({}) exceeds maximum allowed ({}) — \
597                 RegexSet memory scales linearly with pattern count",
598                patterns.len(),
599                max_patterns
600            )));
601        }
602
603        // Partition patterns into literal (Aho-Corasick) and regex (RegexSet)
604        // so each is matched by the most efficient engine.
605        let mut literal_bytes: Vec<Vec<u8>> = Vec::new();
606        let mut literal_indices: Vec<usize> = Vec::new();
607        let mut regex_strs: Vec<&str> = Vec::new();
608        let mut regex_indices: Vec<usize> = Vec::new();
609
610        for (i, pattern) in patterns.iter().enumerate() {
611            if let Some(lit) = &pattern.literal {
612                literal_bytes.push(lit.as_bytes().to_vec());
613                literal_indices.push(i);
614            } else {
615                regex_strs.push(pattern.regex_pattern());
616                regex_indices.push(i);
617            }
618        }
619
620        // Build Aho-Corasick automaton for literal patterns (SIMD-accelerated,
621        // single O(n) pass over the input per chunk).
622        let aho_corasick = if literal_bytes.is_empty() {
623            None
624        } else {
625            Some(AhoCorasick::new(&literal_bytes).map_err(compile_err)?)
626        };
627
628        // Build RegexSet from non-literal patterns only (R-3 pre-filter).
629        let regex_set = if regex_strs.is_empty() {
630            RegexSetBuilder::new(Vec::<&str>::new())
631                .size_limit(REGEX_SIZE_LIMIT)
632                .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
633                .build()
634                .map_err(compile_err)?
635        } else {
636            RegexSetBuilder::new(&regex_strs)
637                .size_limit((REGEX_SIZE_LIMIT * regex_strs.len().max(1)).min(REGEX_SET_SIZE_CAP))
638                .dfa_size_limit(
639                    (REGEX_DFA_SIZE_LIMIT * regex_strs.len().max(1)).min(REGEX_SET_SIZE_CAP),
640                )
641                .build()
642                .map_err(compile_err)?
643        };
644
645        Ok(Self {
646            patterns,
647            regex_set,
648            regex_indices,
649            aho_corasick,
650            literal_indices,
651            store,
652            config,
653        })
654    }
655
656    /// Create a copy of this scanner extended with additional literal patterns.
657    ///
658    /// Clones the existing pattern set and appends `extra`, then rebuilds
659    /// the internal Aho-Corasick and RegexSet automata. Used by the
660    /// format-preserving structured pass to scan original bytes with
661    /// discovered field-value literals added to the base pattern set.
662    ///
663    /// # Errors
664    ///
665    /// Returns [`SanitizeError`] if automaton construction fails or the
666    /// combined pattern count exceeds the default limit.
667    pub fn with_extra_literals(&self, extra: Vec<ScanPattern>) -> Result<Self> {
668        let mut patterns = self.patterns.clone();
669        patterns.extend(extra);
670        Self::new(patterns, Arc::clone(&self.store), self.config.clone())
671    }
672
673    /// Build a scanner suitable for format-preserving structured-file passes.
674    ///
675    /// Patterns whose labels end with `"_kv"` are excluded from the base set.
676    /// Those patterns match both a key name and its value (e.g. `password: s3cr3t`)
677    /// as a single unit; in a structured pass the key must survive untouched so
678    /// only the discovered field-value literals are safe to replace.
679    ///
680    /// `extra` (the profile-discovered literals) are always included.
681    ///
682    /// # Errors
683    ///
684    /// Returns [`SanitizeError`] if Aho-Corasick or RegexSet construction fails
685    /// or the combined pattern count exceeds the default limit.
686    pub fn for_structured_pass(&self, extra: Vec<ScanPattern>) -> Result<Self> {
687        let mut patterns: Vec<ScanPattern> = self
688            .patterns
689            .iter()
690            .filter(|p| !p.label.ends_with(KV_LABEL_SUFFIX))
691            .cloned()
692            .collect();
693        patterns.extend(extra);
694        Self::new(patterns, Arc::clone(&self.store), self.config.clone())
695    }
696
697    /// Scan a reader and write sanitized output to a writer.
698    ///
699    /// Processes the input in chunks of `config.chunk_size` bytes,
700    /// maintaining an overlap window of `config.overlap_size` bytes to
701    /// catch matches spanning chunk boundaries. All detected matches
702    /// are replaced one-way via the [`MappingStore`].
703    ///
704    /// # Arguments
705    ///
706    /// - `reader` — input source (file, network stream, `&[u8]`, …).
707    /// - `writer` — output sink (file, `Vec<u8>`, …).
708    ///
709    /// # Returns
710    ///
711    /// [`ScanStats`] with counters for bytes processed, matches found, etc.
712    ///
713    /// # Errors
714    ///
715    /// Returns [`SanitizeError`] on I/O failures or if a replacement
716    /// cannot be generated (e.g. store capacity exceeded).
717    pub fn scan_reader<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ScanStats> {
718        self.scan_reader_with_callbacks(reader, writer, None, |_| {}, |_| {})
719    }
720
721    /// Scan a reader and emit progress snapshots after each committed chunk.
722    ///
723    /// `total_bytes` should be provided when the caller knows the full input
724    /// size. When omitted, progress consumers should avoid percentages/ETA.
725    ///
726    /// This is a convenience wrapper around [`scan_reader_with_callbacks`](Self::scan_reader_with_callbacks)
727    /// that discards per-match location information. Use that method directly
728    /// when you need line numbers or byte offsets for individual matches.
729    ///
730    /// # Errors
731    ///
732    /// Returns [`SanitizeError`] on I/O failures or if a replacement
733    /// cannot be generated (e.g. store capacity exceeded).
734    pub fn scan_reader_with_progress<R: Read, W: Write, F>(
735        &self,
736        reader: R,
737        writer: W,
738        total_bytes: Option<u64>,
739        on_progress: F,
740    ) -> Result<ScanStats>
741    where
742        F: FnMut(&ScanProgress),
743    {
744        self.scan_reader_with_callbacks(reader, writer, total_bytes, on_progress, |_| {})
745    }
746
747    /// Scan a reader, emit progress snapshots, and call `on_match` for every
748    /// committed match with its 1-based line number and byte offset.
749    ///
750    /// `on_match` is called synchronously in the scanning thread, once per
751    /// committed match, in document order. The callback receives a
752    /// [`MatchLocation`] describing the pattern label, 1-based line number,
753    /// and 0-based byte offset within the input file. Callers that only need
754    /// aggregate counts (no per-match positions) should prefer
755    /// [`scan_reader_with_progress`](Self::scan_reader_with_progress), which
756    /// skips the per-byte newline counting entirely.
757    ///
758    /// # Performance note
759    ///
760    /// Enabling `on_match` adds an O(committed_bytes_between_matches)
761    /// newline-counting pass inside each chunk. For files with sparse matches
762    /// this overhead is proportional to file size; for dense matches (e.g. one
763    /// secret per line) it is negligible. On 10–15 GiB log files with typical
764    /// match densities the overhead is roughly 10–20 % of total scan time.
765    ///
766    /// # Errors
767    ///
768    /// Returns [`SanitizeError`] on I/O failures or if a replacement
769    /// cannot be generated (e.g. store capacity exceeded).
770    pub fn scan_reader_with_callbacks<R: Read, W: Write, F, M>(
771        &self,
772        mut reader: R,
773        mut writer: W,
774        total_bytes: Option<u64>,
775        mut on_progress: F,
776        mut on_match: M,
777    ) -> Result<ScanStats>
778    where
779        F: FnMut(&ScanProgress),
780        M: FnMut(MatchLocation),
781    {
782        let mut stats = ScanStats::default();
783
784        // Carry buffer: the tail of the previous window that needs
785        // to be re-scanned with the next chunk.
786        let mut carry: Vec<u8> = Vec::new();
787
788        // Read buffer (reused across iterations to avoid re-allocation).
789        let mut read_buf = vec![0u8; self.config.chunk_size];
790
791        // Scan window (reused across iterations — grows to peak size then
792        // stays there, avoiding per-chunk allocation).
793        let mut window: Vec<u8> =
794            Vec::with_capacity(self.config.chunk_size + self.config.overlap_size);
795
796        // Scratch buffers reused every chunk to eliminate per-chunk heap
797        // pressure from match collection, output building, and stats tracking.
798        let mut scratch = ScanScratch::new(
799            self.patterns.len(),
800            self.config.chunk_size,
801            self.config.overlap_size,
802        );
803
804        // Absolute file byte offset of window[0] for this iteration.
805        let mut window_file_offset: u64 = 0;
806        // Cumulative newline count in the file before window[0].
807        let mut newlines_before_window: u64 = 0;
808
809        loop {
810            // Read the next chunk.
811            let bytes_read = read_fully(&mut reader, &mut read_buf)?;
812            let is_eof = bytes_read < read_buf.len();
813
814            // Track only genuinely new bytes (carry was already counted).
815            stats.bytes_processed += bytes_read as u64;
816
817            if bytes_read == 0 && carry.is_empty() {
818                break;
819            }
820
821            // Build the scan window: carry ++ new_data.
822            // Reuse the window buffer to avoid per-chunk allocation.
823            window.clear();
824            window.extend_from_slice(&carry);
825            window.extend_from_slice(&read_buf[..bytes_read]);
826
827            if window.is_empty() {
828                break;
829            }
830
831            // Scan the window: find matches, determine commit point, apply
832            // replacements, and flush the committed region to the writer.
833            // Returns the commit_point so we can slice the carry for next iter.
834            let commit_point = self.process_committed_window(
835                &window,
836                is_eof,
837                &mut scratch,
838                &mut writer,
839                &mut stats,
840                window_file_offset,
841                newlines_before_window,
842                &mut on_match,
843            )?;
844
845            // Advance file-level position counters for the next iteration.
846            // window[commit_point] is where the next window's carry starts,
847            // so that byte is at file offset (window_file_offset + commit_point).
848            newlines_before_window += count_newlines(&window[..commit_point]);
849            window_file_offset += commit_point as u64;
850
851            // Fold per-chunk pattern hit counts into the cumulative stats map,
852            // then emit a progress snapshot to the caller.
853            self.fold_chunk_counts(&mut scratch.pattern_counts, &mut stats);
854            on_progress(&ScanProgress {
855                bytes_processed: stats.bytes_processed,
856                bytes_output: stats.bytes_output,
857                total_bytes,
858                matches_found: stats.matches_found,
859                replacements_applied: stats.replacements_applied,
860            });
861
862            // Update carry for next iteration.
863            if is_eof {
864                carry.clear();
865                break;
866            }
867            carry.clear();
868            carry.extend_from_slice(&window[commit_point..]);
869        }
870
871        Ok(stats)
872    }
873
874    /// Scan one window, apply replacements up to the commit point, and flush
875    /// the result to `writer`. Returns the commit point so the caller can
876    /// slice the carry for the next iteration.
877    #[allow(clippy::too_many_arguments)]
878    fn process_committed_window(
879        &self,
880        window: &[u8],
881        is_eof: bool,
882        scratch: &mut ScanScratch,
883        writer: &mut dyn io::Write,
884        stats: &mut ScanStats,
885        window_file_offset: u64,
886        newlines_before_window: u64,
887        on_match: &mut dyn FnMut(MatchLocation),
888    ) -> Result<usize> {
889        // Find all non-overlapping matches in the window.
890        self.find_matches(window, scratch);
891
892        // Determine how much of the window can be safely committed this iteration.
893        let base_commit = if is_eof {
894            window.len()
895        } else {
896            window.len().saturating_sub(self.config.overlap_size)
897        };
898        let commit_point =
899            self.adjusted_commit_point(&scratch.selected, base_commit, window.len(), is_eof);
900
901        // Build output for the committed region (fills scratch.output).
902        self.apply_replacements(
903            &window[..commit_point],
904            &scratch.selected,
905            stats,
906            &mut scratch.output,
907            &mut scratch.pattern_counts,
908            window_file_offset,
909            newlines_before_window,
910            on_match,
911        )?;
912
913        writer
914            .write_all(&scratch.output)
915            .map_err(|e| SanitizeError::IoError(e.to_string()))?;
916        stats.bytes_output += scratch.output.len() as u64;
917
918        Ok(commit_point)
919    }
920
921    /// Fold per-chunk pattern hit counts into the cumulative `stats.pattern_counts`
922    /// map, then reset `counts` to zero for the next chunk.
923    ///
924    /// `label.clone()` is called at most once per distinct pattern per chunk,
925    /// not once per match hit, which keeps cost proportional to pattern count.
926    fn fold_chunk_counts(&self, counts: &mut [u64], stats: &mut ScanStats) {
927        for (idx, count) in counts.iter_mut().enumerate() {
928            if *count > 0 {
929                *stats
930                    .pattern_counts
931                    .entry(self.patterns[idx].label.clone())
932                    .or_insert(0) += *count;
933                *count = 0;
934            }
935        }
936    }
937
938    /// Convenience: scan byte slice in-memory and return sanitized output.
939    ///
940    /// Equivalent to `scan_reader(input, Vec::new())` but returns the
941    /// output buffer directly.
942    ///
943    /// # Errors
944    ///
945    /// Returns [`SanitizeError`] if a replacement cannot be generated
946    /// (e.g. store capacity exceeded).
947    pub fn scan_bytes(&self, input: &[u8]) -> Result<(Vec<u8>, ScanStats)> {
948        self.scan_bytes_with_progress(input, |_| {})
949    }
950
951    /// Scan a byte slice in memory and emit progress snapshots.
952    ///
953    /// # Errors
954    ///
955    /// Returns [`SanitizeError`] if a replacement cannot be generated
956    /// (e.g. store capacity exceeded).
957    pub fn scan_bytes_with_progress<F>(
958        &self,
959        input: &[u8],
960        on_progress: F,
961    ) -> Result<(Vec<u8>, ScanStats)>
962    where
963        F: FnMut(&ScanProgress),
964    {
965        let mut output = Vec::with_capacity(input.len());
966        let stats = self.scan_reader_with_callbacks(
967            input,
968            &mut output,
969            Some(input.len() as u64),
970            on_progress,
971            |_| {},
972        )?;
973        Ok((output, stats))
974    }
975
976    // ---- Accessors ----
977
978    /// Access the scanner's configuration.
979    #[must_use]
980    pub fn config(&self) -> &ScanConfig {
981        &self.config
982    }
983
984    /// Access the underlying mapping store.
985    #[must_use]
986    pub fn store(&self) -> &Arc<MappingStore> {
987        &self.store
988    }
989
990    /// Number of patterns registered in this scanner.
991    #[must_use]
992    pub fn pattern_count(&self) -> usize {
993        self.patterns.len()
994    }
995
996    /// Create a scanner from an encrypted secrets file.
997    ///
998    /// Decrypts the file in memory, parses the entries, compiles
999    /// patterns, and returns the scanner ready to scan. Decrypted
1000    /// plaintext is scrubbed from memory after parsing.
1001    ///
1002    /// # Arguments
1003    ///
1004    /// - `encrypted_bytes` — raw bytes of the `.enc` file.
1005    /// - `password` — user password.
1006    /// - `format` — optional format override for the plaintext.
1007    /// - `store` — mapping store for dedup-consistent replacements.
1008    /// - `config` — chunking / overlap configuration.
1009    /// - `extra_patterns` — additional patterns to merge in.
1010    ///
1011    /// # Returns
1012    ///
1013    /// `(scanner, warnings, allow_patterns)` where `warnings` lists entries
1014    /// that failed to compile (index + error) and `allow_patterns` are the
1015    /// raw strings from `kind: allow` entries — pass these to
1016    /// [`AllowlistMatcher::new`](crate::allowlist::AllowlistMatcher) to
1017    /// suppress replacements for known-safe values.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns a secrets-related [`SanitizeError`] on decryption failure
1022    /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
1023    pub fn from_encrypted_secrets(
1024        encrypted_bytes: &[u8],
1025        password: &str,
1026        format: Option<crate::secrets::SecretsFormat>,
1027        store: Arc<MappingStore>,
1028        config: ScanConfig,
1029        extra_patterns: Vec<ScanPattern>,
1030    ) -> SecretsLoadResult {
1031        let ((mut patterns, warnings), allow) =
1032            crate::secrets::load_encrypted_secrets(encrypted_bytes, password, format)?;
1033        patterns.extend(extra_patterns);
1034        let scanner = Self::new(patterns, store, config)?;
1035        Ok((scanner, warnings, allow))
1036    }
1037
1038    /// Create a scanner from a plaintext secrets file.
1039    ///
1040    /// Convenience for development / testing without encryption.
1041    ///
1042    /// # Returns
1043    ///
1044    /// `(scanner, warnings, allow_patterns)` where `allow_patterns` are the
1045    /// raw strings from `kind: allow` entries — pass these to
1046    /// [`AllowlistMatcher::new`](crate::allowlist::AllowlistMatcher) to
1047    /// suppress replacements for known-safe values.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns a secrets-related [`SanitizeError`] on parse failure
1052    /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
1053    pub fn from_plaintext_secrets(
1054        plaintext: &[u8],
1055        format: Option<crate::secrets::SecretsFormat>,
1056        store: Arc<MappingStore>,
1057        config: ScanConfig,
1058        extra_patterns: Vec<ScanPattern>,
1059    ) -> SecretsLoadResult {
1060        let ((mut patterns, warnings), allow) =
1061            crate::secrets::load_plaintext_secrets(plaintext, format)?;
1062        patterns.extend(extra_patterns);
1063        let scanner = Self::new(patterns, store, config)?;
1064        Ok((scanner, warnings, allow))
1065    }
1066
1067    // ---- Internal helpers ----
1068
1069    /// Find all non-overlapping matches across all patterns.
1070    ///
1071    /// Fills `scratch.selected` with the winning non-overlapping matches
1072    /// for the given `window`.  All three scratch `Vec`s are cleared and
1073    /// repopulated on each call so callers can freely reuse the same
1074    /// `ScanScratch` instance across chunks.
1075    ///
1076    /// ## Strategy
1077    ///
1078    /// 1. **Aho-Corasick** (`aho_corasick`): single O(n) SIMD pass over the
1079    ///    window reporting every occurrence of every literal pattern,
1080    ///    including overlapping ones.  This replaces O(k·n) individual regex
1081    ///    scans for the literal subset.
1082    /// 2. **RegexSet pre-filter** (R-3 optimisation): fast check of which
1083    ///    *non-literal* regex patterns have any match in the window.
1084    /// 3. **Individual regex `find_iter`**: only for regex patterns flagged
1085    ///    by step 2.
1086    /// 4. **Sort + greedy dedup**: all raw matches are sorted by start
1087    ///    (ascending), then length (descending), and a single greedy pass
1088    ///    selects the final non-overlapping set.
1089    fn find_matches(&self, window: &[u8], scratch: &mut ScanScratch) {
1090        scratch.all_matches.clear();
1091        scratch.selected.clear();
1092
1093        // Step 1: Aho-Corasick overlapping scan for all literal patterns.
1094        // find_overlapping_iter reports every match position including
1095        // overlapping ones, so the sort+greedy step below correctly resolves
1096        // ambiguities between literals (e.g. "abc" vs "abcd" at same offset).
1097        // Literals never have capture groups — capture is always None.
1098        if let Some(ac) = &self.aho_corasick {
1099            for mat in ac.find_overlapping_iter(window) {
1100                scratch.all_matches.push(RawMatch {
1101                    start: mat.start(),
1102                    end: mat.end(),
1103                    pattern_idx: self.literal_indices[mat.pattern().as_usize()],
1104                    capture: None,
1105                });
1106            }
1107        }
1108
1109        // Steps 2+3: RegexSet pre-filter then individual scan for non-literal
1110        // patterns.  regex_set only contains non-literal pattern strings, so
1111        // literals are never scanned twice.
1112        // Use captures_iter so that patterns with a capture group 1 record
1113        // the sub-range to replace, while patterns without one fall back to
1114        // replacing the full match.
1115        for rs_idx in self.regex_set.matches(window) {
1116            let pattern_idx = self.regex_indices[rs_idx];
1117            if window.len() < self.patterns[pattern_idx].min_length {
1118                continue;
1119            }
1120            for cap in self.patterns[pattern_idx].regex.captures_iter(window) {
1121                let full = cap.get(0).expect("group 0 always exists");
1122                let capture = cap.get(1).map(|g| (g.start(), g.end()));
1123                scratch.all_matches.push(RawMatch {
1124                    start: full.start(),
1125                    end: full.end(),
1126                    pattern_idx,
1127                    capture,
1128                });
1129            }
1130        }
1131
1132        // Step 4: sort then greedy non-overlapping selection.
1133        // Skip entirely when no matches were found (the common case for
1134        // clean data), avoiding an unnecessary sort of an empty Vec.
1135        if scratch.all_matches.is_empty() {
1136            return;
1137        }
1138
1139        // Primary: start ascending. Secondary: length descending (longer
1140        // match wins when two matches begin at the same position).
1141        scratch.all_matches.sort_unstable_by(|a, b| {
1142            a.start
1143                .cmp(&b.start)
1144                .then_with(|| (b.end - b.start).cmp(&(a.end - a.start)))
1145        });
1146
1147        let mut last_end = 0;
1148        for m in scratch.all_matches.drain(..) {
1149            if m.start >= last_end {
1150                last_end = m.end;
1151                scratch.selected.push(m);
1152            }
1153        }
1154    }
1155
1156    /// Adjust the commit point to avoid splitting a match across the
1157    /// commit / carry boundary.
1158    ///
1159    /// If any match straddles `base_commit` (starts before, ends after),
1160    /// the commit point is moved to after that match so it is emitted
1161    /// in full this iteration.
1162    #[allow(clippy::unused_self)] // keep &self for API consistency with other scanner methods
1163    fn adjusted_commit_point(
1164        &self,
1165        matches: &[RawMatch],
1166        base_commit: usize,
1167        window_len: usize,
1168        is_eof: bool,
1169    ) -> usize {
1170        if is_eof {
1171            return window_len;
1172        }
1173
1174        let mut commit = base_commit;
1175
1176        for m in matches {
1177            if m.start < commit && m.end > commit {
1178                // Match straddles the boundary — extend commit to include it.
1179                commit = m.end;
1180            }
1181        }
1182
1183        // Never exceed window length.
1184        commit.min(window_len)
1185    }
1186
1187    /// Build the output for the committed region by splicing in replacements.
1188    ///
1189    /// Writes into `output_buf` (cleared on entry) and increments
1190    /// `stats.matches_found` / `stats.replacements_applied` for each applied
1191    /// replacement.  Per-pattern hit counts are written to `pattern_counts`
1192    /// (indexed by `pattern_idx`); the caller is responsible for folding
1193    /// these into `ScanStats::pattern_counts` and resetting them.
1194    ///
1195    /// `matches` is the full selected set for the window (may include matches
1196    /// in the carry region beyond `committed`).  Because `adjusted_commit_point`
1197    /// guarantees no match straddles the boundary, any match with
1198    /// `start < committed.len()` also has `end <= committed.len()`.  The
1199    /// loop breaks early once `m.start >= committed.len()` since matches are
1200    /// sorted by start.
1201    ///
1202    /// `window_file_offset` and `newlines_before_window` are used to compute
1203    /// the absolute byte offset and 1-based line number for each committed
1204    /// match, which are delivered to `on_match`. The newline scan is
1205    /// incremental: we scan only the bytes between consecutive matches, not
1206    /// the full committed region.
1207    ///
1208    /// # Note on `from_utf8_lossy`
1209    ///
1210    /// `String::from_utf8_lossy` returns `Cow::Borrowed(&str)` for valid
1211    /// UTF-8 input (the common case for ASCII secrets) — no heap allocation
1212    /// on the hot path.
1213    #[allow(clippy::too_many_arguments)]
1214    fn apply_replacements(
1215        &self,
1216        committed: &[u8],
1217        matches: &[RawMatch],
1218        stats: &mut ScanStats,
1219        output_buf: &mut Vec<u8>,
1220        pattern_counts: &mut [u64],
1221        window_file_offset: u64,
1222        newlines_before_window: u64,
1223        on_match: &mut dyn FnMut(MatchLocation),
1224    ) -> Result<()> {
1225        output_buf.clear();
1226
1227        let mut last_end = 0;
1228        // Running newline count within the committed region, advanced
1229        // incrementally so we only scan the bytes between matches.
1230        let mut newlines_in_committed: u64 = 0;
1231        let mut newline_scan_pos: usize = 0;
1232
1233        for &m in matches {
1234            // Matches are sorted by start; those at or beyond the committed
1235            // region belong to the carry window — stop here.
1236            if m.start >= committed.len() {
1237                break;
1238            }
1239
1240            // Emit bytes before this match verbatim.
1241            output_buf.extend_from_slice(&committed[last_end..m.start]);
1242
1243            // Advance newline counter from previous scan position to match start,
1244            // then emit the match location to the caller.
1245            newlines_in_committed += count_newlines(&committed[newline_scan_pos..m.start]);
1246            newline_scan_pos = m.start;
1247            on_match(MatchLocation {
1248                line: newlines_before_window + newlines_in_committed + 1,
1249                byte_offset: window_file_offset + m.start as u64,
1250                pattern: self.patterns[m.pattern_idx].label.clone(),
1251            });
1252
1253            let pattern = &self.patterns[m.pattern_idx];
1254
1255            if let Some((cap_start, cap_end)) = m.capture {
1256                // Pattern has a capture group: replace only the capture group,
1257                // emitting the surrounding context bytes of the full match verbatim.
1258                // This preserves delimiters, key names, and prefixes that the
1259                // pattern uses as anchors to reduce false positives.
1260                if cap_start < m.start || cap_end > m.end || cap_start > cap_end {
1261                    // Capture bounds outside match bounds — skip rather than panic.
1262                    // This should not happen with correct regex patterns; log it so it
1263                    // surfaces during testing without crashing production runs.
1264                    tracing::warn!(
1265                        pattern = %pattern.label,
1266                        m_start = m.start,
1267                        m_end = m.end,
1268                        cap_start,
1269                        cap_end,
1270                        "capture group bounds outside match bounds — emitting full match unreplaced"
1271                    );
1272                    output_buf.extend_from_slice(&committed[m.start..m.end]);
1273                    last_end = m.end;
1274                    continue;
1275                }
1276                output_buf.extend_from_slice(&committed[m.start..cap_start]);
1277                let secret = String::from_utf8_lossy(&committed[cap_start..cap_end]);
1278                let replacement = self.store.get_or_insert(&pattern.category, &secret)?;
1279                output_buf.extend_from_slice(replacement.as_bytes());
1280                output_buf.extend_from_slice(&committed[cap_end..m.end]);
1281            } else {
1282                // No capture group — replace the full match (e.g. token-prefix
1283                // patterns like `glpat-[...]` where the full match IS the secret).
1284                let matched_text = String::from_utf8_lossy(&committed[m.start..m.end]);
1285                let replacement = self.store.get_or_insert(&pattern.category, &matched_text)?;
1286                output_buf.extend_from_slice(replacement.as_bytes());
1287            }
1288
1289            last_end = m.end;
1290
1291            stats.matches_found += 1;
1292            stats.replacements_applied += 1;
1293            pattern_counts[m.pattern_idx] += 1;
1294        }
1295
1296        // Emit the trailing non-matching tail.
1297        output_buf.extend_from_slice(&committed[last_end..]);
1298
1299        Ok(())
1300    }
1301}
1302
1303// ---------------------------------------------------------------------------
1304// Send + Sync compile-time assertion
1305// ---------------------------------------------------------------------------
1306
1307const _: fn() = || {
1308    fn assert_send<T: Send>() {}
1309    fn assert_sync<T: Sync>() {}
1310    assert_send::<StreamScanner>();
1311    assert_sync::<StreamScanner>();
1312};
1313
1314// ---------------------------------------------------------------------------
1315// I/O helper
1316// ---------------------------------------------------------------------------
1317
1318/// Count the number of `\n` bytes in `data`.
1319///
1320/// Used to advance the cumulative newline counter between consecutive
1321/// match positions so we can compute 1-based line numbers without
1322/// pre-scanning the entire committed region.
1323#[inline]
1324#[allow(clippy::naive_bytecount)]
1325fn count_newlines(data: &[u8]) -> u64 {
1326    data.iter().filter(|&&b| b == b'\n').count() as u64
1327}
1328
1329/// Read up to `buf.len()` bytes from `reader`, retrying on `Interrupted`.
1330///
1331/// Returns the number of bytes actually read (< `buf.len()` only at EOF).
1332fn read_fully<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
1333    let mut total = 0;
1334    while total < buf.len() {
1335        match reader.read(&mut buf[total..]) {
1336            Ok(0) => break, // EOF
1337            Ok(n) => total += n,
1338            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
1339            Err(e) => return Err(SanitizeError::IoError(e.to_string())),
1340        }
1341    }
1342    Ok(total)
1343}
1344
1345// ---------------------------------------------------------------------------
1346// Unit tests
1347// ---------------------------------------------------------------------------
1348
1349#[cfg(test)]
1350mod tests {
1351    use super::*;
1352    use crate::generator::HmacGenerator;
1353
1354    /// Helper: build a scanner with given patterns and small chunk config.
1355    fn test_scanner(patterns: Vec<ScanPattern>) -> StreamScanner {
1356        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1357        let store = Arc::new(MappingStore::new(gen, None));
1358        StreamScanner::new(
1359            patterns,
1360            store,
1361            ScanConfig {
1362                chunk_size: 64,
1363                overlap_size: 16,
1364            },
1365        )
1366        .unwrap()
1367    }
1368
1369    /// Helper: email pattern.
1370    fn email_pattern() -> ScanPattern {
1371        ScanPattern::from_regex(
1372            r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
1373            Category::Email,
1374            "email",
1375        )
1376        .unwrap()
1377    }
1378
1379    /// Helper: IPv4 pattern.
1380    fn ipv4_pattern() -> ScanPattern {
1381        ScanPattern::from_regex(
1382            r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
1383            Category::IpV4,
1384            "ipv4",
1385        )
1386        .unwrap()
1387    }
1388
1389    // ---- Construction ----
1390
1391    #[test]
1392    fn scanner_creation() {
1393        let scanner = test_scanner(vec![email_pattern()]);
1394        assert_eq!(scanner.pattern_count(), 1);
1395    }
1396
1397    #[test]
1398    fn invalid_config_zero_chunk() {
1399        let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1400        let store = Arc::new(MappingStore::new(gen, None));
1401        let result = StreamScanner::new(vec![], store, ScanConfig::new(0, 0));
1402        assert!(result.is_err());
1403    }
1404
1405    #[test]
1406    fn invalid_config_overlap_ge_chunk() {
1407        let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1408        let store = Arc::new(MappingStore::new(gen, None));
1409        let result = StreamScanner::new(vec![], store, ScanConfig::new(100, 100));
1410        assert!(result.is_err());
1411    }
1412
1413    // ---- Empty / no-match cases ----
1414
1415    #[test]
1416    fn empty_input() {
1417        let scanner = test_scanner(vec![email_pattern()]);
1418        let (output, stats) = scanner.scan_bytes(b"").unwrap();
1419        assert!(output.is_empty());
1420        assert_eq!(stats.matches_found, 0);
1421        assert_eq!(stats.bytes_processed, 0);
1422    }
1423
1424    #[test]
1425    fn no_matches() {
1426        let scanner = test_scanner(vec![email_pattern()]);
1427        let input = b"There are no email addresses here.";
1428        let (output, stats) = scanner.scan_bytes(input).unwrap();
1429        assert_eq!(output, input.as_slice());
1430        assert_eq!(stats.matches_found, 0);
1431    }
1432
1433    // ---- Single match ----
1434
1435    #[test]
1436    fn single_email_replaced() {
1437        let scanner = test_scanner(vec![email_pattern()]);
1438        let input = b"Contact alice@corp.com for help.";
1439        let (output, stats) = scanner.scan_bytes(input).unwrap();
1440        assert_eq!(stats.matches_found, 1);
1441        assert_eq!(stats.replacements_applied, 1);
1442        // Original must not appear in output.
1443        assert!(!output
1444            .windows(b"alice@corp.com".len())
1445            .any(|w| w == b"alice@corp.com"));
1446        // Replacement should contain the @ from the domain-preserving email.
1447        let output_str = String::from_utf8_lossy(&output);
1448        assert!(output_str.contains("@corp.com"));
1449        // Length preserved: output is same total length as input.
1450        assert_eq!(output.len(), input.len(), "length must be preserved");
1451        // Surrounding text preserved.
1452        assert!(output_str.starts_with("Contact "));
1453        assert!(output_str.ends_with(" for help."));
1454    }
1455
1456    // ---- Multiple matches ----
1457
1458    #[test]
1459    fn multiple_emails_replaced() {
1460        let scanner = test_scanner(vec![email_pattern()]);
1461        let input = b"From alice@corp.com to bob@corp.com cc admin@corp.com";
1462        let (output, stats) = scanner.scan_bytes(input).unwrap();
1463        assert_eq!(stats.matches_found, 3);
1464        let out_str = String::from_utf8_lossy(&output);
1465        assert!(!out_str.contains("alice@corp.com"));
1466        assert!(!out_str.contains("bob@corp.com"));
1467        assert!(!out_str.contains("admin@corp.com"));
1468    }
1469
1470    // ---- Same secret gets same replacement ----
1471
1472    #[test]
1473    fn same_secret_same_replacement() {
1474        let scanner = test_scanner(vec![email_pattern()]);
1475        let input = b"First alice@corp.com then alice@corp.com again.";
1476        let (output, stats) = scanner.scan_bytes(input).unwrap();
1477        assert_eq!(stats.matches_found, 2);
1478        let out_str = String::from_utf8_lossy(&output);
1479        // Both occurrences should be replaced with the same value.
1480        // With length-preserving replacements, look for the preserved domain.
1481        let parts: Vec<&str> = out_str.split("@corp.com").collect();
1482        // 3 parts = 2 occurrences of the replacement.
1483        assert_eq!(parts.len(), 3);
1484    }
1485
1486    // ---- Literal pattern ----
1487
1488    #[test]
1489    fn literal_pattern_matched() {
1490        let pat = ScanPattern::from_literal(
1491            "SECRET_API_KEY_12345",
1492            Category::Custom("api_key".into()),
1493            "api_key",
1494        )
1495        .unwrap();
1496        let scanner = test_scanner(vec![pat]);
1497        let input = b"key=SECRET_API_KEY_12345&foo=bar";
1498        let (output, stats) = scanner.scan_bytes(input).unwrap();
1499        assert_eq!(stats.matches_found, 1);
1500        assert!(!output
1501            .windows(b"SECRET_API_KEY_12345".len())
1502            .any(|w| w == b"SECRET_API_KEY_12345"));
1503    }
1504
1505    // ---- Multiple pattern types ----
1506
1507    #[test]
1508    fn multiple_pattern_types() {
1509        let scanner = test_scanner(vec![email_pattern(), ipv4_pattern()]);
1510        let input = b"Server 192.168.1.100 contact admin@server.com";
1511        let (output, stats) = scanner.scan_bytes(input).unwrap();
1512        assert_eq!(stats.matches_found, 2);
1513        let out_str = String::from_utf8_lossy(&output);
1514        assert!(!out_str.contains("192.168.1.100"));
1515        assert!(!out_str.contains("admin@server.com"));
1516        assert_eq!(*stats.pattern_counts.get("email").unwrap(), 1);
1517        assert_eq!(*stats.pattern_counts.get("ipv4").unwrap(), 1);
1518    }
1519
1520    // ---- Chunk boundary: match spans two chunks ----
1521
1522    #[test]
1523    fn match_at_chunk_boundary() {
1524        // Use a very small chunk size so the email straddles a boundary.
1525        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1526        let store = Arc::new(MappingStore::new(gen, None));
1527        let scanner = StreamScanner::new(
1528            vec![email_pattern()],
1529            store,
1530            ScanConfig {
1531                chunk_size: 20, // very small
1532                overlap_size: 16,
1533            },
1534        )
1535        .unwrap();
1536
1537        // Place an email address that will definitely straddle a boundary.
1538        let input = b"AAAAAAAAAAAAAAAA alice@corp.com BBBBBBBBBBBBB";
1539        let (output, stats) = scanner.scan_bytes(input).unwrap();
1540        assert_eq!(stats.matches_found, 1);
1541        let out_str = String::from_utf8_lossy(&output);
1542        assert!(!out_str.contains("alice@corp.com"));
1543        assert!(out_str.contains("@corp.com"), "domain must be preserved");
1544    }
1545
1546    // ---- Large input requiring many chunks ----
1547
1548    #[test]
1549    fn large_input_many_chunks() {
1550        let scanner = test_scanner(vec![email_pattern()]);
1551
1552        // Build a ~2 KiB input with emails sprinkled in.
1553        let mut input = Vec::new();
1554        let filler = b"Lorem ipsum dolor sit amet. ";
1555        for i in 0..20 {
1556            input.extend_from_slice(filler);
1557            let email = format!("user{}@example.com ", i);
1558            input.extend_from_slice(email.as_bytes());
1559        }
1560
1561        let (output, stats) = scanner.scan_bytes(&input).unwrap();
1562        assert_eq!(stats.matches_found, 20);
1563        let out_str = String::from_utf8_lossy(&output);
1564        for i in 0..20 {
1565            let email = format!("user{}@example.com", i);
1566            assert!(!out_str.contains(&email));
1567        }
1568    }
1569
1570    #[test]
1571    fn scan_bytes_with_progress_preserves_output_and_stats() {
1572        let scanner = test_scanner(vec![email_pattern()]);
1573        let input = b"Contact alice@corp.com and bob@corp.com for help.";
1574
1575        let (baseline_output, baseline_stats) = scanner.scan_bytes(input).unwrap();
1576
1577        let mut updates = Vec::new();
1578        let (progress_output, progress_stats) = scanner
1579            .scan_bytes_with_progress(input, |progress| updates.push(progress.clone()))
1580            .unwrap();
1581
1582        assert_eq!(progress_output, baseline_output);
1583        assert_eq!(
1584            progress_stats.bytes_processed,
1585            baseline_stats.bytes_processed
1586        );
1587        assert_eq!(progress_stats.bytes_output, baseline_stats.bytes_output);
1588        assert_eq!(progress_stats.matches_found, baseline_stats.matches_found);
1589        assert_eq!(
1590            progress_stats.replacements_applied,
1591            baseline_stats.replacements_applied
1592        );
1593        assert!(!updates.is_empty());
1594        assert_eq!(updates.last().unwrap().bytes_processed, input.len() as u64);
1595        assert_eq!(
1596            updates.last().unwrap().total_bytes,
1597            Some(input.len() as u64)
1598        );
1599        assert_eq!(updates.last().unwrap().matches_found, 2);
1600    }
1601
1602    #[test]
1603    fn scan_reader_with_progress_reports_multiple_updates_for_multi_chunk_input() {
1604        let scanner = test_scanner(vec![email_pattern()]);
1605        let mut input = Vec::new();
1606        for i in 0..8 {
1607            input.extend_from_slice(b"padding padding padding ");
1608            input.extend_from_slice(format!("user{i}@example.com ").as_bytes());
1609        }
1610
1611        let mut output = Vec::new();
1612        let mut updates = Vec::new();
1613        let stats = scanner
1614            .scan_reader_with_callbacks(
1615                &input[..],
1616                &mut output,
1617                Some(input.len() as u64),
1618                |progress| {
1619                    updates.push(progress.clone());
1620                },
1621                |_| {},
1622            )
1623            .unwrap();
1624
1625        assert!(updates.len() >= 2);
1626        assert_eq!(
1627            updates.last().unwrap().bytes_processed,
1628            stats.bytes_processed
1629        );
1630        assert_eq!(updates.last().unwrap().bytes_output, stats.bytes_output);
1631        assert_eq!(
1632            updates.last().unwrap().total_bytes,
1633            Some(input.len() as u64)
1634        );
1635    }
1636
1637    // ---- Scan via Read/Write interface ----
1638
1639    #[test]
1640    fn scan_reader_writer() {
1641        let scanner = test_scanner(vec![email_pattern()]);
1642        let input = b"hello alice@corp.com world";
1643        let mut output = Vec::new();
1644        let stats = scanner.scan_reader(&input[..], &mut output).unwrap();
1645        assert_eq!(stats.matches_found, 1);
1646        let out_str = String::from_utf8_lossy(&output);
1647        assert!(out_str.contains("@corp.com"), "domain must be preserved");
1648    }
1649
1650    // ---- Pattern compile error ----
1651
1652    #[test]
1653    fn invalid_regex_pattern() {
1654        let result = ScanPattern::from_regex("[invalid(", Category::Email, "bad");
1655        assert!(result.is_err());
1656    }
1657
1658    // ---- Default config ----
1659
1660    #[test]
1661    fn default_config_valid() {
1662        ScanConfig::default().validate().unwrap();
1663    }
1664
1665    // ---- Config edge cases ----
1666
1667    #[test]
1668    fn config_chunk_1_overlap_0() {
1669        // Extreme but valid: 1-byte chunks, no overlap.
1670        // Won't catch multi-byte patterns, but should not crash.
1671        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1672        let store = Arc::new(MappingStore::new(gen, None));
1673        let scanner = StreamScanner::new(vec![], store, ScanConfig::new(1, 0)).unwrap();
1674        let (output, _) = scanner.scan_bytes(b"hello").unwrap();
1675        assert_eq!(output, b"hello");
1676    }
1677
1678    // ---- Bytes output tracking ----
1679
1680    #[test]
1681    fn bytes_output_preserved_on_replacement() {
1682        let scanner = test_scanner(vec![email_pattern()]);
1683        let input = b"a@b.cc"; // short email
1684        let (output, stats) = scanner.scan_bytes(input).unwrap();
1685        assert_eq!(stats.bytes_processed, input.len() as u64);
1686        assert_eq!(stats.bytes_output, output.len() as u64);
1687        // Length-preserving: output length matches input length.
1688        assert_eq!(output.len(), input.len());
1689    }
1690}