sanitize_engine/scanner.rs
1//! Streaming scanner for detecting and replacing sensitive data.
2//!
3//! # Architecture
4//!
5//! The streaming scanner processes input data in configurable chunks,
6//! detecting secret patterns (regex or literal) and applying one-way
7//! replacements via the [`MappingStore`].
8//! This design supports files of 20–100 GB+ without requiring the entire
9//! content to fit in memory.
10//!
11//! ```text
12//! ┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐
13//! │ Input (Read) │ ──▶ │ StreamScanner │ ──▶ │ Output (Write) │
14//! │ (chunked) │ │ (pattern match │ │ (sanitized) │
15//! └──────────────┘ │ + replace) │ └──────────────────┘
16//! └────────┬────────┘
17//! │
18//! ┌────────▼────────┐
19//! │ MappingStore │
20//! │ (dedup cache) │
21//! └─────────────────┘
22//! ```
23//!
24//! # Chunk Overlap Strategy
25//!
26//! To avoid missing matches that span chunk boundaries, the scanner
27//! maintains an overlap window between consecutive chunks:
28//!
29//! 1. Read `chunk_size` bytes of new data.
30//! 2. Prepend the `carry` buffer (tail of previous window).
31//! 3. Scan the combined `window` for all pattern matches.
32//! 4. Compute `commit_point = window.len() - overlap_size` (adjusted
33//! upward if a match straddles the boundary).
34//! 5. Emit output for `window[..commit_point]` with replacements applied.
35//! 6. Set `carry = window[commit_point..]` for the next iteration.
36//!
37//! The `overlap_size` should be ≥ the maximum expected match length to
38//! guarantee no matches are missed at boundaries.
39//!
40//! # Thread Safety
41//!
42//! [`StreamScanner`] is `Send + Sync`. Multiple files can be scanned
43//! concurrently using a shared `Arc<StreamScanner>`, all backed by the
44//! same [`MappingStore`] for per-run dedup
45//! consistency.
46//!
47//! # Performance
48//!
49//! - **Chunk-based I/O**: only `chunk_size + overlap_size` bytes in
50//! memory per active scan.
51//! - **Compiled regex**: patterns are compiled once at construction and
52//! reused across all chunks and files.
53//! - **Lock-free reads**: the `DashMap` inside `MappingStore` provides
54//! lock-free reads for already-seen values.
55//! - **File-level parallelism**: share `Arc<StreamScanner>` across
56//! threads to scan multiple files concurrently.
57
58use crate::category::Category;
59use crate::error::{Result, SanitizeError};
60use crate::store::MappingStore;
61use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder};
62use std::collections::HashMap;
63use std::io::{self, Read, Write};
64use std::sync::Arc;
65
66// ---------------------------------------------------------------------------
67// Configuration
68// ---------------------------------------------------------------------------
69
70/// Default chunk size: 1 MiB.
71const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
72
73/// Default overlap size: 4 KiB.
74const DEFAULT_OVERLAP_SIZE: usize = 4096;
75
76/// Maximum compiled regex automaton size (bytes). Prevents DoS via
77/// pathologically complex user-supplied patterns.
78const REGEX_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
79
80/// Maximum DFA cache size (bytes) per regex.
81const REGEX_DFA_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
82
83/// Maximum number of patterns allowed in a single scanner (F-05 fix).
84/// The `RegexSet` automaton memory scales linearly with pattern count.
85/// With 1 MiB size/DFA limits per pattern, 10 000 patterns could
86/// allocate up to ~20 GiB of automaton memory. This cap prevents
87/// accidental resource exhaustion. Override via
88/// [`StreamScanner::new_with_max_patterns`] if needed.
89const DEFAULT_MAX_PATTERNS: usize = 10_000;
90
91/// Configuration for the streaming scanner.
92///
93/// # Tuning Guide
94///
95/// | Workload | `chunk_size` | `overlap_size` |
96/// |------------------------|--------------|----------------|
97/// | Small files (< 10 MB) | 256 KiB | 1 KiB |
98/// | General purpose | 1 MiB | 4 KiB |
99/// | Large files (> 1 GB) | 4–8 MiB | 8 KiB |
100/// | Memory-constrained | 64 KiB | 1 KiB |
101///
102/// `overlap_size` should be ≥ the longest expected match. Most secret
103/// patterns (API keys, emails, SSNs) are well under 256 bytes, so the
104/// 4 KiB default provides ample margin.
105#[derive(Debug, Clone)]
106pub struct ScanConfig {
107 /// Size of each chunk read from the input (bytes).
108 ///
109 /// Larger chunks improve throughput (fewer syscalls) but use more
110 /// memory. Default: 1 MiB.
111 pub chunk_size: usize,
112
113 /// Overlap between consecutive chunks (bytes).
114 ///
115 /// Must be ≥ the maximum expected match length. Patterns whose
116 /// matches can exceed this length risk being missed at chunk
117 /// boundaries. Default: 4 KiB.
118 pub overlap_size: usize,
119}
120
121impl Default for ScanConfig {
122 fn default() -> Self {
123 Self {
124 chunk_size: DEFAULT_CHUNK_SIZE,
125 overlap_size: DEFAULT_OVERLAP_SIZE,
126 }
127 }
128}
129
130impl ScanConfig {
131 /// Create a new configuration with explicit values.
132 #[must_use]
133 pub fn new(chunk_size: usize, overlap_size: usize) -> Self {
134 Self {
135 chunk_size,
136 overlap_size,
137 }
138 }
139
140 /// Validate the configuration, returning an error if invalid.
141 ///
142 /// # Errors
143 ///
144 /// Returns [`SanitizeError::InvalidConfig`] if `chunk_size` is zero
145 /// or `overlap_size >= chunk_size`.
146 pub fn validate(&self) -> Result<()> {
147 if self.chunk_size == 0 {
148 return Err(SanitizeError::InvalidConfig(
149 "chunk_size must be > 0".into(),
150 ));
151 }
152 if self.overlap_size >= self.chunk_size {
153 return Err(SanitizeError::InvalidConfig(
154 "overlap_size must be < chunk_size".into(),
155 ));
156 }
157 Ok(())
158 }
159}
160
161// ---------------------------------------------------------------------------
162// Scan pattern
163// ---------------------------------------------------------------------------
164
165/// A pattern rule defining what to scan for and how to categorize matches.
166///
167/// Wraps a compiled [`regex::bytes::Regex`] with a [`Category`] for
168/// replacement lookups and a human-readable label for reporting.
169///
170/// Both regex and literal patterns are supported. Literals are escaped
171/// and compiled as regex for uniform handling.
172pub struct ScanPattern {
173 /// Compiled regex matcher.
174 regex: Regex,
175 /// Category for replacement lookups.
176 category: Category,
177 /// Human-readable label for reporting / stats.
178 label: String,
179}
180
181impl std::fmt::Debug for ScanPattern {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 f.debug_struct("ScanPattern")
184 .field("pattern", &self.regex.as_str())
185 .field("category", &self.category)
186 .field("label", &self.label)
187 .finish()
188 }
189}
190
191impl ScanPattern {
192 /// Create a pattern from a regex string.
193 ///
194 /// # Errors
195 ///
196 /// Returns [`SanitizeError::PatternCompileError`] if the regex is invalid.
197 ///
198 /// # Examples
199 ///
200 /// ```
201 /// use sanitize_engine::scanner::ScanPattern;
202 /// use sanitize_engine::category::Category;
203 ///
204 /// let pat = ScanPattern::from_regex(
205 /// r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
206 /// Category::Email,
207 /// "email_address",
208 /// ).unwrap();
209 /// ```
210 pub fn from_regex(pattern: &str, category: Category, label: impl Into<String>) -> Result<Self> {
211 let regex = RegexBuilder::new(pattern)
212 .size_limit(REGEX_SIZE_LIMIT)
213 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
214 .build()
215 .map_err(|e| SanitizeError::PatternCompileError(e.to_string()))?;
216 Ok(Self {
217 regex,
218 category,
219 label: label.into(),
220 })
221 }
222
223 /// Create a pattern from a literal string.
224 ///
225 /// The literal is escaped so that regex metacharacters are matched
226 /// verbatim.
227 ///
228 /// # Errors
229 ///
230 /// Returns [`SanitizeError::PatternCompileError`] if regex compilation fails.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use sanitize_engine::scanner::ScanPattern;
236 /// use sanitize_engine::category::Category;
237 ///
238 /// let pat = ScanPattern::from_literal(
239 /// "sk-proj-abc123secret",
240 /// Category::Custom("api_key".into()),
241 /// "openai_key",
242 /// ).unwrap();
243 /// ```
244 pub fn from_literal(
245 literal: &str,
246 category: Category,
247 label: impl Into<String>,
248 ) -> Result<Self> {
249 let escaped = regex::escape(literal);
250 let regex = RegexBuilder::new(&escaped)
251 .size_limit(REGEX_SIZE_LIMIT)
252 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
253 .build()
254 .map_err(|e| SanitizeError::PatternCompileError(e.to_string()))?;
255 Ok(Self {
256 regex,
257 category,
258 label: label.into(),
259 })
260 }
261
262 /// The category this pattern maps to.
263 #[must_use]
264 pub fn category(&self) -> &Category {
265 &self.category
266 }
267
268 /// The human-readable label.
269 #[must_use]
270 pub fn label(&self) -> &str {
271 &self.label
272 }
273
274 /// Return the raw regex pattern string for RegexSet construction.
275 #[must_use]
276 pub fn regex_pattern(&self) -> &str {
277 self.regex.as_str()
278 }
279}
280
281// ScanPattern is Send + Sync because:
282// - regex::bytes::Regex is Send + Sync
283// - Category is Send + Sync (it's an enum of primitives + CompactString)
284// - String is Send + Sync
285
286// ---------------------------------------------------------------------------
287// Internal: raw match descriptor
288// ---------------------------------------------------------------------------
289
290/// A single match found during scanning (internal).
291#[derive(Debug, Clone)]
292struct RawMatch {
293 /// Start byte offset within the scan window.
294 start: usize,
295 /// End byte offset (exclusive) within the scan window.
296 end: usize,
297 /// Index into the `StreamScanner::patterns` vector.
298 pattern_idx: usize,
299}
300
301// ---------------------------------------------------------------------------
302// Scan statistics
303// ---------------------------------------------------------------------------
304
305/// Statistics collected during a scan operation.
306///
307/// Returned by [`StreamScanner::scan_reader`] and
308/// [`StreamScanner::scan_bytes`] to provide visibility into what
309/// the scanner did.
310#[derive(Debug, Clone, Default)]
311pub struct ScanStats {
312 /// Total bytes read from the input.
313 pub bytes_processed: u64,
314 /// Total bytes written to the output (may differ from `bytes_processed`
315 /// when replacements have different lengths than the originals).
316 pub bytes_output: u64,
317 /// Total number of matches found across all patterns.
318 pub matches_found: u64,
319 /// Total number of replacements applied (always == `matches_found`
320 /// in one-way mode).
321 pub replacements_applied: u64,
322 /// Per-pattern match counts, keyed by pattern label.
323 pub pattern_counts: HashMap<String, u64>,
324}
325
326// ---------------------------------------------------------------------------
327// StreamScanner
328// ---------------------------------------------------------------------------
329
330/// Streaming scanner that detects and replaces sensitive patterns.
331///
332/// Thread-safe: can be shared via `Arc<StreamScanner>` for concurrent
333/// scanning of multiple files. Each call to [`scan_reader`](Self::scan_reader)
334/// is independent and maintains its own chunking state.
335///
336/// # Usage
337///
338/// ```rust
339/// use sanitize_engine::scanner::{StreamScanner, ScanPattern, ScanConfig};
340/// use sanitize_engine::category::Category;
341/// use sanitize_engine::generator::HmacGenerator;
342/// use sanitize_engine::store::MappingStore;
343/// use std::sync::Arc;
344///
345/// // 1. Build the replacement store.
346/// let gen = Arc::new(HmacGenerator::new([42u8; 32]));
347/// let store = Arc::new(MappingStore::new(gen, None));
348///
349/// // 2. Define patterns.
350/// let patterns = vec![
351/// ScanPattern::from_regex(
352/// r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
353/// Category::Email,
354/// "email",
355/// ).unwrap(),
356/// ];
357///
358/// // 3. Create the scanner.
359/// let scanner = StreamScanner::new(patterns, store, ScanConfig::default()).unwrap();
360///
361/// // 4. Scan.
362/// let input = b"Contact alice@corp.com for details.";
363/// let (output, stats) = scanner.scan_bytes(input).unwrap();
364/// assert_eq!(stats.matches_found, 1);
365/// assert!(!output.windows(b"alice@corp.com".len())
366/// .any(|w| w == b"alice@corp.com"));
367/// ```
368pub struct StreamScanner {
369 /// Compiled scan patterns.
370 patterns: Vec<ScanPattern>,
371 /// Pre-compiled set for fast multi-pattern pre-filtering.
372 /// `matches()` returns which pattern indices matched, avoiding
373 /// running every individual regex on each chunk (R-3 optimisation).
374 regex_set: RegexSet,
375 /// Thread-safe dedup replacement store.
376 store: Arc<MappingStore>,
377 /// Scanner configuration.
378 config: ScanConfig,
379}
380
381impl StreamScanner {
382 /// Create a new streaming scanner.
383 ///
384 /// # Arguments
385 ///
386 /// - `patterns` — the set of patterns to scan for.
387 /// - `store` — the mapping store for dedup-consistent replacements.
388 /// - `config` — chunking / overlap configuration.
389 ///
390 /// # Errors
391 ///
392 /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
393 /// invalid (e.g. `chunk_size == 0` or `overlap_size >= chunk_size`).
394 pub fn new(
395 patterns: Vec<ScanPattern>,
396 store: Arc<MappingStore>,
397 config: ScanConfig,
398 ) -> Result<Self> {
399 Self::new_with_max_patterns(patterns, store, config, DEFAULT_MAX_PATTERNS)
400 }
401
402 /// Create a new streaming scanner with a custom pattern limit.
403 ///
404 /// This is identical to [`new`](Self::new) but allows overriding the
405 /// default pattern cap (10 000). Use this
406 /// when you have a legitimate need for more patterns and have
407 /// verified that your system has enough memory for the resulting
408 /// `RegexSet`.
409 ///
410 /// # Errors
411 ///
412 /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
413 /// invalid or the pattern count exceeds `max_patterns`.
414 pub fn new_with_max_patterns(
415 patterns: Vec<ScanPattern>,
416 store: Arc<MappingStore>,
417 config: ScanConfig,
418 max_patterns: usize,
419 ) -> Result<Self> {
420 config.validate()?;
421
422 // F-05 fix: enforce maximum pattern count to bound RegexSet memory.
423 if patterns.len() > max_patterns {
424 return Err(SanitizeError::InvalidConfig(format!(
425 "pattern count ({}) exceeds maximum allowed ({}) — \
426 RegexSet memory scales linearly with pattern count",
427 patterns.len(),
428 max_patterns
429 )));
430 }
431
432 // Build a RegexSet from all pattern strings for fast pre-filtering.
433 let regex_set = if patterns.is_empty() {
434 RegexSetBuilder::new(Vec::<&str>::new())
435 .size_limit(REGEX_SIZE_LIMIT)
436 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
437 .build()
438 .map_err(|e| SanitizeError::PatternCompileError(e.to_string()))?
439 } else {
440 let pattern_strs: Vec<&str> = patterns.iter().map(|p| p.regex_pattern()).collect();
441 RegexSetBuilder::new(&pattern_strs)
442 .size_limit(REGEX_SIZE_LIMIT * pattern_strs.len().max(1))
443 .dfa_size_limit(REGEX_DFA_SIZE_LIMIT * pattern_strs.len().max(1))
444 .build()
445 .map_err(|e| SanitizeError::PatternCompileError(e.to_string()))?
446 };
447
448 Ok(Self {
449 patterns,
450 regex_set,
451 store,
452 config,
453 })
454 }
455
456 /// Scan a reader and write sanitized output to a writer.
457 ///
458 /// Processes the input in chunks of `config.chunk_size` bytes,
459 /// maintaining an overlap window of `config.overlap_size` bytes to
460 /// catch matches spanning chunk boundaries. All detected matches
461 /// are replaced one-way via the [`MappingStore`].
462 ///
463 /// # Arguments
464 ///
465 /// - `reader` — input source (file, network stream, `&[u8]`, …).
466 /// - `writer` — output sink (file, `Vec<u8>`, …).
467 ///
468 /// # Returns
469 ///
470 /// [`ScanStats`] with counters for bytes processed, matches found, etc.
471 ///
472 /// # Errors
473 ///
474 /// Returns [`SanitizeError`] on I/O failures or if a replacement
475 /// cannot be generated (e.g. store capacity exceeded).
476 pub fn scan_reader<R: Read, W: Write>(
477 &self,
478 mut reader: R,
479 mut writer: W,
480 ) -> Result<ScanStats> {
481 let mut stats = ScanStats::default();
482
483 // Carry buffer: the tail of the previous window that needs
484 // to be re-scanned with the next chunk.
485 let mut carry: Vec<u8> = Vec::new();
486
487 // Read buffer (reused across iterations to avoid re-allocation).
488 let mut read_buf = vec![0u8; self.config.chunk_size];
489
490 // Scan window (reused across iterations — grows to peak size then
491 // stays there, avoiding per-chunk allocation).
492 let mut window: Vec<u8> =
493 Vec::with_capacity(self.config.chunk_size + self.config.overlap_size);
494
495 loop {
496 // Read the next chunk.
497 let bytes_read = read_fully(&mut reader, &mut read_buf)?;
498 let is_eof = bytes_read < read_buf.len();
499
500 // Track only genuinely new bytes (carry was already counted).
501 stats.bytes_processed += bytes_read as u64;
502
503 if bytes_read == 0 && carry.is_empty() {
504 break;
505 }
506
507 // Build the scan window: carry ++ new_data.
508 // Reuse the window buffer to avoid per-chunk allocation.
509 let new_data = &read_buf[..bytes_read];
510 window.clear();
511 window.extend_from_slice(&carry);
512 window.extend_from_slice(new_data);
513
514 if window.is_empty() {
515 break;
516 }
517
518 // Find all non-overlapping matches in the window.
519 let matches = self.find_matches(&window);
520
521 // Determine the commit point — how much of the window we can
522 // safely emit this iteration.
523 let base_commit = if is_eof {
524 window.len()
525 } else {
526 window.len().saturating_sub(self.config.overlap_size)
527 };
528
529 let commit_point =
530 self.adjusted_commit_point(&matches, base_commit, window.len(), is_eof);
531
532 // Select matches that fall entirely within the committed region.
533 let committed_matches: Vec<&RawMatch> = matches
534 .iter()
535 .filter(|m| m.start < commit_point && m.end <= commit_point)
536 .collect();
537
538 // Apply replacements and write the committed output.
539 let output =
540 self.apply_replacements(&window[..commit_point], &committed_matches, &mut stats)?;
541 writer
542 .write_all(&output)
543 .map_err(|e| SanitizeError::IoError(e.to_string()))?;
544 stats.bytes_output += output.len() as u64;
545
546 // Update carry for next iteration. Reuse the carry buffer
547 // by copying remaining bytes down.
548 if is_eof {
549 carry.clear();
550 break;
551 }
552 carry.clear();
553 carry.extend_from_slice(&window[commit_point..]);
554 }
555
556 Ok(stats)
557 }
558
559 /// Convenience: scan byte slice in-memory and return sanitized output.
560 ///
561 /// Equivalent to `scan_reader(input, Vec::new())` but returns the
562 /// output buffer directly.
563 ///
564 /// # Errors
565 ///
566 /// Returns [`SanitizeError`] if a replacement cannot be generated
567 /// (e.g. store capacity exceeded).
568 pub fn scan_bytes(&self, input: &[u8]) -> Result<(Vec<u8>, ScanStats)> {
569 let mut output = Vec::with_capacity(input.len());
570 let stats = self.scan_reader(input, &mut output)?;
571 Ok((output, stats))
572 }
573
574 // ---- Accessors ----
575
576 /// Access the scanner's configuration.
577 #[must_use]
578 pub fn config(&self) -> &ScanConfig {
579 &self.config
580 }
581
582 /// Access the underlying mapping store.
583 #[must_use]
584 pub fn store(&self) -> &Arc<MappingStore> {
585 &self.store
586 }
587
588 /// Number of patterns registered in this scanner.
589 #[must_use]
590 pub fn pattern_count(&self) -> usize {
591 self.patterns.len()
592 }
593
594 /// Create a scanner from an encrypted secrets file.
595 ///
596 /// Decrypts the file in memory, parses the entries, compiles
597 /// patterns, and returns the scanner ready to scan. Decrypted
598 /// plaintext is scrubbed from memory after parsing.
599 ///
600 /// # Arguments
601 ///
602 /// - `encrypted_bytes` — raw bytes of the `.enc` file.
603 /// - `password` — user password.
604 /// - `format` — optional format override for the plaintext.
605 /// - `store` — mapping store for dedup-consistent replacements.
606 /// - `config` — chunking / overlap configuration.
607 /// - `extra_patterns` — additional patterns to merge in.
608 ///
609 /// # Returns
610 ///
611 /// `(scanner, warnings)` where `warnings` lists entries that
612 /// failed to compile (index + error).
613 ///
614 /// # Errors
615 ///
616 /// Returns [`SanitizeError::SecretsError`] on decryption failure
617 /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
618 pub fn from_encrypted_secrets(
619 encrypted_bytes: &[u8],
620 password: &str,
621 format: Option<crate::secrets::SecretsFormat>,
622 store: Arc<MappingStore>,
623 config: ScanConfig,
624 extra_patterns: Vec<ScanPattern>,
625 ) -> Result<(Self, Vec<(usize, SanitizeError)>)> {
626 let (mut patterns, warnings) =
627 crate::secrets::load_encrypted_secrets(encrypted_bytes, password, format)?;
628 patterns.extend(extra_patterns);
629 let scanner = Self::new(patterns, store, config)?;
630 Ok((scanner, warnings))
631 }
632
633 /// Create a scanner from a plaintext secrets file.
634 ///
635 /// Convenience for development / testing without encryption.
636 ///
637 /// # Errors
638 ///
639 /// Returns [`SanitizeError::SecretsError`] on parse failure
640 /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
641 pub fn from_plaintext_secrets(
642 plaintext: &[u8],
643 format: Option<crate::secrets::SecretsFormat>,
644 store: Arc<MappingStore>,
645 config: ScanConfig,
646 extra_patterns: Vec<ScanPattern>,
647 ) -> Result<(Self, Vec<(usize, SanitizeError)>)> {
648 let (mut patterns, warnings) = crate::secrets::load_plaintext_secrets(plaintext, format)?;
649 patterns.extend(extra_patterns);
650 let scanner = Self::new(patterns, store, config)?;
651 Ok((scanner, warnings))
652 }
653
654 // ---- Internal helpers ----
655
656 /// Find all non-overlapping matches across all patterns.
657 ///
658 /// Strategy: use the `RegexSet` for a fast check of which patterns
659 /// have *any* match in the window, then run only those individual
660 /// regexes for precise match positions. This avoids running every
661 /// pattern on every chunk (R-3 optimisation).
662 fn find_matches(&self, window: &[u8]) -> Vec<RawMatch> {
663 let mut all_matches = Vec::new();
664
665 // Fast pre-filter: which patterns have at least one match?
666 let active: Vec<usize> = self.regex_set.matches(window).into_iter().collect();
667
668 // Only run individual regexes for patterns that matched.
669 for &idx in &active {
670 let pattern = &self.patterns[idx];
671 for m in pattern.regex.find_iter(window) {
672 all_matches.push(RawMatch {
673 start: m.start(),
674 end: m.end(),
675 pattern_idx: idx,
676 });
677 }
678 }
679
680 // Sort: primary by start (ascending), secondary by length
681 // (descending — prefer longer matches when they start at the
682 // same position).
683 all_matches.sort_by(|a, b| {
684 a.start
685 .cmp(&b.start)
686 .then_with(|| (b.end - b.start).cmp(&(a.end - a.start)))
687 });
688
689 // Greedily select non-overlapping matches.
690 let mut selected = Vec::new();
691 let mut last_end = 0;
692 for m in all_matches {
693 if m.start >= last_end {
694 last_end = m.end;
695 selected.push(m);
696 }
697 }
698
699 selected
700 }
701
702 /// Adjust the commit point to avoid splitting a match across the
703 /// commit / carry boundary.
704 ///
705 /// If any match straddles `base_commit` (starts before, ends after),
706 /// the commit point is moved to after that match so it is emitted
707 /// in full this iteration.
708 #[allow(clippy::unused_self)] // keep &self for API consistency with other scanner methods
709 fn adjusted_commit_point(
710 &self,
711 matches: &[RawMatch],
712 base_commit: usize,
713 window_len: usize,
714 is_eof: bool,
715 ) -> usize {
716 if is_eof {
717 return window_len;
718 }
719
720 let mut commit = base_commit;
721
722 for m in matches {
723 if m.start < commit && m.end > commit {
724 // Match straddles the boundary — extend commit to include it.
725 commit = m.end;
726 }
727 }
728
729 // Never exceed window length.
730 commit.min(window_len)
731 }
732
733 /// Build the output buffer for the committed region by splicing in
734 /// replacements for every match.
735 fn apply_replacements(
736 &self,
737 committed: &[u8],
738 matches: &[&RawMatch],
739 stats: &mut ScanStats,
740 ) -> Result<Vec<u8>> {
741 if matches.is_empty() {
742 return Ok(committed.to_vec());
743 }
744
745 let mut output = Vec::with_capacity(committed.len());
746 let mut last_end = 0;
747
748 for m in matches {
749 // Emit the non-matching region before this match.
750 output.extend_from_slice(&committed[last_end..m.start]);
751
752 // Extract the matched text (lossy UTF-8 for binary safety).
753 let matched_bytes = &committed[m.start..m.end];
754 let matched_text = String::from_utf8_lossy(matched_bytes);
755
756 // Look up or create the one-way replacement.
757 let pattern = &self.patterns[m.pattern_idx];
758 let replacement = self.store.get_or_insert(&pattern.category, &matched_text)?;
759
760 output.extend_from_slice(replacement.as_bytes());
761 last_end = m.end;
762
763 // Accumulate per-match stats.
764 stats.matches_found += 1;
765 stats.replacements_applied += 1;
766 *stats
767 .pattern_counts
768 .entry(pattern.label.clone())
769 .or_insert(0) += 1;
770 }
771
772 // Emit trailing non-matching region.
773 output.extend_from_slice(&committed[last_end..]);
774
775 Ok(output)
776 }
777}
778
779// ---------------------------------------------------------------------------
780// Send + Sync compile-time assertion
781// ---------------------------------------------------------------------------
782
783const _: fn() = || {
784 fn assert_send<T: Send>() {}
785 fn assert_sync<T: Sync>() {}
786 assert_send::<StreamScanner>();
787 assert_sync::<StreamScanner>();
788};
789
790// ---------------------------------------------------------------------------
791// I/O helper
792// ---------------------------------------------------------------------------
793
794/// Read up to `buf.len()` bytes from `reader`, retrying on `Interrupted`.
795///
796/// Returns the number of bytes actually read (< `buf.len()` only at EOF).
797fn read_fully<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
798 let mut total = 0;
799 while total < buf.len() {
800 match reader.read(&mut buf[total..]) {
801 Ok(0) => break, // EOF
802 Ok(n) => total += n,
803 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
804 Err(e) => return Err(SanitizeError::IoError(e.to_string())),
805 }
806 }
807 Ok(total)
808}
809
810// ---------------------------------------------------------------------------
811// Unit tests
812// ---------------------------------------------------------------------------
813
814#[cfg(test)]
815mod tests {
816 use super::*;
817 use crate::generator::HmacGenerator;
818
819 /// Helper: build a scanner with given patterns and small chunk config.
820 fn test_scanner(patterns: Vec<ScanPattern>) -> StreamScanner {
821 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
822 let store = Arc::new(MappingStore::new(gen, None));
823 StreamScanner::new(
824 patterns,
825 store,
826 ScanConfig {
827 chunk_size: 64,
828 overlap_size: 16,
829 },
830 )
831 .unwrap()
832 }
833
834 /// Helper: email pattern.
835 fn email_pattern() -> ScanPattern {
836 ScanPattern::from_regex(
837 r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
838 Category::Email,
839 "email",
840 )
841 .unwrap()
842 }
843
844 /// Helper: IPv4 pattern.
845 fn ipv4_pattern() -> ScanPattern {
846 ScanPattern::from_regex(
847 r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
848 Category::IpV4,
849 "ipv4",
850 )
851 .unwrap()
852 }
853
854 // ---- Construction ----
855
856 #[test]
857 fn scanner_creation() {
858 let scanner = test_scanner(vec![email_pattern()]);
859 assert_eq!(scanner.pattern_count(), 1);
860 }
861
862 #[test]
863 fn invalid_config_zero_chunk() {
864 let gen = Arc::new(HmacGenerator::new([0u8; 32]));
865 let store = Arc::new(MappingStore::new(gen, None));
866 let result = StreamScanner::new(vec![], store, ScanConfig::new(0, 0));
867 assert!(result.is_err());
868 }
869
870 #[test]
871 fn invalid_config_overlap_ge_chunk() {
872 let gen = Arc::new(HmacGenerator::new([0u8; 32]));
873 let store = Arc::new(MappingStore::new(gen, None));
874 let result = StreamScanner::new(vec![], store, ScanConfig::new(100, 100));
875 assert!(result.is_err());
876 }
877
878 // ---- Empty / no-match cases ----
879
880 #[test]
881 fn empty_input() {
882 let scanner = test_scanner(vec![email_pattern()]);
883 let (output, stats) = scanner.scan_bytes(b"").unwrap();
884 assert!(output.is_empty());
885 assert_eq!(stats.matches_found, 0);
886 assert_eq!(stats.bytes_processed, 0);
887 }
888
889 #[test]
890 fn no_matches() {
891 let scanner = test_scanner(vec![email_pattern()]);
892 let input = b"There are no email addresses here.";
893 let (output, stats) = scanner.scan_bytes(input).unwrap();
894 assert_eq!(output, input.as_slice());
895 assert_eq!(stats.matches_found, 0);
896 }
897
898 // ---- Single match ----
899
900 #[test]
901 fn single_email_replaced() {
902 let scanner = test_scanner(vec![email_pattern()]);
903 let input = b"Contact alice@corp.com for help.";
904 let (output, stats) = scanner.scan_bytes(input).unwrap();
905 assert_eq!(stats.matches_found, 1);
906 assert_eq!(stats.replacements_applied, 1);
907 // Original must not appear in output.
908 assert!(!output
909 .windows(b"alice@corp.com".len())
910 .any(|w| w == b"alice@corp.com"));
911 // Replacement should contain the @ from the domain-preserving email.
912 let output_str = String::from_utf8_lossy(&output);
913 assert!(output_str.contains("@corp.com"));
914 // Length preserved: output is same total length as input.
915 assert_eq!(output.len(), input.len(), "length must be preserved");
916 // Surrounding text preserved.
917 assert!(output_str.starts_with("Contact "));
918 assert!(output_str.ends_with(" for help."));
919 }
920
921 // ---- Multiple matches ----
922
923 #[test]
924 fn multiple_emails_replaced() {
925 let scanner = test_scanner(vec![email_pattern()]);
926 let input = b"From alice@corp.com to bob@corp.com cc admin@corp.com";
927 let (output, stats) = scanner.scan_bytes(input).unwrap();
928 assert_eq!(stats.matches_found, 3);
929 let out_str = String::from_utf8_lossy(&output);
930 assert!(!out_str.contains("alice@corp.com"));
931 assert!(!out_str.contains("bob@corp.com"));
932 assert!(!out_str.contains("admin@corp.com"));
933 }
934
935 // ---- Same secret gets same replacement ----
936
937 #[test]
938 fn same_secret_same_replacement() {
939 let scanner = test_scanner(vec![email_pattern()]);
940 let input = b"First alice@corp.com then alice@corp.com again.";
941 let (output, stats) = scanner.scan_bytes(input).unwrap();
942 assert_eq!(stats.matches_found, 2);
943 let out_str = String::from_utf8_lossy(&output);
944 // Both occurrences should be replaced with the same value.
945 // With length-preserving replacements, look for the preserved domain.
946 let parts: Vec<&str> = out_str.split("@corp.com").collect();
947 // 3 parts = 2 occurrences of the replacement.
948 assert_eq!(parts.len(), 3);
949 }
950
951 // ---- Literal pattern ----
952
953 #[test]
954 fn literal_pattern_matched() {
955 let pat = ScanPattern::from_literal(
956 "SECRET_API_KEY_12345",
957 Category::Custom("api_key".into()),
958 "api_key",
959 )
960 .unwrap();
961 let scanner = test_scanner(vec![pat]);
962 let input = b"key=SECRET_API_KEY_12345&foo=bar";
963 let (output, stats) = scanner.scan_bytes(input).unwrap();
964 assert_eq!(stats.matches_found, 1);
965 assert!(!output
966 .windows(b"SECRET_API_KEY_12345".len())
967 .any(|w| w == b"SECRET_API_KEY_12345"));
968 }
969
970 // ---- Multiple pattern types ----
971
972 #[test]
973 fn multiple_pattern_types() {
974 let scanner = test_scanner(vec![email_pattern(), ipv4_pattern()]);
975 let input = b"Server 192.168.1.100 contact admin@server.com";
976 let (output, stats) = scanner.scan_bytes(input).unwrap();
977 assert_eq!(stats.matches_found, 2);
978 let out_str = String::from_utf8_lossy(&output);
979 assert!(!out_str.contains("192.168.1.100"));
980 assert!(!out_str.contains("admin@server.com"));
981 assert_eq!(*stats.pattern_counts.get("email").unwrap(), 1);
982 assert_eq!(*stats.pattern_counts.get("ipv4").unwrap(), 1);
983 }
984
985 // ---- Chunk boundary: match spans two chunks ----
986
987 #[test]
988 fn match_at_chunk_boundary() {
989 // Use a very small chunk size so the email straddles a boundary.
990 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
991 let store = Arc::new(MappingStore::new(gen, None));
992 let scanner = StreamScanner::new(
993 vec![email_pattern()],
994 store,
995 ScanConfig {
996 chunk_size: 20, // very small
997 overlap_size: 16,
998 },
999 )
1000 .unwrap();
1001
1002 // Place an email address that will definitely straddle a boundary.
1003 let input = b"AAAAAAAAAAAAAAAA alice@corp.com BBBBBBBBBBBBB";
1004 let (output, stats) = scanner.scan_bytes(input).unwrap();
1005 assert_eq!(stats.matches_found, 1);
1006 let out_str = String::from_utf8_lossy(&output);
1007 assert!(!out_str.contains("alice@corp.com"));
1008 assert!(out_str.contains("@corp.com"), "domain must be preserved");
1009 }
1010
1011 // ---- Large input requiring many chunks ----
1012
1013 #[test]
1014 fn large_input_many_chunks() {
1015 let scanner = test_scanner(vec![email_pattern()]);
1016
1017 // Build a ~2 KiB input with emails sprinkled in.
1018 let mut input = Vec::new();
1019 let filler = b"Lorem ipsum dolor sit amet. ";
1020 for i in 0..20 {
1021 input.extend_from_slice(filler);
1022 let email = format!("user{}@example.com ", i);
1023 input.extend_from_slice(email.as_bytes());
1024 }
1025
1026 let (output, stats) = scanner.scan_bytes(&input).unwrap();
1027 assert_eq!(stats.matches_found, 20);
1028 let out_str = String::from_utf8_lossy(&output);
1029 for i in 0..20 {
1030 let email = format!("user{}@example.com", i);
1031 assert!(!out_str.contains(&email));
1032 }
1033 }
1034
1035 // ---- Scan via Read/Write interface ----
1036
1037 #[test]
1038 fn scan_reader_writer() {
1039 let scanner = test_scanner(vec![email_pattern()]);
1040 let input = b"hello alice@corp.com world";
1041 let mut output = Vec::new();
1042 let stats = scanner.scan_reader(&input[..], &mut output).unwrap();
1043 assert_eq!(stats.matches_found, 1);
1044 let out_str = String::from_utf8_lossy(&output);
1045 assert!(out_str.contains("@corp.com"), "domain must be preserved");
1046 }
1047
1048 // ---- Pattern compile error ----
1049
1050 #[test]
1051 fn invalid_regex_pattern() {
1052 let result = ScanPattern::from_regex("[invalid(", Category::Email, "bad");
1053 assert!(result.is_err());
1054 }
1055
1056 // ---- Default config ----
1057
1058 #[test]
1059 fn default_config_valid() {
1060 ScanConfig::default().validate().unwrap();
1061 }
1062
1063 // ---- Config edge cases ----
1064
1065 #[test]
1066 fn config_chunk_1_overlap_0() {
1067 // Extreme but valid: 1-byte chunks, no overlap.
1068 // Won't catch multi-byte patterns, but should not crash.
1069 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1070 let store = Arc::new(MappingStore::new(gen, None));
1071 let scanner = StreamScanner::new(vec![], store, ScanConfig::new(1, 0)).unwrap();
1072 let (output, _) = scanner.scan_bytes(b"hello").unwrap();
1073 assert_eq!(output, b"hello");
1074 }
1075
1076 // ---- Bytes output tracking ----
1077
1078 #[test]
1079 fn bytes_output_preserved_on_replacement() {
1080 let scanner = test_scanner(vec![email_pattern()]);
1081 let input = b"a@b.cc"; // short email
1082 let (output, stats) = scanner.scan_bytes(input).unwrap();
1083 assert_eq!(stats.bytes_processed, input.len() as u64);
1084 assert_eq!(stats.bytes_output, output.len() as u64);
1085 // Length-preserving: output length matches input length.
1086 assert_eq!(output.len(), input.len());
1087 }
1088}