sanitize_engine/scanner.rs
1//! Streaming scanner for detecting and replacing sensitive data.
2//!
3//! # Architecture
4//!
5//! The streaming scanner processes input data in configurable chunks,
6//! detecting secret patterns (regex or literal) and applying one-way
7//! replacements via the [`MappingStore`].
8//! This design supports files of 20–100 GB+ without requiring the entire
9//! content to fit in memory.
10//!
11//! ```text
12//! ┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐
13//! │ Input (Read) │ ──▶ │ StreamScanner │ ──▶ │ Output (Write) │
14//! │ (chunked) │ │ (pattern match │ │ (sanitized) │
15//! └──────────────┘ │ + replace) │ └──────────────────┘
16//! └────────┬────────┘
17//! │
18//! ┌────────▼────────┐
19//! │ MappingStore │
20//! │ (dedup cache) │
21//! └─────────────────┘
22//! ```
23//!
24//! # Chunk Overlap Strategy
25//!
26//! To avoid missing matches that span chunk boundaries, the scanner
27//! maintains an overlap window between consecutive chunks:
28//!
29//! 1. Read `chunk_size` bytes of new data.
30//! 2. Prepend the `carry` buffer (tail of previous window).
31//! 3. Scan the combined `window` for all pattern matches.
32//! 4. Compute `commit_point = window.len() - overlap_size` (adjusted
33//! upward if a match straddles the boundary).
34//! 5. Emit output for `window[..commit_point]` with replacements applied.
35//! 6. Set `carry = window[commit_point..]` for the next iteration.
36//!
37//! The `overlap_size` should be ≥ the maximum expected match length to
38//! guarantee no matches are missed at boundaries.
39//!
40//! # Thread Safety
41//!
42//! [`StreamScanner`] is `Send + Sync`. Multiple files can be scanned
43//! concurrently using a shared `Arc<StreamScanner>`, all backed by the
44//! same [`MappingStore`] for per-run dedup
45//! consistency.
46//!
47//! # Performance
48//!
49//! - **Chunk-based I/O**: only `chunk_size + overlap_size` bytes in
50//! memory per active scan.
51//! - **Compiled regex**: patterns are compiled once at construction and
52//! reused across all chunks and files.
53//! - **Lock-free reads**: the `DashMap` inside `MappingStore` provides
54//! lock-free reads for already-seen values.
55//! - **File-level parallelism**: share `Arc<StreamScanner>` across
56//! threads to scan multiple files concurrently.
57
58use crate::category::Category;
59use crate::error::{Result, SanitizeError};
60use crate::store::MappingStore;
61use aho_corasick::AhoCorasick;
62use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder};
63use serde::Serialize;
64use std::collections::HashMap;
65use std::io::{self, Read, Write};
66use std::sync::Arc;
67
68// ---------------------------------------------------------------------------
69// Configuration
70// ---------------------------------------------------------------------------
71
72/// Default chunk size: 1 MiB.
73const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
74
75/// Default overlap size: 4 KiB.
76const DEFAULT_OVERLAP_SIZE: usize = 4096;
77
78/// Maximum compiled regex automaton size (bytes). Prevents DoS via
79/// pathologically complex user-supplied patterns.
80const REGEX_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
81
82/// Maximum DFA cache size (bytes) per regex.
83const REGEX_DFA_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
84
85/// Hard ceiling on the combined RegexSet automaton budget.
86/// The per-pattern limit is multiplied by the pattern count so that a large
87/// pattern set can still compile, but without this cap a pathological secrets
88/// file with 10 000 patterns could claim up to ~20 GiB of automaton memory.
89const REGEX_SET_SIZE_CAP: usize = 256 * 1024 * 1024; // 256 MiB
90
91/// Maximum number of patterns allowed in a single scanner (F-05 fix).
92/// The `RegexSet` automaton memory scales linearly with pattern count.
93/// With 1 MiB size/DFA limits per pattern, 10 000 patterns could
94/// allocate up to ~20 GiB of automaton memory. This cap prevents
95/// accidental resource exhaustion. Override via
96/// [`StreamScanner::new_with_max_patterns`] if needed.
97const DEFAULT_MAX_PATTERNS: usize = 10_000;
98
99/// Label suffix that marks patterns as key-value-only.
100///
101/// Patterns whose label ends with this suffix are excluded from the streaming
102/// scanner pass (`for_structured_pass`) because the key-value processor
103/// resolves their values structurally and the scanner would produce spurious
104/// duplicate replacements on the surrounding syntax.
105pub const KV_LABEL_SUFFIX: &str = "_kv";
106
107/// Configuration for the streaming scanner.
108///
109/// # Tuning Guide
110///
111/// | Workload | `chunk_size` | `overlap_size` |
112/// |------------------------|--------------|----------------|
113/// | Small files (< 10 MB) | 256 KiB | 1 KiB |
114/// | General purpose | 1 MiB | 4 KiB |
115/// | Large files (> 1 GB) | 4–8 MiB | 8 KiB |
116/// | Memory-constrained | 64 KiB | 1 KiB |
117///
118/// `overlap_size` should be ≥ the longest expected match. Most secret
119/// patterns (API keys, emails, SSNs) are well under 256 bytes, so the
120/// 4 KiB default provides ample margin.
121#[derive(Debug, Clone)]
122pub struct ScanConfig {
123 /// Size of each chunk read from the input (bytes).
124 ///
125 /// Larger chunks improve throughput (fewer syscalls) but use more
126 /// memory. Default: 1 MiB.
127 pub chunk_size: usize,
128
129 /// Overlap between consecutive chunks (bytes).
130 ///
131 /// Must be ≥ the maximum expected match length. Patterns whose
132 /// matches can exceed this length risk being missed at chunk
133 /// boundaries. Default: 4 KiB.
134 pub overlap_size: usize,
135}
136
137impl Default for ScanConfig {
138 fn default() -> Self {
139 Self {
140 chunk_size: DEFAULT_CHUNK_SIZE,
141 overlap_size: DEFAULT_OVERLAP_SIZE,
142 }
143 }
144}
145
146impl ScanConfig {
147 /// Create a new configuration with explicit values.
148 #[must_use]
149 pub fn new(chunk_size: usize, overlap_size: usize) -> Self {
150 Self {
151 chunk_size,
152 overlap_size,
153 }
154 }
155
156 /// Validate the configuration, returning an error if invalid.
157 ///
158 /// # Errors
159 ///
160 /// Returns [`SanitizeError::InvalidConfig`] if `chunk_size` is zero
161 /// or `overlap_size >= chunk_size`.
162 pub fn validate(&self) -> Result<()> {
163 if self.chunk_size == 0 {
164 return Err(SanitizeError::InvalidConfig(
165 "chunk_size must be > 0".into(),
166 ));
167 }
168 if self.overlap_size >= self.chunk_size {
169 return Err(SanitizeError::InvalidConfig(
170 "overlap_size must be < chunk_size".into(),
171 ));
172 }
173 Ok(())
174 }
175}
176
177// ---------------------------------------------------------------------------
178// Internal helpers
179// ---------------------------------------------------------------------------
180
181/// Convert any compile-time pattern error into [`SanitizeError::PatternCompileError`].
182#[inline]
183fn compile_err(e: impl std::fmt::Display) -> SanitizeError {
184 SanitizeError::PatternCompileError(e.to_string())
185}
186
187// ---------------------------------------------------------------------------
188// Scan pattern
189// ---------------------------------------------------------------------------
190
191/// A pattern rule defining what to scan for and how to categorize matches.
192///
193/// Wraps a compiled [`regex::bytes::Regex`] with a [`Category`] for
194/// replacement lookups and a human-readable label for reporting.
195///
196/// Both regex and literal patterns are supported. Literal patterns keep
197/// their original text and are matched by the scanner's Aho-Corasick
198/// automaton for fast multi-literal scanning.
199pub struct ScanPattern {
200 /// Compiled regex matcher (used for non-literal patterns and as a
201 /// fallback; literal patterns are matched via Aho-Corasick instead).
202 regex: Regex,
203 /// Category for replacement lookups.
204 category: Category,
205 /// Human-readable label for reporting / stats.
206 label: String,
207 /// Original (unescaped) literal string when created via `from_literal`.
208 /// `None` for patterns created via `from_regex`.
209 /// Stored so `StreamScanner` can build an Aho-Corasick automaton for
210 /// fast SIMD literal matching instead of running the regex engine.
211 literal: Option<String>,
212 /// Minimum window size (bytes) required to produce a match.
213 /// For literal patterns this equals the byte length of the literal itself.
214 /// For regex patterns this is `0` (no guaranteed minimum).
215 /// Used to skip `captures_iter` when the window is provably too short.
216 pub min_length: usize,
217}
218
219impl std::fmt::Debug for ScanPattern {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("ScanPattern")
222 .field("pattern", &self.regex.as_str())
223 .field("category", &self.category)
224 .field("label", &self.label)
225 .field("literal", &self.literal.as_deref())
226 .field("min_length", &self.min_length)
227 .finish()
228 }
229}
230
231impl Clone for ScanPattern {
232 fn clone(&self) -> Self {
233 Self {
234 regex: self.regex.clone(),
235 category: self.category.clone(),
236 label: self.label.clone(),
237 literal: self.literal.clone(),
238 min_length: self.min_length,
239 }
240 }
241}
242
243impl ScanPattern {
244 /// Create a pattern from a regex string.
245 ///
246 /// ## Capture group 1 — partial replacement
247 ///
248 /// If the regex contains a capture group 1 (`(...)`), only the bytes
249 /// matched by that group are replaced; the bytes before and after it
250 /// within the full match are emitted verbatim. This lets you write
251 /// context-anchored patterns without redacting the prefix/suffix:
252 ///
253 /// ```text
254 /// pattern: glpat-([A-Za-z0-9_-]{20})
255 /// ^^^^^^ prefix preserved
256 /// ^^^^^^^^^^^^^^^^^^^^ group 1 → replaced
257 /// ```
258 ///
259 /// Patterns **without** a capture group replace the entire match.
260 ///
261 /// # Errors
262 ///
263 /// Returns [`SanitizeError::PatternCompileError`] if the regex is invalid.
264 ///
265 /// # Examples
266 ///
267 /// ```
268 /// use sanitize_engine::scanner::ScanPattern;
269 /// use sanitize_engine::category::Category;
270 ///
271 /// // No capture group — full match replaced:
272 /// let email = ScanPattern::from_regex(
273 /// r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
274 /// Category::Email,
275 /// "email_address",
276 /// ).unwrap();
277 ///
278 /// // Capture group 1 — prefix preserved, only the token value replaced:
279 /// let token = ScanPattern::from_regex(
280 /// r"glpat-([A-Za-z0-9_-]{20})",
281 /// Category::AuthToken,
282 /// "gitlab_pat",
283 /// ).unwrap();
284 /// ```
285 pub fn from_regex(pattern: &str, category: Category, label: impl Into<String>) -> Result<Self> {
286 let regex = RegexBuilder::new(pattern)
287 .size_limit(REGEX_SIZE_LIMIT)
288 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
289 .build()
290 .map_err(compile_err)?;
291 Ok(Self {
292 regex,
293 category,
294 label: label.into(),
295 literal: None,
296 min_length: 0,
297 })
298 }
299
300 /// Create a pattern from a literal string.
301 ///
302 /// The literal is escaped so that regex metacharacters are matched
303 /// verbatim.
304 ///
305 /// # Errors
306 ///
307 /// Returns [`SanitizeError::PatternCompileError`] if regex compilation fails.
308 ///
309 /// # Examples
310 ///
311 /// ```
312 /// use sanitize_engine::scanner::ScanPattern;
313 /// use sanitize_engine::category::Category;
314 ///
315 /// let pat = ScanPattern::from_literal(
316 /// "sk-proj-abc123secret",
317 /// Category::Custom("api_key".into()),
318 /// "openai_key",
319 /// ).unwrap();
320 /// ```
321 pub fn from_literal(
322 literal: &str,
323 category: Category,
324 label: impl Into<String>,
325 ) -> Result<Self> {
326 let escaped = regex::escape(literal);
327 let regex = RegexBuilder::new(&escaped)
328 .size_limit(REGEX_SIZE_LIMIT)
329 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
330 .build()
331 .map_err(compile_err)?;
332 Ok(Self {
333 regex,
334 category,
335 label: label.into(),
336 min_length: literal.len(),
337 literal: Some(literal.to_owned()),
338 })
339 }
340
341 /// The category this pattern maps to.
342 #[must_use]
343 pub fn category(&self) -> &Category {
344 &self.category
345 }
346
347 /// The human-readable label.
348 #[must_use]
349 pub fn label(&self) -> &str {
350 &self.label
351 }
352
353 /// Return the raw regex pattern string for RegexSet construction.
354 #[must_use]
355 pub fn regex_pattern(&self) -> &str {
356 self.regex.as_str()
357 }
358}
359
360// ScanPattern is Send + Sync because:
361// - regex::bytes::Regex is Send + Sync
362// - Category is Send + Sync (it's an enum of primitives + CompactString)
363// - String is Send + Sync
364
365// ---------------------------------------------------------------------------
366// Internal: raw match descriptor
367// ---------------------------------------------------------------------------
368
369/// A single match found during scanning (internal).
370#[derive(Debug, Clone, Copy)]
371struct RawMatch {
372 /// Start byte offset within the scan window.
373 start: usize,
374 /// End byte offset (exclusive) within the scan window.
375 end: usize,
376 /// Index into the `StreamScanner::patterns` vector.
377 pattern_idx: usize,
378 /// Byte range of capture group 1 within the window, if the pattern has one.
379 /// When present, only this sub-range is replaced; the bytes between
380 /// `start..capture_start` and `capture_end..end` are emitted verbatim,
381 /// preserving surrounding context (delimiters, key names, prefixes).
382 capture: Option<(usize, usize)>,
383}
384
385// ---------------------------------------------------------------------------
386// Per-scan scratch buffers
387// ---------------------------------------------------------------------------
388
389/// Scratch buffers reused across chunks within a single scan call.
390///
391/// Allocating these once per `scan_reader_with_progress` invocation
392/// and reusing them each chunk eliminates the per-chunk heap pressure
393/// that would otherwise come from `Vec` allocations in `find_matches`
394/// and `apply_replacements`.
395struct ScanScratch {
396 /// Accumulates raw matches from all patterns before deduplication.
397 all_matches: Vec<RawMatch>,
398 /// Non-overlapping matches selected for the current window
399 /// (populated by `find_matches`, consumed by `apply_replacements`).
400 selected: Vec<RawMatch>,
401 /// Output bytes for the committed region, written by `apply_replacements`.
402 output: Vec<u8>,
403 /// Per-pattern match counts indexed by `pattern_idx`.
404 /// Reset to zero after each chunk's counts are folded into `ScanStats`.
405 pattern_counts: Vec<u64>,
406}
407
408impl ScanScratch {
409 fn new(pattern_count: usize, chunk_size: usize, overlap_size: usize) -> Self {
410 Self {
411 all_matches: Vec::with_capacity(64),
412 selected: Vec::with_capacity(64),
413 output: Vec::with_capacity(chunk_size + overlap_size),
414 pattern_counts: vec![0u64; pattern_count],
415 }
416 }
417}
418
419// ---------------------------------------------------------------------------
420// Scan statistics
421// ---------------------------------------------------------------------------
422
423/// The file-level position of a single scanner match.
424///
425/// Emitted via the `on_match` callback in
426/// [`StreamScanner::scan_reader_with_callbacks`]. Line numbers are
427/// 1-based and count `\n` bytes only (Unix line endings). For files with
428/// Windows line endings (`\r\n`), `line` is still correct because `\n` is
429/// the canonical line separator — `\r` bytes do not affect the count.
430///
431/// `byte_offset` is the absolute byte position of the first byte of the
432/// matched region within the file (0-based). Both fields refer to the
433/// *input* file, not the sanitized output.
434#[derive(Debug, Clone, Serialize)]
435pub struct MatchLocation {
436 /// 1-based line number of the match within the file.
437 pub line: u64,
438 /// 0-based byte offset of the match start within the file.
439 pub byte_offset: u64,
440 /// Pattern label that triggered this match.
441 pub pattern: String,
442}
443
444/// Statistics collected during a scan operation.
445///
446/// Returned by [`StreamScanner::scan_reader`] and
447/// [`StreamScanner::scan_bytes`] to provide visibility into what
448/// the scanner did.
449#[derive(Debug, Clone, Default, PartialEq)]
450pub struct ScanStats {
451 /// Total bytes read from the input.
452 pub bytes_processed: u64,
453 /// Total bytes written to the output (may differ from `bytes_processed`
454 /// when replacements have different lengths than the originals).
455 pub bytes_output: u64,
456 /// Total number of matches found across all patterns.
457 pub matches_found: u64,
458 /// Total number of replacements applied (always == `matches_found`
459 /// in one-way mode).
460 pub replacements_applied: u64,
461 /// Per-pattern match counts, keyed by pattern label.
462 pub pattern_counts: HashMap<String, u64>,
463}
464
465/// Progress snapshot emitted during streaming scans.
466#[derive(Debug, Clone, Default, Eq, PartialEq)]
467pub struct ScanProgress {
468 /// Total bytes read from the input so far.
469 pub bytes_processed: u64,
470 /// Total bytes written to the output so far.
471 pub bytes_output: u64,
472 /// Total input size when known.
473 pub total_bytes: Option<u64>,
474 /// Total number of matches found so far.
475 pub matches_found: u64,
476 /// Total replacements applied so far.
477 pub replacements_applied: u64,
478}
479
480// ---------------------------------------------------------------------------
481// StreamScanner
482// ---------------------------------------------------------------------------
483
484/// Streaming scanner that detects and replaces sensitive patterns.
485///
486/// Thread-safe: can be shared via `Arc<StreamScanner>` for concurrent
487/// scanning of multiple files. Each call to [`scan_reader`](Self::scan_reader)
488/// is independent and maintains its own chunking state.
489///
490/// # Usage
491///
492/// ```rust
493/// use sanitize_engine::scanner::{StreamScanner, ScanPattern, ScanConfig};
494/// use sanitize_engine::category::Category;
495/// use sanitize_engine::generator::HmacGenerator;
496/// use sanitize_engine::store::MappingStore;
497/// use std::sync::Arc;
498///
499/// // 1. Build the replacement store.
500/// let gen = Arc::new(HmacGenerator::new([42u8; 32]));
501/// let store = Arc::new(MappingStore::new(gen, None));
502///
503/// // 2. Define patterns.
504/// let patterns = vec![
505/// ScanPattern::from_regex(
506/// r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
507/// Category::Email,
508/// "email",
509/// ).unwrap(),
510/// ];
511///
512/// // 3. Create the scanner.
513/// let scanner = StreamScanner::new(patterns, store, ScanConfig::default()).unwrap();
514///
515/// // 4. Scan.
516/// let input = b"Contact alice@corp.com for details.";
517/// let (output, stats) = scanner.scan_bytes(input).unwrap();
518/// assert_eq!(stats.matches_found, 1);
519/// assert!(!output.windows(b"alice@corp.com".len())
520/// .any(|w| w == b"alice@corp.com"));
521/// ```
522pub struct StreamScanner {
523 /// Compiled scan patterns (both literal and regex).
524 patterns: Vec<ScanPattern>,
525 /// Pre-compiled set for fast multi-pattern pre-filtering of **regex**
526 /// (non-literal) patterns only. `matches()` returns which regex-pattern
527 /// indices matched, avoiding running every individual regex on each chunk
528 /// (R-3 optimisation).
529 regex_set: RegexSet,
530 /// Maps a `RegexSet` index → index into `self.patterns`.
531 /// Only non-literal patterns are in the `RegexSet`.
532 regex_indices: Vec<usize>,
533 /// Aho-Corasick automaton for fast SIMD literal matching.
534 /// `None` when there are no literal patterns.
535 aho_corasick: Option<AhoCorasick>,
536 /// Maps an Aho-Corasick pattern index → index into `self.patterns`.
537 /// Only literal patterns appear here.
538 literal_indices: Vec<usize>,
539 /// Thread-safe dedup replacement store.
540 store: Arc<MappingStore>,
541 /// Scanner configuration.
542 config: ScanConfig,
543}
544
545/// Return type for scanner factory methods that load a secrets file.
546///
547/// Contains `(scanner, warnings, allow_patterns)` where `warnings` are
548/// non-fatal parse errors and `allow_patterns` are raw strings from
549/// `kind: allow` entries.
550type SecretsLoadResult = Result<(StreamScanner, Vec<(usize, SanitizeError)>, Vec<String>)>;
551
552impl StreamScanner {
553 /// Create a new streaming scanner.
554 ///
555 /// # Arguments
556 ///
557 /// - `patterns` — the set of patterns to scan for.
558 /// - `store` — the mapping store for dedup-consistent replacements.
559 /// - `config` — chunking / overlap configuration.
560 ///
561 /// # Errors
562 ///
563 /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
564 /// invalid (e.g. `chunk_size == 0` or `overlap_size >= chunk_size`).
565 pub fn new(
566 patterns: Vec<ScanPattern>,
567 store: Arc<MappingStore>,
568 config: ScanConfig,
569 ) -> Result<Self> {
570 Self::new_with_max_patterns(patterns, store, config, DEFAULT_MAX_PATTERNS)
571 }
572
573 /// Create a new streaming scanner with a custom pattern limit.
574 ///
575 /// This is identical to [`new`](Self::new) but allows overriding the
576 /// default pattern cap (10 000). Use this
577 /// when you have a legitimate need for more patterns and have
578 /// verified that your system has enough memory for the resulting
579 /// `RegexSet`.
580 ///
581 /// # Errors
582 ///
583 /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
584 /// invalid or the pattern count exceeds `max_patterns`.
585 pub fn new_with_max_patterns(
586 patterns: Vec<ScanPattern>,
587 store: Arc<MappingStore>,
588 config: ScanConfig,
589 max_patterns: usize,
590 ) -> Result<Self> {
591 config.validate()?;
592
593 // F-05 fix: enforce maximum pattern count to bound RegexSet memory.
594 if patterns.len() > max_patterns {
595 return Err(SanitizeError::InvalidConfig(format!(
596 "pattern count ({}) exceeds maximum allowed ({}) — \
597 RegexSet memory scales linearly with pattern count",
598 patterns.len(),
599 max_patterns
600 )));
601 }
602
603 // Partition patterns into literal (Aho-Corasick) and regex (RegexSet)
604 // so each is matched by the most efficient engine.
605 let mut literal_bytes: Vec<Vec<u8>> = Vec::new();
606 let mut literal_indices: Vec<usize> = Vec::new();
607 let mut regex_strs: Vec<&str> = Vec::new();
608 let mut regex_indices: Vec<usize> = Vec::new();
609
610 for (i, pattern) in patterns.iter().enumerate() {
611 if let Some(lit) = &pattern.literal {
612 literal_bytes.push(lit.as_bytes().to_vec());
613 literal_indices.push(i);
614 } else {
615 regex_strs.push(pattern.regex_pattern());
616 regex_indices.push(i);
617 }
618 }
619
620 // Build Aho-Corasick automaton for literal patterns (SIMD-accelerated,
621 // single O(n) pass over the input per chunk).
622 let aho_corasick = if literal_bytes.is_empty() {
623 None
624 } else {
625 Some(AhoCorasick::new(&literal_bytes).map_err(compile_err)?)
626 };
627
628 // Build RegexSet from non-literal patterns only (R-3 pre-filter).
629 let regex_set = if regex_strs.is_empty() {
630 RegexSetBuilder::new(Vec::<&str>::new())
631 .size_limit(REGEX_SIZE_LIMIT)
632 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
633 .build()
634 .map_err(compile_err)?
635 } else {
636 RegexSetBuilder::new(®ex_strs)
637 .size_limit((REGEX_SIZE_LIMIT * regex_strs.len().max(1)).min(REGEX_SET_SIZE_CAP))
638 .dfa_size_limit(
639 (REGEX_DFA_SIZE_LIMIT * regex_strs.len().max(1)).min(REGEX_SET_SIZE_CAP),
640 )
641 .build()
642 .map_err(compile_err)?
643 };
644
645 Ok(Self {
646 patterns,
647 regex_set,
648 regex_indices,
649 aho_corasick,
650 literal_indices,
651 store,
652 config,
653 })
654 }
655
656 /// Create a copy of this scanner extended with additional literal patterns.
657 ///
658 /// Clones the existing pattern set and appends `extra`, then rebuilds
659 /// the internal Aho-Corasick and RegexSet automata. Used by the
660 /// format-preserving structured pass to scan original bytes with
661 /// discovered field-value literals added to the base pattern set.
662 ///
663 /// # Errors
664 ///
665 /// Returns [`SanitizeError`] if automaton construction fails or the
666 /// combined pattern count exceeds the default limit.
667 pub fn with_extra_literals(&self, extra: Vec<ScanPattern>) -> Result<Self> {
668 let mut patterns = self.patterns.clone();
669 patterns.extend(extra);
670 Self::new(patterns, Arc::clone(&self.store), self.config.clone())
671 }
672
673 /// Build a scanner suitable for format-preserving structured-file passes.
674 ///
675 /// Patterns whose labels end with `"_kv"` are excluded from the base set.
676 /// Those patterns match both a key name and its value (e.g. `password: s3cr3t`)
677 /// as a single unit; in a structured pass the key must survive untouched so
678 /// only the discovered field-value literals are safe to replace.
679 ///
680 /// `extra` (the profile-discovered literals) are always included.
681 ///
682 /// # Errors
683 ///
684 /// Returns [`SanitizeError`] if Aho-Corasick or RegexSet construction fails
685 /// or the combined pattern count exceeds the default limit.
686 pub fn for_structured_pass(&self, extra: Vec<ScanPattern>) -> Result<Self> {
687 let mut patterns: Vec<ScanPattern> = self
688 .patterns
689 .iter()
690 .filter(|p| !p.label.ends_with(KV_LABEL_SUFFIX))
691 .cloned()
692 .collect();
693 patterns.extend(extra);
694 Self::new(patterns, Arc::clone(&self.store), self.config.clone())
695 }
696
697 /// Scan a reader and write sanitized output to a writer.
698 ///
699 /// Processes the input in chunks of `config.chunk_size` bytes,
700 /// maintaining an overlap window of `config.overlap_size` bytes to
701 /// catch matches spanning chunk boundaries. All detected matches
702 /// are replaced one-way via the [`MappingStore`].
703 ///
704 /// # Arguments
705 ///
706 /// - `reader` — input source (file, network stream, `&[u8]`, …).
707 /// - `writer` — output sink (file, `Vec<u8>`, …).
708 ///
709 /// # Returns
710 ///
711 /// [`ScanStats`] with counters for bytes processed, matches found, etc.
712 ///
713 /// # Errors
714 ///
715 /// Returns [`SanitizeError`] on I/O failures or if a replacement
716 /// cannot be generated (e.g. store capacity exceeded).
717 pub fn scan_reader<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ScanStats> {
718 self.scan_reader_with_callbacks(reader, writer, None, |_| {}, |_| {})
719 }
720
721 /// Scan a reader and emit progress snapshots after each committed chunk.
722 ///
723 /// `total_bytes` should be provided when the caller knows the full input
724 /// size. When omitted, progress consumers should avoid percentages/ETA.
725 ///
726 /// This is a convenience wrapper around [`scan_reader_with_callbacks`](Self::scan_reader_with_callbacks)
727 /// that discards per-match location information. Use that method directly
728 /// when you need line numbers or byte offsets for individual matches.
729 ///
730 /// # Errors
731 ///
732 /// Returns [`SanitizeError`] on I/O failures or if a replacement
733 /// cannot be generated (e.g. store capacity exceeded).
734 pub fn scan_reader_with_progress<R: Read, W: Write, F>(
735 &self,
736 reader: R,
737 writer: W,
738 total_bytes: Option<u64>,
739 on_progress: F,
740 ) -> Result<ScanStats>
741 where
742 F: FnMut(&ScanProgress),
743 {
744 self.scan_reader_with_callbacks(reader, writer, total_bytes, on_progress, |_| {})
745 }
746
747 /// Scan a reader, emit progress snapshots, and call `on_match` for every
748 /// committed match with its 1-based line number and byte offset.
749 ///
750 /// `on_match` is called synchronously in the scanning thread, once per
751 /// committed match, in document order. The callback receives a
752 /// [`MatchLocation`] describing the pattern label, 1-based line number,
753 /// and 0-based byte offset within the input file. Callers that only need
754 /// aggregate counts (no per-match positions) should prefer
755 /// [`scan_reader_with_progress`](Self::scan_reader_with_progress), which
756 /// skips the per-byte newline counting entirely.
757 ///
758 /// # Performance note
759 ///
760 /// Enabling `on_match` adds an O(committed_bytes_between_matches)
761 /// newline-counting pass inside each chunk. For files with sparse matches
762 /// this overhead is proportional to file size; for dense matches (e.g. one
763 /// secret per line) it is negligible. On 10–15 GiB log files with typical
764 /// match densities the overhead is roughly 10–20 % of total scan time.
765 ///
766 /// # Errors
767 ///
768 /// Returns [`SanitizeError`] on I/O failures or if a replacement
769 /// cannot be generated (e.g. store capacity exceeded).
770 pub fn scan_reader_with_callbacks<R: Read, W: Write, F, M>(
771 &self,
772 mut reader: R,
773 mut writer: W,
774 total_bytes: Option<u64>,
775 mut on_progress: F,
776 mut on_match: M,
777 ) -> Result<ScanStats>
778 where
779 F: FnMut(&ScanProgress),
780 M: FnMut(MatchLocation),
781 {
782 let mut stats = ScanStats::default();
783
784 // Carry buffer: the tail of the previous window that needs
785 // to be re-scanned with the next chunk.
786 let mut carry: Vec<u8> = Vec::new();
787
788 // Read buffer (reused across iterations to avoid re-allocation).
789 let mut read_buf = vec![0u8; self.config.chunk_size];
790
791 // Scan window (reused across iterations — grows to peak size then
792 // stays there, avoiding per-chunk allocation).
793 let mut window: Vec<u8> =
794 Vec::with_capacity(self.config.chunk_size + self.config.overlap_size);
795
796 // Scratch buffers reused every chunk to eliminate per-chunk heap
797 // pressure from match collection, output building, and stats tracking.
798 let mut scratch = ScanScratch::new(
799 self.patterns.len(),
800 self.config.chunk_size,
801 self.config.overlap_size,
802 );
803
804 // Absolute file byte offset of window[0] for this iteration.
805 let mut window_file_offset: u64 = 0;
806 // Cumulative newline count in the file before window[0].
807 let mut newlines_before_window: u64 = 0;
808
809 loop {
810 // Read the next chunk.
811 let bytes_read = read_fully(&mut reader, &mut read_buf)?;
812 let is_eof = bytes_read < read_buf.len();
813
814 // Track only genuinely new bytes (carry was already counted).
815 stats.bytes_processed += bytes_read as u64;
816
817 if bytes_read == 0 && carry.is_empty() {
818 break;
819 }
820
821 // Build the scan window: carry ++ new_data.
822 // Reuse the window buffer to avoid per-chunk allocation.
823 window.clear();
824 window.extend_from_slice(&carry);
825 window.extend_from_slice(&read_buf[..bytes_read]);
826
827 if window.is_empty() {
828 break;
829 }
830
831 // Scan the window: find matches, determine commit point, apply
832 // replacements, and flush the committed region to the writer.
833 // Returns the commit_point so we can slice the carry for next iter.
834 let commit_point = self.process_committed_window(
835 &window,
836 is_eof,
837 &mut scratch,
838 &mut writer,
839 &mut stats,
840 window_file_offset,
841 newlines_before_window,
842 &mut on_match,
843 )?;
844
845 // Advance file-level position counters for the next iteration.
846 // window[commit_point] is where the next window's carry starts,
847 // so that byte is at file offset (window_file_offset + commit_point).
848 newlines_before_window += count_newlines(&window[..commit_point]);
849 window_file_offset += commit_point as u64;
850
851 // Fold per-chunk pattern hit counts into the cumulative stats map,
852 // then emit a progress snapshot to the caller.
853 self.fold_chunk_counts(&mut scratch.pattern_counts, &mut stats);
854 on_progress(&ScanProgress {
855 bytes_processed: stats.bytes_processed,
856 bytes_output: stats.bytes_output,
857 total_bytes,
858 matches_found: stats.matches_found,
859 replacements_applied: stats.replacements_applied,
860 });
861
862 // Update carry for next iteration.
863 if is_eof {
864 carry.clear();
865 break;
866 }
867 carry.clear();
868 carry.extend_from_slice(&window[commit_point..]);
869 }
870
871 Ok(stats)
872 }
873
874 /// Scan one window, apply replacements up to the commit point, and flush
875 /// the result to `writer`. Returns the commit point so the caller can
876 /// slice the carry for the next iteration.
877 #[allow(clippy::too_many_arguments)]
878 fn process_committed_window(
879 &self,
880 window: &[u8],
881 is_eof: bool,
882 scratch: &mut ScanScratch,
883 writer: &mut dyn io::Write,
884 stats: &mut ScanStats,
885 window_file_offset: u64,
886 newlines_before_window: u64,
887 on_match: &mut dyn FnMut(MatchLocation),
888 ) -> Result<usize> {
889 // Find all non-overlapping matches in the window.
890 self.find_matches(window, scratch);
891
892 // Determine how much of the window can be safely committed this iteration.
893 let base_commit = if is_eof {
894 window.len()
895 } else {
896 window.len().saturating_sub(self.config.overlap_size)
897 };
898 let commit_point =
899 self.adjusted_commit_point(&scratch.selected, base_commit, window.len(), is_eof);
900
901 // Build output for the committed region (fills scratch.output).
902 self.apply_replacements(
903 &window[..commit_point],
904 &scratch.selected,
905 stats,
906 &mut scratch.output,
907 &mut scratch.pattern_counts,
908 window_file_offset,
909 newlines_before_window,
910 on_match,
911 )?;
912
913 writer.write_all(&scratch.output)?;
914 stats.bytes_output += scratch.output.len() as u64;
915
916 Ok(commit_point)
917 }
918
919 /// Fold per-chunk pattern hit counts into the cumulative `stats.pattern_counts`
920 /// map, then reset `counts` to zero for the next chunk.
921 ///
922 /// `label.clone()` is called at most once per distinct pattern per chunk,
923 /// not once per match hit, which keeps cost proportional to pattern count.
924 fn fold_chunk_counts(&self, counts: &mut [u64], stats: &mut ScanStats) {
925 for (idx, count) in counts.iter_mut().enumerate() {
926 if *count > 0 {
927 *stats
928 .pattern_counts
929 .entry(self.patterns[idx].label.clone())
930 .or_insert(0) += *count;
931 *count = 0;
932 }
933 }
934 }
935
936 /// Convenience: scan byte slice in-memory and return sanitized output.
937 ///
938 /// Equivalent to `scan_reader(input, Vec::new())` but returns the
939 /// output buffer directly.
940 ///
941 /// # Errors
942 ///
943 /// Returns [`SanitizeError`] if a replacement cannot be generated
944 /// (e.g. store capacity exceeded).
945 pub fn scan_bytes(&self, input: &[u8]) -> Result<(Vec<u8>, ScanStats)> {
946 self.scan_bytes_with_progress(input, |_| {})
947 }
948
949 /// Scan a byte slice in memory and emit progress snapshots.
950 ///
951 /// # Errors
952 ///
953 /// Returns [`SanitizeError`] if a replacement cannot be generated
954 /// (e.g. store capacity exceeded).
955 pub fn scan_bytes_with_progress<F>(
956 &self,
957 input: &[u8],
958 on_progress: F,
959 ) -> Result<(Vec<u8>, ScanStats)>
960 where
961 F: FnMut(&ScanProgress),
962 {
963 let mut output = Vec::with_capacity(input.len());
964 let stats = self.scan_reader_with_callbacks(
965 input,
966 &mut output,
967 Some(input.len() as u64),
968 on_progress,
969 |_| {},
970 )?;
971 Ok((output, stats))
972 }
973
974 // ---- Accessors ----
975
976 /// Access the scanner's configuration.
977 #[must_use]
978 pub fn config(&self) -> &ScanConfig {
979 &self.config
980 }
981
982 /// Access the underlying mapping store.
983 #[must_use]
984 pub fn store(&self) -> &Arc<MappingStore> {
985 &self.store
986 }
987
988 /// Number of patterns registered in this scanner.
989 #[must_use]
990 pub fn pattern_count(&self) -> usize {
991 self.patterns.len()
992 }
993
994 /// Create a scanner from an encrypted secrets file.
995 ///
996 /// Decrypts the file in memory, parses the entries, compiles
997 /// patterns, and returns the scanner ready to scan. Decrypted
998 /// plaintext is scrubbed from memory after parsing.
999 ///
1000 /// # Arguments
1001 ///
1002 /// - `encrypted_bytes` — raw bytes of the `.enc` file.
1003 /// - `password` — user password.
1004 /// - `format` — optional format override for the plaintext.
1005 /// - `store` — mapping store for dedup-consistent replacements.
1006 /// - `config` — chunking / overlap configuration.
1007 /// - `extra_patterns` — additional patterns to merge in.
1008 ///
1009 /// # Returns
1010 ///
1011 /// `(scanner, warnings, allow_patterns)` where `warnings` lists entries
1012 /// that failed to compile (index + error) and `allow_patterns` are the
1013 /// raw strings from `kind: allow` entries — pass these to
1014 /// [`AllowlistMatcher::new`](crate::allowlist::AllowlistMatcher) to
1015 /// suppress replacements for known-safe values.
1016 ///
1017 /// # Errors
1018 ///
1019 /// Returns a secrets-related [`SanitizeError`] on decryption failure
1020 /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
1021 pub fn from_encrypted_secrets(
1022 encrypted_bytes: &[u8],
1023 password: &str,
1024 format: Option<crate::secrets::SecretsFormat>,
1025 store: Arc<MappingStore>,
1026 config: ScanConfig,
1027 extra_patterns: Vec<ScanPattern>,
1028 ) -> SecretsLoadResult {
1029 let ((mut patterns, warnings), allow) =
1030 crate::secrets::load_encrypted_secrets(encrypted_bytes, password, format)?;
1031 patterns.extend(extra_patterns);
1032 let scanner = Self::new(patterns, store, config)?;
1033 Ok((scanner, warnings, allow))
1034 }
1035
1036 /// Create a scanner from a plaintext secrets file.
1037 ///
1038 /// Convenience for development / testing without encryption.
1039 ///
1040 /// # Returns
1041 ///
1042 /// `(scanner, warnings, allow_patterns)` where `allow_patterns` are the
1043 /// raw strings from `kind: allow` entries — pass these to
1044 /// [`AllowlistMatcher::new`](crate::allowlist::AllowlistMatcher) to
1045 /// suppress replacements for known-safe values.
1046 ///
1047 /// # Errors
1048 ///
1049 /// Returns a secrets-related [`SanitizeError`] on parse failure
1050 /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
1051 pub fn from_plaintext_secrets(
1052 plaintext: &[u8],
1053 format: Option<crate::secrets::SecretsFormat>,
1054 store: Arc<MappingStore>,
1055 config: ScanConfig,
1056 extra_patterns: Vec<ScanPattern>,
1057 ) -> SecretsLoadResult {
1058 let ((mut patterns, warnings), allow) =
1059 crate::secrets::load_plaintext_secrets(plaintext, format)?;
1060 patterns.extend(extra_patterns);
1061 let scanner = Self::new(patterns, store, config)?;
1062 Ok((scanner, warnings, allow))
1063 }
1064
1065 // ---- Internal helpers ----
1066
1067 /// Find all non-overlapping matches across all patterns.
1068 ///
1069 /// Fills `scratch.selected` with the winning non-overlapping matches
1070 /// for the given `window`. All three scratch `Vec`s are cleared and
1071 /// repopulated on each call so callers can freely reuse the same
1072 /// `ScanScratch` instance across chunks.
1073 ///
1074 /// ## Strategy
1075 ///
1076 /// 1. **Aho-Corasick** (`aho_corasick`): single O(n) SIMD pass over the
1077 /// window reporting every occurrence of every literal pattern,
1078 /// including overlapping ones. This replaces O(k·n) individual regex
1079 /// scans for the literal subset.
1080 /// 2. **RegexSet pre-filter** (R-3 optimisation): fast check of which
1081 /// *non-literal* regex patterns have any match in the window.
1082 /// 3. **Individual regex `find_iter`**: only for regex patterns flagged
1083 /// by step 2.
1084 /// 4. **Sort + greedy dedup**: all raw matches are sorted by start
1085 /// (ascending), then length (descending), and a single greedy pass
1086 /// selects the final non-overlapping set.
1087 fn find_matches(&self, window: &[u8], scratch: &mut ScanScratch) {
1088 scratch.all_matches.clear();
1089 scratch.selected.clear();
1090
1091 // Step 1: Aho-Corasick overlapping scan for all literal patterns.
1092 // find_overlapping_iter reports every match position including
1093 // overlapping ones, so the sort+greedy step below correctly resolves
1094 // ambiguities between literals (e.g. "abc" vs "abcd" at same offset).
1095 // Literals never have capture groups — capture is always None.
1096 if let Some(ac) = &self.aho_corasick {
1097 for mat in ac.find_overlapping_iter(window) {
1098 scratch.all_matches.push(RawMatch {
1099 start: mat.start(),
1100 end: mat.end(),
1101 pattern_idx: self.literal_indices[mat.pattern().as_usize()],
1102 capture: None,
1103 });
1104 }
1105 }
1106
1107 // Steps 2+3: RegexSet pre-filter then individual scan for non-literal
1108 // patterns. regex_set only contains non-literal pattern strings, so
1109 // literals are never scanned twice.
1110 // Use captures_iter so that patterns with a capture group 1 record
1111 // the sub-range to replace, while patterns without one fall back to
1112 // replacing the full match.
1113 for rs_idx in self.regex_set.matches(window) {
1114 let pattern_idx = self.regex_indices[rs_idx];
1115 if window.len() < self.patterns[pattern_idx].min_length {
1116 continue;
1117 }
1118 for cap in self.patterns[pattern_idx].regex.captures_iter(window) {
1119 let full = cap.get(0).expect("group 0 always exists");
1120 let capture = cap.get(1).map(|g| (g.start(), g.end()));
1121 scratch.all_matches.push(RawMatch {
1122 start: full.start(),
1123 end: full.end(),
1124 pattern_idx,
1125 capture,
1126 });
1127 }
1128 }
1129
1130 // Step 4: sort then greedy non-overlapping selection.
1131 // Skip entirely when no matches were found (the common case for
1132 // clean data), avoiding an unnecessary sort of an empty Vec.
1133 if scratch.all_matches.is_empty() {
1134 return;
1135 }
1136
1137 // Primary: start ascending. Secondary: length descending (longer
1138 // match wins when two matches begin at the same position).
1139 scratch.all_matches.sort_unstable_by(|a, b| {
1140 a.start
1141 .cmp(&b.start)
1142 .then_with(|| (b.end - b.start).cmp(&(a.end - a.start)))
1143 });
1144
1145 let mut last_end = 0;
1146 for m in scratch.all_matches.drain(..) {
1147 if m.start >= last_end {
1148 last_end = m.end;
1149 scratch.selected.push(m);
1150 }
1151 }
1152 }
1153
1154 /// Adjust the commit point to avoid splitting a match across the
1155 /// commit / carry boundary.
1156 ///
1157 /// If any match straddles `base_commit` (starts before, ends after),
1158 /// the commit point is moved to after that match so it is emitted
1159 /// in full this iteration.
1160 #[allow(clippy::unused_self)] // keep &self for API consistency with other scanner methods
1161 fn adjusted_commit_point(
1162 &self,
1163 matches: &[RawMatch],
1164 base_commit: usize,
1165 window_len: usize,
1166 is_eof: bool,
1167 ) -> usize {
1168 if is_eof {
1169 return window_len;
1170 }
1171
1172 let mut commit = base_commit;
1173
1174 for m in matches {
1175 if m.start < commit && m.end > commit {
1176 // Match straddles the boundary — extend commit to include it.
1177 commit = m.end;
1178 }
1179 }
1180
1181 // Never exceed window length.
1182 commit.min(window_len)
1183 }
1184
1185 /// Build the output for the committed region by splicing in replacements.
1186 ///
1187 /// Writes into `output_buf` (cleared on entry) and increments
1188 /// `stats.matches_found` / `stats.replacements_applied` for each applied
1189 /// replacement. Per-pattern hit counts are written to `pattern_counts`
1190 /// (indexed by `pattern_idx`); the caller is responsible for folding
1191 /// these into `ScanStats::pattern_counts` and resetting them.
1192 ///
1193 /// `matches` is the full selected set for the window (may include matches
1194 /// in the carry region beyond `committed`). Because `adjusted_commit_point`
1195 /// guarantees no match straddles the boundary, any match with
1196 /// `start < committed.len()` also has `end <= committed.len()`. The
1197 /// loop breaks early once `m.start >= committed.len()` since matches are
1198 /// sorted by start.
1199 ///
1200 /// `window_file_offset` and `newlines_before_window` are used to compute
1201 /// the absolute byte offset and 1-based line number for each committed
1202 /// match, which are delivered to `on_match`. The newline scan is
1203 /// incremental: we scan only the bytes between consecutive matches, not
1204 /// the full committed region.
1205 ///
1206 /// # Note on `from_utf8_lossy`
1207 ///
1208 /// `String::from_utf8_lossy` returns `Cow::Borrowed(&str)` for valid
1209 /// UTF-8 input (the common case for ASCII secrets) — no heap allocation
1210 /// on the hot path.
1211 #[allow(clippy::too_many_arguments)]
1212 fn apply_replacements(
1213 &self,
1214 committed: &[u8],
1215 matches: &[RawMatch],
1216 stats: &mut ScanStats,
1217 output_buf: &mut Vec<u8>,
1218 pattern_counts: &mut [u64],
1219 window_file_offset: u64,
1220 newlines_before_window: u64,
1221 on_match: &mut dyn FnMut(MatchLocation),
1222 ) -> Result<()> {
1223 output_buf.clear();
1224
1225 let mut last_end = 0;
1226 // Running newline count within the committed region, advanced
1227 // incrementally so we only scan the bytes between matches.
1228 let mut newlines_in_committed: u64 = 0;
1229 let mut newline_scan_pos: usize = 0;
1230
1231 for &m in matches {
1232 // Matches are sorted by start; those at or beyond the committed
1233 // region belong to the carry window — stop here.
1234 if m.start >= committed.len() {
1235 break;
1236 }
1237
1238 // Emit bytes before this match verbatim.
1239 output_buf.extend_from_slice(&committed[last_end..m.start]);
1240
1241 // Advance newline counter from previous scan position to match start,
1242 // then emit the match location to the caller.
1243 newlines_in_committed += count_newlines(&committed[newline_scan_pos..m.start]);
1244 newline_scan_pos = m.start;
1245 on_match(MatchLocation {
1246 line: newlines_before_window + newlines_in_committed + 1,
1247 byte_offset: window_file_offset + m.start as u64,
1248 pattern: self.patterns[m.pattern_idx].label.clone(),
1249 });
1250
1251 let pattern = &self.patterns[m.pattern_idx];
1252
1253 if let Some((cap_start, cap_end)) = m.capture {
1254 // Pattern has a capture group: replace only the capture group,
1255 // emitting the surrounding context bytes of the full match verbatim.
1256 // This preserves delimiters, key names, and prefixes that the
1257 // pattern uses as anchors to reduce false positives.
1258 if cap_start < m.start || cap_end > m.end || cap_start > cap_end {
1259 // Capture bounds outside match bounds — skip rather than panic.
1260 // This should not happen with correct regex patterns; log it so it
1261 // surfaces during testing without crashing production runs.
1262 tracing::warn!(
1263 pattern = %pattern.label,
1264 m_start = m.start,
1265 m_end = m.end,
1266 cap_start,
1267 cap_end,
1268 "capture group bounds outside match bounds — emitting full match unreplaced"
1269 );
1270 output_buf.extend_from_slice(&committed[m.start..m.end]);
1271 last_end = m.end;
1272 continue;
1273 }
1274 output_buf.extend_from_slice(&committed[m.start..cap_start]);
1275 let secret = String::from_utf8_lossy(&committed[cap_start..cap_end]);
1276 let replacement = self.store.get_or_insert(&pattern.category, &secret)?;
1277 output_buf.extend_from_slice(replacement.as_bytes());
1278 output_buf.extend_from_slice(&committed[cap_end..m.end]);
1279 } else {
1280 // No capture group — replace the full match (e.g. token-prefix
1281 // patterns like `glpat-[...]` where the full match IS the secret).
1282 let matched_text = String::from_utf8_lossy(&committed[m.start..m.end]);
1283 let replacement = self.store.get_or_insert(&pattern.category, &matched_text)?;
1284 output_buf.extend_from_slice(replacement.as_bytes());
1285 }
1286
1287 last_end = m.end;
1288
1289 stats.matches_found += 1;
1290 stats.replacements_applied += 1;
1291 pattern_counts[m.pattern_idx] += 1;
1292 }
1293
1294 // Emit the trailing non-matching tail.
1295 output_buf.extend_from_slice(&committed[last_end..]);
1296
1297 Ok(())
1298 }
1299}
1300
1301// ---------------------------------------------------------------------------
1302// Send + Sync compile-time assertion
1303// ---------------------------------------------------------------------------
1304
1305const _: fn() = || {
1306 fn assert_send<T: Send>() {}
1307 fn assert_sync<T: Sync>() {}
1308 assert_send::<StreamScanner>();
1309 assert_sync::<StreamScanner>();
1310};
1311
1312// ---------------------------------------------------------------------------
1313// I/O helper
1314// ---------------------------------------------------------------------------
1315
1316/// Count the number of `\n` bytes in `data`.
1317///
1318/// Used to advance the cumulative newline counter between consecutive
1319/// match positions so we can compute 1-based line numbers without
1320/// pre-scanning the entire committed region.
1321#[inline]
1322fn count_newlines(data: &[u8]) -> u64 {
1323 bytecount::count(data, b'\n') as u64
1324}
1325
1326/// Read up to `buf.len()` bytes from `reader`, retrying on `Interrupted`.
1327///
1328/// Returns the number of bytes actually read (< `buf.len()` only at EOF).
1329fn read_fully<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
1330 let mut total = 0;
1331 while total < buf.len() {
1332 match reader.read(&mut buf[total..]) {
1333 Ok(0) => break, // EOF
1334 Ok(n) => total += n,
1335 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
1336 Err(e) => return Err(SanitizeError::from(e)),
1337 }
1338 }
1339 Ok(total)
1340}
1341
1342// ---------------------------------------------------------------------------
1343// Unit tests
1344// ---------------------------------------------------------------------------
1345
1346#[cfg(test)]
1347mod tests {
1348 use super::*;
1349 use crate::generator::HmacGenerator;
1350
1351 /// Helper: build a scanner with given patterns and small chunk config.
1352 fn test_scanner(patterns: Vec<ScanPattern>) -> StreamScanner {
1353 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1354 let store = Arc::new(MappingStore::new(gen, None));
1355 StreamScanner::new(
1356 patterns,
1357 store,
1358 ScanConfig {
1359 chunk_size: 64,
1360 overlap_size: 16,
1361 },
1362 )
1363 .unwrap()
1364 }
1365
1366 /// Helper: email pattern.
1367 fn email_pattern() -> ScanPattern {
1368 ScanPattern::from_regex(
1369 r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
1370 Category::Email,
1371 "email",
1372 )
1373 .unwrap()
1374 }
1375
1376 /// Helper: IPv4 pattern.
1377 fn ipv4_pattern() -> ScanPattern {
1378 ScanPattern::from_regex(
1379 r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
1380 Category::IpV4,
1381 "ipv4",
1382 )
1383 .unwrap()
1384 }
1385
1386 // ---- Construction ----
1387
1388 #[test]
1389 fn scanner_creation() {
1390 let scanner = test_scanner(vec![email_pattern()]);
1391 assert_eq!(scanner.pattern_count(), 1);
1392 }
1393
1394 #[test]
1395 fn invalid_config_zero_chunk() {
1396 let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1397 let store = Arc::new(MappingStore::new(gen, None));
1398 let result = StreamScanner::new(vec![], store, ScanConfig::new(0, 0));
1399 assert!(result.is_err());
1400 }
1401
1402 #[test]
1403 fn invalid_config_overlap_ge_chunk() {
1404 let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1405 let store = Arc::new(MappingStore::new(gen, None));
1406 let result = StreamScanner::new(vec![], store, ScanConfig::new(100, 100));
1407 assert!(result.is_err());
1408 }
1409
1410 // ---- Empty / no-match cases ----
1411
1412 #[test]
1413 fn empty_input() {
1414 let scanner = test_scanner(vec![email_pattern()]);
1415 let (output, stats) = scanner.scan_bytes(b"").unwrap();
1416 assert!(output.is_empty());
1417 assert_eq!(stats.matches_found, 0);
1418 assert_eq!(stats.bytes_processed, 0);
1419 }
1420
1421 #[test]
1422 fn no_matches() {
1423 let scanner = test_scanner(vec![email_pattern()]);
1424 let input = b"There are no email addresses here.";
1425 let (output, stats) = scanner.scan_bytes(input).unwrap();
1426 assert_eq!(output, input.as_slice());
1427 assert_eq!(stats.matches_found, 0);
1428 }
1429
1430 // ---- Single match ----
1431
1432 #[test]
1433 fn single_email_replaced() {
1434 let scanner = test_scanner(vec![email_pattern()]);
1435 let input = b"Contact alice@corp.com for help.";
1436 let (output, stats) = scanner.scan_bytes(input).unwrap();
1437 assert_eq!(stats.matches_found, 1);
1438 assert_eq!(stats.replacements_applied, 1);
1439 // Original must not appear in output.
1440 assert!(!output
1441 .windows(b"alice@corp.com".len())
1442 .any(|w| w == b"alice@corp.com"));
1443 // Replacement should contain the @ from the domain-preserving email.
1444 let output_str = String::from_utf8_lossy(&output);
1445 assert!(output_str.contains("@corp.com"));
1446 // Length preserved: output is same total length as input.
1447 assert_eq!(output.len(), input.len(), "length must be preserved");
1448 // Surrounding text preserved.
1449 assert!(output_str.starts_with("Contact "));
1450 assert!(output_str.ends_with(" for help."));
1451 }
1452
1453 // ---- Multiple matches ----
1454
1455 #[test]
1456 fn multiple_emails_replaced() {
1457 let scanner = test_scanner(vec![email_pattern()]);
1458 let input = b"From alice@corp.com to bob@corp.com cc admin@corp.com";
1459 let (output, stats) = scanner.scan_bytes(input).unwrap();
1460 assert_eq!(stats.matches_found, 3);
1461 let out_str = String::from_utf8_lossy(&output);
1462 assert!(!out_str.contains("alice@corp.com"));
1463 assert!(!out_str.contains("bob@corp.com"));
1464 assert!(!out_str.contains("admin@corp.com"));
1465 }
1466
1467 // ---- Same secret gets same replacement ----
1468
1469 #[test]
1470 fn same_secret_same_replacement() {
1471 let scanner = test_scanner(vec![email_pattern()]);
1472 let input = b"First alice@corp.com then alice@corp.com again.";
1473 let (output, stats) = scanner.scan_bytes(input).unwrap();
1474 assert_eq!(stats.matches_found, 2);
1475 let out_str = String::from_utf8_lossy(&output);
1476 // Both occurrences should be replaced with the same value.
1477 // With length-preserving replacements, look for the preserved domain.
1478 let parts: Vec<&str> = out_str.split("@corp.com").collect();
1479 // 3 parts = 2 occurrences of the replacement.
1480 assert_eq!(parts.len(), 3);
1481 }
1482
1483 // ---- Literal pattern ----
1484
1485 #[test]
1486 fn literal_pattern_matched() {
1487 let pat = ScanPattern::from_literal(
1488 "SECRET_API_KEY_12345",
1489 Category::Custom("api_key".into()),
1490 "api_key",
1491 )
1492 .unwrap();
1493 let scanner = test_scanner(vec![pat]);
1494 let input = b"key=SECRET_API_KEY_12345&foo=bar";
1495 let (output, stats) = scanner.scan_bytes(input).unwrap();
1496 assert_eq!(stats.matches_found, 1);
1497 assert!(!output
1498 .windows(b"SECRET_API_KEY_12345".len())
1499 .any(|w| w == b"SECRET_API_KEY_12345"));
1500 }
1501
1502 // ---- Multiple pattern types ----
1503
1504 #[test]
1505 fn multiple_pattern_types() {
1506 let scanner = test_scanner(vec![email_pattern(), ipv4_pattern()]);
1507 let input = b"Server 192.168.1.100 contact admin@server.com";
1508 let (output, stats) = scanner.scan_bytes(input).unwrap();
1509 assert_eq!(stats.matches_found, 2);
1510 let out_str = String::from_utf8_lossy(&output);
1511 assert!(!out_str.contains("192.168.1.100"));
1512 assert!(!out_str.contains("admin@server.com"));
1513 assert_eq!(*stats.pattern_counts.get("email").unwrap(), 1);
1514 assert_eq!(*stats.pattern_counts.get("ipv4").unwrap(), 1);
1515 }
1516
1517 // ---- Chunk boundary: match spans two chunks ----
1518
1519 #[test]
1520 fn match_at_chunk_boundary() {
1521 // Use a very small chunk size so the email straddles a boundary.
1522 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1523 let store = Arc::new(MappingStore::new(gen, None));
1524 let scanner = StreamScanner::new(
1525 vec![email_pattern()],
1526 store,
1527 ScanConfig {
1528 chunk_size: 20, // very small
1529 overlap_size: 16,
1530 },
1531 )
1532 .unwrap();
1533
1534 // Place an email address that will definitely straddle a boundary.
1535 let input = b"AAAAAAAAAAAAAAAA alice@corp.com BBBBBBBBBBBBB";
1536 let (output, stats) = scanner.scan_bytes(input).unwrap();
1537 assert_eq!(stats.matches_found, 1);
1538 let out_str = String::from_utf8_lossy(&output);
1539 assert!(!out_str.contains("alice@corp.com"));
1540 assert!(out_str.contains("@corp.com"), "domain must be preserved");
1541 }
1542
1543 // ---- Large input requiring many chunks ----
1544
1545 #[test]
1546 fn large_input_many_chunks() {
1547 let scanner = test_scanner(vec![email_pattern()]);
1548
1549 // Build a ~2 KiB input with emails sprinkled in.
1550 let mut input = Vec::new();
1551 let filler = b"Lorem ipsum dolor sit amet. ";
1552 for i in 0..20 {
1553 input.extend_from_slice(filler);
1554 let email = format!("user{}@example.com ", i);
1555 input.extend_from_slice(email.as_bytes());
1556 }
1557
1558 let (output, stats) = scanner.scan_bytes(&input).unwrap();
1559 assert_eq!(stats.matches_found, 20);
1560 let out_str = String::from_utf8_lossy(&output);
1561 for i in 0..20 {
1562 let email = format!("user{}@example.com", i);
1563 assert!(!out_str.contains(&email));
1564 }
1565 }
1566
1567 #[test]
1568 fn scan_bytes_with_progress_preserves_output_and_stats() {
1569 let scanner = test_scanner(vec![email_pattern()]);
1570 let input = b"Contact alice@corp.com and bob@corp.com for help.";
1571
1572 let (baseline_output, baseline_stats) = scanner.scan_bytes(input).unwrap();
1573
1574 let mut updates = Vec::new();
1575 let (progress_output, progress_stats) = scanner
1576 .scan_bytes_with_progress(input, |progress| updates.push(progress.clone()))
1577 .unwrap();
1578
1579 assert_eq!(progress_output, baseline_output);
1580 assert_eq!(
1581 progress_stats.bytes_processed,
1582 baseline_stats.bytes_processed
1583 );
1584 assert_eq!(progress_stats.bytes_output, baseline_stats.bytes_output);
1585 assert_eq!(progress_stats.matches_found, baseline_stats.matches_found);
1586 assert_eq!(
1587 progress_stats.replacements_applied,
1588 baseline_stats.replacements_applied
1589 );
1590 assert!(!updates.is_empty());
1591 assert_eq!(updates.last().unwrap().bytes_processed, input.len() as u64);
1592 assert_eq!(
1593 updates.last().unwrap().total_bytes,
1594 Some(input.len() as u64)
1595 );
1596 assert_eq!(updates.last().unwrap().matches_found, 2);
1597 }
1598
1599 #[test]
1600 fn scan_reader_with_progress_reports_multiple_updates_for_multi_chunk_input() {
1601 let scanner = test_scanner(vec![email_pattern()]);
1602 let mut input = Vec::new();
1603 for i in 0..8 {
1604 input.extend_from_slice(b"padding padding padding ");
1605 input.extend_from_slice(format!("user{i}@example.com ").as_bytes());
1606 }
1607
1608 let mut output = Vec::new();
1609 let mut updates = Vec::new();
1610 let stats = scanner
1611 .scan_reader_with_callbacks(
1612 &input[..],
1613 &mut output,
1614 Some(input.len() as u64),
1615 |progress| {
1616 updates.push(progress.clone());
1617 },
1618 |_| {},
1619 )
1620 .unwrap();
1621
1622 assert!(updates.len() >= 2);
1623 assert_eq!(
1624 updates.last().unwrap().bytes_processed,
1625 stats.bytes_processed
1626 );
1627 assert_eq!(updates.last().unwrap().bytes_output, stats.bytes_output);
1628 assert_eq!(
1629 updates.last().unwrap().total_bytes,
1630 Some(input.len() as u64)
1631 );
1632 }
1633
1634 // ---- Scan via Read/Write interface ----
1635
1636 #[test]
1637 fn scan_reader_writer() {
1638 let scanner = test_scanner(vec![email_pattern()]);
1639 let input = b"hello alice@corp.com world";
1640 let mut output = Vec::new();
1641 let stats = scanner.scan_reader(&input[..], &mut output).unwrap();
1642 assert_eq!(stats.matches_found, 1);
1643 let out_str = String::from_utf8_lossy(&output);
1644 assert!(out_str.contains("@corp.com"), "domain must be preserved");
1645 }
1646
1647 // ---- Pattern compile error ----
1648
1649 #[test]
1650 fn invalid_regex_pattern() {
1651 let result = ScanPattern::from_regex("[invalid(", Category::Email, "bad");
1652 assert!(result.is_err());
1653 }
1654
1655 // ---- Default config ----
1656
1657 #[test]
1658 fn default_config_valid() {
1659 ScanConfig::default().validate().unwrap();
1660 }
1661
1662 // ---- Config edge cases ----
1663
1664 #[test]
1665 fn config_chunk_1_overlap_0() {
1666 // Extreme but valid: 1-byte chunks, no overlap.
1667 // Won't catch multi-byte patterns, but should not crash.
1668 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1669 let store = Arc::new(MappingStore::new(gen, None));
1670 let scanner = StreamScanner::new(vec![], store, ScanConfig::new(1, 0)).unwrap();
1671 let (output, _) = scanner.scan_bytes(b"hello").unwrap();
1672 assert_eq!(output, b"hello");
1673 }
1674
1675 // ---- ScanStats equality (exercises the PartialEq derive) ----
1676
1677 #[test]
1678 fn scan_stats_equality() {
1679 let scanner = test_scanner(vec![email_pattern()]);
1680 let input = b"hello alice@corp.com world";
1681 let (_, stats_a) = scanner.scan_bytes(input).unwrap();
1682 let (_, stats_b) = scanner.scan_bytes(input).unwrap();
1683 // Identical inputs produce identical stats.
1684 assert_eq!(
1685 stats_a, stats_b,
1686 "identical inputs must produce identical stats"
1687 );
1688 // Values are correct — not just equal to each other.
1689 assert_eq!(stats_a.matches_found, 1, "one email in input");
1690 assert_eq!(stats_a.replacements_applied, 1);
1691 assert_eq!(stats_a.bytes_processed, input.len() as u64);
1692 assert_eq!(*stats_a.pattern_counts.get("email").unwrap_or(&0), 1);
1693 // No-match run produces zeroed counters.
1694 let (_, stats_empty) = scanner.scan_bytes(b"no matches here").unwrap();
1695 assert_ne!(stats_a, stats_empty);
1696 assert_eq!(stats_empty.matches_found, 0);
1697 assert_eq!(stats_empty.replacements_applied, 0);
1698 }
1699
1700 // ---- on_match line number and byte offset accuracy ----
1701
1702 #[test]
1703 fn on_match_reports_correct_line_and_byte_offset() {
1704 // alice@corp.com starts after "line one\n" (9 bytes) → byte 9, line 2.
1705 // bob@corp.com starts after "line one\nalice@corp.com\nline three\n"
1706 // = 9 + 14 + 1 + 10 + 1 = 35 bytes → byte 35, line 4.
1707 let scanner = test_scanner(vec![email_pattern()]);
1708 let input = b"line one\nalice@corp.com\nline three\nbob@corp.com\n";
1709 let mut locations = Vec::new();
1710 let mut output = Vec::new();
1711 scanner
1712 .scan_reader_with_callbacks(
1713 &input[..],
1714 &mut output,
1715 None,
1716 |_| {},
1717 |loc| locations.push(loc),
1718 )
1719 .unwrap();
1720 assert_eq!(locations.len(), 2);
1721 assert_eq!(locations[0].line, 2, "alice must be on line 2");
1722 assert_eq!(locations[0].byte_offset, 9, "alice must start at byte 9");
1723 assert_eq!(locations[1].line, 4, "bob must be on line 4");
1724 assert_eq!(locations[1].byte_offset, 35, "bob must start at byte 35");
1725 }
1726
1727 // ---- Cross-chunk newline accumulation ----
1728
1729 #[test]
1730 fn on_match_line_numbers_stable_across_chunk_sizes() {
1731 // alice@corp.com starts after "line one\n" (9 bytes) → byte 9, line 2.
1732 // bob@corp.com starts after "line one\nalice@corp.com\nline three\n"
1733 // = 9 + 14 + 1 + 10 + 1 = 35 bytes → byte 35, line 4.
1734 // Running the same input through different chunk sizes exercises
1735 // newlines_before_window accumulation across chunk boundaries.
1736 let input = b"line one\nalice@corp.com\nline three\nbob@corp.com\n";
1737 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1738 let store = Arc::new(MappingStore::new(gen, None));
1739
1740 for chunk_size in [16usize, 20, 24, 32, 64] {
1741 let scanner = StreamScanner::new(
1742 vec![email_pattern()],
1743 Arc::clone(&store),
1744 ScanConfig::new(chunk_size, 14),
1745 )
1746 .unwrap();
1747
1748 let mut locations = Vec::new();
1749 let mut output = Vec::new();
1750 scanner
1751 .scan_reader_with_callbacks(
1752 &input[..],
1753 &mut output,
1754 None,
1755 |_| {},
1756 |loc| locations.push(loc),
1757 )
1758 .unwrap();
1759
1760 assert_eq!(
1761 locations.len(),
1762 2,
1763 "chunk_size={chunk_size}: expected 2 matches"
1764 );
1765 assert_eq!(
1766 locations[0].line, 2,
1767 "chunk_size={chunk_size}: alice must be on line 2"
1768 );
1769 assert_eq!(
1770 locations[0].byte_offset, 9,
1771 "chunk_size={chunk_size}: alice must start at byte 9"
1772 );
1773 assert_eq!(
1774 locations[1].line, 4,
1775 "chunk_size={chunk_size}: bob must be on line 4"
1776 );
1777 assert_eq!(
1778 locations[1].byte_offset, 35,
1779 "chunk_size={chunk_size}: bob must start at byte 35"
1780 );
1781 }
1782 }
1783
1784 // ---- Bytes output tracking ----
1785
1786 #[test]
1787 fn bytes_output_preserved_on_replacement() {
1788 let scanner = test_scanner(vec![email_pattern()]);
1789 let input = b"a@b.cc"; // short email
1790 let (output, stats) = scanner.scan_bytes(input).unwrap();
1791 assert_eq!(stats.bytes_processed, input.len() as u64);
1792 assert_eq!(stats.bytes_output, output.len() as u64);
1793 // Length-preserving: output length matches input length.
1794 assert_eq!(output.len(), input.len());
1795 }
1796}