sanitize_engine/scanner.rs
1//! Streaming scanner for detecting and replacing sensitive data.
2//!
3//! # Architecture
4//!
5//! The streaming scanner processes input data in configurable chunks,
6//! detecting secret patterns (regex or literal) and applying one-way
7//! replacements via the [`MappingStore`].
8//! This design supports files of 20–100 GB+ without requiring the entire
9//! content to fit in memory.
10//!
11//! ```text
12//! ┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐
13//! │ Input (Read) │ ──▶ │ StreamScanner │ ──▶ │ Output (Write) │
14//! │ (chunked) │ │ (pattern match │ │ (sanitized) │
15//! └──────────────┘ │ + replace) │ └──────────────────┘
16//! └────────┬────────┘
17//! │
18//! ┌────────▼────────┐
19//! │ MappingStore │
20//! │ (dedup cache) │
21//! └─────────────────┘
22//! ```
23//!
24//! # Chunk Overlap Strategy
25//!
26//! To avoid missing matches that span chunk boundaries, the scanner
27//! maintains an overlap window between consecutive chunks:
28//!
29//! 1. Read `chunk_size` bytes of new data.
30//! 2. Prepend the `carry` buffer (tail of previous window).
31//! 3. Scan the combined `window` for all pattern matches.
32//! 4. Compute `commit_point = window.len() - overlap_size` (adjusted
33//! upward if a match straddles the boundary).
34//! 5. Emit output for `window[..commit_point]` with replacements applied.
35//! 6. Set `carry = window[commit_point..]` for the next iteration.
36//!
37//! The `overlap_size` should be ≥ the maximum expected match length to
38//! guarantee no matches are missed at boundaries.
39//!
40//! # Thread Safety
41//!
42//! [`StreamScanner`] is `Send + Sync`. Multiple files can be scanned
43//! concurrently using a shared `Arc<StreamScanner>`, all backed by the
44//! same [`MappingStore`] for per-run dedup
45//! consistency.
46//!
47//! # Performance
48//!
49//! - **Chunk-based I/O**: only `chunk_size + overlap_size` bytes in
50//! memory per active scan.
51//! - **Compiled regex**: patterns are compiled once at construction and
52//! reused across all chunks and files.
53//! - **Lock-free reads**: the `DashMap` inside `MappingStore` provides
54//! lock-free reads for already-seen values.
55//! - **File-level parallelism**: share `Arc<StreamScanner>` across
56//! threads to scan multiple files concurrently.
57
58use crate::category::Category;
59use crate::error::{Result, SanitizeError};
60use crate::store::MappingStore;
61use aho_corasick::AhoCorasick;
62use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder};
63use std::collections::HashMap;
64use std::io::{self, Read, Write};
65use std::sync::Arc;
66
67// ---------------------------------------------------------------------------
68// Configuration
69// ---------------------------------------------------------------------------
70
71/// Default chunk size: 1 MiB.
72const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
73
74/// Default overlap size: 4 KiB.
75const DEFAULT_OVERLAP_SIZE: usize = 4096;
76
77/// Maximum compiled regex automaton size (bytes). Prevents DoS via
78/// pathologically complex user-supplied patterns.
79const REGEX_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
80
81/// Maximum DFA cache size (bytes) per regex.
82const REGEX_DFA_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
83
84/// Maximum number of patterns allowed in a single scanner (F-05 fix).
85/// The `RegexSet` automaton memory scales linearly with pattern count.
86/// With 1 MiB size/DFA limits per pattern, 10 000 patterns could
87/// allocate up to ~20 GiB of automaton memory. This cap prevents
88/// accidental resource exhaustion. Override via
89/// [`StreamScanner::new_with_max_patterns`] if needed.
90const DEFAULT_MAX_PATTERNS: usize = 10_000;
91
92/// Configuration for the streaming scanner.
93///
94/// # Tuning Guide
95///
96/// | Workload | `chunk_size` | `overlap_size` |
97/// |------------------------|--------------|----------------|
98/// | Small files (< 10 MB) | 256 KiB | 1 KiB |
99/// | General purpose | 1 MiB | 4 KiB |
100/// | Large files (> 1 GB) | 4–8 MiB | 8 KiB |
101/// | Memory-constrained | 64 KiB | 1 KiB |
102///
103/// `overlap_size` should be ≥ the longest expected match. Most secret
104/// patterns (API keys, emails, SSNs) are well under 256 bytes, so the
105/// 4 KiB default provides ample margin.
106#[derive(Debug, Clone)]
107pub struct ScanConfig {
108 /// Size of each chunk read from the input (bytes).
109 ///
110 /// Larger chunks improve throughput (fewer syscalls) but use more
111 /// memory. Default: 1 MiB.
112 pub chunk_size: usize,
113
114 /// Overlap between consecutive chunks (bytes).
115 ///
116 /// Must be ≥ the maximum expected match length. Patterns whose
117 /// matches can exceed this length risk being missed at chunk
118 /// boundaries. Default: 4 KiB.
119 pub overlap_size: usize,
120}
121
122impl Default for ScanConfig {
123 fn default() -> Self {
124 Self {
125 chunk_size: DEFAULT_CHUNK_SIZE,
126 overlap_size: DEFAULT_OVERLAP_SIZE,
127 }
128 }
129}
130
131impl ScanConfig {
132 /// Create a new configuration with explicit values.
133 #[must_use]
134 pub fn new(chunk_size: usize, overlap_size: usize) -> Self {
135 Self {
136 chunk_size,
137 overlap_size,
138 }
139 }
140
141 /// Validate the configuration, returning an error if invalid.
142 ///
143 /// # Errors
144 ///
145 /// Returns [`SanitizeError::InvalidConfig`] if `chunk_size` is zero
146 /// or `overlap_size >= chunk_size`.
147 pub fn validate(&self) -> Result<()> {
148 if self.chunk_size == 0 {
149 return Err(SanitizeError::InvalidConfig(
150 "chunk_size must be > 0".into(),
151 ));
152 }
153 if self.overlap_size >= self.chunk_size {
154 return Err(SanitizeError::InvalidConfig(
155 "overlap_size must be < chunk_size".into(),
156 ));
157 }
158 Ok(())
159 }
160}
161
162// ---------------------------------------------------------------------------
163// Internal helpers
164// ---------------------------------------------------------------------------
165
166/// Convert any compile-time pattern error into [`SanitizeError::PatternCompileError`].
167#[inline]
168fn compile_err(e: impl std::fmt::Display) -> SanitizeError {
169 SanitizeError::PatternCompileError(e.to_string())
170}
171
172// ---------------------------------------------------------------------------
173// Scan pattern
174// ---------------------------------------------------------------------------
175
176/// A pattern rule defining what to scan for and how to categorize matches.
177///
178/// Wraps a compiled [`regex::bytes::Regex`] with a [`Category`] for
179/// replacement lookups and a human-readable label for reporting.
180///
181/// Both regex and literal patterns are supported. Literal patterns keep
182/// their original text and are matched by the scanner's Aho-Corasick
183/// automaton for fast multi-literal scanning.
184pub struct ScanPattern {
185 /// Compiled regex matcher (used for non-literal patterns and as a
186 /// fallback; literal patterns are matched via Aho-Corasick instead).
187 regex: Regex,
188 /// Category for replacement lookups.
189 category: Category,
190 /// Human-readable label for reporting / stats.
191 label: String,
192 /// Original (unescaped) literal string when created via `from_literal`.
193 /// `None` for patterns created via `from_regex`.
194 /// Stored so `StreamScanner` can build an Aho-Corasick automaton for
195 /// fast SIMD literal matching instead of running the regex engine.
196 literal: Option<String>,
197}
198
199impl std::fmt::Debug for ScanPattern {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 f.debug_struct("ScanPattern")
202 .field("pattern", &self.regex.as_str())
203 .field("category", &self.category)
204 .field("label", &self.label)
205 .field("literal", &self.literal.as_deref())
206 .finish()
207 }
208}
209
210impl Clone for ScanPattern {
211 fn clone(&self) -> Self {
212 Self {
213 regex: self.regex.clone(),
214 category: self.category.clone(),
215 label: self.label.clone(),
216 literal: self.literal.clone(),
217 }
218 }
219}
220
221impl ScanPattern {
222 /// Create a pattern from a regex string.
223 ///
224 /// # Errors
225 ///
226 /// Returns [`SanitizeError::PatternCompileError`] if the regex is invalid.
227 ///
228 /// # Examples
229 ///
230 /// ```
231 /// use sanitize_engine::scanner::ScanPattern;
232 /// use sanitize_engine::category::Category;
233 ///
234 /// let pat = ScanPattern::from_regex(
235 /// r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
236 /// Category::Email,
237 /// "email_address",
238 /// ).unwrap();
239 /// ```
240 pub fn from_regex(pattern: &str, category: Category, label: impl Into<String>) -> Result<Self> {
241 let regex = RegexBuilder::new(pattern)
242 .size_limit(REGEX_SIZE_LIMIT)
243 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
244 .build()
245 .map_err(compile_err)?;
246 Ok(Self {
247 regex,
248 category,
249 label: label.into(),
250 literal: None,
251 })
252 }
253
254 /// Create a pattern from a literal string.
255 ///
256 /// The literal is escaped so that regex metacharacters are matched
257 /// verbatim.
258 ///
259 /// # Errors
260 ///
261 /// Returns [`SanitizeError::PatternCompileError`] if regex compilation fails.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use sanitize_engine::scanner::ScanPattern;
267 /// use sanitize_engine::category::Category;
268 ///
269 /// let pat = ScanPattern::from_literal(
270 /// "sk-proj-abc123secret",
271 /// Category::Custom("api_key".into()),
272 /// "openai_key",
273 /// ).unwrap();
274 /// ```
275 pub fn from_literal(
276 literal: &str,
277 category: Category,
278 label: impl Into<String>,
279 ) -> Result<Self> {
280 let escaped = regex::escape(literal);
281 let regex = RegexBuilder::new(&escaped)
282 .size_limit(REGEX_SIZE_LIMIT)
283 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
284 .build()
285 .map_err(compile_err)?;
286 Ok(Self {
287 regex,
288 category,
289 label: label.into(),
290 literal: Some(literal.to_owned()),
291 })
292 }
293
294 /// The category this pattern maps to.
295 #[must_use]
296 pub fn category(&self) -> &Category {
297 &self.category
298 }
299
300 /// The human-readable label.
301 #[must_use]
302 pub fn label(&self) -> &str {
303 &self.label
304 }
305
306 /// Return the raw regex pattern string for RegexSet construction.
307 #[must_use]
308 pub fn regex_pattern(&self) -> &str {
309 self.regex.as_str()
310 }
311}
312
313// ScanPattern is Send + Sync because:
314// - regex::bytes::Regex is Send + Sync
315// - Category is Send + Sync (it's an enum of primitives + CompactString)
316// - String is Send + Sync
317
318// ---------------------------------------------------------------------------
319// Internal: raw match descriptor
320// ---------------------------------------------------------------------------
321
322/// A single match found during scanning (internal).
323#[derive(Debug, Clone, Copy)]
324struct RawMatch {
325 /// Start byte offset within the scan window.
326 start: usize,
327 /// End byte offset (exclusive) within the scan window.
328 end: usize,
329 /// Index into the `StreamScanner::patterns` vector.
330 pattern_idx: usize,
331}
332
333// ---------------------------------------------------------------------------
334// Per-scan scratch buffers
335// ---------------------------------------------------------------------------
336
337/// Scratch buffers reused across chunks within a single scan call.
338///
339/// Allocating these once per `scan_reader_with_progress` invocation
340/// and reusing them each chunk eliminates the per-chunk heap pressure
341/// that would otherwise come from `Vec` allocations in `find_matches`
342/// and `apply_replacements`.
343struct ScanScratch {
344 /// Accumulates raw matches from all patterns before deduplication.
345 all_matches: Vec<RawMatch>,
346 /// Non-overlapping matches selected for the current window
347 /// (populated by `find_matches`, consumed by `apply_replacements`).
348 selected: Vec<RawMatch>,
349 /// Output bytes for the committed region, written by `apply_replacements`.
350 output: Vec<u8>,
351 /// Per-pattern match counts indexed by `pattern_idx`.
352 /// Reset to zero after each chunk's counts are folded into `ScanStats`.
353 pattern_counts: Vec<u64>,
354}
355
356impl ScanScratch {
357 fn new(pattern_count: usize, chunk_size: usize, overlap_size: usize) -> Self {
358 Self {
359 all_matches: Vec::new(),
360 selected: Vec::new(),
361 output: Vec::with_capacity(chunk_size + overlap_size),
362 pattern_counts: vec![0u64; pattern_count],
363 }
364 }
365}
366
367// ---------------------------------------------------------------------------
368// Scan statistics
369// ---------------------------------------------------------------------------
370
371/// Statistics collected during a scan operation.
372///
373/// Returned by [`StreamScanner::scan_reader`] and
374/// [`StreamScanner::scan_bytes`] to provide visibility into what
375/// the scanner did.
376#[derive(Debug, Clone, Default)]
377pub struct ScanStats {
378 /// Total bytes read from the input.
379 pub bytes_processed: u64,
380 /// Total bytes written to the output (may differ from `bytes_processed`
381 /// when replacements have different lengths than the originals).
382 pub bytes_output: u64,
383 /// Total number of matches found across all patterns.
384 pub matches_found: u64,
385 /// Total number of replacements applied (always == `matches_found`
386 /// in one-way mode).
387 pub replacements_applied: u64,
388 /// Per-pattern match counts, keyed by pattern label.
389 pub pattern_counts: HashMap<String, u64>,
390}
391
392/// Progress snapshot emitted during streaming scans.
393#[derive(Debug, Clone, Default, Eq, PartialEq)]
394pub struct ScanProgress {
395 /// Total bytes read from the input so far.
396 pub bytes_processed: u64,
397 /// Total bytes written to the output so far.
398 pub bytes_output: u64,
399 /// Total input size when known.
400 pub total_bytes: Option<u64>,
401 /// Total number of matches found so far.
402 pub matches_found: u64,
403 /// Total replacements applied so far.
404 pub replacements_applied: u64,
405}
406
407// ---------------------------------------------------------------------------
408// StreamScanner
409// ---------------------------------------------------------------------------
410
411/// Streaming scanner that detects and replaces sensitive patterns.
412///
413/// Thread-safe: can be shared via `Arc<StreamScanner>` for concurrent
414/// scanning of multiple files. Each call to [`scan_reader`](Self::scan_reader)
415/// is independent and maintains its own chunking state.
416///
417/// # Usage
418///
419/// ```rust
420/// use sanitize_engine::scanner::{StreamScanner, ScanPattern, ScanConfig};
421/// use sanitize_engine::category::Category;
422/// use sanitize_engine::generator::HmacGenerator;
423/// use sanitize_engine::store::MappingStore;
424/// use std::sync::Arc;
425///
426/// // 1. Build the replacement store.
427/// let gen = Arc::new(HmacGenerator::new([42u8; 32]));
428/// let store = Arc::new(MappingStore::new(gen, None));
429///
430/// // 2. Define patterns.
431/// let patterns = vec![
432/// ScanPattern::from_regex(
433/// r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
434/// Category::Email,
435/// "email",
436/// ).unwrap(),
437/// ];
438///
439/// // 3. Create the scanner.
440/// let scanner = StreamScanner::new(patterns, store, ScanConfig::default()).unwrap();
441///
442/// // 4. Scan.
443/// let input = b"Contact alice@corp.com for details.";
444/// let (output, stats) = scanner.scan_bytes(input).unwrap();
445/// assert_eq!(stats.matches_found, 1);
446/// assert!(!output.windows(b"alice@corp.com".len())
447/// .any(|w| w == b"alice@corp.com"));
448/// ```
449pub struct StreamScanner {
450 /// Compiled scan patterns (both literal and regex).
451 patterns: Vec<ScanPattern>,
452 /// Pre-compiled set for fast multi-pattern pre-filtering of **regex**
453 /// (non-literal) patterns only. `matches()` returns which regex-pattern
454 /// indices matched, avoiding running every individual regex on each chunk
455 /// (R-3 optimisation).
456 regex_set: RegexSet,
457 /// Maps a `RegexSet` index → index into `self.patterns`.
458 /// Only non-literal patterns are in the `RegexSet`.
459 regex_indices: Vec<usize>,
460 /// Aho-Corasick automaton for fast SIMD literal matching.
461 /// `None` when there are no literal patterns.
462 aho_corasick: Option<AhoCorasick>,
463 /// Maps an Aho-Corasick pattern index → index into `self.patterns`.
464 /// Only literal patterns appear here.
465 literal_indices: Vec<usize>,
466 /// Thread-safe dedup replacement store.
467 store: Arc<MappingStore>,
468 /// Scanner configuration.
469 config: ScanConfig,
470}
471
472impl StreamScanner {
473 /// Create a new streaming scanner.
474 ///
475 /// # Arguments
476 ///
477 /// - `patterns` — the set of patterns to scan for.
478 /// - `store` — the mapping store for dedup-consistent replacements.
479 /// - `config` — chunking / overlap configuration.
480 ///
481 /// # Errors
482 ///
483 /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
484 /// invalid (e.g. `chunk_size == 0` or `overlap_size >= chunk_size`).
485 pub fn new(
486 patterns: Vec<ScanPattern>,
487 store: Arc<MappingStore>,
488 config: ScanConfig,
489 ) -> Result<Self> {
490 Self::new_with_max_patterns(patterns, store, config, DEFAULT_MAX_PATTERNS)
491 }
492
493 /// Create a new streaming scanner with a custom pattern limit.
494 ///
495 /// This is identical to [`new`](Self::new) but allows overriding the
496 /// default pattern cap (10 000). Use this
497 /// when you have a legitimate need for more patterns and have
498 /// verified that your system has enough memory for the resulting
499 /// `RegexSet`.
500 ///
501 /// # Errors
502 ///
503 /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
504 /// invalid or the pattern count exceeds `max_patterns`.
505 pub fn new_with_max_patterns(
506 patterns: Vec<ScanPattern>,
507 store: Arc<MappingStore>,
508 config: ScanConfig,
509 max_patterns: usize,
510 ) -> Result<Self> {
511 config.validate()?;
512
513 // F-05 fix: enforce maximum pattern count to bound RegexSet memory.
514 if patterns.len() > max_patterns {
515 return Err(SanitizeError::InvalidConfig(format!(
516 "pattern count ({}) exceeds maximum allowed ({}) — \
517 RegexSet memory scales linearly with pattern count",
518 patterns.len(),
519 max_patterns
520 )));
521 }
522
523 // Partition patterns into literal (Aho-Corasick) and regex (RegexSet)
524 // so each is matched by the most efficient engine.
525 let mut literal_bytes: Vec<Vec<u8>> = Vec::new();
526 let mut literal_indices: Vec<usize> = Vec::new();
527 let mut regex_strs: Vec<&str> = Vec::new();
528 let mut regex_indices: Vec<usize> = Vec::new();
529
530 for (i, pattern) in patterns.iter().enumerate() {
531 if let Some(lit) = &pattern.literal {
532 literal_bytes.push(lit.as_bytes().to_vec());
533 literal_indices.push(i);
534 } else {
535 regex_strs.push(pattern.regex_pattern());
536 regex_indices.push(i);
537 }
538 }
539
540 // Build Aho-Corasick automaton for literal patterns (SIMD-accelerated,
541 // single O(n) pass over the input per chunk).
542 let aho_corasick = if literal_bytes.is_empty() {
543 None
544 } else {
545 Some(
546 AhoCorasick::new(&literal_bytes)
547 .map_err(compile_err)?,
548 )
549 };
550
551 // Build RegexSet from non-literal patterns only (R-3 pre-filter).
552 let regex_set = if regex_strs.is_empty() {
553 RegexSetBuilder::new(Vec::<&str>::new())
554 .size_limit(REGEX_SIZE_LIMIT)
555 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
556 .build()
557 .map_err(compile_err)?
558 } else {
559 RegexSetBuilder::new(®ex_strs)
560 .size_limit(REGEX_SIZE_LIMIT * regex_strs.len().max(1))
561 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT * regex_strs.len().max(1))
562 .build()
563 .map_err(compile_err)?
564 };
565
566 Ok(Self {
567 patterns,
568 regex_set,
569 regex_indices,
570 aho_corasick,
571 literal_indices,
572 store,
573 config,
574 })
575 }
576
577 /// Create a copy of this scanner extended with additional literal patterns.
578 ///
579 /// Clones the existing pattern set and appends `extra`, then rebuilds
580 /// the internal Aho-Corasick and RegexSet automata. Used by the
581 /// format-preserving structured pass to scan original bytes with
582 /// discovered field-value literals added to the base pattern set.
583 ///
584 /// # Errors
585 ///
586 /// Returns [`SanitizeError`] if automaton construction fails or the
587 /// combined pattern count exceeds the default limit.
588 pub fn with_extra_literals(&self, extra: Vec<ScanPattern>) -> Result<Self> {
589 let mut patterns = self.patterns.clone();
590 patterns.extend(extra);
591 Self::new(patterns, Arc::clone(&self.store), self.config.clone())
592 }
593
594 /// Scan a reader and write sanitized output to a writer.
595 ///
596 /// Processes the input in chunks of `config.chunk_size` bytes,
597 /// maintaining an overlap window of `config.overlap_size` bytes to
598 /// catch matches spanning chunk boundaries. All detected matches
599 /// are replaced one-way via the [`MappingStore`].
600 ///
601 /// # Arguments
602 ///
603 /// - `reader` — input source (file, network stream, `&[u8]`, …).
604 /// - `writer` — output sink (file, `Vec<u8>`, …).
605 ///
606 /// # Returns
607 ///
608 /// [`ScanStats`] with counters for bytes processed, matches found, etc.
609 ///
610 /// # Errors
611 ///
612 /// Returns [`SanitizeError`] on I/O failures or if a replacement
613 /// cannot be generated (e.g. store capacity exceeded).
614 pub fn scan_reader<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ScanStats> {
615 self.scan_reader_with_progress(reader, writer, None, |_| {})
616 }
617
618 /// Scan a reader and emit progress snapshots after each committed chunk.
619 ///
620 /// `total_bytes` should be provided when the caller knows the full input
621 /// size. When omitted, progress consumers should avoid percentages/ETA.
622 ///
623 /// # Errors
624 ///
625 /// Returns [`SanitizeError`] on I/O failures or if a replacement
626 /// cannot be generated (e.g. store capacity exceeded).
627 pub fn scan_reader_with_progress<R: Read, W: Write, F>(
628 &self,
629 mut reader: R,
630 mut writer: W,
631 total_bytes: Option<u64>,
632 mut on_progress: F,
633 ) -> Result<ScanStats>
634 where
635 F: FnMut(&ScanProgress),
636 {
637 let mut stats = ScanStats::default();
638
639 // Carry buffer: the tail of the previous window that needs
640 // to be re-scanned with the next chunk.
641 let mut carry: Vec<u8> = Vec::new();
642
643 // Read buffer (reused across iterations to avoid re-allocation).
644 let mut read_buf = vec![0u8; self.config.chunk_size];
645
646 // Scan window (reused across iterations — grows to peak size then
647 // stays there, avoiding per-chunk allocation).
648 let mut window: Vec<u8> =
649 Vec::with_capacity(self.config.chunk_size + self.config.overlap_size);
650
651 // Scratch buffers reused every chunk to eliminate per-chunk heap
652 // pressure from match collection, output building, and stats tracking.
653 let mut scratch = ScanScratch::new(
654 self.patterns.len(),
655 self.config.chunk_size,
656 self.config.overlap_size,
657 );
658
659 loop {
660 // Read the next chunk.
661 let bytes_read = read_fully(&mut reader, &mut read_buf)?;
662 let is_eof = bytes_read < read_buf.len();
663
664 // Track only genuinely new bytes (carry was already counted).
665 stats.bytes_processed += bytes_read as u64;
666
667 if bytes_read == 0 && carry.is_empty() {
668 break;
669 }
670
671 // Build the scan window: carry ++ new_data.
672 // Reuse the window buffer to avoid per-chunk allocation.
673 let new_data = &read_buf[..bytes_read];
674 window.clear();
675 window.extend_from_slice(&carry);
676 window.extend_from_slice(new_data);
677
678 if window.is_empty() {
679 break;
680 }
681
682 // Find all non-overlapping matches in the window (fills scratch.selected).
683 self.find_matches(&window, &mut scratch);
684
685 // Determine the commit point — how much of the window we can
686 // safely emit this iteration.
687 let base_commit = if is_eof {
688 window.len()
689 } else {
690 window.len().saturating_sub(self.config.overlap_size)
691 };
692
693 let commit_point =
694 self.adjusted_commit_point(&scratch.selected, base_commit, window.len(), is_eof);
695
696 // Build output into scratch.output and update stats counters.
697 // Matches beyond commit_point are filtered inside apply_replacements.
698 self.apply_replacements(
699 &window[..commit_point],
700 &scratch.selected,
701 &mut stats,
702 &mut scratch.output,
703 &mut scratch.pattern_counts,
704 )?;
705
706 writer
707 .write_all(&scratch.output)
708 .map_err(|e| SanitizeError::IoError(e.to_string()))?;
709 stats.bytes_output += scratch.output.len() as u64;
710
711 // Fold per-chunk pattern counts into stats.
712 // label.clone() is called at most once per distinct pattern per
713 // chunk (not once per match hit), which is far cheaper at scale.
714 for (idx, count) in scratch.pattern_counts.iter_mut().enumerate() {
715 if *count > 0 {
716 *stats
717 .pattern_counts
718 .entry(self.patterns[idx].label.clone())
719 .or_insert(0) += *count;
720 *count = 0; // reset for next chunk
721 }
722 }
723
724 on_progress(&ScanProgress {
725 bytes_processed: stats.bytes_processed,
726 bytes_output: stats.bytes_output,
727 total_bytes,
728 matches_found: stats.matches_found,
729 replacements_applied: stats.replacements_applied,
730 });
731
732 // Update carry for next iteration. Reuse the carry buffer
733 // by copying remaining bytes down.
734 if is_eof {
735 carry.clear();
736 break;
737 }
738 carry.clear();
739 carry.extend_from_slice(&window[commit_point..]);
740 }
741
742 Ok(stats)
743 }
744
745 /// Convenience: scan byte slice in-memory and return sanitized output.
746 ///
747 /// Equivalent to `scan_reader(input, Vec::new())` but returns the
748 /// output buffer directly.
749 ///
750 /// # Errors
751 ///
752 /// Returns [`SanitizeError`] if a replacement cannot be generated
753 /// (e.g. store capacity exceeded).
754 pub fn scan_bytes(&self, input: &[u8]) -> Result<(Vec<u8>, ScanStats)> {
755 self.scan_bytes_with_progress(input, |_| {})
756 }
757
758 /// Scan a byte slice in memory and emit progress snapshots.
759 ///
760 /// # Errors
761 ///
762 /// Returns [`SanitizeError`] if a replacement cannot be generated
763 /// (e.g. store capacity exceeded).
764 pub fn scan_bytes_with_progress<F>(
765 &self,
766 input: &[u8],
767 on_progress: F,
768 ) -> Result<(Vec<u8>, ScanStats)>
769 where
770 F: FnMut(&ScanProgress),
771 {
772 let mut output = Vec::with_capacity(input.len());
773 let stats = self.scan_reader_with_progress(
774 input,
775 &mut output,
776 Some(input.len() as u64),
777 on_progress,
778 )?;
779 Ok((output, stats))
780 }
781
782 // ---- Accessors ----
783
784 /// Access the scanner's configuration.
785 #[must_use]
786 pub fn config(&self) -> &ScanConfig {
787 &self.config
788 }
789
790 /// Access the underlying mapping store.
791 #[must_use]
792 pub fn store(&self) -> &Arc<MappingStore> {
793 &self.store
794 }
795
796 /// Number of patterns registered in this scanner.
797 #[must_use]
798 pub fn pattern_count(&self) -> usize {
799 self.patterns.len()
800 }
801
802 /// Create a scanner from an encrypted secrets file.
803 ///
804 /// Decrypts the file in memory, parses the entries, compiles
805 /// patterns, and returns the scanner ready to scan. Decrypted
806 /// plaintext is scrubbed from memory after parsing.
807 ///
808 /// # Arguments
809 ///
810 /// - `encrypted_bytes` — raw bytes of the `.enc` file.
811 /// - `password` — user password.
812 /// - `format` — optional format override for the plaintext.
813 /// - `store` — mapping store for dedup-consistent replacements.
814 /// - `config` — chunking / overlap configuration.
815 /// - `extra_patterns` — additional patterns to merge in.
816 ///
817 /// # Returns
818 ///
819 /// `(scanner, warnings)` where `warnings` lists entries that
820 /// failed to compile (index + error).
821 ///
822 /// # Errors
823 ///
824 /// Returns [`SanitizeError::SecretsError`] on decryption failure
825 /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
826 pub fn from_encrypted_secrets(
827 encrypted_bytes: &[u8],
828 password: &str,
829 format: Option<crate::secrets::SecretsFormat>,
830 store: Arc<MappingStore>,
831 config: ScanConfig,
832 extra_patterns: Vec<ScanPattern>,
833 ) -> Result<(Self, Vec<(usize, SanitizeError)>)> {
834 let (mut patterns, warnings) =
835 crate::secrets::load_encrypted_secrets(encrypted_bytes, password, format)?;
836 patterns.extend(extra_patterns);
837 let scanner = Self::new(patterns, store, config)?;
838 Ok((scanner, warnings))
839 }
840
841 /// Create a scanner from a plaintext secrets file.
842 ///
843 /// Convenience for development / testing without encryption.
844 ///
845 /// # Errors
846 ///
847 /// Returns [`SanitizeError::SecretsError`] on parse failure
848 /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
849 pub fn from_plaintext_secrets(
850 plaintext: &[u8],
851 format: Option<crate::secrets::SecretsFormat>,
852 store: Arc<MappingStore>,
853 config: ScanConfig,
854 extra_patterns: Vec<ScanPattern>,
855 ) -> Result<(Self, Vec<(usize, SanitizeError)>)> {
856 let (mut patterns, warnings) = crate::secrets::load_plaintext_secrets(plaintext, format)?;
857 patterns.extend(extra_patterns);
858 let scanner = Self::new(patterns, store, config)?;
859 Ok((scanner, warnings))
860 }
861
862 // ---- Internal helpers ----
863
864 /// Find all non-overlapping matches across all patterns.
865 ///
866 /// Fills `scratch.selected` with the winning non-overlapping matches
867 /// for the given `window`. All three scratch `Vec`s are cleared and
868 /// repopulated on each call so callers can freely reuse the same
869 /// `ScanScratch` instance across chunks.
870 ///
871 /// ## Strategy
872 ///
873 /// 1. **Aho-Corasick** (`aho_corasick`): single O(n) SIMD pass over the
874 /// window reporting every occurrence of every literal pattern,
875 /// including overlapping ones. This replaces O(k·n) individual regex
876 /// scans for the literal subset.
877 /// 2. **RegexSet pre-filter** (R-3 optimisation): fast check of which
878 /// *non-literal* regex patterns have any match in the window.
879 /// 3. **Individual regex `find_iter`**: only for regex patterns flagged
880 /// by step 2.
881 /// 4. **Sort + greedy dedup**: all raw matches are sorted by start
882 /// (ascending), then length (descending), and a single greedy pass
883 /// selects the final non-overlapping set.
884 fn find_matches(&self, window: &[u8], scratch: &mut ScanScratch) {
885 scratch.all_matches.clear();
886 scratch.selected.clear();
887
888 // Step 1: Aho-Corasick overlapping scan for all literal patterns.
889 // find_overlapping_iter reports every match position including
890 // overlapping ones, so the sort+greedy step below correctly resolves
891 // ambiguities between literals (e.g. "abc" vs "abcd" at same offset).
892 if let Some(ac) = &self.aho_corasick {
893 for mat in ac.find_overlapping_iter(window) {
894 scratch.all_matches.push(RawMatch {
895 start: mat.start(),
896 end: mat.end(),
897 pattern_idx: self.literal_indices[mat.pattern().as_usize()],
898 });
899 }
900 }
901
902 // Steps 2+3: RegexSet pre-filter then individual scan for non-literal
903 // patterns. regex_set only contains non-literal pattern strings, so
904 // literals are never scanned twice.
905 for rs_idx in self.regex_set.matches(window) {
906 let pattern_idx = self.regex_indices[rs_idx];
907 for m in self.patterns[pattern_idx].regex.find_iter(window) {
908 scratch.all_matches.push(RawMatch {
909 start: m.start(),
910 end: m.end(),
911 pattern_idx,
912 });
913 }
914 }
915
916 // Step 4: sort then greedy non-overlapping selection.
917 // Skip entirely when no matches were found (the common case for
918 // clean data), avoiding an unnecessary sort of an empty Vec.
919 if scratch.all_matches.is_empty() {
920 return;
921 }
922
923 // Primary: start ascending. Secondary: length descending (longer
924 // match wins when two matches begin at the same position).
925 scratch.all_matches.sort_unstable_by(|a, b| {
926 a.start
927 .cmp(&b.start)
928 .then_with(|| (b.end - b.start).cmp(&(a.end - a.start)))
929 });
930
931 let mut last_end = 0;
932 for m in scratch.all_matches.drain(..) {
933 if m.start >= last_end {
934 last_end = m.end;
935 scratch.selected.push(m);
936 }
937 }
938 }
939
940 /// Adjust the commit point to avoid splitting a match across the
941 /// commit / carry boundary.
942 ///
943 /// If any match straddles `base_commit` (starts before, ends after),
944 /// the commit point is moved to after that match so it is emitted
945 /// in full this iteration.
946 #[allow(clippy::unused_self)] // keep &self for API consistency with other scanner methods
947 fn adjusted_commit_point(
948 &self,
949 matches: &[RawMatch],
950 base_commit: usize,
951 window_len: usize,
952 is_eof: bool,
953 ) -> usize {
954 if is_eof {
955 return window_len;
956 }
957
958 let mut commit = base_commit;
959
960 for m in matches {
961 if m.start < commit && m.end > commit {
962 // Match straddles the boundary — extend commit to include it.
963 commit = m.end;
964 }
965 }
966
967 // Never exceed window length.
968 commit.min(window_len)
969 }
970
971 /// Build the output for the committed region by splicing in replacements.
972 ///
973 /// Writes into `output_buf` (cleared on entry) and increments
974 /// `stats.matches_found` / `stats.replacements_applied` for each applied
975 /// replacement. Per-pattern hit counts are written to `pattern_counts`
976 /// (indexed by `pattern_idx`); the caller is responsible for folding
977 /// these into `ScanStats::pattern_counts` and resetting them.
978 ///
979 /// `matches` is the full selected set for the window (may include matches
980 /// in the carry region beyond `committed`). Because `adjusted_commit_point`
981 /// guarantees no match straddles the boundary, any match with
982 /// `start < committed.len()` also has `end <= committed.len()`. The
983 /// loop breaks early once `m.start >= committed.len()` since matches are
984 /// sorted by start.
985 ///
986 /// # Note on `from_utf8_lossy`
987 ///
988 /// `String::from_utf8_lossy` returns `Cow::Borrowed(&str)` for valid
989 /// UTF-8 input (the common case for ASCII secrets) — no heap allocation
990 /// on the hot path.
991 fn apply_replacements(
992 &self,
993 committed: &[u8],
994 matches: &[RawMatch],
995 stats: &mut ScanStats,
996 output_buf: &mut Vec<u8>,
997 pattern_counts: &mut [u64],
998 ) -> Result<()> {
999 output_buf.clear();
1000
1001 let mut last_end = 0;
1002
1003 for &m in matches {
1004 // Matches are sorted by start; those at or beyond the committed
1005 // region belong to the carry window — stop here.
1006 if m.start >= committed.len() {
1007 break;
1008 }
1009
1010 // Emit bytes before this match verbatim.
1011 output_buf.extend_from_slice(&committed[last_end..m.start]);
1012
1013 // Decode matched bytes. from_utf8_lossy is zero-copy (Cow::Borrowed)
1014 // for valid UTF-8, which covers all ASCII secrets.
1015 let matched_text = String::from_utf8_lossy(&committed[m.start..m.end]);
1016
1017 // One-way deterministic replacement via the MappingStore.
1018 let pattern = &self.patterns[m.pattern_idx];
1019 let replacement = self.store.get_or_insert(&pattern.category, &matched_text)?;
1020
1021 output_buf.extend_from_slice(replacement.as_bytes());
1022 last_end = m.end;
1023
1024 stats.matches_found += 1;
1025 stats.replacements_applied += 1;
1026 pattern_counts[m.pattern_idx] += 1;
1027 }
1028
1029 // Emit the trailing non-matching tail.
1030 output_buf.extend_from_slice(&committed[last_end..]);
1031
1032 Ok(())
1033 }
1034}
1035
1036// ---------------------------------------------------------------------------
1037// Send + Sync compile-time assertion
1038// ---------------------------------------------------------------------------
1039
1040const _: fn() = || {
1041 fn assert_send<T: Send>() {}
1042 fn assert_sync<T: Sync>() {}
1043 assert_send::<StreamScanner>();
1044 assert_sync::<StreamScanner>();
1045};
1046
1047// ---------------------------------------------------------------------------
1048// I/O helper
1049// ---------------------------------------------------------------------------
1050
1051/// Read up to `buf.len()` bytes from `reader`, retrying on `Interrupted`.
1052///
1053/// Returns the number of bytes actually read (< `buf.len()` only at EOF).
1054fn read_fully<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
1055 let mut total = 0;
1056 while total < buf.len() {
1057 match reader.read(&mut buf[total..]) {
1058 Ok(0) => break, // EOF
1059 Ok(n) => total += n,
1060 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
1061 Err(e) => return Err(SanitizeError::IoError(e.to_string())),
1062 }
1063 }
1064 Ok(total)
1065}
1066
1067// ---------------------------------------------------------------------------
1068// Unit tests
1069// ---------------------------------------------------------------------------
1070
1071#[cfg(test)]
1072mod tests {
1073 use super::*;
1074 use crate::generator::HmacGenerator;
1075
1076 /// Helper: build a scanner with given patterns and small chunk config.
1077 fn test_scanner(patterns: Vec<ScanPattern>) -> StreamScanner {
1078 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1079 let store = Arc::new(MappingStore::new(gen, None));
1080 StreamScanner::new(
1081 patterns,
1082 store,
1083 ScanConfig {
1084 chunk_size: 64,
1085 overlap_size: 16,
1086 },
1087 )
1088 .unwrap()
1089 }
1090
1091 /// Helper: email pattern.
1092 fn email_pattern() -> ScanPattern {
1093 ScanPattern::from_regex(
1094 r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
1095 Category::Email,
1096 "email",
1097 )
1098 .unwrap()
1099 }
1100
1101 /// Helper: IPv4 pattern.
1102 fn ipv4_pattern() -> ScanPattern {
1103 ScanPattern::from_regex(
1104 r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
1105 Category::IpV4,
1106 "ipv4",
1107 )
1108 .unwrap()
1109 }
1110
1111 // ---- Construction ----
1112
1113 #[test]
1114 fn scanner_creation() {
1115 let scanner = test_scanner(vec![email_pattern()]);
1116 assert_eq!(scanner.pattern_count(), 1);
1117 }
1118
1119 #[test]
1120 fn invalid_config_zero_chunk() {
1121 let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1122 let store = Arc::new(MappingStore::new(gen, None));
1123 let result = StreamScanner::new(vec![], store, ScanConfig::new(0, 0));
1124 assert!(result.is_err());
1125 }
1126
1127 #[test]
1128 fn invalid_config_overlap_ge_chunk() {
1129 let gen = Arc::new(HmacGenerator::new([0u8; 32]));
1130 let store = Arc::new(MappingStore::new(gen, None));
1131 let result = StreamScanner::new(vec![], store, ScanConfig::new(100, 100));
1132 assert!(result.is_err());
1133 }
1134
1135 // ---- Empty / no-match cases ----
1136
1137 #[test]
1138 fn empty_input() {
1139 let scanner = test_scanner(vec![email_pattern()]);
1140 let (output, stats) = scanner.scan_bytes(b"").unwrap();
1141 assert!(output.is_empty());
1142 assert_eq!(stats.matches_found, 0);
1143 assert_eq!(stats.bytes_processed, 0);
1144 }
1145
1146 #[test]
1147 fn no_matches() {
1148 let scanner = test_scanner(vec![email_pattern()]);
1149 let input = b"There are no email addresses here.";
1150 let (output, stats) = scanner.scan_bytes(input).unwrap();
1151 assert_eq!(output, input.as_slice());
1152 assert_eq!(stats.matches_found, 0);
1153 }
1154
1155 // ---- Single match ----
1156
1157 #[test]
1158 fn single_email_replaced() {
1159 let scanner = test_scanner(vec![email_pattern()]);
1160 let input = b"Contact alice@corp.com for help.";
1161 let (output, stats) = scanner.scan_bytes(input).unwrap();
1162 assert_eq!(stats.matches_found, 1);
1163 assert_eq!(stats.replacements_applied, 1);
1164 // Original must not appear in output.
1165 assert!(!output
1166 .windows(b"alice@corp.com".len())
1167 .any(|w| w == b"alice@corp.com"));
1168 // Replacement should contain the @ from the domain-preserving email.
1169 let output_str = String::from_utf8_lossy(&output);
1170 assert!(output_str.contains("@corp.com"));
1171 // Length preserved: output is same total length as input.
1172 assert_eq!(output.len(), input.len(), "length must be preserved");
1173 // Surrounding text preserved.
1174 assert!(output_str.starts_with("Contact "));
1175 assert!(output_str.ends_with(" for help."));
1176 }
1177
1178 // ---- Multiple matches ----
1179
1180 #[test]
1181 fn multiple_emails_replaced() {
1182 let scanner = test_scanner(vec![email_pattern()]);
1183 let input = b"From alice@corp.com to bob@corp.com cc admin@corp.com";
1184 let (output, stats) = scanner.scan_bytes(input).unwrap();
1185 assert_eq!(stats.matches_found, 3);
1186 let out_str = String::from_utf8_lossy(&output);
1187 assert!(!out_str.contains("alice@corp.com"));
1188 assert!(!out_str.contains("bob@corp.com"));
1189 assert!(!out_str.contains("admin@corp.com"));
1190 }
1191
1192 // ---- Same secret gets same replacement ----
1193
1194 #[test]
1195 fn same_secret_same_replacement() {
1196 let scanner = test_scanner(vec![email_pattern()]);
1197 let input = b"First alice@corp.com then alice@corp.com again.";
1198 let (output, stats) = scanner.scan_bytes(input).unwrap();
1199 assert_eq!(stats.matches_found, 2);
1200 let out_str = String::from_utf8_lossy(&output);
1201 // Both occurrences should be replaced with the same value.
1202 // With length-preserving replacements, look for the preserved domain.
1203 let parts: Vec<&str> = out_str.split("@corp.com").collect();
1204 // 3 parts = 2 occurrences of the replacement.
1205 assert_eq!(parts.len(), 3);
1206 }
1207
1208 // ---- Literal pattern ----
1209
1210 #[test]
1211 fn literal_pattern_matched() {
1212 let pat = ScanPattern::from_literal(
1213 "SECRET_API_KEY_12345",
1214 Category::Custom("api_key".into()),
1215 "api_key",
1216 )
1217 .unwrap();
1218 let scanner = test_scanner(vec![pat]);
1219 let input = b"key=SECRET_API_KEY_12345&foo=bar";
1220 let (output, stats) = scanner.scan_bytes(input).unwrap();
1221 assert_eq!(stats.matches_found, 1);
1222 assert!(!output
1223 .windows(b"SECRET_API_KEY_12345".len())
1224 .any(|w| w == b"SECRET_API_KEY_12345"));
1225 }
1226
1227 // ---- Multiple pattern types ----
1228
1229 #[test]
1230 fn multiple_pattern_types() {
1231 let scanner = test_scanner(vec![email_pattern(), ipv4_pattern()]);
1232 let input = b"Server 192.168.1.100 contact admin@server.com";
1233 let (output, stats) = scanner.scan_bytes(input).unwrap();
1234 assert_eq!(stats.matches_found, 2);
1235 let out_str = String::from_utf8_lossy(&output);
1236 assert!(!out_str.contains("192.168.1.100"));
1237 assert!(!out_str.contains("admin@server.com"));
1238 assert_eq!(*stats.pattern_counts.get("email").unwrap(), 1);
1239 assert_eq!(*stats.pattern_counts.get("ipv4").unwrap(), 1);
1240 }
1241
1242 // ---- Chunk boundary: match spans two chunks ----
1243
1244 #[test]
1245 fn match_at_chunk_boundary() {
1246 // Use a very small chunk size so the email straddles a boundary.
1247 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1248 let store = Arc::new(MappingStore::new(gen, None));
1249 let scanner = StreamScanner::new(
1250 vec![email_pattern()],
1251 store,
1252 ScanConfig {
1253 chunk_size: 20, // very small
1254 overlap_size: 16,
1255 },
1256 )
1257 .unwrap();
1258
1259 // Place an email address that will definitely straddle a boundary.
1260 let input = b"AAAAAAAAAAAAAAAA alice@corp.com BBBBBBBBBBBBB";
1261 let (output, stats) = scanner.scan_bytes(input).unwrap();
1262 assert_eq!(stats.matches_found, 1);
1263 let out_str = String::from_utf8_lossy(&output);
1264 assert!(!out_str.contains("alice@corp.com"));
1265 assert!(out_str.contains("@corp.com"), "domain must be preserved");
1266 }
1267
1268 // ---- Large input requiring many chunks ----
1269
1270 #[test]
1271 fn large_input_many_chunks() {
1272 let scanner = test_scanner(vec![email_pattern()]);
1273
1274 // Build a ~2 KiB input with emails sprinkled in.
1275 let mut input = Vec::new();
1276 let filler = b"Lorem ipsum dolor sit amet. ";
1277 for i in 0..20 {
1278 input.extend_from_slice(filler);
1279 let email = format!("user{}@example.com ", i);
1280 input.extend_from_slice(email.as_bytes());
1281 }
1282
1283 let (output, stats) = scanner.scan_bytes(&input).unwrap();
1284 assert_eq!(stats.matches_found, 20);
1285 let out_str = String::from_utf8_lossy(&output);
1286 for i in 0..20 {
1287 let email = format!("user{}@example.com", i);
1288 assert!(!out_str.contains(&email));
1289 }
1290 }
1291
1292 #[test]
1293 fn scan_bytes_with_progress_preserves_output_and_stats() {
1294 let scanner = test_scanner(vec![email_pattern()]);
1295 let input = b"Contact alice@corp.com and bob@corp.com for help.";
1296
1297 let (baseline_output, baseline_stats) = scanner.scan_bytes(input).unwrap();
1298
1299 let mut updates = Vec::new();
1300 let (progress_output, progress_stats) = scanner
1301 .scan_bytes_with_progress(input, |progress| updates.push(progress.clone()))
1302 .unwrap();
1303
1304 assert_eq!(progress_output, baseline_output);
1305 assert_eq!(
1306 progress_stats.bytes_processed,
1307 baseline_stats.bytes_processed
1308 );
1309 assert_eq!(progress_stats.bytes_output, baseline_stats.bytes_output);
1310 assert_eq!(progress_stats.matches_found, baseline_stats.matches_found);
1311 assert_eq!(
1312 progress_stats.replacements_applied,
1313 baseline_stats.replacements_applied
1314 );
1315 assert!(!updates.is_empty());
1316 assert_eq!(updates.last().unwrap().bytes_processed, input.len() as u64);
1317 assert_eq!(
1318 updates.last().unwrap().total_bytes,
1319 Some(input.len() as u64)
1320 );
1321 assert_eq!(updates.last().unwrap().matches_found, 2);
1322 }
1323
1324 #[test]
1325 fn scan_reader_with_progress_reports_multiple_updates_for_multi_chunk_input() {
1326 let scanner = test_scanner(vec![email_pattern()]);
1327 let mut input = Vec::new();
1328 for i in 0..8 {
1329 input.extend_from_slice(b"padding padding padding ");
1330 input.extend_from_slice(format!("user{i}@example.com ").as_bytes());
1331 }
1332
1333 let mut output = Vec::new();
1334 let mut updates = Vec::new();
1335 let stats = scanner
1336 .scan_reader_with_progress(
1337 &input[..],
1338 &mut output,
1339 Some(input.len() as u64),
1340 |progress| {
1341 updates.push(progress.clone());
1342 },
1343 )
1344 .unwrap();
1345
1346 assert!(updates.len() >= 2);
1347 assert_eq!(
1348 updates.last().unwrap().bytes_processed,
1349 stats.bytes_processed
1350 );
1351 assert_eq!(updates.last().unwrap().bytes_output, stats.bytes_output);
1352 assert_eq!(
1353 updates.last().unwrap().total_bytes,
1354 Some(input.len() as u64)
1355 );
1356 }
1357
1358 // ---- Scan via Read/Write interface ----
1359
1360 #[test]
1361 fn scan_reader_writer() {
1362 let scanner = test_scanner(vec![email_pattern()]);
1363 let input = b"hello alice@corp.com world";
1364 let mut output = Vec::new();
1365 let stats = scanner.scan_reader(&input[..], &mut output).unwrap();
1366 assert_eq!(stats.matches_found, 1);
1367 let out_str = String::from_utf8_lossy(&output);
1368 assert!(out_str.contains("@corp.com"), "domain must be preserved");
1369 }
1370
1371 // ---- Pattern compile error ----
1372
1373 #[test]
1374 fn invalid_regex_pattern() {
1375 let result = ScanPattern::from_regex("[invalid(", Category::Email, "bad");
1376 assert!(result.is_err());
1377 }
1378
1379 // ---- Default config ----
1380
1381 #[test]
1382 fn default_config_valid() {
1383 ScanConfig::default().validate().unwrap();
1384 }
1385
1386 // ---- Config edge cases ----
1387
1388 #[test]
1389 fn config_chunk_1_overlap_0() {
1390 // Extreme but valid: 1-byte chunks, no overlap.
1391 // Won't catch multi-byte patterns, but should not crash.
1392 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1393 let store = Arc::new(MappingStore::new(gen, None));
1394 let scanner = StreamScanner::new(vec![], store, ScanConfig::new(1, 0)).unwrap();
1395 let (output, _) = scanner.scan_bytes(b"hello").unwrap();
1396 assert_eq!(output, b"hello");
1397 }
1398
1399 // ---- Bytes output tracking ----
1400
1401 #[test]
1402 fn bytes_output_preserved_on_replacement() {
1403 let scanner = test_scanner(vec![email_pattern()]);
1404 let input = b"a@b.cc"; // short email
1405 let (output, stats) = scanner.scan_bytes(input).unwrap();
1406 assert_eq!(stats.bytes_processed, input.len() as u64);
1407 assert_eq!(stats.bytes_output, output.len() as u64);
1408 // Length-preserving: output length matches input length.
1409 assert_eq!(output.len(), input.len());
1410 }
1411}