sanitize_engine/scanner.rs
1//! Streaming scanner for detecting and replacing sensitive data.
2//!
3//! # Architecture
4//!
5//! The streaming scanner processes input data in configurable chunks,
6//! detecting secret patterns (regex or literal) and applying one-way
7//! replacements via the [`MappingStore`].
8//! This design supports files of 20–100 GB+ without requiring the entire
9//! content to fit in memory.
10//!
11//! ```text
12//! ┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐
13//! │ Input (Read) │ ──▶ │ StreamScanner │ ──▶ │ Output (Write) │
14//! │ (chunked) │ │ (pattern match │ │ (sanitized) │
15//! └──────────────┘ │ + replace) │ └──────────────────┘
16//! └────────┬────────┘
17//! │
18//! ┌────────▼────────┐
19//! │ MappingStore │
20//! │ (dedup cache) │
21//! └─────────────────┘
22//! ```
23//!
24//! # Chunk Overlap Strategy
25//!
26//! To avoid missing matches that span chunk boundaries, the scanner
27//! maintains an overlap window between consecutive chunks:
28//!
29//! 1. Read `chunk_size` bytes of new data.
30//! 2. Prepend the `carry` buffer (tail of previous window).
31//! 3. Scan the combined `window` for all pattern matches.
32//! 4. Compute `commit_point = window.len() - overlap_size` (adjusted
33//! upward if a match straddles the boundary).
34//! 5. Emit output for `window[..commit_point]` with replacements applied.
35//! 6. Set `carry = window[commit_point..]` for the next iteration.
36//!
37//! The `overlap_size` should be ≥ the maximum expected match length to
38//! guarantee no matches are missed at boundaries.
39//!
40//! # Thread Safety
41//!
42//! [`StreamScanner`] is `Send + Sync`. Multiple files can be scanned
43//! concurrently using a shared `Arc<StreamScanner>`, all backed by the
44//! same [`MappingStore`] for per-run dedup
45//! consistency.
46//!
47//! # Performance
48//!
49//! - **Chunk-based I/O**: only `chunk_size + overlap_size` bytes in
50//! memory per active scan.
51//! - **Compiled regex**: patterns are compiled once at construction and
52//! reused across all chunks and files.
53//! - **Lock-free reads**: the `DashMap` inside `MappingStore` provides
54//! lock-free reads for already-seen values.
55//! - **File-level parallelism**: share `Arc<StreamScanner>` across
56//! threads to scan multiple files concurrently.
57
58use crate::category::Category;
59use crate::error::{Result, SanitizeError};
60use crate::store::MappingStore;
61use aho_corasick::AhoCorasick;
62use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder};
63use serde::Serialize;
64use std::collections::HashMap;
65use std::io::{self, Read, Write};
66use std::sync::Arc;
67
68// ---------------------------------------------------------------------------
69// Configuration
70// ---------------------------------------------------------------------------
71
72/// Default chunk size: 1 MiB.
73const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
74
75/// Default overlap size: 4 KiB.
76const DEFAULT_OVERLAP_SIZE: usize = 4096;
77
78/// Maximum compiled regex automaton size (bytes). Prevents DoS via
79/// pathologically complex user-supplied patterns.
80const REGEX_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
81
82/// Maximum DFA cache size (bytes) per regex.
83const REGEX_DFA_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
84
85/// Hard ceiling on the combined RegexSet automaton budget.
86/// The per-pattern limit is multiplied by the pattern count so that a large
87/// pattern set can still compile, but without this cap a pathological secrets
88/// file with 10 000 patterns could claim up to ~20 GiB of automaton memory.
89const REGEX_SET_SIZE_CAP: usize = 256 * 1024 * 1024; // 256 MiB
90
91/// Maximum number of patterns allowed in a single scanner (F-05 fix).
92/// The `RegexSet` automaton memory scales linearly with pattern count.
93/// With 1 MiB size/DFA limits per pattern, 10 000 patterns could
94/// allocate up to ~20 GiB of automaton memory. This cap prevents
95/// accidental resource exhaustion. Override via
96/// [`StreamScanner::new_with_max_patterns`] if needed.
97const DEFAULT_MAX_PATTERNS: usize = 10_000;
98
99/// Label suffix that marks patterns as key-value-only.
100///
101/// Patterns whose label ends with this suffix are excluded from the streaming
102/// scanner pass (`for_structured_pass`) because the key-value processor
103/// resolves their values structurally and the scanner would produce spurious
104/// duplicate replacements on the surrounding syntax.
105pub const KV_LABEL_SUFFIX: &str = "_kv";
106
107/// Configuration for the streaming scanner.
108///
109/// # Tuning Guide
110///
111/// | Workload | `chunk_size` | `overlap_size` |
112/// |------------------------|--------------|----------------|
113/// | Small files (< 10 MB) | 256 KiB | 1 KiB |
114/// | General purpose | 1 MiB | 4 KiB |
115/// | Large files (> 1 GB) | 4–8 MiB | 8 KiB |
116/// | Memory-constrained | 64 KiB | 1 KiB |
117///
118/// `overlap_size` should be ≥ the longest expected match. Most secret
119/// patterns (API keys, emails, SSNs) are well under 256 bytes, so the
120/// 4 KiB default provides ample margin.
121#[derive(Debug, Clone)]
122pub struct ScanConfig {
123 /// Size of each chunk read from the input (bytes).
124 ///
125 /// Larger chunks improve throughput (fewer syscalls) but use more
126 /// memory. Default: 1 MiB.
127 pub chunk_size: usize,
128
129 /// Overlap between consecutive chunks (bytes).
130 ///
131 /// Must be ≥ the maximum expected match length. Patterns whose
132 /// matches can exceed this length risk being missed at chunk
133 /// boundaries. Default: 4 KiB.
134 pub overlap_size: usize,
135}
136
137impl Default for ScanConfig {
138 fn default() -> Self {
139 Self {
140 chunk_size: DEFAULT_CHUNK_SIZE,
141 overlap_size: DEFAULT_OVERLAP_SIZE,
142 }
143 }
144}
145
146impl ScanConfig {
147 /// Create a new configuration with explicit values.
148 #[must_use]
149 pub fn new(chunk_size: usize, overlap_size: usize) -> Self {
150 Self {
151 chunk_size,
152 overlap_size,
153 }
154 }
155
156 /// Validate the configuration, returning an error if invalid.
157 ///
158 /// # Errors
159 ///
160 /// Returns [`SanitizeError::InvalidConfig`] if `chunk_size` is zero
161 /// or `overlap_size >= chunk_size`.
162 pub fn validate(&self) -> Result<()> {
163 if self.chunk_size == 0 {
164 return Err(SanitizeError::InvalidConfig(
165 "chunk_size must be > 0".into(),
166 ));
167 }
168 if self.overlap_size >= self.chunk_size {
169 return Err(SanitizeError::InvalidConfig(
170 "overlap_size must be < chunk_size".into(),
171 ));
172 }
173 Ok(())
174 }
175}
176
177// ---------------------------------------------------------------------------
178// Internal helpers
179// ---------------------------------------------------------------------------
180
181/// Convert any compile-time pattern error into [`SanitizeError::PatternCompileError`].
182#[inline]
183fn compile_err(e: impl std::fmt::Display) -> SanitizeError {
184 SanitizeError::PatternCompileError(e.to_string())
185}
186
187// ---------------------------------------------------------------------------
188// Scan pattern
189// ---------------------------------------------------------------------------
190
191/// A pattern rule defining what to scan for and how to categorize matches.
192///
193/// Wraps a compiled [`regex::bytes::Regex`] with a [`Category`] for
194/// replacement lookups and a human-readable label for reporting.
195///
196/// Both regex and literal patterns are supported. Literal patterns keep
197/// their original text and are matched by the scanner's Aho-Corasick
198/// automaton for fast multi-literal scanning.
199pub struct ScanPattern {
200 /// Compiled regex matcher (used for non-literal patterns and as a
201 /// fallback; literal patterns are matched via Aho-Corasick instead).
202 regex: Regex,
203 /// Category for replacement lookups.
204 category: Category,
205 /// Human-readable label for reporting / stats.
206 label: String,
207 /// Original (unescaped) literal string when created via `from_literal`.
208 /// `None` for patterns created via `from_regex`.
209 /// Stored so `StreamScanner` can build an Aho-Corasick automaton for
210 /// fast SIMD literal matching instead of running the regex engine.
211 literal: Option<String>,
212 /// Minimum window size (bytes) required to produce a match.
213 /// For literal patterns this equals the byte length of the literal itself.
214 /// For regex patterns this is `0` (no guaranteed minimum).
215 /// Used to skip `captures_iter` when the window is provably too short.
216 pub min_length: usize,
217}
218
219impl std::fmt::Debug for ScanPattern {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("ScanPattern")
222 .field("pattern", &self.regex.as_str())
223 .field("category", &self.category)
224 .field("label", &self.label)
225 .field("literal", &self.literal.as_deref())
226 .field("min_length", &self.min_length)
227 .finish()
228 }
229}
230
231impl Clone for ScanPattern {
232 fn clone(&self) -> Self {
233 Self {
234 regex: self.regex.clone(),
235 category: self.category.clone(),
236 label: self.label.clone(),
237 literal: self.literal.clone(),
238 min_length: self.min_length,
239 }
240 }
241}
242
243impl ScanPattern {
244 /// Create a pattern from a regex string.
245 ///
246 /// ## Capture group 1 — partial replacement
247 ///
248 /// If the regex contains a capture group 1 (`(...)`), only the bytes
249 /// matched by that group are replaced; the bytes before and after it
250 /// within the full match are emitted verbatim. This lets you write
251 /// context-anchored patterns without redacting the prefix/suffix:
252 ///
253 /// ```text
254 /// pattern: glpat-([A-Za-z0-9_-]{20})
255 /// ^^^^^^ prefix preserved
256 /// ^^^^^^^^^^^^^^^^^^^^ group 1 → replaced
257 /// ```
258 ///
259 /// Patterns **without** a capture group replace the entire match.
260 ///
261 /// # Errors
262 ///
263 /// Returns [`SanitizeError::PatternCompileError`] if the regex is invalid.
264 ///
265 /// # Examples
266 ///
267 /// ```
268 /// use sanitize_engine::scanner::ScanPattern;
269 /// use sanitize_engine::category::Category;
270 ///
271 /// // No capture group — full match replaced:
272 /// let email = ScanPattern::from_regex(
273 /// r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
274 /// Category::Email,
275 /// "email_address",
276 /// ).unwrap();
277 ///
278 /// // Capture group 1 — prefix preserved, only the token value replaced:
279 /// let token = ScanPattern::from_regex(
280 /// r"glpat-([A-Za-z0-9_-]{20})",
281 /// Category::AuthToken,
282 /// "gitlab_pat",
283 /// ).unwrap();
284 /// ```
285 pub fn from_regex(pattern: &str, category: Category, label: impl Into<String>) -> Result<Self> {
286 let regex = RegexBuilder::new(pattern)
287 .size_limit(REGEX_SIZE_LIMIT)
288 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
289 .build()
290 .map_err(compile_err)?;
291 Ok(Self {
292 regex,
293 category,
294 label: label.into(),
295 literal: None,
296 min_length: 0,
297 })
298 }
299
300 /// Create a pattern from a literal string.
301 ///
302 /// The literal is escaped so that regex metacharacters are matched
303 /// verbatim.
304 ///
305 /// # Errors
306 ///
307 /// Returns [`SanitizeError::PatternCompileError`] if regex compilation fails.
308 ///
309 /// # Examples
310 ///
311 /// ```
312 /// use sanitize_engine::scanner::ScanPattern;
313 /// use sanitize_engine::category::Category;
314 ///
315 /// let pat = ScanPattern::from_literal(
316 /// "sk-proj-abc123secret",
317 /// Category::Custom("api_key".into()),
318 /// "openai_key",
319 /// ).unwrap();
320 /// ```
321 pub fn from_literal(
322 literal: &str,
323 category: Category,
324 label: impl Into<String>,
325 ) -> Result<Self> {
326 let escaped = regex::escape(literal);
327 let regex = RegexBuilder::new(&escaped)
328 .size_limit(REGEX_SIZE_LIMIT)
329 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
330 .build()
331 .map_err(compile_err)?;
332 Ok(Self {
333 regex,
334 category,
335 label: label.into(),
336 min_length: literal.len(),
337 literal: Some(literal.to_owned()),
338 })
339 }
340
341 /// The category this pattern maps to.
342 #[must_use]
343 pub fn category(&self) -> &Category {
344 &self.category
345 }
346
347 /// The human-readable label.
348 #[must_use]
349 pub fn label(&self) -> &str {
350 &self.label
351 }
352
353 /// Return the raw regex pattern string for RegexSet construction.
354 #[must_use]
355 pub fn regex_pattern(&self) -> &str {
356 self.regex.as_str()
357 }
358}
359
360// ScanPattern is Send + Sync because:
361// - regex::bytes::Regex is Send + Sync
362// - Category is Send + Sync (it's an enum of primitives + CompactString)
363// - String is Send + Sync
364
365// ---------------------------------------------------------------------------
366// Internal: raw match descriptor
367// ---------------------------------------------------------------------------
368
369/// A single match found during scanning (internal).
370#[derive(Debug, Clone, Copy)]
371struct RawMatch {
372 /// Start byte offset within the scan window.
373 start: usize,
374 /// End byte offset (exclusive) within the scan window.
375 end: usize,
376 /// Index into the `StreamScanner::patterns` vector.
377 pattern_idx: usize,
378 /// Byte range of capture group 1 within the window, if the pattern has one.
379 /// When present, only this sub-range is replaced; the bytes between
380 /// `start..capture_start` and `capture_end..end` are emitted verbatim,
381 /// preserving surrounding context (delimiters, key names, prefixes).
382 capture: Option<(usize, usize)>,
383}
384
385// ---------------------------------------------------------------------------
386// Per-scan scratch buffers
387// ---------------------------------------------------------------------------
388
389/// Scratch buffers reused across chunks within a single scan call.
390///
391/// Allocating these once per `scan_reader_with_progress` invocation
392/// and reusing them each chunk eliminates the per-chunk heap pressure
393/// that would otherwise come from `Vec` allocations in `find_matches`
394/// and `apply_replacements`.
395struct ScanScratch {
396 /// Accumulates raw matches from all patterns before deduplication.
397 all_matches: Vec<RawMatch>,
398 /// Non-overlapping matches selected for the current window
399 /// (populated by `find_matches`, consumed by `apply_replacements`).
400 selected: Vec<RawMatch>,
401 /// Output bytes for the committed region, written by `apply_replacements`.
402 output: Vec<u8>,
403 /// Per-pattern match counts indexed by `pattern_idx`.
404 /// Reset to zero after each chunk's counts are folded into `ScanStats`.
405 pattern_counts: Vec<u64>,
406}
407
408impl ScanScratch {
409 fn new(pattern_count: usize, chunk_size: usize, overlap_size: usize) -> Self {
410 Self {
411 all_matches: Vec::new(),
412 selected: Vec::new(),
413 output: Vec::with_capacity(chunk_size + overlap_size),
414 pattern_counts: vec![0u64; pattern_count],
415 }
416 }
417}
418
419// ---------------------------------------------------------------------------
420// Scan statistics
421// ---------------------------------------------------------------------------
422
423/// The file-level position of a single scanner match.
424///
425/// Emitted via the `on_match` callback in
426/// [`StreamScanner::scan_reader_with_callbacks`]. Line numbers are
427/// 1-based and count `\n` bytes only (Unix line endings). For files with
428/// Windows line endings (`\r\n`), `line` is still correct because `\n` is
429/// the canonical line separator — `\r` bytes do not affect the count.
430///
431/// `byte_offset` is the absolute byte position of the first byte of the
432/// matched region within the file (0-based). Both fields refer to the
433/// *input* file, not the sanitized output.
434#[derive(Debug, Clone, Serialize)]
435pub struct MatchLocation {
436 /// 1-based line number of the match within the file.
437 pub line: u64,
438 /// 0-based byte offset of the match start within the file.
439 pub byte_offset: u64,
440 /// Pattern label that triggered this match.
441 pub pattern: String,
442}
443
444/// Statistics collected during a scan operation.
445///
446/// Returned by [`StreamScanner::scan_reader`] and
447/// [`StreamScanner::scan_bytes`] to provide visibility into what
448/// the scanner did.
449#[derive(Debug, Clone, Default)]
450pub struct ScanStats {
451 /// Total bytes read from the input.
452 pub bytes_processed: u64,
453 /// Total bytes written to the output (may differ from `bytes_processed`
454 /// when replacements have different lengths than the originals).
455 pub bytes_output: u64,
456 /// Total number of matches found across all patterns.
457 pub matches_found: u64,
458 /// Total number of replacements applied (always == `matches_found`
459 /// in one-way mode).
460 pub replacements_applied: u64,
461 /// Per-pattern match counts, keyed by pattern label.
462 pub pattern_counts: HashMap<String, u64>,
463}
464
465/// Progress snapshot emitted during streaming scans.
466#[derive(Debug, Clone, Default, Eq, PartialEq)]
467pub struct ScanProgress {
468 /// Total bytes read from the input so far.
469 pub bytes_processed: u64,
470 /// Total bytes written to the output so far.
471 pub bytes_output: u64,
472 /// Total input size when known.
473 pub total_bytes: Option<u64>,
474 /// Total number of matches found so far.
475 pub matches_found: u64,
476 /// Total replacements applied so far.
477 pub replacements_applied: u64,
478}
479
480// ---------------------------------------------------------------------------
481// StreamScanner
482// ---------------------------------------------------------------------------
483
484/// Streaming scanner that detects and replaces sensitive patterns.
485///
486/// Thread-safe: can be shared via `Arc<StreamScanner>` for concurrent
487/// scanning of multiple files. Each call to [`scan_reader`](Self::scan_reader)
488/// is independent and maintains its own chunking state.
489///
490/// # Usage
491///
492/// ```rust
493/// use sanitize_engine::scanner::{StreamScanner, ScanPattern, ScanConfig};
494/// use sanitize_engine::category::Category;
495/// use sanitize_engine::generator::HmacGenerator;
496/// use sanitize_engine::store::MappingStore;
497/// use std::sync::Arc;
498///
499/// // 1. Build the replacement store.
500/// let gen = Arc::new(HmacGenerator::new([42u8; 32]));
501/// let store = Arc::new(MappingStore::new(gen, None));
502///
503/// // 2. Define patterns.
504/// let patterns = vec![
505/// ScanPattern::from_regex(
506/// r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
507/// Category::Email,
508/// "email",
509/// ).unwrap(),
510/// ];
511///
512/// // 3. Create the scanner.
513/// let scanner = StreamScanner::new(patterns, store, ScanConfig::default()).unwrap();
514///
515/// // 4. Scan.
516/// let input = b"Contact alice@corp.com for details.";
517/// let (output, stats) = scanner.scan_bytes(input).unwrap();
518/// assert_eq!(stats.matches_found, 1);
519/// assert!(!output.windows(b"alice@corp.com".len())
520/// .any(|w| w == b"alice@corp.com"));
521/// ```
522pub struct StreamScanner {
523 /// Compiled scan patterns (both literal and regex).
524 patterns: Vec<ScanPattern>,
525 /// Pre-compiled set for fast multi-pattern pre-filtering of **regex**
526 /// (non-literal) patterns only. `matches()` returns which regex-pattern
527 /// indices matched, avoiding running every individual regex on each chunk
528 /// (R-3 optimisation).
529 regex_set: RegexSet,
530 /// Maps a `RegexSet` index → index into `self.patterns`.
531 /// Only non-literal patterns are in the `RegexSet`.
532 regex_indices: Vec<usize>,
533 /// Aho-Corasick automaton for fast SIMD literal matching.
534 /// `None` when there are no literal patterns.
535 aho_corasick: Option<AhoCorasick>,
536 /// Maps an Aho-Corasick pattern index → index into `self.patterns`.
537 /// Only literal patterns appear here.
538 literal_indices: Vec<usize>,
539 /// Thread-safe dedup replacement store.
540 store: Arc<MappingStore>,
541 /// Scanner configuration.
542 config: ScanConfig,
543}
544
545/// Return type for scanner factory methods that load a secrets file.
546///
547/// Contains `(scanner, warnings, allow_patterns)` where `warnings` are
548/// non-fatal parse errors and `allow_patterns` are raw strings from
549/// `kind: allow` entries.
550type SecretsLoadResult = Result<(StreamScanner, Vec<(usize, SanitizeError)>, Vec<String>)>;
551
552impl StreamScanner {
553 /// Create a new streaming scanner.
554 ///
555 /// # Arguments
556 ///
557 /// - `patterns` — the set of patterns to scan for.
558 /// - `store` — the mapping store for dedup-consistent replacements.
559 /// - `config` — chunking / overlap configuration.
560 ///
561 /// # Errors
562 ///
563 /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
564 /// invalid (e.g. `chunk_size == 0` or `overlap_size >= chunk_size`).
565 pub fn new(
566 patterns: Vec<ScanPattern>,
567 store: Arc<MappingStore>,
568 config: ScanConfig,
569 ) -> Result<Self> {
570 Self::new_with_max_patterns(patterns, store, config, DEFAULT_MAX_PATTERNS)
571 }
572
573 /// Create a new streaming scanner with a custom pattern limit.
574 ///
575 /// This is identical to [`new`](Self::new) but allows overriding the
576 /// default pattern cap (10 000). Use this
577 /// when you have a legitimate need for more patterns and have
578 /// verified that your system has enough memory for the resulting
579 /// `RegexSet`.
580 ///
581 /// # Errors
582 ///
583 /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
584 /// invalid or the pattern count exceeds `max_patterns`.
585 pub fn new_with_max_patterns(
586 patterns: Vec<ScanPattern>,
587 store: Arc<MappingStore>,
588 config: ScanConfig,
589 max_patterns: usize,
590 ) -> Result<Self> {
591 config.validate()?;
592
593 // F-05 fix: enforce maximum pattern count to bound RegexSet memory.
594 if patterns.len() > max_patterns {
595 return Err(SanitizeError::InvalidConfig(format!(
596 "pattern count ({}) exceeds maximum allowed ({}) — \
597 RegexSet memory scales linearly with pattern count",
598 patterns.len(),
599 max_patterns
600 )));
601 }
602
603 // Partition patterns into literal (Aho-Corasick) and regex (RegexSet)
604 // so each is matched by the most efficient engine.
605 let mut literal_bytes: Vec<Vec<u8>> = Vec::new();
606 let mut literal_indices: Vec<usize> = Vec::new();
607 let mut regex_strs: Vec<&str> = Vec::new();
608 let mut regex_indices: Vec<usize> = Vec::new();
609
610 for (i, pattern) in patterns.iter().enumerate() {
611 if let Some(lit) = &pattern.literal {
612 literal_bytes.push(lit.as_bytes().to_vec());
613 literal_indices.push(i);
614 } else {
615 regex_strs.push(pattern.regex_pattern());
616 regex_indices.push(i);
617 }
618 }
619
620 // Build Aho-Corasick automaton for literal patterns (SIMD-accelerated,
621 // single O(n) pass over the input per chunk).
622 let aho_corasick = if literal_bytes.is_empty() {
623 None
624 } else {
625 Some(AhoCorasick::new(&literal_bytes).map_err(compile_err)?)
626 };
627
628 // Build RegexSet from non-literal patterns only (R-3 pre-filter).
629 let regex_set = if regex_strs.is_empty() {
630 RegexSetBuilder::new(Vec::<&str>::new())
631 .size_limit(REGEX_SIZE_LIMIT)
632 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
633 .build()
634 .map_err(compile_err)?
635 } else {
636 RegexSetBuilder::new(®ex_strs)
637 .size_limit((REGEX_SIZE_LIMIT * regex_strs.len().max(1)).min(REGEX_SET_SIZE_CAP))
638 .dfa_size_limit(
639 (REGEX_DFA_SIZE_LIMIT * regex_strs.len().max(1)).min(REGEX_SET_SIZE_CAP),
640 )
641 .build()
642 .map_err(compile_err)?
643 };
644
645 Ok(Self {
646 patterns,
647 regex_set,
648 regex_indices,
649 aho_corasick,
650 literal_indices,
651 store,
652 config,
653 })
654 }
655
656 /// Create a copy of this scanner extended with additional literal patterns.
657 ///
658 /// Clones the existing pattern set and appends `extra`, then rebuilds
659 /// the internal Aho-Corasick and RegexSet automata. Used by the
660 /// format-preserving structured pass to scan original bytes with
661 /// discovered field-value literals added to the base pattern set.
662 ///
663 /// # Errors
664 ///
665 /// Returns [`SanitizeError`] if automaton construction fails or the
666 /// combined pattern count exceeds the default limit.
667 pub fn with_extra_literals(&self, extra: Vec<ScanPattern>) -> Result<Self> {
668 let mut patterns = self.patterns.clone();
669 patterns.extend(extra);
670 Self::new(patterns, Arc::clone(&self.store), self.config.clone())
671 }
672
673 /// Build a scanner suitable for format-preserving structured-file passes.
674 ///
675 /// Patterns whose labels end with `"_kv"` are excluded from the base set.
676 /// Those patterns match both a key name and its value (e.g. `password: s3cr3t`)
677 /// as a single unit; in a structured pass the key must survive untouched so
678 /// only the discovered field-value literals are safe to replace.
679 ///
680 /// `extra` (the profile-discovered literals) are always included.
681 ///
682 /// # Errors
683 ///
684 /// Returns [`SanitizeError`] if Aho-Corasick or RegexSet construction fails
685 /// or the combined pattern count exceeds the default limit.
686 pub fn for_structured_pass(&self, extra: Vec<ScanPattern>) -> Result<Self> {
687 let mut patterns: Vec<ScanPattern> = self
688 .patterns
689 .iter()
690 .filter(|p| !p.label.ends_with(KV_LABEL_SUFFIX))
691 .cloned()
692 .collect();
693 patterns.extend(extra);
694 Self::new(patterns, Arc::clone(&self.store), self.config.clone())
695 }
696
697 /// Scan a reader and write sanitized output to a writer.
698 ///
699 /// Processes the input in chunks of `config.chunk_size` bytes,
700 /// maintaining an overlap window of `config.overlap_size` bytes to
701 /// catch matches spanning chunk boundaries. All detected matches
702 /// are replaced one-way via the [`MappingStore`].
703 ///
704 /// # Arguments
705 ///
706 /// - `reader` — input source (file, network stream, `&[u8]`, …).
707 /// - `writer` — output sink (file, `Vec<u8>`, …).
708 ///
709 /// # Returns
710 ///
711 /// [`ScanStats`] with counters for bytes processed, matches found, etc.
712 ///
713 /// # Errors
714 ///
715 /// Returns [`SanitizeError`] on I/O failures or if a replacement
716 /// cannot be generated (e.g. store capacity exceeded).
717 pub fn scan_reader<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ScanStats> {
718 self.scan_reader_with_callbacks(reader, writer, None, |_| {}, |_| {})
719 }
720
721 /// Scan a reader and emit progress snapshots after each committed chunk.
722 ///
723 /// `total_bytes` should be provided when the caller knows the full input
724 /// size. When omitted, progress consumers should avoid percentages/ETA.
725 ///
726 /// This is a convenience wrapper around [`scan_reader_with_callbacks`](Self::scan_reader_with_callbacks)
727 /// that discards per-match location information. Use that method directly
728 /// when you need line numbers or byte offsets for individual matches.
729 ///
730 /// # Errors
731 ///
732 /// Returns [`SanitizeError`] on I/O failures or if a replacement
733 /// cannot be generated (e.g. store capacity exceeded).
734 pub fn scan_reader_with_progress<R: Read, W: Write, F>(
735 &self,
736 reader: R,
737 writer: W,
738 total_bytes: Option<u64>,
739 on_progress: F,
740 ) -> Result<ScanStats>
741 where
742 F: FnMut(&ScanProgress),
743 {
744 self.scan_reader_with_callbacks(reader, writer, total_bytes, on_progress, |_| {})
745 }
746
747 /// Scan a reader, emit progress snapshots, and call `on_match` for every
748 /// committed match with its 1-based line number and byte offset.
749 ///
750 /// `on_match` is called synchronously in the scanning thread, once per
751 /// committed match, in document order. The callback receives a
752 /// [`MatchLocation`] describing the pattern label, 1-based line number,
753 /// and 0-based byte offset within the input file. Callers that only need
754 /// aggregate counts (no per-match positions) should prefer
755 /// [`scan_reader_with_progress`](Self::scan_reader_with_progress), which
756 /// skips the per-byte newline counting entirely.
757 ///
758 /// # Performance note
759 ///
760 /// Enabling `on_match` adds an O(committed_bytes_between_matches)
761 /// newline-counting pass inside each chunk. For files with sparse matches
762 /// this overhead is proportional to file size; for dense matches (e.g. one
763 /// secret per line) it is negligible. On 10–15 GiB log files with typical
764 /// match densities the overhead is roughly 10–20 % of total scan time.
765 ///
766 /// # Errors
767 ///
768 /// Returns [`SanitizeError`] on I/O failures or if a replacement
769 /// cannot be generated (e.g. store capacity exceeded).
770 pub fn scan_reader_with_callbacks<R: Read, W: Write, F, M>(
771 &self,
772 mut reader: R,
773 mut writer: W,
774 total_bytes: Option<u64>,
775 mut on_progress: F,
776 mut on_match: M,
777 ) -> Result<ScanStats>
778 where
779 F: FnMut(&ScanProgress),
780 M: FnMut(MatchLocation),
781 {
782 let mut stats = ScanStats::default();
783
784 // Carry buffer: the tail of the previous window that needs
785 // to be re-scanned with the next chunk.
786 let mut carry: Vec<u8> = Vec::new();
787
788 // Read buffer (reused across iterations to avoid re-allocation).
789 let mut read_buf = vec![0u8; self.config.chunk_size];
790
791 // Scan window (reused across iterations — grows to peak size then
792 // stays there, avoiding per-chunk allocation).
793 let mut window: Vec<u8> =
794 Vec::with_capacity(self.config.chunk_size + self.config.overlap_size);
795
796 // Scratch buffers reused every chunk to eliminate per-chunk heap
797 // pressure from match collection, output building, and stats tracking.
798 let mut scratch = ScanScratch::new(
799 self.patterns.len(),
800 self.config.chunk_size,
801 self.config.overlap_size,
802 );
803
804 // Absolute file byte offset of window[0] for this iteration.
805 let mut window_file_offset: u64 = 0;
806 // Cumulative newline count in the file before window[0].
807 let mut newlines_before_window: u64 = 0;
808
809 loop {
810 // Read the next chunk.
811 let bytes_read = read_fully(&mut reader, &mut read_buf)?;
812 let is_eof = bytes_read < read_buf.len();
813
814 // Track only genuinely new bytes (carry was already counted).
815 stats.bytes_processed += bytes_read as u64;
816
817 if bytes_read == 0 && carry.is_empty() {
818 break;
819 }
820
821 // Build the scan window: carry ++ new_data.
822 // Reuse the window buffer to avoid per-chunk allocation.
823 window.clear();
824 window.extend_from_slice(&carry);
825 window.extend_from_slice(&read_buf[..bytes_read]);
826
827 if window.is_empty() {
828 break;
829 }
830
831 // Scan the window: find matches, determine commit point, apply
832 // replacements, and flush the committed region to the writer.
833 // Returns the commit_point so we can slice the carry for next iter.
834 let commit_point = self.process_committed_window(
835 &window,
836 is_eof,
837 &mut scratch,
838 &mut writer,
839 &mut stats,
840 window_file_offset,
841 newlines_before_window,
842 &mut on_match,
843 )?;
844
845 // Advance file-level position counters for the next iteration.
846 // window[commit_point] is where the next window's carry starts,
847 // so that byte is at file offset (window_file_offset + commit_point).
848 newlines_before_window += count_newlines(&window[..commit_point]);
849 window_file_offset += commit_point as u64;
850
851 // Fold per-chunk pattern hit counts into the cumulative stats map,
852 // then emit a progress snapshot to the caller.
853 self.fold_chunk_counts(&mut scratch.pattern_counts, &mut stats);
854 on_progress(&ScanProgress {
855 bytes_processed: stats.bytes_processed,
856 bytes_output: stats.bytes_output,
857 total_bytes,
858 matches_found: stats.matches_found,
859 replacements_applied: stats.replacements_applied,
860 });
861
862 // Update carry for next iteration.
863 if is_eof {
864 carry.clear();
865 break;
866 }
867 carry.clear();
868 carry.extend_from_slice(&window[commit_point..]);
869 }
870
871 Ok(stats)
872 }
873
874 /// Scan one window, apply replacements up to the commit point, and flush
875 /// the result to `writer`. Returns the commit point so the caller can
876 /// slice the carry for the next iteration.
877 #[allow(clippy::too_many_arguments)]
878 fn process_committed_window(
879 &self,
880 window: &[u8],
881 is_eof: bool,
882 scratch: &mut ScanScratch,
883 writer: &mut dyn io::Write,
884 stats: &mut ScanStats,
885 window_file_offset: u64,
886 newlines_before_window: u64,
887 on_match: &mut dyn FnMut(MatchLocation),
888 ) -> Result<usize> {
889 // Find all non-overlapping matches in the window.
890 self.find_matches(window, scratch);
891
892 // Determine how much of the window can be safely committed this iteration.
893 let base_commit = if is_eof {
894 window.len()
895 } else {
896 window.len().saturating_sub(self.config.overlap_size)
897 };
898 let commit_point =
899 self.adjusted_commit_point(&scratch.selected, base_commit, window.len(), is_eof);
900
901 // Build output for the committed region (fills scratch.output).
902 self.apply_replacements(
903 &window[..commit_point],
904 &scratch.selected,
905 stats,
906 &mut scratch.output,
907 &mut scratch.pattern_counts,
908 window_file_offset,
909 newlines_before_window,
910 on_match,
911 )?;
912
913 writer
914 .write_all(&scratch.output)
915 .map_err(|e| SanitizeError::IoError(e.to_string()))?;
916 stats.bytes_output += scratch.output.len() as u64;
917
918 Ok(commit_point)
919 }
920
921 /// Fold per-chunk pattern hit counts into the cumulative `stats.pattern_counts`
922 /// map, then reset `counts` to zero for the next chunk.
923 ///
924 /// `label.clone()` is called at most once per distinct pattern per chunk,
925 /// not once per match hit, which keeps cost proportional to pattern count.
926 fn fold_chunk_counts(&self, counts: &mut [u64], stats: &mut ScanStats) {
927 for (idx, count) in counts.iter_mut().enumerate() {
928 if *count > 0 {
929 *stats
930 .pattern_counts
931 .entry(self.patterns[idx].label.clone())
932 .or_insert(0) += *count;
933 *count = 0;
934 }
935 }
936 }
937
938 /// Convenience: scan byte slice in-memory and return sanitized output.
939 ///
940 /// Equivalent to `scan_reader(input, Vec::new())` but returns the
941 /// output buffer directly.
942 ///
943 /// # Errors
944 ///
945 /// Returns [`SanitizeError`] if a replacement cannot be generated
946 /// (e.g. store capacity exceeded).
947 pub fn scan_bytes(&self, input: &[u8]) -> Result<(Vec<u8>, ScanStats)> {
948 self.scan_bytes_with_progress(input, |_| {})
949 }
950
951 /// Scan a byte slice in memory and emit progress snapshots.
952 ///
953 /// # Errors
954 ///
955 /// Returns [`SanitizeError`] if a replacement cannot be generated
956 /// (e.g. store capacity exceeded).
957 pub fn scan_bytes_with_progress<F>(
958 &self,
959 input: &[u8],
960 on_progress: F,
961 ) -> Result<(Vec<u8>, ScanStats)>
962 where
963 F: FnMut(&ScanProgress),
964 {
965 let mut output = Vec::with_capacity(input.len());
966 let stats = self.scan_reader_with_callbacks(
967 input,
968 &mut output,
969 Some(input.len() as u64),
970 on_progress,
971 |_| {},
972 )?;
973 Ok((output, stats))
974 }
975
976 // ---- Accessors ----
977
978 /// Access the scanner's configuration.
979 #[must_use]
980 pub fn config(&self) -> &ScanConfig {
981 &self.config
982 }
983
984 /// Access the underlying mapping store.
985 #[must_use]
986 pub fn store(&self) -> &Arc<MappingStore> {
987 &self.store
988 }
989
990 /// Number of patterns registered in this scanner.
991 #[must_use]
992 pub fn pattern_count(&self) -> usize {
993 self.patterns.len()
994 }
995
996 /// Create a scanner from an encrypted secrets file.
997 ///
998 /// Decrypts the file in memory, parses the entries, compiles
999 /// patterns, and returns the scanner ready to scan. Decrypted
1000 /// plaintext is scrubbed from memory after parsing.
1001 ///
1002 /// # Arguments
1003 ///
1004 /// - `encrypted_bytes` — raw bytes of the `.enc` file.
1005 /// - `password` — user password.
1006 /// - `format` — optional format override for the plaintext.
1007 /// - `store` — mapping store for dedup-consistent replacements.
1008 /// - `config` — chunking / overlap configuration.
1009 /// - `extra_patterns` — additional patterns to merge in.
1010 ///
1011 /// # Returns
1012 ///
1013 /// `(scanner, warnings, allow_patterns)` where `warnings` lists entries
1014 /// that failed to compile (index + error) and `allow_patterns` are the
1015 /// raw strings from `kind: allow` entries — pass these to
1016 /// [`AllowlistMatcher::new`](crate::allowlist::AllowlistMatcher) to
1017 /// suppress replacements for known-safe values.
1018 ///
1019 /// # Errors
1020 ///
1021 /// Returns a secrets-related [`SanitizeError`] on decryption failure
1022 /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
1023 pub fn from_encrypted_secrets(
1024 encrypted_bytes: &[u8],
1025 password: &str,
1026 format: Option<crate::secrets::SecretsFormat>,
1027 store: Arc<MappingStore>,
1028 config: ScanConfig,
1029 extra_patterns: Vec<ScanPattern>,
1030 ) -> SecretsLoadResult {
1031 let ((mut patterns, warnings), allow) =
1032 crate::secrets::load_encrypted_secrets(encrypted_bytes, password, format)?;
1033 patterns.extend(extra_patterns);
1034 let scanner = Self::new(patterns, store, config)?;
1035 Ok((scanner, warnings, allow))
1036 }
1037
1038 /// Create a scanner from a plaintext secrets file.
1039 ///
1040 /// Convenience for development / testing without encryption.
1041 ///
1042 /// # Returns
1043 ///
1044 /// `(scanner, warnings, allow_patterns)` where `allow_patterns` are the
1045 /// raw strings from `kind: allow` entries — pass these to
1046 /// [`AllowlistMatcher::new`](crate::allowlist::AllowlistMatcher) to
1047 /// suppress replacements for known-safe values.
1048 ///
1049 /// # Errors
1050 ///
1051 /// Returns a secrets-related [`SanitizeError`] on parse failure
1052 /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
1053 pub fn from_plaintext_secrets(
1054 plaintext: &[u8],
1055 format: Option<crate::secrets::SecretsFormat>,
1056 store: Arc<MappingStore>,
1057 config: ScanConfig,
1058 extra_patterns: Vec<ScanPattern>,
1059 ) -> SecretsLoadResult {
1060 let ((mut patterns, warnings), allow) =
1061 crate::secrets::load_plaintext_secrets(plaintext, format)?;
1062 patterns.extend(extra_patterns);
1063 let scanner = Self::new(patterns, store, config)?;
1064 Ok((scanner, warnings, allow))
1065 }
1066
1067 // ---- Internal helpers ----
1068
1069 /// Find all non-overlapping matches across all patterns.
1070 ///
1071 /// Fills `scratch.selected` with the winning non-overlapping matches
1072 /// for the given `window`. All three scratch `Vec`s are cleared and
1073 /// repopulated on each call so callers can freely reuse the same
1074 /// `ScanScratch` instance across chunks.
1075 ///
1076 /// ## Strategy
1077 ///
1078 /// 1. **Aho-Corasick** (`aho_corasick`): single O(n) SIMD pass over the
1079 /// window reporting every occurrence of every literal pattern,
1080 /// including overlapping ones. This replaces O(k·n) individual regex
1081 /// scans for the literal subset.
1082 /// 2. **RegexSet pre-filter** (R-3 optimisation): fast check of which
1083 /// *non-literal* regex patterns have any match in the window.
1084 /// 3. **Individual regex `find_iter`**: only for regex patterns flagged
1085 /// by step 2.
1086 /// 4. **Sort + greedy dedup**: all raw matches are sorted by start
1087 /// (ascending), then length (descending), and a single greedy pass
1088 /// selects the final non-overlapping set.
1089 fn find_matches(&self, window: &[u8], scratch: &mut ScanScratch) {
1090 scratch.all_matches.clear();
1091 scratch.selected.clear();
1092
1093 // Step 1: Aho-Corasick overlapping scan for all literal patterns.
1094 // find_overlapping_iter reports every match position including
1095 // overlapping ones, so the sort+greedy step below correctly resolves
1096 // ambiguities between literals (e.g. "abc" vs "abcd" at same offset).
1097 // Literals never have capture groups — capture is always None.
1098 if let Some(ac) = &self.aho_corasick {
1099 for mat in ac.find_overlapping_iter(window) {
1100 scratch.all_matches.push(RawMatch {
1101 start: mat.start(),
1102 end: mat.end(),
1103 pattern_idx: self.literal_indices[mat.pattern().as_usize()],
1104 capture: None,
1105 });
1106 }
1107 }
1108
1109 // Steps 2+3: RegexSet pre-filter then individual scan for non-literal
1110 // patterns. regex_set only contains non-literal pattern strings, so
1111 // literals are never scanned twice.
1112 // Use captures_iter so that patterns with a capture group 1 record
1113 // the sub-range to replace, while patterns without one fall back to
1114 // replacing the full match.
1115 for rs_idx in self.regex_set.matches(window) {
1116 let pattern_idx = self.regex_indices[rs_idx];
1117 if window.len() < self.patterns[pattern_idx].min_length {
1118 continue;
1119 }
1120 for cap in self.patterns[pattern_idx].regex.captures_iter(window) {
1121 let full = cap.get(0).expect("group 0 always exists");
1122 let capture = cap.get(1).map(|g| (g.start(), g.end()));
1123 scratch.all_matches.push(RawMatch {
1124 start: full.start(),
1125 end: full.end(),
1126 pattern_idx,
1127 capture,
1128 });
1129 }
1130 }
1131
1132 // Step 4: sort then greedy non-overlapping selection.
1133 // Skip entirely when no matches were found (the common case for
1134 // clean data), avoiding an unnecessary sort of an empty Vec.
1135 if scratch.all_matches.is_empty() {
1136 return;
1137 }
1138
1139 // Primary: start ascending. Secondary: length descending (longer
1140 // match wins when two matches begin at the same position).
1141 scratch.all_matches.sort_unstable_by(|a, b| {
1142 a.start
1143 .cmp(&b.start)
1144 .then_with(|| (b.end - b.start).cmp(&(a.end - a.start)))
1145 });
1146
1147 let mut last_end = 0;
1148 for m in scratch.all_matches.drain(..) {
1149 if m.start >= last_end {
1150 last_end = m.end;
1151 scratch.selected.push(m);
1152 }
1153 }
1154 }
1155
1156 /// Adjust the commit point to avoid splitting a match across the
1157 /// commit / carry boundary.
1158 ///
1159 /// If any match straddles `base_commit` (starts before, ends after),
1160 /// the commit point is moved to after that match so it is emitted
1161 /// in full this iteration.
1162 #[allow(clippy::unused_self)] // keep &self for API consistency with other scanner methods
1163 fn adjusted_commit_point(
1164 &self,
1165 matches: &[RawMatch],
1166 base_commit: usize,
1167 window_len: usize,
1168 is_eof: bool,
1169 ) -> usize {
1170 if is_eof {
1171 return window_len;
1172 }
1173
1174 let mut commit = base_commit;
1175
1176 for m in matches {
1177 if m.start < commit && m.end > commit {
1178 // Match straddles the boundary — extend commit to include it.
1179 commit = m.end;
1180 }
1181 }
1182
1183 // Never exceed window length.
1184 commit.min(window_len)
1185 }
1186
1187 /// Build the output for the committed region by splicing in replacements.
1188 ///
1189 /// Writes into `output_buf` (cleared on entry) and increments
1190 /// `stats.matches_found` / `stats.replacements_applied` for each applied
1191 /// replacement. Per-pattern hit counts are written to `pattern_counts`
1192 /// (indexed by `pattern_idx`); the caller is responsible for folding
1193 /// these into `ScanStats::pattern_counts` and resetting them.
1194 ///
1195 /// `matches` is the full selected set for the window (may include matches
1196 /// in the carry region beyond `committed`). Because `adjusted_commit_point`
1197 /// guarantees no match straddles the boundary, any match with
1198 /// `start < committed.len()` also has `end <= committed.len()`. The
1199 /// loop breaks early once `m.start >= committed.len()` since matches are
1200 /// sorted by start.
1201 ///
1202 /// `window_file_offset` and `newlines_before_window` are used to compute
1203 /// the absolute byte offset and 1-based line number for each committed
1204 /// match, which are delivered to `on_match`. The newline scan is
1205 /// incremental: we scan only the bytes between consecutive matches, not
1206 /// the full committed region.
1207 ///
1208 /// # Note on `from_utf8_lossy`
1209 ///
1210 /// `String::from_utf8_lossy` returns `Cow::Borrowed(&str)` for valid
1211 /// UTF-8 input (the common case for ASCII secrets) — no heap allocation
1212 /// on the hot path.
1213 #[allow(clippy::too_many_arguments)]
1214 fn apply_replacements(
1215 &self,
1216 committed: &[u8],
1217 matches: &[RawMatch],
1218 stats: &mut ScanStats,
1219 output_buf: &mut Vec<u8>,
1220 pattern_counts: &mut [u64],
1221 window_file_offset: u64,
1222 newlines_before_window: u64,
1223 on_match: &mut dyn FnMut(MatchLocation),
1224 ) -> Result<()> {
1225 output_buf.clear();
1226
1227 let mut last_end = 0;
1228 // Running newline count within the committed region, advanced
1229 // incrementally so we only scan the bytes between matches.
1230 let mut newlines_in_committed: u64 = 0;
1231 let mut newline_scan_pos: usize = 0;
1232
1233 for &m in matches {
1234 // Matches are sorted by start; those at or beyond the committed
1235 // region belong to the carry window — stop here.
1236 if m.start >= committed.len() {
1237 break;
1238 }
1239
1240 // Emit bytes before this match verbatim.
1241 output_buf.extend_from_slice(&committed[last_end..m.start]);
1242
1243 // Advance newline counter from previous scan position to match start,
1244 // then emit the match location to the caller.
1245 newlines_in_committed += count_newlines(&committed[newline_scan_pos..m.start]);
1246 newline_scan_pos = m.start;
1247 on_match(MatchLocation {
1248 line: newlines_before_window + newlines_in_committed + 1,
1249 byte_offset: window_file_offset + m.start as u64,
1250 pattern: self.patterns[m.pattern_idx].label.clone(),
1251 });
1252
1253 let pattern = &self.patterns[m.pattern_idx];
1254
1255 if let Some((cap_start, cap_end)) = m.capture {
1256 // Pattern has a capture group: replace only the capture group,
1257 // emitting the surrounding context bytes of the full match verbatim.
1258 // This preserves delimiters, key names, and prefixes that the
1259 // pattern uses as anchors to reduce false positives.
1260 if cap_start < m.start || cap_end > m.end || cap_start > cap_end {
1261 // Capture bounds outside match bounds — skip rather than panic.
1262 // This should not happen with correct regex patterns; log it so it
1263 // surfaces during testing without crashing production runs.
1264 tracing::warn!(
1265 pattern = %pattern.label,
1266 m_start = m.start,
1267 m_end = m.end,
1268 cap_start,
1269 cap_end,
1270 "capture group bounds outside match bounds — emitting full match unreplaced"
1271 );
1272 output_buf.extend_from_slice(&committed[m.start..m.end]);
1273 last_end = m.end;
1274 continue;
1275 }
1276 output_buf.extend_from_slice(&committed[m.start..cap_start]);
1277 let secret = String::from_utf8_lossy(&committed[cap_start..cap_end]);
1278 let replacement = self.store.get_or_insert(&pattern.category, &secret)?;
1279 output_buf.extend_from_slice(replacement.as_bytes());
1280 output_buf.extend_from_slice(&committed[cap_end..m.end]);
1281 } else {
1282 // No capture group — replace the full match (e.g. token-prefix
1283 // patterns like `glpat-[...]` where the full match IS the secret).
1284 let matched_text = String::from_utf8_lossy(&committed[m.start..m.end]);
1285 let replacement = self.store.get_or_insert(&pattern.category, &matched_text)?;
1286 output_buf.extend_from_slice(replacement.as_bytes());
1287 }
1288
1289 last_end = m.end;
1290
1291 stats.matches_found += 1;
1292 stats.replacements_applied += 1;
1293 pattern_counts[m.pattern_idx] += 1;
1294 }
1295
1296 // Emit the trailing non-matching tail.
1297 output_buf.extend_from_slice(&committed[last_end..]);
1298
1299 Ok(())
1300 }
1301}
1302
1303// ---------------------------------------------------------------------------
1304// Send + Sync compile-time assertion
1305// ---------------------------------------------------------------------------
1306
1307const _: fn() = || {
1308 fn assert_send<T: Send>() {}
1309 fn assert_sync<T: Sync>() {}
1310 assert_send::<StreamScanner>();
1311 assert_sync::<StreamScanner>();
1312};
1313
1314// ---------------------------------------------------------------------------
1315// I/O helper
1316// ---------------------------------------------------------------------------
1317
1318/// Count the number of `\n` bytes in `data`.
1319///
1320/// Used to advance the cumulative newline counter between consecutive
1321/// match positions so we can compute 1-based line numbers without
1322/// pre-scanning the entire committed region.
1323#[inline]
1324#[allow(clippy::naive_bytecount)]
1325fn count_newlines(data: &[u8]) -> u64 {
1326 data.iter().filter(|&&b| b == b'\n').count() as u64
1327}
1328
1329/// Read up to `buf.len()` bytes from `reader`, retrying on `Interrupted`.
1330///
1331/// Returns the number of bytes actually read (< `buf.len()` only at EOF).
1332fn read_fully<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
1333 let mut total = 0;
1334 while total < buf.len() {
1335 match reader.read(&mut buf[total..]) {
1336 Ok(0) => break, // EOF
1337 Ok(n) => total += n,
1338 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
1339 Err(e) => return Err(SanitizeError::IoError(e.to_string())),
1340 }
1341 }
1342 Ok(total)
1343}
1344
1345// ---------------------------------------------------------------------------
1346// Unit tests
1347// ---------------------------------------------------------------------------
1348
1349#[cfg(test)]
1350mod tests {
1351 use super::*;
1352 use crate::generator::HmacGenerator;
1353
1354 /// Helper: build a scanner with given patterns and small chunk config.
1355 fn test_scanner(patterns: Vec<ScanPattern>) -> StreamScanner {
1356 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1357 let store = Arc::new(MappingStore::new(gen, None));
1358 StreamScanner::new(
1359 patterns,
1360 store,
1361 ScanConfig {
1362 chunk_size: 64,
1363 overlap_size: 16,
1364 },
1365 )
1366 .unwrap()
1367 }
1368
1369 /// Helper: email pattern.
1370 fn email_pattern() -> ScanPattern {
1371 ScanPattern::from_regex(
1372 r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
1373 Category::Email,
1374 "email",
1375 )
1376 .unwrap()
1377 }
1378
1379 /// Helper: IPv4 pattern.
1380 fn ipv4_pattern() -> ScanPattern {
1381 ScanPattern::from_regex(
1382 r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
1383 Category::IpV4,
1384 "ipv4",
1385 )
1386 .unwrap()
1387 }
1388
1389 // ---- Construction ----
1390
1391 #[test]
1392 fn scanner_creation() {
1393 let scanner = test_scanner(vec![email_pattern()]);
1394 assert_eq!(scanner.pattern_count(), 1);
1395 }
1396
1397 #[test]
1398 fn invalid_config_zero_chunk() {
1399 let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1400 let store = Arc::new(MappingStore::new(gen, None));
1401 let result = StreamScanner::new(vec![], store, ScanConfig::new(0, 0));
1402 assert!(result.is_err());
1403 }
1404
1405 #[test]
1406 fn invalid_config_overlap_ge_chunk() {
1407 let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1408 let store = Arc::new(MappingStore::new(gen, None));
1409 let result = StreamScanner::new(vec![], store, ScanConfig::new(100, 100));
1410 assert!(result.is_err());
1411 }
1412
1413 // ---- Empty / no-match cases ----
1414
1415 #[test]
1416 fn empty_input() {
1417 let scanner = test_scanner(vec![email_pattern()]);
1418 let (output, stats) = scanner.scan_bytes(b"").unwrap();
1419 assert!(output.is_empty());
1420 assert_eq!(stats.matches_found, 0);
1421 assert_eq!(stats.bytes_processed, 0);
1422 }
1423
1424 #[test]
1425 fn no_matches() {
1426 let scanner = test_scanner(vec![email_pattern()]);
1427 let input = b"There are no email addresses here.";
1428 let (output, stats) = scanner.scan_bytes(input).unwrap();
1429 assert_eq!(output, input.as_slice());
1430 assert_eq!(stats.matches_found, 0);
1431 }
1432
1433 // ---- Single match ----
1434
1435 #[test]
1436 fn single_email_replaced() {
1437 let scanner = test_scanner(vec![email_pattern()]);
1438 let input = b"Contact alice@corp.com for help.";
1439 let (output, stats) = scanner.scan_bytes(input).unwrap();
1440 assert_eq!(stats.matches_found, 1);
1441 assert_eq!(stats.replacements_applied, 1);
1442 // Original must not appear in output.
1443 assert!(!output
1444 .windows(b"alice@corp.com".len())
1445 .any(|w| w == b"alice@corp.com"));
1446 // Replacement should contain the @ from the domain-preserving email.
1447 let output_str = String::from_utf8_lossy(&output);
1448 assert!(output_str.contains("@corp.com"));
1449 // Length preserved: output is same total length as input.
1450 assert_eq!(output.len(), input.len(), "length must be preserved");
1451 // Surrounding text preserved.
1452 assert!(output_str.starts_with("Contact "));
1453 assert!(output_str.ends_with(" for help."));
1454 }
1455
1456 // ---- Multiple matches ----
1457
1458 #[test]
1459 fn multiple_emails_replaced() {
1460 let scanner = test_scanner(vec![email_pattern()]);
1461 let input = b"From alice@corp.com to bob@corp.com cc admin@corp.com";
1462 let (output, stats) = scanner.scan_bytes(input).unwrap();
1463 assert_eq!(stats.matches_found, 3);
1464 let out_str = String::from_utf8_lossy(&output);
1465 assert!(!out_str.contains("alice@corp.com"));
1466 assert!(!out_str.contains("bob@corp.com"));
1467 assert!(!out_str.contains("admin@corp.com"));
1468 }
1469
1470 // ---- Same secret gets same replacement ----
1471
1472 #[test]
1473 fn same_secret_same_replacement() {
1474 let scanner = test_scanner(vec![email_pattern()]);
1475 let input = b"First alice@corp.com then alice@corp.com again.";
1476 let (output, stats) = scanner.scan_bytes(input).unwrap();
1477 assert_eq!(stats.matches_found, 2);
1478 let out_str = String::from_utf8_lossy(&output);
1479 // Both occurrences should be replaced with the same value.
1480 // With length-preserving replacements, look for the preserved domain.
1481 let parts: Vec<&str> = out_str.split("@corp.com").collect();
1482 // 3 parts = 2 occurrences of the replacement.
1483 assert_eq!(parts.len(), 3);
1484 }
1485
1486 // ---- Literal pattern ----
1487
1488 #[test]
1489 fn literal_pattern_matched() {
1490 let pat = ScanPattern::from_literal(
1491 "SECRET_API_KEY_12345",
1492 Category::Custom("api_key".into()),
1493 "api_key",
1494 )
1495 .unwrap();
1496 let scanner = test_scanner(vec![pat]);
1497 let input = b"key=SECRET_API_KEY_12345&foo=bar";
1498 let (output, stats) = scanner.scan_bytes(input).unwrap();
1499 assert_eq!(stats.matches_found, 1);
1500 assert!(!output
1501 .windows(b"SECRET_API_KEY_12345".len())
1502 .any(|w| w == b"SECRET_API_KEY_12345"));
1503 }
1504
1505 // ---- Multiple pattern types ----
1506
1507 #[test]
1508 fn multiple_pattern_types() {
1509 let scanner = test_scanner(vec![email_pattern(), ipv4_pattern()]);
1510 let input = b"Server 192.168.1.100 contact admin@server.com";
1511 let (output, stats) = scanner.scan_bytes(input).unwrap();
1512 assert_eq!(stats.matches_found, 2);
1513 let out_str = String::from_utf8_lossy(&output);
1514 assert!(!out_str.contains("192.168.1.100"));
1515 assert!(!out_str.contains("admin@server.com"));
1516 assert_eq!(*stats.pattern_counts.get("email").unwrap(), 1);
1517 assert_eq!(*stats.pattern_counts.get("ipv4").unwrap(), 1);
1518 }
1519
1520 // ---- Chunk boundary: match spans two chunks ----
1521
1522 #[test]
1523 fn match_at_chunk_boundary() {
1524 // Use a very small chunk size so the email straddles a boundary.
1525 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1526 let store = Arc::new(MappingStore::new(gen, None));
1527 let scanner = StreamScanner::new(
1528 vec![email_pattern()],
1529 store,
1530 ScanConfig {
1531 chunk_size: 20, // very small
1532 overlap_size: 16,
1533 },
1534 )
1535 .unwrap();
1536
1537 // Place an email address that will definitely straddle a boundary.
1538 let input = b"AAAAAAAAAAAAAAAA alice@corp.com BBBBBBBBBBBBB";
1539 let (output, stats) = scanner.scan_bytes(input).unwrap();
1540 assert_eq!(stats.matches_found, 1);
1541 let out_str = String::from_utf8_lossy(&output);
1542 assert!(!out_str.contains("alice@corp.com"));
1543 assert!(out_str.contains("@corp.com"), "domain must be preserved");
1544 }
1545
1546 // ---- Large input requiring many chunks ----
1547
1548 #[test]
1549 fn large_input_many_chunks() {
1550 let scanner = test_scanner(vec![email_pattern()]);
1551
1552 // Build a ~2 KiB input with emails sprinkled in.
1553 let mut input = Vec::new();
1554 let filler = b"Lorem ipsum dolor sit amet. ";
1555 for i in 0..20 {
1556 input.extend_from_slice(filler);
1557 let email = format!("user{}@example.com ", i);
1558 input.extend_from_slice(email.as_bytes());
1559 }
1560
1561 let (output, stats) = scanner.scan_bytes(&input).unwrap();
1562 assert_eq!(stats.matches_found, 20);
1563 let out_str = String::from_utf8_lossy(&output);
1564 for i in 0..20 {
1565 let email = format!("user{}@example.com", i);
1566 assert!(!out_str.contains(&email));
1567 }
1568 }
1569
1570 #[test]
1571 fn scan_bytes_with_progress_preserves_output_and_stats() {
1572 let scanner = test_scanner(vec![email_pattern()]);
1573 let input = b"Contact alice@corp.com and bob@corp.com for help.";
1574
1575 let (baseline_output, baseline_stats) = scanner.scan_bytes(input).unwrap();
1576
1577 let mut updates = Vec::new();
1578 let (progress_output, progress_stats) = scanner
1579 .scan_bytes_with_progress(input, |progress| updates.push(progress.clone()))
1580 .unwrap();
1581
1582 assert_eq!(progress_output, baseline_output);
1583 assert_eq!(
1584 progress_stats.bytes_processed,
1585 baseline_stats.bytes_processed
1586 );
1587 assert_eq!(progress_stats.bytes_output, baseline_stats.bytes_output);
1588 assert_eq!(progress_stats.matches_found, baseline_stats.matches_found);
1589 assert_eq!(
1590 progress_stats.replacements_applied,
1591 baseline_stats.replacements_applied
1592 );
1593 assert!(!updates.is_empty());
1594 assert_eq!(updates.last().unwrap().bytes_processed, input.len() as u64);
1595 assert_eq!(
1596 updates.last().unwrap().total_bytes,
1597 Some(input.len() as u64)
1598 );
1599 assert_eq!(updates.last().unwrap().matches_found, 2);
1600 }
1601
1602 #[test]
1603 fn scan_reader_with_progress_reports_multiple_updates_for_multi_chunk_input() {
1604 let scanner = test_scanner(vec![email_pattern()]);
1605 let mut input = Vec::new();
1606 for i in 0..8 {
1607 input.extend_from_slice(b"padding padding padding ");
1608 input.extend_from_slice(format!("user{i}@example.com ").as_bytes());
1609 }
1610
1611 let mut output = Vec::new();
1612 let mut updates = Vec::new();
1613 let stats = scanner
1614 .scan_reader_with_callbacks(
1615 &input[..],
1616 &mut output,
1617 Some(input.len() as u64),
1618 |progress| {
1619 updates.push(progress.clone());
1620 },
1621 |_| {},
1622 )
1623 .unwrap();
1624
1625 assert!(updates.len() >= 2);
1626 assert_eq!(
1627 updates.last().unwrap().bytes_processed,
1628 stats.bytes_processed
1629 );
1630 assert_eq!(updates.last().unwrap().bytes_output, stats.bytes_output);
1631 assert_eq!(
1632 updates.last().unwrap().total_bytes,
1633 Some(input.len() as u64)
1634 );
1635 }
1636
1637 // ---- Scan via Read/Write interface ----
1638
1639 #[test]
1640 fn scan_reader_writer() {
1641 let scanner = test_scanner(vec![email_pattern()]);
1642 let input = b"hello alice@corp.com world";
1643 let mut output = Vec::new();
1644 let stats = scanner.scan_reader(&input[..], &mut output).unwrap();
1645 assert_eq!(stats.matches_found, 1);
1646 let out_str = String::from_utf8_lossy(&output);
1647 assert!(out_str.contains("@corp.com"), "domain must be preserved");
1648 }
1649
1650 // ---- Pattern compile error ----
1651
1652 #[test]
1653 fn invalid_regex_pattern() {
1654 let result = ScanPattern::from_regex("[invalid(", Category::Email, "bad");
1655 assert!(result.is_err());
1656 }
1657
1658 // ---- Default config ----
1659
1660 #[test]
1661 fn default_config_valid() {
1662 ScanConfig::default().validate().unwrap();
1663 }
1664
1665 // ---- Config edge cases ----
1666
1667 #[test]
1668 fn config_chunk_1_overlap_0() {
1669 // Extreme but valid: 1-byte chunks, no overlap.
1670 // Won't catch multi-byte patterns, but should not crash.
1671 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1672 let store = Arc::new(MappingStore::new(gen, None));
1673 let scanner = StreamScanner::new(vec![], store, ScanConfig::new(1, 0)).unwrap();
1674 let (output, _) = scanner.scan_bytes(b"hello").unwrap();
1675 assert_eq!(output, b"hello");
1676 }
1677
1678 // ---- Bytes output tracking ----
1679
1680 #[test]
1681 fn bytes_output_preserved_on_replacement() {
1682 let scanner = test_scanner(vec![email_pattern()]);
1683 let input = b"a@b.cc"; // short email
1684 let (output, stats) = scanner.scan_bytes(input).unwrap();
1685 assert_eq!(stats.bytes_processed, input.len() as u64);
1686 assert_eq!(stats.bytes_output, output.len() as u64);
1687 // Length-preserving: output length matches input length.
1688 assert_eq!(output.len(), input.len());
1689 }
1690}