Skip to main content

sanitize_engine/
scanner.rs

1//! Streaming scanner for detecting and replacing sensitive data.
2//!
3//! # Architecture
4//!
5//! The streaming scanner processes input data in configurable chunks,
6//! detecting secret patterns (regex or literal) and applying one-way
7//! replacements via the [`MappingStore`].
8//! This design supports files of 20–100 GB+ without requiring the entire
9//! content to fit in memory.
10//!
11//! ```text
12//! ┌──────────────┐     ┌─────────────────┐     ┌──────────────────┐
13//! │  Input (Read) │ ──▶ │  StreamScanner  │ ──▶ │  Output (Write)  │
14//! │  (chunked)    │     │  (pattern match │     │  (sanitized)     │
15//! └──────────────┘     │   + replace)    │     └──────────────────┘
16//!                       └────────┬────────┘
17//!                                │
18//!                       ┌────────▼────────┐
19//!                       │  MappingStore   │
20//!                       │  (dedup cache)  │
21//!                       └─────────────────┘
22//! ```
23//!
24//! # Chunk Overlap Strategy
25//!
26//! To avoid missing matches that span chunk boundaries, the scanner
27//! maintains an overlap window between consecutive chunks:
28//!
29//! 1. Read `chunk_size` bytes of new data.
30//! 2. Prepend the `carry` buffer (tail of previous window).
31//! 3. Scan the combined `window` for all pattern matches.
32//! 4. Compute `commit_point = window.len() - overlap_size` (adjusted
33//!    upward if a match straddles the boundary).
34//! 5. Emit output for `window[..commit_point]` with replacements applied.
35//! 6. Set `carry = window[commit_point..]` for the next iteration.
36//!
37//! The `overlap_size` should be ≥ the maximum expected match length to
38//! guarantee no matches are missed at boundaries.
39//!
40//! # Thread Safety
41//!
42//! [`StreamScanner`] is `Send + Sync`. Multiple files can be scanned
43//! concurrently using a shared `Arc<StreamScanner>`, all backed by the
44//! same [`MappingStore`] for per-run dedup
45//! consistency.
46//!
47//! # Performance
48//!
49//! - **Chunk-based I/O**: only `chunk_size + overlap_size` bytes in
50//!   memory per active scan.
51//! - **Compiled regex**: patterns are compiled once at construction and
52//!   reused across all chunks and files.
53//! - **Lock-free reads**: the `DashMap` inside `MappingStore` provides
54//!   lock-free reads for already-seen values.
55//! - **File-level parallelism**: share `Arc<StreamScanner>` across
56//!   threads to scan multiple files concurrently.
57
58use crate::category::Category;
59use crate::error::{Result, SanitizeError};
60use crate::store::MappingStore;
61use aho_corasick::AhoCorasick;
62use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder};
63use serde::Serialize;
64use std::collections::HashMap;
65use std::io::{self, Read, Write};
66use std::sync::Arc;
67
68// ---------------------------------------------------------------------------
69// Configuration
70// ---------------------------------------------------------------------------
71
72/// Default chunk size: 1 MiB.
73const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
74
75/// Default overlap size: 4 KiB.
76const DEFAULT_OVERLAP_SIZE: usize = 4096;
77
78/// Maximum compiled regex automaton size (bytes). Prevents DoS via
79/// pathologically complex user-supplied patterns.
80const REGEX_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
81
82/// Maximum DFA cache size (bytes) per regex.
83const REGEX_DFA_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
84
85/// Hard ceiling on the combined RegexSet automaton budget.
86/// The per-pattern limit is multiplied by the pattern count so that a large
87/// pattern set can still compile, but without this cap a pathological secrets
88/// file with 10 000 patterns could claim up to ~20 GiB of automaton memory.
89const REGEX_SET_SIZE_CAP: usize = 256 * 1024 * 1024; // 256 MiB
90
91/// Maximum number of patterns allowed in a single scanner (F-05 fix).
92/// The `RegexSet` automaton memory scales linearly with pattern count.
93/// With 1 MiB size/DFA limits per pattern, 10 000 patterns could
94/// allocate up to ~20 GiB of automaton memory.  This cap prevents
95/// accidental resource exhaustion.  Override via
96/// [`StreamScanner::new_with_max_patterns`] if needed.
97const DEFAULT_MAX_PATTERNS: usize = 10_000;
98
99/// Label suffix that marks patterns as key-value-only.
100///
101/// Patterns whose label ends with this suffix are excluded from the streaming
102/// scanner pass (`for_structured_pass`) because the key-value processor
103/// resolves their values structurally and the scanner would produce spurious
104/// duplicate replacements on the surrounding syntax.
105pub const KV_LABEL_SUFFIX: &str = "_kv";
106
107/// Configuration for the streaming scanner.
108///
109/// # Tuning Guide
110///
111/// | Workload               | `chunk_size` | `overlap_size` |
112/// |------------------------|--------------|----------------|
113/// | Small files (< 10 MB)  | 256 KiB      | 1 KiB          |
114/// | General purpose        | 1 MiB        | 4 KiB          |
115/// | Large files (> 1 GB)   | 4–8 MiB      | 8 KiB          |
116/// | Memory-constrained     | 64 KiB       | 1 KiB          |
117///
118/// `overlap_size` should be ≥ the longest expected match. Most secret
119/// patterns (API keys, emails, SSNs) are well under 256 bytes, so the
120/// 4 KiB default provides ample margin.
121#[derive(Debug, Clone)]
122pub struct ScanConfig {
123    /// Size of each chunk read from the input (bytes).
124    ///
125    /// Larger chunks improve throughput (fewer syscalls) but use more
126    /// memory. Default: 1 MiB.
127    pub chunk_size: usize,
128
129    /// Overlap between consecutive chunks (bytes).
130    ///
131    /// Must be ≥ the maximum expected match length. Patterns whose
132    /// matches can exceed this length risk being missed at chunk
133    /// boundaries. Default: 4 KiB.
134    pub overlap_size: usize,
135}
136
137impl Default for ScanConfig {
138    fn default() -> Self {
139        Self {
140            chunk_size: DEFAULT_CHUNK_SIZE,
141            overlap_size: DEFAULT_OVERLAP_SIZE,
142        }
143    }
144}
145
146impl ScanConfig {
147    /// Create a new configuration with explicit values.
148    #[must_use]
149    pub fn new(chunk_size: usize, overlap_size: usize) -> Self {
150        Self {
151            chunk_size,
152            overlap_size,
153        }
154    }
155
156    /// Validate the configuration, returning an error if invalid.
157    ///
158    /// # Errors
159    ///
160    /// Returns [`SanitizeError::InvalidConfig`] if `chunk_size` is zero
161    /// or `overlap_size >= chunk_size`.
162    pub fn validate(&self) -> Result<()> {
163        if self.chunk_size == 0 {
164            return Err(SanitizeError::InvalidConfig(
165                "chunk_size must be > 0".into(),
166            ));
167        }
168        if self.overlap_size >= self.chunk_size {
169            return Err(SanitizeError::InvalidConfig(
170                "overlap_size must be < chunk_size".into(),
171            ));
172        }
173        Ok(())
174    }
175}
176
177// ---------------------------------------------------------------------------
178// Internal helpers
179// ---------------------------------------------------------------------------
180
181/// Convert any compile-time pattern error into [`SanitizeError::PatternCompileError`].
182#[inline]
183fn compile_err(e: impl std::fmt::Display) -> SanitizeError {
184    SanitizeError::PatternCompileError(e.to_string())
185}
186
187// ---------------------------------------------------------------------------
188// Scan pattern
189// ---------------------------------------------------------------------------
190
191/// A pattern rule defining what to scan for and how to categorize matches.
192///
193/// Wraps a compiled [`regex::bytes::Regex`] with a [`Category`] for
194/// replacement lookups and a human-readable label for reporting.
195///
196/// Both regex and literal patterns are supported. Literal patterns keep
197/// their original text and are matched by the scanner's Aho-Corasick
198/// automaton for fast multi-literal scanning.
199pub struct ScanPattern {
200    /// Compiled regex matcher (used for non-literal patterns and as a
201    /// fallback; literal patterns are matched via Aho-Corasick instead).
202    regex: Regex,
203    /// Category for replacement lookups.
204    category: Category,
205    /// Human-readable label for reporting / stats.
206    label: String,
207    /// Original (unescaped) literal string when created via `from_literal`.
208    /// `None` for patterns created via `from_regex`.
209    /// Stored so `StreamScanner` can build an Aho-Corasick automaton for
210    /// fast SIMD literal matching instead of running the regex engine.
211    literal: Option<String>,
212    /// Minimum window size (bytes) required to produce a match.
213    /// For literal patterns this equals the byte length of the literal itself.
214    /// For regex patterns this is `0` (no guaranteed minimum).
215    /// Used to skip `captures_iter` when the window is provably too short.
216    pub min_length: usize,
217}
218
219impl std::fmt::Debug for ScanPattern {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("ScanPattern")
222            .field("pattern", &self.regex.as_str())
223            .field("category", &self.category)
224            .field("label", &self.label)
225            .field("literal", &self.literal.as_deref())
226            .field("min_length", &self.min_length)
227            .finish()
228    }
229}
230
231impl Clone for ScanPattern {
232    fn clone(&self) -> Self {
233        Self {
234            regex: self.regex.clone(),
235            category: self.category.clone(),
236            label: self.label.clone(),
237            literal: self.literal.clone(),
238            min_length: self.min_length,
239        }
240    }
241}
242
243impl ScanPattern {
244    /// Create a pattern from a regex string.
245    ///
246    /// ## Capture group 1 — partial replacement
247    ///
248    /// If the regex contains a capture group 1 (`(...)`), only the bytes
249    /// matched by that group are replaced; the bytes before and after it
250    /// within the full match are emitted verbatim. This lets you write
251    /// context-anchored patterns without redacting the prefix/suffix:
252    ///
253    /// ```text
254    /// pattern: glpat-([A-Za-z0-9_-]{20})
255    ///           ^^^^^^ prefix preserved
256    ///                  ^^^^^^^^^^^^^^^^^^^^ group 1 → replaced
257    /// ```
258    ///
259    /// Patterns **without** a capture group replace the entire match.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`SanitizeError::PatternCompileError`] if the regex is invalid.
264    ///
265    /// # Examples
266    ///
267    /// ```
268    /// use sanitize_engine::scanner::ScanPattern;
269    /// use sanitize_engine::category::Category;
270    ///
271    /// // No capture group — full match replaced:
272    /// let email = ScanPattern::from_regex(
273    ///     r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
274    ///     Category::Email,
275    ///     "email_address",
276    /// ).unwrap();
277    ///
278    /// // Capture group 1 — prefix preserved, only the token value replaced:
279    /// let token = ScanPattern::from_regex(
280    ///     r"glpat-([A-Za-z0-9_-]{20})",
281    ///     Category::AuthToken,
282    ///     "gitlab_pat",
283    /// ).unwrap();
284    /// ```
285    pub fn from_regex(pattern: &str, category: Category, label: impl Into<String>) -> Result<Self> {
286        let regex = RegexBuilder::new(pattern)
287            .size_limit(REGEX_SIZE_LIMIT)
288            .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
289            .build()
290            .map_err(compile_err)?;
291        Ok(Self {
292            regex,
293            category,
294            label: label.into(),
295            literal: None,
296            min_length: 0,
297        })
298    }
299
300    /// Create a pattern from a literal string.
301    ///
302    /// The literal is escaped so that regex metacharacters are matched
303    /// verbatim.
304    ///
305    /// # Errors
306    ///
307    /// Returns [`SanitizeError::PatternCompileError`] if regex compilation fails.
308    ///
309    /// # Examples
310    ///
311    /// ```
312    /// use sanitize_engine::scanner::ScanPattern;
313    /// use sanitize_engine::category::Category;
314    ///
315    /// let pat = ScanPattern::from_literal(
316    ///     "sk-proj-abc123secret",
317    ///     Category::Custom("api_key".into()),
318    ///     "openai_key",
319    /// ).unwrap();
320    /// ```
321    pub fn from_literal(
322        literal: &str,
323        category: Category,
324        label: impl Into<String>,
325    ) -> Result<Self> {
326        let escaped = regex::escape(literal);
327        let regex = RegexBuilder::new(&escaped)
328            .size_limit(REGEX_SIZE_LIMIT)
329            .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
330            .build()
331            .map_err(compile_err)?;
332        Ok(Self {
333            regex,
334            category,
335            label: label.into(),
336            min_length: literal.len(),
337            literal: Some(literal.to_owned()),
338        })
339    }
340
341    /// The category this pattern maps to.
342    #[must_use]
343    pub fn category(&self) -> &Category {
344        &self.category
345    }
346
347    /// The human-readable label.
348    #[must_use]
349    pub fn label(&self) -> &str {
350        &self.label
351    }
352
353    /// Return the raw regex pattern string for RegexSet construction.
354    #[must_use]
355    pub fn regex_pattern(&self) -> &str {
356        self.regex.as_str()
357    }
358}
359
360// ScanPattern is Send + Sync because:
361// - regex::bytes::Regex is Send + Sync
362// - Category is Send + Sync (it's an enum of primitives + CompactString)
363// - String is Send + Sync
364
365// ---------------------------------------------------------------------------
366// Internal: raw match descriptor
367// ---------------------------------------------------------------------------
368
369/// A single match found during scanning (internal).
370#[derive(Debug, Clone, Copy)]
371struct RawMatch {
372    /// Start byte offset within the scan window.
373    start: usize,
374    /// End byte offset (exclusive) within the scan window.
375    end: usize,
376    /// Index into the `StreamScanner::patterns` vector.
377    pattern_idx: usize,
378    /// Byte range of capture group 1 within the window, if the pattern has one.
379    /// When present, only this sub-range is replaced; the bytes between
380    /// `start..capture_start` and `capture_end..end` are emitted verbatim,
381    /// preserving surrounding context (delimiters, key names, prefixes).
382    capture: Option<(usize, usize)>,
383}
384
385// ---------------------------------------------------------------------------
386// Per-scan scratch buffers
387// ---------------------------------------------------------------------------
388
389/// Scratch buffers reused across chunks within a single scan call.
390///
391/// Allocating these once per `scan_reader_with_progress` invocation
392/// and reusing them each chunk eliminates the per-chunk heap pressure
393/// that would otherwise come from `Vec` allocations in `find_matches`
394/// and `apply_replacements`.
395struct ScanScratch {
396    /// Accumulates raw matches from all patterns before deduplication.
397    all_matches: Vec<RawMatch>,
398    /// Non-overlapping matches selected for the current window
399    /// (populated by `find_matches`, consumed by `apply_replacements`).
400    selected: Vec<RawMatch>,
401    /// Output bytes for the committed region, written by `apply_replacements`.
402    output: Vec<u8>,
403    /// Per-pattern match counts indexed by `pattern_idx`.
404    /// Reset to zero after each chunk's counts are folded into `ScanStats`.
405    pattern_counts: Vec<u64>,
406}
407
408impl ScanScratch {
409    fn new(pattern_count: usize, chunk_size: usize, overlap_size: usize) -> Self {
410        Self {
411            all_matches: Vec::with_capacity(64),
412            selected: Vec::with_capacity(64),
413            output: Vec::with_capacity(chunk_size + overlap_size),
414            pattern_counts: vec![0u64; pattern_count],
415        }
416    }
417}
418
419// ---------------------------------------------------------------------------
420// Scan statistics
421// ---------------------------------------------------------------------------
422
423/// The file-level position of a single scanner match.
424///
425/// Emitted via the `on_match` callback in
426/// [`StreamScanner::scan_reader_with_callbacks`]. Line numbers are
427/// 1-based and count `\n` bytes only (Unix line endings). For files with
428/// Windows line endings (`\r\n`), `line` is still correct because `\n` is
429/// the canonical line separator — `\r` bytes do not affect the count.
430///
431/// `byte_offset` is the absolute byte position of the first byte of the
432/// matched region within the file (0-based). Both fields refer to the
433/// *input* file, not the sanitized output.
434#[derive(Debug, Clone, Serialize)]
435pub struct MatchLocation {
436    /// 1-based line number of the match within the file.
437    pub line: u64,
438    /// 0-based byte offset of the match start within the file.
439    pub byte_offset: u64,
440    /// Pattern label that triggered this match.
441    pub pattern: String,
442}
443
444/// Statistics collected during a scan operation.
445///
446/// Returned by [`StreamScanner::scan_reader`] and
447/// [`StreamScanner::scan_bytes`] to provide visibility into what
448/// the scanner did.
449#[derive(Debug, Clone, Default, PartialEq)]
450pub struct ScanStats {
451    /// Total bytes read from the input.
452    pub bytes_processed: u64,
453    /// Total bytes written to the output (may differ from `bytes_processed`
454    /// when replacements have different lengths than the originals).
455    pub bytes_output: u64,
456    /// Total number of matches found across all patterns.
457    pub matches_found: u64,
458    /// Total number of replacements applied (always == `matches_found`
459    /// in one-way mode).
460    pub replacements_applied: u64,
461    /// Per-pattern match counts, keyed by pattern label.
462    pub pattern_counts: HashMap<String, u64>,
463}
464
465/// Progress snapshot emitted during streaming scans.
466#[derive(Debug, Clone, Default, Eq, PartialEq)]
467pub struct ScanProgress {
468    /// Total bytes read from the input so far.
469    pub bytes_processed: u64,
470    /// Total bytes written to the output so far.
471    pub bytes_output: u64,
472    /// Total input size when known.
473    pub total_bytes: Option<u64>,
474    /// Total number of matches found so far.
475    pub matches_found: u64,
476    /// Total replacements applied so far.
477    pub replacements_applied: u64,
478}
479
480// ---------------------------------------------------------------------------
481// StreamScanner
482// ---------------------------------------------------------------------------
483
484/// Streaming scanner that detects and replaces sensitive patterns.
485///
486/// Thread-safe: can be shared via `Arc<StreamScanner>` for concurrent
487/// scanning of multiple files. Each call to [`scan_reader`](Self::scan_reader)
488/// is independent and maintains its own chunking state.
489///
490/// # Usage
491///
492/// ```rust
493/// use sanitize_engine::scanner::{StreamScanner, ScanPattern, ScanConfig};
494/// use sanitize_engine::category::Category;
495/// use sanitize_engine::generator::HmacGenerator;
496/// use sanitize_engine::store::MappingStore;
497/// use std::sync::Arc;
498///
499/// // 1. Build the replacement store.
500/// let gen = Arc::new(HmacGenerator::new([42u8; 32]));
501/// let store = Arc::new(MappingStore::new(gen, None));
502///
503/// // 2. Define patterns.
504/// let patterns = vec![
505///     ScanPattern::from_regex(
506///         r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
507///         Category::Email,
508///         "email",
509///     ).unwrap(),
510/// ];
511///
512/// // 3. Create the scanner.
513/// let scanner = StreamScanner::new(patterns, store, ScanConfig::default()).unwrap();
514///
515/// // 4. Scan.
516/// let input = b"Contact alice@corp.com for details.";
517/// let (output, stats) = scanner.scan_bytes(input).unwrap();
518/// assert_eq!(stats.matches_found, 1);
519/// assert!(!output.windows(b"alice@corp.com".len())
520///     .any(|w| w == b"alice@corp.com"));
521/// ```
522pub struct StreamScanner {
523    /// Compiled scan patterns (both literal and regex).
524    patterns: Vec<ScanPattern>,
525    /// Pre-compiled set for fast multi-pattern pre-filtering of **regex**
526    /// (non-literal) patterns only.  `matches()` returns which regex-pattern
527    /// indices matched, avoiding running every individual regex on each chunk
528    /// (R-3 optimisation).
529    regex_set: RegexSet,
530    /// Maps a `RegexSet` index → index into `self.patterns`.
531    /// Only non-literal patterns are in the `RegexSet`.
532    regex_indices: Vec<usize>,
533    /// Aho-Corasick automaton for fast SIMD literal matching.
534    /// `None` when there are no literal patterns.
535    aho_corasick: Option<AhoCorasick>,
536    /// Maps an Aho-Corasick pattern index → index into `self.patterns`.
537    /// Only literal patterns appear here.
538    literal_indices: Vec<usize>,
539    /// Thread-safe dedup replacement store.
540    store: Arc<MappingStore>,
541    /// Scanner configuration.
542    config: ScanConfig,
543}
544
545/// Return type for scanner factory methods that load a secrets file.
546///
547/// Contains `(scanner, warnings, allow_patterns)` where `warnings` are
548/// non-fatal parse errors and `allow_patterns` are raw strings from
549/// `kind: allow` entries.
550type SecretsLoadResult = Result<(StreamScanner, Vec<(usize, SanitizeError)>, Vec<String>)>;
551
552impl StreamScanner {
553    /// Create a new streaming scanner.
554    ///
555    /// # Arguments
556    ///
557    /// - `patterns` — the set of patterns to scan for.
558    /// - `store` — the mapping store for dedup-consistent replacements.
559    /// - `config` — chunking / overlap configuration.
560    ///
561    /// # Errors
562    ///
563    /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
564    /// invalid (e.g. `chunk_size == 0` or `overlap_size >= chunk_size`).
565    pub fn new(
566        patterns: Vec<ScanPattern>,
567        store: Arc<MappingStore>,
568        config: ScanConfig,
569    ) -> Result<Self> {
570        Self::new_with_max_patterns(patterns, store, config, DEFAULT_MAX_PATTERNS)
571    }
572
573    /// Create a new streaming scanner with a custom pattern limit.
574    ///
575    /// This is identical to [`new`](Self::new) but allows overriding the
576    /// default pattern cap (10 000).  Use this
577    /// when you have a legitimate need for more patterns and have
578    /// verified that your system has enough memory for the resulting
579    /// `RegexSet`.
580    ///
581    /// # Errors
582    ///
583    /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
584    /// invalid or the pattern count exceeds `max_patterns`.
585    pub fn new_with_max_patterns(
586        patterns: Vec<ScanPattern>,
587        store: Arc<MappingStore>,
588        config: ScanConfig,
589        max_patterns: usize,
590    ) -> Result<Self> {
591        config.validate()?;
592
593        // F-05 fix: enforce maximum pattern count to bound RegexSet memory.
594        if patterns.len() > max_patterns {
595            return Err(SanitizeError::InvalidConfig(format!(
596                "pattern count ({}) exceeds maximum allowed ({}) — \
597                 RegexSet memory scales linearly with pattern count",
598                patterns.len(),
599                max_patterns
600            )));
601        }
602
603        // Partition patterns into literal (Aho-Corasick) and regex (RegexSet)
604        // so each is matched by the most efficient engine.
605        let mut literal_bytes: Vec<Vec<u8>> = Vec::new();
606        let mut literal_indices: Vec<usize> = Vec::new();
607        let mut regex_strs: Vec<&str> = Vec::new();
608        let mut regex_indices: Vec<usize> = Vec::new();
609
610        for (i, pattern) in patterns.iter().enumerate() {
611            if let Some(lit) = &pattern.literal {
612                literal_bytes.push(lit.as_bytes().to_vec());
613                literal_indices.push(i);
614            } else {
615                regex_strs.push(pattern.regex_pattern());
616                regex_indices.push(i);
617            }
618        }
619
620        // Build Aho-Corasick automaton for literal patterns (SIMD-accelerated,
621        // single O(n) pass over the input per chunk).
622        let aho_corasick = if literal_bytes.is_empty() {
623            None
624        } else {
625            Some(AhoCorasick::new(&literal_bytes).map_err(compile_err)?)
626        };
627
628        // Build RegexSet from non-literal patterns only (R-3 pre-filter).
629        let regex_set = if regex_strs.is_empty() {
630            RegexSetBuilder::new(Vec::<&str>::new())
631                .size_limit(REGEX_SIZE_LIMIT)
632                .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
633                .build()
634                .map_err(compile_err)?
635        } else {
636            RegexSetBuilder::new(&regex_strs)
637                .size_limit((REGEX_SIZE_LIMIT * regex_strs.len().max(1)).min(REGEX_SET_SIZE_CAP))
638                .dfa_size_limit(
639                    (REGEX_DFA_SIZE_LIMIT * regex_strs.len().max(1)).min(REGEX_SET_SIZE_CAP),
640                )
641                .build()
642                .map_err(compile_err)?
643        };
644
645        Ok(Self {
646            patterns,
647            regex_set,
648            regex_indices,
649            aho_corasick,
650            literal_indices,
651            store,
652            config,
653        })
654    }
655
656    /// Create a copy of this scanner extended with additional literal patterns.
657    ///
658    /// Clones the existing pattern set and appends `extra`, then rebuilds
659    /// the internal Aho-Corasick and RegexSet automata. Used by the
660    /// format-preserving structured pass to scan original bytes with
661    /// discovered field-value literals added to the base pattern set.
662    ///
663    /// # Errors
664    ///
665    /// Returns [`SanitizeError`] if automaton construction fails or the
666    /// combined pattern count exceeds the default limit.
667    pub fn with_extra_literals(&self, extra: Vec<ScanPattern>) -> Result<Self> {
668        let mut patterns = self.patterns.clone();
669        patterns.extend(extra);
670        Self::new(patterns, Arc::clone(&self.store), self.config.clone())
671    }
672
673    /// Build a scanner suitable for format-preserving structured-file passes.
674    ///
675    /// Patterns whose labels end with `"_kv"` are excluded from the base set.
676    /// Those patterns match both a key name and its value (e.g. `password: s3cr3t`)
677    /// as a single unit; in a structured pass the key must survive untouched so
678    /// only the discovered field-value literals are safe to replace.
679    ///
680    /// `extra` (the profile-discovered literals) are always included.
681    ///
682    /// # Errors
683    ///
684    /// Returns [`SanitizeError`] if Aho-Corasick or RegexSet construction fails
685    /// or the combined pattern count exceeds the default limit.
686    pub fn for_structured_pass(&self, extra: Vec<ScanPattern>) -> Result<Self> {
687        let mut patterns: Vec<ScanPattern> = self
688            .patterns
689            .iter()
690            .filter(|p| !p.label.ends_with(KV_LABEL_SUFFIX))
691            .cloned()
692            .collect();
693        patterns.extend(extra);
694        Self::new(patterns, Arc::clone(&self.store), self.config.clone())
695    }
696
697    /// Scan a reader and write sanitized output to a writer.
698    ///
699    /// Processes the input in chunks of `config.chunk_size` bytes,
700    /// maintaining an overlap window of `config.overlap_size` bytes to
701    /// catch matches spanning chunk boundaries. All detected matches
702    /// are replaced one-way via the [`MappingStore`].
703    ///
704    /// # Arguments
705    ///
706    /// - `reader` — input source (file, network stream, `&[u8]`, …).
707    /// - `writer` — output sink (file, `Vec<u8>`, …).
708    ///
709    /// # Returns
710    ///
711    /// [`ScanStats`] with counters for bytes processed, matches found, etc.
712    ///
713    /// # Errors
714    ///
715    /// Returns [`SanitizeError`] on I/O failures or if a replacement
716    /// cannot be generated (e.g. store capacity exceeded).
717    pub fn scan_reader<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ScanStats> {
718        self.scan_reader_with_callbacks(reader, writer, None, |_| {}, |_| {})
719    }
720
721    /// Scan a reader and emit progress snapshots after each committed chunk.
722    ///
723    /// `total_bytes` should be provided when the caller knows the full input
724    /// size. When omitted, progress consumers should avoid percentages/ETA.
725    ///
726    /// This is a convenience wrapper around [`scan_reader_with_callbacks`](Self::scan_reader_with_callbacks)
727    /// that discards per-match location information. Use that method directly
728    /// when you need line numbers or byte offsets for individual matches.
729    ///
730    /// # Errors
731    ///
732    /// Returns [`SanitizeError`] on I/O failures or if a replacement
733    /// cannot be generated (e.g. store capacity exceeded).
734    pub fn scan_reader_with_progress<R: Read, W: Write, F>(
735        &self,
736        reader: R,
737        writer: W,
738        total_bytes: Option<u64>,
739        on_progress: F,
740    ) -> Result<ScanStats>
741    where
742        F: FnMut(&ScanProgress),
743    {
744        self.scan_reader_with_callbacks(reader, writer, total_bytes, on_progress, |_| {})
745    }
746
747    /// Scan a reader, emit progress snapshots, and call `on_match` for every
748    /// committed match with its 1-based line number and byte offset.
749    ///
750    /// `on_match` is called synchronously in the scanning thread, once per
751    /// committed match, in document order. The callback receives a
752    /// [`MatchLocation`] describing the pattern label, 1-based line number,
753    /// and 0-based byte offset within the input file. Callers that only need
754    /// aggregate counts (no per-match positions) should prefer
755    /// [`scan_reader_with_progress`](Self::scan_reader_with_progress), which
756    /// skips the per-byte newline counting entirely.
757    ///
758    /// # Performance note
759    ///
760    /// Enabling `on_match` adds an O(committed_bytes_between_matches)
761    /// newline-counting pass inside each chunk. For files with sparse matches
762    /// this overhead is proportional to file size; for dense matches (e.g. one
763    /// secret per line) it is negligible. On 10–15 GiB log files with typical
764    /// match densities the overhead is roughly 10–20 % of total scan time.
765    ///
766    /// # Errors
767    ///
768    /// Returns [`SanitizeError`] on I/O failures or if a replacement
769    /// cannot be generated (e.g. store capacity exceeded).
770    pub fn scan_reader_with_callbacks<R: Read, W: Write, F, M>(
771        &self,
772        mut reader: R,
773        mut writer: W,
774        total_bytes: Option<u64>,
775        mut on_progress: F,
776        mut on_match: M,
777    ) -> Result<ScanStats>
778    where
779        F: FnMut(&ScanProgress),
780        M: FnMut(MatchLocation),
781    {
782        let mut stats = ScanStats::default();
783
784        // Carry buffer: the tail of the previous window that needs
785        // to be re-scanned with the next chunk.
786        let mut carry: Vec<u8> = Vec::new();
787
788        // Read buffer (reused across iterations to avoid re-allocation).
789        let mut read_buf = vec![0u8; self.config.chunk_size];
790
791        // Scan window (reused across iterations — grows to peak size then
792        // stays there, avoiding per-chunk allocation).
793        let mut window: Vec<u8> =
794            Vec::with_capacity(self.config.chunk_size + self.config.overlap_size);
795
796        // Scratch buffers reused every chunk to eliminate per-chunk heap
797        // pressure from match collection, output building, and stats tracking.
798        let mut scratch = ScanScratch::new(
799            self.patterns.len(),
800            self.config.chunk_size,
801            self.config.overlap_size,
802        );
803
804        // Absolute file byte offset of window[0] for this iteration.
805        let mut window_file_offset: u64 = 0;
806        // Cumulative newline count in the file before window[0].
807        let mut newlines_before_window: u64 = 0;
808
809        loop {
810            // Read the next chunk.
811            let bytes_read = read_fully(&mut reader, &mut read_buf)?;
812            let is_eof = bytes_read < read_buf.len();
813
814            // Track only genuinely new bytes (carry was already counted).
815            stats.bytes_processed += bytes_read as u64;
816
817            if bytes_read == 0 && carry.is_empty() {
818                break;
819            }
820
821            // Build the scan window: carry ++ new_data.
822            // Reuse the window buffer to avoid per-chunk allocation.
823            window.clear();
824            window.extend_from_slice(&carry);
825            window.extend_from_slice(&read_buf[..bytes_read]);
826
827            if window.is_empty() {
828                break;
829            }
830
831            // Scan the window: find matches, determine commit point, apply
832            // replacements, and flush the committed region to the writer.
833            // Returns the commit_point so we can slice the carry for next iter.
834            let commit_point = self.process_committed_window(
835                &window,
836                is_eof,
837                &mut scratch,
838                &mut writer,
839                &mut stats,
840                window_file_offset,
841                newlines_before_window,
842                &mut on_match,
843            )?;
844
845            // Advance file-level position counters for the next iteration.
846            // window[commit_point] is where the next window's carry starts,
847            // so that byte is at file offset (window_file_offset + commit_point).
848            newlines_before_window += count_newlines(&window[..commit_point]);
849            window_file_offset += commit_point as u64;
850
851            // Fold per-chunk pattern hit counts into the cumulative stats map,
852            // then emit a progress snapshot to the caller.
853            self.fold_chunk_counts(&mut scratch.pattern_counts, &mut stats);
854            on_progress(&ScanProgress {
855                bytes_processed: stats.bytes_processed,
856                bytes_output: stats.bytes_output,
857                total_bytes,
858                matches_found: stats.matches_found,
859                replacements_applied: stats.replacements_applied,
860            });
861
862            // Update carry for next iteration.
863            if is_eof {
864                carry.clear();
865                break;
866            }
867            carry.clear();
868            carry.extend_from_slice(&window[commit_point..]);
869        }
870
871        Ok(stats)
872    }
873
874    /// Scan one window, apply replacements up to the commit point, and flush
875    /// the result to `writer`. Returns the commit point so the caller can
876    /// slice the carry for the next iteration.
877    #[allow(clippy::too_many_arguments)]
878    fn process_committed_window(
879        &self,
880        window: &[u8],
881        is_eof: bool,
882        scratch: &mut ScanScratch,
883        writer: &mut dyn io::Write,
884        stats: &mut ScanStats,
885        window_file_offset: u64,
886        newlines_before_window: u64,
887        on_match: &mut dyn FnMut(MatchLocation),
888    ) -> Result<usize> {
889        // Find all non-overlapping matches in the window.
890        self.find_matches(window, scratch);
891
892        // Determine how much of the window can be safely committed this iteration.
893        let base_commit = if is_eof {
894            window.len()
895        } else {
896            window.len().saturating_sub(self.config.overlap_size)
897        };
898        let commit_point =
899            self.adjusted_commit_point(&scratch.selected, base_commit, window.len(), is_eof);
900
901        // Build output for the committed region (fills scratch.output).
902        self.apply_replacements(
903            &window[..commit_point],
904            &scratch.selected,
905            stats,
906            &mut scratch.output,
907            &mut scratch.pattern_counts,
908            window_file_offset,
909            newlines_before_window,
910            on_match,
911        )?;
912
913        writer.write_all(&scratch.output)?;
914        stats.bytes_output += scratch.output.len() as u64;
915
916        Ok(commit_point)
917    }
918
919    /// Fold per-chunk pattern hit counts into the cumulative `stats.pattern_counts`
920    /// map, then reset `counts` to zero for the next chunk.
921    ///
922    /// `label.clone()` is called at most once per distinct pattern per chunk,
923    /// not once per match hit, which keeps cost proportional to pattern count.
924    fn fold_chunk_counts(&self, counts: &mut [u64], stats: &mut ScanStats) {
925        for (idx, count) in counts.iter_mut().enumerate() {
926            if *count > 0 {
927                *stats
928                    .pattern_counts
929                    .entry(self.patterns[idx].label.clone())
930                    .or_insert(0) += *count;
931                *count = 0;
932            }
933        }
934    }
935
936    /// Convenience: scan byte slice in-memory and return sanitized output.
937    ///
938    /// Equivalent to `scan_reader(input, Vec::new())` but returns the
939    /// output buffer directly.
940    ///
941    /// # Errors
942    ///
943    /// Returns [`SanitizeError`] if a replacement cannot be generated
944    /// (e.g. store capacity exceeded).
945    pub fn scan_bytes(&self, input: &[u8]) -> Result<(Vec<u8>, ScanStats)> {
946        self.scan_bytes_with_progress(input, |_| {})
947    }
948
949    /// Scan a byte slice in memory and emit progress snapshots.
950    ///
951    /// # Errors
952    ///
953    /// Returns [`SanitizeError`] if a replacement cannot be generated
954    /// (e.g. store capacity exceeded).
955    pub fn scan_bytes_with_progress<F>(
956        &self,
957        input: &[u8],
958        on_progress: F,
959    ) -> Result<(Vec<u8>, ScanStats)>
960    where
961        F: FnMut(&ScanProgress),
962    {
963        let mut output = Vec::with_capacity(input.len());
964        let stats = self.scan_reader_with_callbacks(
965            input,
966            &mut output,
967            Some(input.len() as u64),
968            on_progress,
969            |_| {},
970        )?;
971        Ok((output, stats))
972    }
973
974    // ---- Accessors ----
975
976    /// Access the scanner's configuration.
977    #[must_use]
978    pub fn config(&self) -> &ScanConfig {
979        &self.config
980    }
981
982    /// Access the underlying mapping store.
983    #[must_use]
984    pub fn store(&self) -> &Arc<MappingStore> {
985        &self.store
986    }
987
988    /// Number of patterns registered in this scanner.
989    #[must_use]
990    pub fn pattern_count(&self) -> usize {
991        self.patterns.len()
992    }
993
994    /// Create a scanner from an encrypted secrets file.
995    ///
996    /// Decrypts the file in memory, parses the entries, compiles
997    /// patterns, and returns the scanner ready to scan. Decrypted
998    /// plaintext is scrubbed from memory after parsing.
999    ///
1000    /// # Arguments
1001    ///
1002    /// - `encrypted_bytes` — raw bytes of the `.enc` file.
1003    /// - `password` — user password.
1004    /// - `format` — optional format override for the plaintext.
1005    /// - `store` — mapping store for dedup-consistent replacements.
1006    /// - `config` — chunking / overlap configuration.
1007    /// - `extra_patterns` — additional patterns to merge in.
1008    ///
1009    /// # Returns
1010    ///
1011    /// `(scanner, warnings, allow_patterns)` where `warnings` lists entries
1012    /// that failed to compile (index + error) and `allow_patterns` are the
1013    /// raw strings from `kind: allow` entries — pass these to
1014    /// [`AllowlistMatcher::new`](crate::allowlist::AllowlistMatcher) to
1015    /// suppress replacements for known-safe values.
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns a secrets-related [`SanitizeError`] on decryption failure
1020    /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
1021    pub fn from_encrypted_secrets(
1022        encrypted_bytes: &[u8],
1023        password: &str,
1024        format: Option<crate::secrets::SecretsFormat>,
1025        store: Arc<MappingStore>,
1026        config: ScanConfig,
1027        extra_patterns: Vec<ScanPattern>,
1028    ) -> SecretsLoadResult {
1029        let ((mut patterns, warnings), allow) =
1030            crate::secrets::load_encrypted_secrets(encrypted_bytes, password, format)?;
1031        patterns.extend(extra_patterns);
1032        let scanner = Self::new(patterns, store, config)?;
1033        Ok((scanner, warnings, allow))
1034    }
1035
1036    /// Create a scanner from a plaintext secrets file.
1037    ///
1038    /// Convenience for development / testing without encryption.
1039    ///
1040    /// # Returns
1041    ///
1042    /// `(scanner, warnings, allow_patterns)` where `allow_patterns` are the
1043    /// raw strings from `kind: allow` entries — pass these to
1044    /// [`AllowlistMatcher::new`](crate::allowlist::AllowlistMatcher) to
1045    /// suppress replacements for known-safe values.
1046    ///
1047    /// # Errors
1048    ///
1049    /// Returns a secrets-related [`SanitizeError`] on parse failure
1050    /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
1051    pub fn from_plaintext_secrets(
1052        plaintext: &[u8],
1053        format: Option<crate::secrets::SecretsFormat>,
1054        store: Arc<MappingStore>,
1055        config: ScanConfig,
1056        extra_patterns: Vec<ScanPattern>,
1057    ) -> SecretsLoadResult {
1058        let ((mut patterns, warnings), allow) =
1059            crate::secrets::load_plaintext_secrets(plaintext, format)?;
1060        patterns.extend(extra_patterns);
1061        let scanner = Self::new(patterns, store, config)?;
1062        Ok((scanner, warnings, allow))
1063    }
1064
1065    // ---- Internal helpers ----
1066
1067    /// Find all non-overlapping matches across all patterns.
1068    ///
1069    /// Fills `scratch.selected` with the winning non-overlapping matches
1070    /// for the given `window`.  All three scratch `Vec`s are cleared and
1071    /// repopulated on each call so callers can freely reuse the same
1072    /// `ScanScratch` instance across chunks.
1073    ///
1074    /// ## Strategy
1075    ///
1076    /// 1. **Aho-Corasick** (`aho_corasick`): single O(n) SIMD pass over the
1077    ///    window reporting every occurrence of every literal pattern,
1078    ///    including overlapping ones.  This replaces O(k·n) individual regex
1079    ///    scans for the literal subset.
1080    /// 2. **RegexSet pre-filter** (R-3 optimisation): fast check of which
1081    ///    *non-literal* regex patterns have any match in the window.
1082    /// 3. **Individual regex `find_iter`**: only for regex patterns flagged
1083    ///    by step 2.
1084    /// 4. **Sort + greedy dedup**: all raw matches are sorted by start
1085    ///    (ascending), then length (descending), and a single greedy pass
1086    ///    selects the final non-overlapping set.
1087    fn find_matches(&self, window: &[u8], scratch: &mut ScanScratch) {
1088        scratch.all_matches.clear();
1089        scratch.selected.clear();
1090
1091        // Step 1: Aho-Corasick overlapping scan for all literal patterns.
1092        // find_overlapping_iter reports every match position including
1093        // overlapping ones, so the sort+greedy step below correctly resolves
1094        // ambiguities between literals (e.g. "abc" vs "abcd" at same offset).
1095        // Literals never have capture groups — capture is always None.
1096        if let Some(ac) = &self.aho_corasick {
1097            for mat in ac.find_overlapping_iter(window) {
1098                scratch.all_matches.push(RawMatch {
1099                    start: mat.start(),
1100                    end: mat.end(),
1101                    pattern_idx: self.literal_indices[mat.pattern().as_usize()],
1102                    capture: None,
1103                });
1104            }
1105        }
1106
1107        // Steps 2+3: RegexSet pre-filter then individual scan for non-literal
1108        // patterns.  regex_set only contains non-literal pattern strings, so
1109        // literals are never scanned twice.
1110        // Use captures_iter so that patterns with a capture group 1 record
1111        // the sub-range to replace, while patterns without one fall back to
1112        // replacing the full match.
1113        for rs_idx in self.regex_set.matches(window) {
1114            let pattern_idx = self.regex_indices[rs_idx];
1115            if window.len() < self.patterns[pattern_idx].min_length {
1116                continue;
1117            }
1118            for cap in self.patterns[pattern_idx].regex.captures_iter(window) {
1119                let full = cap.get(0).expect("group 0 always exists");
1120                let capture = cap.get(1).map(|g| (g.start(), g.end()));
1121                scratch.all_matches.push(RawMatch {
1122                    start: full.start(),
1123                    end: full.end(),
1124                    pattern_idx,
1125                    capture,
1126                });
1127            }
1128        }
1129
1130        // Step 4: sort then greedy non-overlapping selection.
1131        // Skip entirely when no matches were found (the common case for
1132        // clean data), avoiding an unnecessary sort of an empty Vec.
1133        if scratch.all_matches.is_empty() {
1134            return;
1135        }
1136
1137        // Primary: start ascending. Secondary: length descending (longer
1138        // match wins when two matches begin at the same position).
1139        scratch.all_matches.sort_unstable_by(|a, b| {
1140            a.start
1141                .cmp(&b.start)
1142                .then_with(|| (b.end - b.start).cmp(&(a.end - a.start)))
1143        });
1144
1145        let mut last_end = 0;
1146        for m in scratch.all_matches.drain(..) {
1147            if m.start >= last_end {
1148                last_end = m.end;
1149                scratch.selected.push(m);
1150            }
1151        }
1152    }
1153
1154    /// Adjust the commit point to avoid splitting a match across the
1155    /// commit / carry boundary.
1156    ///
1157    /// If any match straddles `base_commit` (starts before, ends after),
1158    /// the commit point is moved to after that match so it is emitted
1159    /// in full this iteration.
1160    #[allow(clippy::unused_self)] // keep &self for API consistency with other scanner methods
1161    fn adjusted_commit_point(
1162        &self,
1163        matches: &[RawMatch],
1164        base_commit: usize,
1165        window_len: usize,
1166        is_eof: bool,
1167    ) -> usize {
1168        if is_eof {
1169            return window_len;
1170        }
1171
1172        let mut commit = base_commit;
1173
1174        for m in matches {
1175            if m.start < commit && m.end > commit {
1176                // Match straddles the boundary — extend commit to include it.
1177                commit = m.end;
1178            }
1179        }
1180
1181        // Never exceed window length.
1182        commit.min(window_len)
1183    }
1184
1185    /// Build the output for the committed region by splicing in replacements.
1186    ///
1187    /// Writes into `output_buf` (cleared on entry) and increments
1188    /// `stats.matches_found` / `stats.replacements_applied` for each applied
1189    /// replacement.  Per-pattern hit counts are written to `pattern_counts`
1190    /// (indexed by `pattern_idx`); the caller is responsible for folding
1191    /// these into `ScanStats::pattern_counts` and resetting them.
1192    ///
1193    /// `matches` is the full selected set for the window (may include matches
1194    /// in the carry region beyond `committed`).  Because `adjusted_commit_point`
1195    /// guarantees no match straddles the boundary, any match with
1196    /// `start < committed.len()` also has `end <= committed.len()`.  The
1197    /// loop breaks early once `m.start >= committed.len()` since matches are
1198    /// sorted by start.
1199    ///
1200    /// `window_file_offset` and `newlines_before_window` are used to compute
1201    /// the absolute byte offset and 1-based line number for each committed
1202    /// match, which are delivered to `on_match`. The newline scan is
1203    /// incremental: we scan only the bytes between consecutive matches, not
1204    /// the full committed region.
1205    ///
1206    /// # Note on `from_utf8_lossy`
1207    ///
1208    /// `String::from_utf8_lossy` returns `Cow::Borrowed(&str)` for valid
1209    /// UTF-8 input (the common case for ASCII secrets) — no heap allocation
1210    /// on the hot path.
1211    #[allow(clippy::too_many_arguments)]
1212    fn apply_replacements(
1213        &self,
1214        committed: &[u8],
1215        matches: &[RawMatch],
1216        stats: &mut ScanStats,
1217        output_buf: &mut Vec<u8>,
1218        pattern_counts: &mut [u64],
1219        window_file_offset: u64,
1220        newlines_before_window: u64,
1221        on_match: &mut dyn FnMut(MatchLocation),
1222    ) -> Result<()> {
1223        output_buf.clear();
1224
1225        let mut last_end = 0;
1226        // Running newline count within the committed region, advanced
1227        // incrementally so we only scan the bytes between matches.
1228        let mut newlines_in_committed: u64 = 0;
1229        let mut newline_scan_pos: usize = 0;
1230
1231        for &m in matches {
1232            // Matches are sorted by start; those at or beyond the committed
1233            // region belong to the carry window — stop here.
1234            if m.start >= committed.len() {
1235                break;
1236            }
1237
1238            // Emit bytes before this match verbatim.
1239            output_buf.extend_from_slice(&committed[last_end..m.start]);
1240
1241            // Advance newline counter from previous scan position to match start,
1242            // then emit the match location to the caller.
1243            newlines_in_committed += count_newlines(&committed[newline_scan_pos..m.start]);
1244            newline_scan_pos = m.start;
1245            on_match(MatchLocation {
1246                line: newlines_before_window + newlines_in_committed + 1,
1247                byte_offset: window_file_offset + m.start as u64,
1248                pattern: self.patterns[m.pattern_idx].label.clone(),
1249            });
1250
1251            let pattern = &self.patterns[m.pattern_idx];
1252
1253            if let Some((cap_start, cap_end)) = m.capture {
1254                // Pattern has a capture group: replace only the capture group,
1255                // emitting the surrounding context bytes of the full match verbatim.
1256                // This preserves delimiters, key names, and prefixes that the
1257                // pattern uses as anchors to reduce false positives.
1258                if cap_start < m.start || cap_end > m.end || cap_start > cap_end {
1259                    // Capture bounds outside match bounds — skip rather than panic.
1260                    // This should not happen with correct regex patterns; log it so it
1261                    // surfaces during testing without crashing production runs.
1262                    tracing::warn!(
1263                        pattern = %pattern.label,
1264                        m_start = m.start,
1265                        m_end = m.end,
1266                        cap_start,
1267                        cap_end,
1268                        "capture group bounds outside match bounds — emitting full match unreplaced"
1269                    );
1270                    output_buf.extend_from_slice(&committed[m.start..m.end]);
1271                    last_end = m.end;
1272                    continue;
1273                }
1274                output_buf.extend_from_slice(&committed[m.start..cap_start]);
1275                let secret = String::from_utf8_lossy(&committed[cap_start..cap_end]);
1276                let replacement = self.store.get_or_insert(&pattern.category, &secret)?;
1277                output_buf.extend_from_slice(replacement.as_bytes());
1278                output_buf.extend_from_slice(&committed[cap_end..m.end]);
1279            } else {
1280                // No capture group — replace the full match (e.g. token-prefix
1281                // patterns like `glpat-[...]` where the full match IS the secret).
1282                let matched_text = String::from_utf8_lossy(&committed[m.start..m.end]);
1283                let replacement = self.store.get_or_insert(&pattern.category, &matched_text)?;
1284                output_buf.extend_from_slice(replacement.as_bytes());
1285            }
1286
1287            last_end = m.end;
1288
1289            stats.matches_found += 1;
1290            stats.replacements_applied += 1;
1291            pattern_counts[m.pattern_idx] += 1;
1292        }
1293
1294        // Emit the trailing non-matching tail.
1295        output_buf.extend_from_slice(&committed[last_end..]);
1296
1297        Ok(())
1298    }
1299}
1300
1301// ---------------------------------------------------------------------------
1302// Send + Sync compile-time assertion
1303// ---------------------------------------------------------------------------
1304
1305const _: fn() = || {
1306    fn assert_send<T: Send>() {}
1307    fn assert_sync<T: Sync>() {}
1308    assert_send::<StreamScanner>();
1309    assert_sync::<StreamScanner>();
1310};
1311
1312// ---------------------------------------------------------------------------
1313// I/O helper
1314// ---------------------------------------------------------------------------
1315
1316/// Count the number of `\n` bytes in `data`.
1317///
1318/// Used to advance the cumulative newline counter between consecutive
1319/// match positions so we can compute 1-based line numbers without
1320/// pre-scanning the entire committed region.
1321#[inline]
1322fn count_newlines(data: &[u8]) -> u64 {
1323    bytecount::count(data, b'\n') as u64
1324}
1325
1326/// Read up to `buf.len()` bytes from `reader`, retrying on `Interrupted`.
1327///
1328/// Returns the number of bytes actually read (< `buf.len()` only at EOF).
1329fn read_fully<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
1330    let mut total = 0;
1331    while total < buf.len() {
1332        match reader.read(&mut buf[total..]) {
1333            Ok(0) => break, // EOF
1334            Ok(n) => total += n,
1335            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
1336            Err(e) => return Err(SanitizeError::from(e)),
1337        }
1338    }
1339    Ok(total)
1340}
1341
1342// ---------------------------------------------------------------------------
1343// Unit tests
1344// ---------------------------------------------------------------------------
1345
1346#[cfg(test)]
1347mod tests {
1348    use super::*;
1349    use crate::generator::HmacGenerator;
1350
1351    /// Helper: build a scanner with given patterns and small chunk config.
1352    fn test_scanner(patterns: Vec<ScanPattern>) -> StreamScanner {
1353        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1354        let store = Arc::new(MappingStore::new(gen, None));
1355        StreamScanner::new(
1356            patterns,
1357            store,
1358            ScanConfig {
1359                chunk_size: 64,
1360                overlap_size: 16,
1361            },
1362        )
1363        .unwrap()
1364    }
1365
1366    /// Helper: email pattern.
1367    fn email_pattern() -> ScanPattern {
1368        ScanPattern::from_regex(
1369            r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
1370            Category::Email,
1371            "email",
1372        )
1373        .unwrap()
1374    }
1375
1376    /// Helper: IPv4 pattern.
1377    fn ipv4_pattern() -> ScanPattern {
1378        ScanPattern::from_regex(
1379            r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
1380            Category::IpV4,
1381            "ipv4",
1382        )
1383        .unwrap()
1384    }
1385
1386    // ---- Construction ----
1387
1388    #[test]
1389    fn scanner_creation() {
1390        let scanner = test_scanner(vec![email_pattern()]);
1391        assert_eq!(scanner.pattern_count(), 1);
1392    }
1393
1394    #[test]
1395    fn invalid_config_zero_chunk() {
1396        let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1397        let store = Arc::new(MappingStore::new(gen, None));
1398        let result = StreamScanner::new(vec![], store, ScanConfig::new(0, 0));
1399        assert!(result.is_err());
1400    }
1401
1402    #[test]
1403    fn invalid_config_overlap_ge_chunk() {
1404        let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1405        let store = Arc::new(MappingStore::new(gen, None));
1406        let result = StreamScanner::new(vec![], store, ScanConfig::new(100, 100));
1407        assert!(result.is_err());
1408    }
1409
1410    // ---- Empty / no-match cases ----
1411
1412    #[test]
1413    fn empty_input() {
1414        let scanner = test_scanner(vec![email_pattern()]);
1415        let (output, stats) = scanner.scan_bytes(b"").unwrap();
1416        assert!(output.is_empty());
1417        assert_eq!(stats.matches_found, 0);
1418        assert_eq!(stats.bytes_processed, 0);
1419    }
1420
1421    #[test]
1422    fn no_matches() {
1423        let scanner = test_scanner(vec![email_pattern()]);
1424        let input = b"There are no email addresses here.";
1425        let (output, stats) = scanner.scan_bytes(input).unwrap();
1426        assert_eq!(output, input.as_slice());
1427        assert_eq!(stats.matches_found, 0);
1428    }
1429
1430    // ---- Single match ----
1431
1432    #[test]
1433    fn single_email_replaced() {
1434        let scanner = test_scanner(vec![email_pattern()]);
1435        let input = b"Contact alice@corp.com for help.";
1436        let (output, stats) = scanner.scan_bytes(input).unwrap();
1437        assert_eq!(stats.matches_found, 1);
1438        assert_eq!(stats.replacements_applied, 1);
1439        // Original must not appear in output.
1440        assert!(!output
1441            .windows(b"alice@corp.com".len())
1442            .any(|w| w == b"alice@corp.com"));
1443        // Replacement should contain the @ from the domain-preserving email.
1444        let output_str = String::from_utf8_lossy(&output);
1445        assert!(output_str.contains("@corp.com"));
1446        // Length preserved: output is same total length as input.
1447        assert_eq!(output.len(), input.len(), "length must be preserved");
1448        // Surrounding text preserved.
1449        assert!(output_str.starts_with("Contact "));
1450        assert!(output_str.ends_with(" for help."));
1451    }
1452
1453    // ---- Multiple matches ----
1454
1455    #[test]
1456    fn multiple_emails_replaced() {
1457        let scanner = test_scanner(vec![email_pattern()]);
1458        let input = b"From alice@corp.com to bob@corp.com cc admin@corp.com";
1459        let (output, stats) = scanner.scan_bytes(input).unwrap();
1460        assert_eq!(stats.matches_found, 3);
1461        let out_str = String::from_utf8_lossy(&output);
1462        assert!(!out_str.contains("alice@corp.com"));
1463        assert!(!out_str.contains("bob@corp.com"));
1464        assert!(!out_str.contains("admin@corp.com"));
1465    }
1466
1467    // ---- Same secret gets same replacement ----
1468
1469    #[test]
1470    fn same_secret_same_replacement() {
1471        let scanner = test_scanner(vec![email_pattern()]);
1472        let input = b"First alice@corp.com then alice@corp.com again.";
1473        let (output, stats) = scanner.scan_bytes(input).unwrap();
1474        assert_eq!(stats.matches_found, 2);
1475        let out_str = String::from_utf8_lossy(&output);
1476        // Both occurrences should be replaced with the same value.
1477        // With length-preserving replacements, look for the preserved domain.
1478        let parts: Vec<&str> = out_str.split("@corp.com").collect();
1479        // 3 parts = 2 occurrences of the replacement.
1480        assert_eq!(parts.len(), 3);
1481    }
1482
1483    // ---- Literal pattern ----
1484
1485    #[test]
1486    fn literal_pattern_matched() {
1487        let pat = ScanPattern::from_literal(
1488            "SECRET_API_KEY_12345",
1489            Category::Custom("api_key".into()),
1490            "api_key",
1491        )
1492        .unwrap();
1493        let scanner = test_scanner(vec![pat]);
1494        let input = b"key=SECRET_API_KEY_12345&foo=bar";
1495        let (output, stats) = scanner.scan_bytes(input).unwrap();
1496        assert_eq!(stats.matches_found, 1);
1497        assert!(!output
1498            .windows(b"SECRET_API_KEY_12345".len())
1499            .any(|w| w == b"SECRET_API_KEY_12345"));
1500    }
1501
1502    // ---- Multiple pattern types ----
1503
1504    #[test]
1505    fn multiple_pattern_types() {
1506        let scanner = test_scanner(vec![email_pattern(), ipv4_pattern()]);
1507        let input = b"Server 192.168.1.100 contact admin@server.com";
1508        let (output, stats) = scanner.scan_bytes(input).unwrap();
1509        assert_eq!(stats.matches_found, 2);
1510        let out_str = String::from_utf8_lossy(&output);
1511        assert!(!out_str.contains("192.168.1.100"));
1512        assert!(!out_str.contains("admin@server.com"));
1513        assert_eq!(*stats.pattern_counts.get("email").unwrap(), 1);
1514        assert_eq!(*stats.pattern_counts.get("ipv4").unwrap(), 1);
1515    }
1516
1517    // ---- Chunk boundary: match spans two chunks ----
1518
1519    #[test]
1520    fn match_at_chunk_boundary() {
1521        // Use a very small chunk size so the email straddles a boundary.
1522        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1523        let store = Arc::new(MappingStore::new(gen, None));
1524        let scanner = StreamScanner::new(
1525            vec![email_pattern()],
1526            store,
1527            ScanConfig {
1528                chunk_size: 20, // very small
1529                overlap_size: 16,
1530            },
1531        )
1532        .unwrap();
1533
1534        // Place an email address that will definitely straddle a boundary.
1535        let input = b"AAAAAAAAAAAAAAAA alice@corp.com BBBBBBBBBBBBB";
1536        let (output, stats) = scanner.scan_bytes(input).unwrap();
1537        assert_eq!(stats.matches_found, 1);
1538        let out_str = String::from_utf8_lossy(&output);
1539        assert!(!out_str.contains("alice@corp.com"));
1540        assert!(out_str.contains("@corp.com"), "domain must be preserved");
1541    }
1542
1543    // ---- Large input requiring many chunks ----
1544
1545    #[test]
1546    fn large_input_many_chunks() {
1547        let scanner = test_scanner(vec![email_pattern()]);
1548
1549        // Build a ~2 KiB input with emails sprinkled in.
1550        let mut input = Vec::new();
1551        let filler = b"Lorem ipsum dolor sit amet. ";
1552        for i in 0..20 {
1553            input.extend_from_slice(filler);
1554            let email = format!("user{}@example.com ", i);
1555            input.extend_from_slice(email.as_bytes());
1556        }
1557
1558        let (output, stats) = scanner.scan_bytes(&input).unwrap();
1559        assert_eq!(stats.matches_found, 20);
1560        let out_str = String::from_utf8_lossy(&output);
1561        for i in 0..20 {
1562            let email = format!("user{}@example.com", i);
1563            assert!(!out_str.contains(&email));
1564        }
1565    }
1566
1567    #[test]
1568    fn scan_bytes_with_progress_preserves_output_and_stats() {
1569        let scanner = test_scanner(vec![email_pattern()]);
1570        let input = b"Contact alice@corp.com and bob@corp.com for help.";
1571
1572        let (baseline_output, baseline_stats) = scanner.scan_bytes(input).unwrap();
1573
1574        let mut updates = Vec::new();
1575        let (progress_output, progress_stats) = scanner
1576            .scan_bytes_with_progress(input, |progress| updates.push(progress.clone()))
1577            .unwrap();
1578
1579        assert_eq!(progress_output, baseline_output);
1580        assert_eq!(
1581            progress_stats.bytes_processed,
1582            baseline_stats.bytes_processed
1583        );
1584        assert_eq!(progress_stats.bytes_output, baseline_stats.bytes_output);
1585        assert_eq!(progress_stats.matches_found, baseline_stats.matches_found);
1586        assert_eq!(
1587            progress_stats.replacements_applied,
1588            baseline_stats.replacements_applied
1589        );
1590        assert!(!updates.is_empty());
1591        assert_eq!(updates.last().unwrap().bytes_processed, input.len() as u64);
1592        assert_eq!(
1593            updates.last().unwrap().total_bytes,
1594            Some(input.len() as u64)
1595        );
1596        assert_eq!(updates.last().unwrap().matches_found, 2);
1597    }
1598
1599    #[test]
1600    fn scan_reader_with_progress_reports_multiple_updates_for_multi_chunk_input() {
1601        let scanner = test_scanner(vec![email_pattern()]);
1602        let mut input = Vec::new();
1603        for i in 0..8 {
1604            input.extend_from_slice(b"padding padding padding ");
1605            input.extend_from_slice(format!("user{i}@example.com ").as_bytes());
1606        }
1607
1608        let mut output = Vec::new();
1609        let mut updates = Vec::new();
1610        let stats = scanner
1611            .scan_reader_with_callbacks(
1612                &input[..],
1613                &mut output,
1614                Some(input.len() as u64),
1615                |progress| {
1616                    updates.push(progress.clone());
1617                },
1618                |_| {},
1619            )
1620            .unwrap();
1621
1622        assert!(updates.len() >= 2);
1623        assert_eq!(
1624            updates.last().unwrap().bytes_processed,
1625            stats.bytes_processed
1626        );
1627        assert_eq!(updates.last().unwrap().bytes_output, stats.bytes_output);
1628        assert_eq!(
1629            updates.last().unwrap().total_bytes,
1630            Some(input.len() as u64)
1631        );
1632    }
1633
1634    // ---- Scan via Read/Write interface ----
1635
1636    #[test]
1637    fn scan_reader_writer() {
1638        let scanner = test_scanner(vec![email_pattern()]);
1639        let input = b"hello alice@corp.com world";
1640        let mut output = Vec::new();
1641        let stats = scanner.scan_reader(&input[..], &mut output).unwrap();
1642        assert_eq!(stats.matches_found, 1);
1643        let out_str = String::from_utf8_lossy(&output);
1644        assert!(out_str.contains("@corp.com"), "domain must be preserved");
1645    }
1646
1647    // ---- Pattern compile error ----
1648
1649    #[test]
1650    fn invalid_regex_pattern() {
1651        let result = ScanPattern::from_regex("[invalid(", Category::Email, "bad");
1652        assert!(result.is_err());
1653    }
1654
1655    // ---- Default config ----
1656
1657    #[test]
1658    fn default_config_valid() {
1659        ScanConfig::default().validate().unwrap();
1660    }
1661
1662    // ---- Config edge cases ----
1663
1664    #[test]
1665    fn config_chunk_1_overlap_0() {
1666        // Extreme but valid: 1-byte chunks, no overlap.
1667        // Won't catch multi-byte patterns, but should not crash.
1668        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1669        let store = Arc::new(MappingStore::new(gen, None));
1670        let scanner = StreamScanner::new(vec![], store, ScanConfig::new(1, 0)).unwrap();
1671        let (output, _) = scanner.scan_bytes(b"hello").unwrap();
1672        assert_eq!(output, b"hello");
1673    }
1674
1675    // ---- ScanStats equality (exercises the PartialEq derive) ----
1676
1677    #[test]
1678    fn scan_stats_equality() {
1679        let scanner = test_scanner(vec![email_pattern()]);
1680        let input = b"hello alice@corp.com world";
1681        let (_, stats_a) = scanner.scan_bytes(input).unwrap();
1682        let (_, stats_b) = scanner.scan_bytes(input).unwrap();
1683        // Identical inputs produce identical stats.
1684        assert_eq!(
1685            stats_a, stats_b,
1686            "identical inputs must produce identical stats"
1687        );
1688        // Values are correct — not just equal to each other.
1689        assert_eq!(stats_a.matches_found, 1, "one email in input");
1690        assert_eq!(stats_a.replacements_applied, 1);
1691        assert_eq!(stats_a.bytes_processed, input.len() as u64);
1692        assert_eq!(*stats_a.pattern_counts.get("email").unwrap_or(&0), 1);
1693        // No-match run produces zeroed counters.
1694        let (_, stats_empty) = scanner.scan_bytes(b"no matches here").unwrap();
1695        assert_ne!(stats_a, stats_empty);
1696        assert_eq!(stats_empty.matches_found, 0);
1697        assert_eq!(stats_empty.replacements_applied, 0);
1698    }
1699
1700    // ---- on_match line number and byte offset accuracy ----
1701
1702    #[test]
1703    fn on_match_reports_correct_line_and_byte_offset() {
1704        // alice@corp.com starts after "line one\n" (9 bytes) → byte 9, line 2.
1705        // bob@corp.com starts after "line one\nalice@corp.com\nline three\n"
1706        //   = 9 + 14 + 1 + 10 + 1 = 35 bytes → byte 35, line 4.
1707        let scanner = test_scanner(vec![email_pattern()]);
1708        let input = b"line one\nalice@corp.com\nline three\nbob@corp.com\n";
1709        let mut locations = Vec::new();
1710        let mut output = Vec::new();
1711        scanner
1712            .scan_reader_with_callbacks(
1713                &input[..],
1714                &mut output,
1715                None,
1716                |_| {},
1717                |loc| locations.push(loc),
1718            )
1719            .unwrap();
1720        assert_eq!(locations.len(), 2);
1721        assert_eq!(locations[0].line, 2, "alice must be on line 2");
1722        assert_eq!(locations[0].byte_offset, 9, "alice must start at byte 9");
1723        assert_eq!(locations[1].line, 4, "bob must be on line 4");
1724        assert_eq!(locations[1].byte_offset, 35, "bob must start at byte 35");
1725    }
1726
1727    // ---- Cross-chunk newline accumulation ----
1728
1729    #[test]
1730    fn on_match_line_numbers_stable_across_chunk_sizes() {
1731        // alice@corp.com starts after "line one\n" (9 bytes) → byte 9, line 2.
1732        // bob@corp.com starts after "line one\nalice@corp.com\nline three\n"
1733        //   = 9 + 14 + 1 + 10 + 1 = 35 bytes → byte 35, line 4.
1734        // Running the same input through different chunk sizes exercises
1735        // newlines_before_window accumulation across chunk boundaries.
1736        let input = b"line one\nalice@corp.com\nline three\nbob@corp.com\n";
1737        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1738        let store = Arc::new(MappingStore::new(gen, None));
1739
1740        for chunk_size in [16usize, 20, 24, 32, 64] {
1741            let scanner = StreamScanner::new(
1742                vec![email_pattern()],
1743                Arc::clone(&store),
1744                ScanConfig::new(chunk_size, 14),
1745            )
1746            .unwrap();
1747
1748            let mut locations = Vec::new();
1749            let mut output = Vec::new();
1750            scanner
1751                .scan_reader_with_callbacks(
1752                    &input[..],
1753                    &mut output,
1754                    None,
1755                    |_| {},
1756                    |loc| locations.push(loc),
1757                )
1758                .unwrap();
1759
1760            assert_eq!(
1761                locations.len(),
1762                2,
1763                "chunk_size={chunk_size}: expected 2 matches"
1764            );
1765            assert_eq!(
1766                locations[0].line, 2,
1767                "chunk_size={chunk_size}: alice must be on line 2"
1768            );
1769            assert_eq!(
1770                locations[0].byte_offset, 9,
1771                "chunk_size={chunk_size}: alice must start at byte 9"
1772            );
1773            assert_eq!(
1774                locations[1].line, 4,
1775                "chunk_size={chunk_size}: bob must be on line 4"
1776            );
1777            assert_eq!(
1778                locations[1].byte_offset, 35,
1779                "chunk_size={chunk_size}: bob must start at byte 35"
1780            );
1781        }
1782    }
1783
1784    // ---- Bytes output tracking ----
1785
1786    #[test]
1787    fn bytes_output_preserved_on_replacement() {
1788        let scanner = test_scanner(vec![email_pattern()]);
1789        let input = b"a@b.cc"; // short email
1790        let (output, stats) = scanner.scan_bytes(input).unwrap();
1791        assert_eq!(stats.bytes_processed, input.len() as u64);
1792        assert_eq!(stats.bytes_output, output.len() as u64);
1793        // Length-preserving: output length matches input length.
1794        assert_eq!(output.len(), input.len());
1795    }
1796}