Skip to main content

sanitize_engine/
scanner.rs

1//! Streaming scanner for detecting and replacing sensitive data.
2//!
3//! # Architecture
4//!
5//! The streaming scanner processes input data in configurable chunks,
6//! detecting secret patterns (regex or literal) and applying one-way
7//! replacements via the [`MappingStore`].
8//! This design supports files of 20–100 GB+ without requiring the entire
9//! content to fit in memory.
10//!
11//! ```text
12//! ┌──────────────┐     ┌─────────────────┐     ┌──────────────────┐
13//! │  Input (Read) │ ──▶ │  StreamScanner  │ ──▶ │  Output (Write)  │
14//! │  (chunked)    │     │  (pattern match │     │  (sanitized)     │
15//! └──────────────┘     │   + replace)    │     └──────────────────┘
16//!                       └────────┬────────┘
17//!                                │
18//!                       ┌────────▼────────┐
19//!                       │  MappingStore   │
20//!                       │  (dedup cache)  │
21//!                       └─────────────────┘
22//! ```
23//!
24//! # Chunk Overlap Strategy
25//!
26//! To avoid missing matches that span chunk boundaries, the scanner
27//! maintains an overlap window between consecutive chunks:
28//!
29//! 1. Read `chunk_size` bytes of new data.
30//! 2. Prepend the `carry` buffer (tail of previous window).
31//! 3. Scan the combined `window` for all pattern matches.
32//! 4. Compute `commit_point = window.len() - overlap_size` (adjusted
33//!    upward if a match straddles the boundary).
34//! 5. Emit output for `window[..commit_point]` with replacements applied.
35//! 6. Set `carry = window[commit_point..]` for the next iteration.
36//!
37//! The `overlap_size` should be ≥ the maximum expected match length to
38//! guarantee no matches are missed at boundaries.
39//!
40//! # Thread Safety
41//!
42//! [`StreamScanner`] is `Send + Sync`. Multiple files can be scanned
43//! concurrently using a shared `Arc<StreamScanner>`, all backed by the
44//! same [`MappingStore`] for per-run dedup
45//! consistency.
46//!
47//! # Performance
48//!
49//! - **Chunk-based I/O**: only `chunk_size + overlap_size` bytes in
50//!   memory per active scan.
51//! - **Compiled regex**: patterns are compiled once at construction and
52//!   reused across all chunks and files.
53//! - **Lock-free reads**: the `DashMap` inside `MappingStore` provides
54//!   lock-free reads for already-seen values.
55//! - **File-level parallelism**: share `Arc<StreamScanner>` across
56//!   threads to scan multiple files concurrently.
57
58use crate::category::Category;
59use crate::error::{Result, SanitizeError};
60use crate::store::MappingStore;
61use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder};
62use std::collections::HashMap;
63use std::io::{self, Read, Write};
64use std::sync::Arc;
65
66// ---------------------------------------------------------------------------
67// Configuration
68// ---------------------------------------------------------------------------
69
70/// Default chunk size: 1 MiB.
71const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
72
73/// Default overlap size: 4 KiB.
74const DEFAULT_OVERLAP_SIZE: usize = 4096;
75
76/// Maximum compiled regex automaton size (bytes). Prevents DoS via
77/// pathologically complex user-supplied patterns.
78const REGEX_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
79
80/// Maximum DFA cache size (bytes) per regex.
81const REGEX_DFA_SIZE_LIMIT: usize = 1 << 20; // 1 MiB
82
83/// Maximum number of patterns allowed in a single scanner (F-05 fix).
84/// The `RegexSet` automaton memory scales linearly with pattern count.
85/// With 1 MiB size/DFA limits per pattern, 10 000 patterns could
86/// allocate up to ~20 GiB of automaton memory.  This cap prevents
87/// accidental resource exhaustion.  Override via
88/// [`StreamScanner::new_with_max_patterns`] if needed.
89const DEFAULT_MAX_PATTERNS: usize = 10_000;
90
91/// Configuration for the streaming scanner.
92///
93/// # Tuning Guide
94///
95/// | Workload               | `chunk_size` | `overlap_size` |
96/// |------------------------|--------------|----------------|
97/// | Small files (< 10 MB)  | 256 KiB      | 1 KiB          |
98/// | General purpose        | 1 MiB        | 4 KiB          |
99/// | Large files (> 1 GB)   | 4–8 MiB      | 8 KiB          |
100/// | Memory-constrained     | 64 KiB       | 1 KiB          |
101///
102/// `overlap_size` should be ≥ the longest expected match. Most secret
103/// patterns (API keys, emails, SSNs) are well under 256 bytes, so the
104/// 4 KiB default provides ample margin.
105#[derive(Debug, Clone)]
106pub struct ScanConfig {
107    /// Size of each chunk read from the input (bytes).
108    ///
109    /// Larger chunks improve throughput (fewer syscalls) but use more
110    /// memory. Default: 1 MiB.
111    pub chunk_size: usize,
112
113    /// Overlap between consecutive chunks (bytes).
114    ///
115    /// Must be ≥ the maximum expected match length. Patterns whose
116    /// matches can exceed this length risk being missed at chunk
117    /// boundaries. Default: 4 KiB.
118    pub overlap_size: usize,
119}
120
121impl Default for ScanConfig {
122    fn default() -> Self {
123        Self {
124            chunk_size: DEFAULT_CHUNK_SIZE,
125            overlap_size: DEFAULT_OVERLAP_SIZE,
126        }
127    }
128}
129
130impl ScanConfig {
131    /// Create a new configuration with explicit values.
132    #[must_use]
133    pub fn new(chunk_size: usize, overlap_size: usize) -> Self {
134        Self {
135            chunk_size,
136            overlap_size,
137        }
138    }
139
140    /// Validate the configuration, returning an error if invalid.
141    ///
142    /// # Errors
143    ///
144    /// Returns [`SanitizeError::InvalidConfig`] if `chunk_size` is zero
145    /// or `overlap_size >= chunk_size`.
146    pub fn validate(&self) -> Result<()> {
147        if self.chunk_size == 0 {
148            return Err(SanitizeError::InvalidConfig(
149                "chunk_size must be > 0".into(),
150            ));
151        }
152        if self.overlap_size >= self.chunk_size {
153            return Err(SanitizeError::InvalidConfig(
154                "overlap_size must be < chunk_size".into(),
155            ));
156        }
157        Ok(())
158    }
159}
160
161// ---------------------------------------------------------------------------
162// Scan pattern
163// ---------------------------------------------------------------------------
164
165/// A pattern rule defining what to scan for and how to categorize matches.
166///
167/// Wraps a compiled [`regex::bytes::Regex`] with a [`Category`] for
168/// replacement lookups and a human-readable label for reporting.
169///
170/// Both regex and literal patterns are supported. Literals are escaped
171/// and compiled as regex for uniform handling.
172pub struct ScanPattern {
173    /// Compiled regex matcher.
174    regex: Regex,
175    /// Category for replacement lookups.
176    category: Category,
177    /// Human-readable label for reporting / stats.
178    label: String,
179}
180
181impl std::fmt::Debug for ScanPattern {
182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        f.debug_struct("ScanPattern")
184            .field("pattern", &self.regex.as_str())
185            .field("category", &self.category)
186            .field("label", &self.label)
187            .finish()
188    }
189}
190
191impl ScanPattern {
192    /// Create a pattern from a regex string.
193    ///
194    /// # Errors
195    ///
196    /// Returns [`SanitizeError::PatternCompileError`] if the regex is invalid.
197    ///
198    /// # Examples
199    ///
200    /// ```
201    /// use sanitize_engine::scanner::ScanPattern;
202    /// use sanitize_engine::category::Category;
203    ///
204    /// let pat = ScanPattern::from_regex(
205    ///     r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
206    ///     Category::Email,
207    ///     "email_address",
208    /// ).unwrap();
209    /// ```
210    pub fn from_regex(pattern: &str, category: Category, label: impl Into<String>) -> Result<Self> {
211        let regex = RegexBuilder::new(pattern)
212            .size_limit(REGEX_SIZE_LIMIT)
213            .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
214            .build()
215            .map_err(|e| SanitizeError::PatternCompileError(e.to_string()))?;
216        Ok(Self {
217            regex,
218            category,
219            label: label.into(),
220        })
221    }
222
223    /// Create a pattern from a literal string.
224    ///
225    /// The literal is escaped so that regex metacharacters are matched
226    /// verbatim.
227    ///
228    /// # Errors
229    ///
230    /// Returns [`SanitizeError::PatternCompileError`] if regex compilation fails.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// use sanitize_engine::scanner::ScanPattern;
236    /// use sanitize_engine::category::Category;
237    ///
238    /// let pat = ScanPattern::from_literal(
239    ///     "sk-proj-abc123secret",
240    ///     Category::Custom("api_key".into()),
241    ///     "openai_key",
242    /// ).unwrap();
243    /// ```
244    pub fn from_literal(
245        literal: &str,
246        category: Category,
247        label: impl Into<String>,
248    ) -> Result<Self> {
249        let escaped = regex::escape(literal);
250        let regex = RegexBuilder::new(&escaped)
251            .size_limit(REGEX_SIZE_LIMIT)
252            .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
253            .build()
254            .map_err(|e| SanitizeError::PatternCompileError(e.to_string()))?;
255        Ok(Self {
256            regex,
257            category,
258            label: label.into(),
259        })
260    }
261
262    /// The category this pattern maps to.
263    #[must_use]
264    pub fn category(&self) -> &Category {
265        &self.category
266    }
267
268    /// The human-readable label.
269    #[must_use]
270    pub fn label(&self) -> &str {
271        &self.label
272    }
273
274    /// Return the raw regex pattern string for RegexSet construction.
275    #[must_use]
276    pub fn regex_pattern(&self) -> &str {
277        self.regex.as_str()
278    }
279}
280
281// ScanPattern is Send + Sync because:
282// - regex::bytes::Regex is Send + Sync
283// - Category is Send + Sync (it's an enum of primitives + CompactString)
284// - String is Send + Sync
285
286// ---------------------------------------------------------------------------
287// Internal: raw match descriptor
288// ---------------------------------------------------------------------------
289
290/// A single match found during scanning (internal).
291#[derive(Debug, Clone)]
292struct RawMatch {
293    /// Start byte offset within the scan window.
294    start: usize,
295    /// End byte offset (exclusive) within the scan window.
296    end: usize,
297    /// Index into the `StreamScanner::patterns` vector.
298    pattern_idx: usize,
299}
300
301// ---------------------------------------------------------------------------
302// Scan statistics
303// ---------------------------------------------------------------------------
304
305/// Statistics collected during a scan operation.
306///
307/// Returned by [`StreamScanner::scan_reader`] and
308/// [`StreamScanner::scan_bytes`] to provide visibility into what
309/// the scanner did.
310#[derive(Debug, Clone, Default)]
311pub struct ScanStats {
312    /// Total bytes read from the input.
313    pub bytes_processed: u64,
314    /// Total bytes written to the output (may differ from `bytes_processed`
315    /// when replacements have different lengths than the originals).
316    pub bytes_output: u64,
317    /// Total number of matches found across all patterns.
318    pub matches_found: u64,
319    /// Total number of replacements applied (always == `matches_found`
320    /// in one-way mode).
321    pub replacements_applied: u64,
322    /// Per-pattern match counts, keyed by pattern label.
323    pub pattern_counts: HashMap<String, u64>,
324}
325
326// ---------------------------------------------------------------------------
327// StreamScanner
328// ---------------------------------------------------------------------------
329
330/// Streaming scanner that detects and replaces sensitive patterns.
331///
332/// Thread-safe: can be shared via `Arc<StreamScanner>` for concurrent
333/// scanning of multiple files. Each call to [`scan_reader`](Self::scan_reader)
334/// is independent and maintains its own chunking state.
335///
336/// # Usage
337///
338/// ```rust
339/// use sanitize_engine::scanner::{StreamScanner, ScanPattern, ScanConfig};
340/// use sanitize_engine::category::Category;
341/// use sanitize_engine::generator::HmacGenerator;
342/// use sanitize_engine::store::MappingStore;
343/// use std::sync::Arc;
344///
345/// // 1. Build the replacement store.
346/// let gen = Arc::new(HmacGenerator::new([42u8; 32]));
347/// let store = Arc::new(MappingStore::new(gen, None));
348///
349/// // 2. Define patterns.
350/// let patterns = vec![
351///     ScanPattern::from_regex(
352///         r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
353///         Category::Email,
354///         "email",
355///     ).unwrap(),
356/// ];
357///
358/// // 3. Create the scanner.
359/// let scanner = StreamScanner::new(patterns, store, ScanConfig::default()).unwrap();
360///
361/// // 4. Scan.
362/// let input = b"Contact alice@corp.com for details.";
363/// let (output, stats) = scanner.scan_bytes(input).unwrap();
364/// assert_eq!(stats.matches_found, 1);
365/// assert!(!output.windows(b"alice@corp.com".len())
366///     .any(|w| w == b"alice@corp.com"));
367/// ```
368pub struct StreamScanner {
369    /// Compiled scan patterns.
370    patterns: Vec<ScanPattern>,
371    /// Pre-compiled set for fast multi-pattern pre-filtering.
372    /// `matches()` returns which pattern indices matched, avoiding
373    /// running every individual regex on each chunk (R-3 optimisation).
374    regex_set: RegexSet,
375    /// Thread-safe dedup replacement store.
376    store: Arc<MappingStore>,
377    /// Scanner configuration.
378    config: ScanConfig,
379}
380
381impl StreamScanner {
382    /// Create a new streaming scanner.
383    ///
384    /// # Arguments
385    ///
386    /// - `patterns` — the set of patterns to scan for.
387    /// - `store` — the mapping store for dedup-consistent replacements.
388    /// - `config` — chunking / overlap configuration.
389    ///
390    /// # Errors
391    ///
392    /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
393    /// invalid (e.g. `chunk_size == 0` or `overlap_size >= chunk_size`).
394    pub fn new(
395        patterns: Vec<ScanPattern>,
396        store: Arc<MappingStore>,
397        config: ScanConfig,
398    ) -> Result<Self> {
399        Self::new_with_max_patterns(patterns, store, config, DEFAULT_MAX_PATTERNS)
400    }
401
402    /// Create a new streaming scanner with a custom pattern limit.
403    ///
404    /// This is identical to [`new`](Self::new) but allows overriding the
405    /// default pattern cap (10 000).  Use this
406    /// when you have a legitimate need for more patterns and have
407    /// verified that your system has enough memory for the resulting
408    /// `RegexSet`.
409    ///
410    /// # Errors
411    ///
412    /// Returns [`SanitizeError::InvalidConfig`] if the configuration is
413    /// invalid or the pattern count exceeds `max_patterns`.
414    pub fn new_with_max_patterns(
415        patterns: Vec<ScanPattern>,
416        store: Arc<MappingStore>,
417        config: ScanConfig,
418        max_patterns: usize,
419    ) -> Result<Self> {
420        config.validate()?;
421
422        // F-05 fix: enforce maximum pattern count to bound RegexSet memory.
423        if patterns.len() > max_patterns {
424            return Err(SanitizeError::InvalidConfig(format!(
425                "pattern count ({}) exceeds maximum allowed ({}) — \
426                 RegexSet memory scales linearly with pattern count",
427                patterns.len(),
428                max_patterns
429            )));
430        }
431
432        // Build a RegexSet from all pattern strings for fast pre-filtering.
433        let regex_set = if patterns.is_empty() {
434            RegexSetBuilder::new(Vec::<&str>::new())
435                .size_limit(REGEX_SIZE_LIMIT)
436                .dfa_size_limit(REGEX_DFA_SIZE_LIMIT)
437                .build()
438                .map_err(|e| SanitizeError::PatternCompileError(e.to_string()))?
439        } else {
440            let pattern_strs: Vec<&str> = patterns.iter().map(|p| p.regex_pattern()).collect();
441            RegexSetBuilder::new(&pattern_strs)
442                .size_limit(REGEX_SIZE_LIMIT * pattern_strs.len().max(1))
443                .dfa_size_limit(REGEX_DFA_SIZE_LIMIT * pattern_strs.len().max(1))
444                .build()
445                .map_err(|e| SanitizeError::PatternCompileError(e.to_string()))?
446        };
447
448        Ok(Self {
449            patterns,
450            regex_set,
451            store,
452            config,
453        })
454    }
455
456    /// Scan a reader and write sanitized output to a writer.
457    ///
458    /// Processes the input in chunks of `config.chunk_size` bytes,
459    /// maintaining an overlap window of `config.overlap_size` bytes to
460    /// catch matches spanning chunk boundaries. All detected matches
461    /// are replaced one-way via the [`MappingStore`].
462    ///
463    /// # Arguments
464    ///
465    /// - `reader` — input source (file, network stream, `&[u8]`, …).
466    /// - `writer` — output sink (file, `Vec<u8>`, …).
467    ///
468    /// # Returns
469    ///
470    /// [`ScanStats`] with counters for bytes processed, matches found, etc.
471    ///
472    /// # Errors
473    ///
474    /// Returns [`SanitizeError`] on I/O failures or if a replacement
475    /// cannot be generated (e.g. store capacity exceeded).
476    pub fn scan_reader<R: Read, W: Write>(
477        &self,
478        mut reader: R,
479        mut writer: W,
480    ) -> Result<ScanStats> {
481        let mut stats = ScanStats::default();
482
483        // Carry buffer: the tail of the previous window that needs
484        // to be re-scanned with the next chunk.
485        let mut carry: Vec<u8> = Vec::new();
486
487        // Read buffer (reused across iterations to avoid re-allocation).
488        let mut read_buf = vec![0u8; self.config.chunk_size];
489
490        // Scan window (reused across iterations — grows to peak size then
491        // stays there, avoiding per-chunk allocation).
492        let mut window: Vec<u8> =
493            Vec::with_capacity(self.config.chunk_size + self.config.overlap_size);
494
495        loop {
496            // Read the next chunk.
497            let bytes_read = read_fully(&mut reader, &mut read_buf)?;
498            let is_eof = bytes_read < read_buf.len();
499
500            // Track only genuinely new bytes (carry was already counted).
501            stats.bytes_processed += bytes_read as u64;
502
503            if bytes_read == 0 && carry.is_empty() {
504                break;
505            }
506
507            // Build the scan window: carry ++ new_data.
508            // Reuse the window buffer to avoid per-chunk allocation.
509            let new_data = &read_buf[..bytes_read];
510            window.clear();
511            window.extend_from_slice(&carry);
512            window.extend_from_slice(new_data);
513
514            if window.is_empty() {
515                break;
516            }
517
518            // Find all non-overlapping matches in the window.
519            let matches = self.find_matches(&window);
520
521            // Determine the commit point — how much of the window we can
522            // safely emit this iteration.
523            let base_commit = if is_eof {
524                window.len()
525            } else {
526                window.len().saturating_sub(self.config.overlap_size)
527            };
528
529            let commit_point =
530                self.adjusted_commit_point(&matches, base_commit, window.len(), is_eof);
531
532            // Select matches that fall entirely within the committed region.
533            let committed_matches: Vec<&RawMatch> = matches
534                .iter()
535                .filter(|m| m.start < commit_point && m.end <= commit_point)
536                .collect();
537
538            // Apply replacements and write the committed output.
539            let output =
540                self.apply_replacements(&window[..commit_point], &committed_matches, &mut stats)?;
541            writer
542                .write_all(&output)
543                .map_err(|e| SanitizeError::IoError(e.to_string()))?;
544            stats.bytes_output += output.len() as u64;
545
546            // Update carry for next iteration. Reuse the carry buffer
547            // by copying remaining bytes down.
548            if is_eof {
549                carry.clear();
550                break;
551            }
552            carry.clear();
553            carry.extend_from_slice(&window[commit_point..]);
554        }
555
556        Ok(stats)
557    }
558
559    /// Convenience: scan byte slice in-memory and return sanitized output.
560    ///
561    /// Equivalent to `scan_reader(input, Vec::new())` but returns the
562    /// output buffer directly.
563    ///
564    /// # Errors
565    ///
566    /// Returns [`SanitizeError`] if a replacement cannot be generated
567    /// (e.g. store capacity exceeded).
568    pub fn scan_bytes(&self, input: &[u8]) -> Result<(Vec<u8>, ScanStats)> {
569        let mut output = Vec::with_capacity(input.len());
570        let stats = self.scan_reader(input, &mut output)?;
571        Ok((output, stats))
572    }
573
574    // ---- Accessors ----
575
576    /// Access the scanner's configuration.
577    #[must_use]
578    pub fn config(&self) -> &ScanConfig {
579        &self.config
580    }
581
582    /// Access the underlying mapping store.
583    #[must_use]
584    pub fn store(&self) -> &Arc<MappingStore> {
585        &self.store
586    }
587
588    /// Number of patterns registered in this scanner.
589    #[must_use]
590    pub fn pattern_count(&self) -> usize {
591        self.patterns.len()
592    }
593
594    /// Create a scanner from an encrypted secrets file.
595    ///
596    /// Decrypts the file in memory, parses the entries, compiles
597    /// patterns, and returns the scanner ready to scan. Decrypted
598    /// plaintext is scrubbed from memory after parsing.
599    ///
600    /// # Arguments
601    ///
602    /// - `encrypted_bytes` — raw bytes of the `.enc` file.
603    /// - `password` — user password.
604    /// - `format` — optional format override for the plaintext.
605    /// - `store` — mapping store for dedup-consistent replacements.
606    /// - `config` — chunking / overlap configuration.
607    /// - `extra_patterns` — additional patterns to merge in.
608    ///
609    /// # Returns
610    ///
611    /// `(scanner, warnings)` where `warnings` lists entries that
612    /// failed to compile (index + error).
613    ///
614    /// # Errors
615    ///
616    /// Returns [`SanitizeError::SecretsError`] on decryption failure
617    /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
618    pub fn from_encrypted_secrets(
619        encrypted_bytes: &[u8],
620        password: &str,
621        format: Option<crate::secrets::SecretsFormat>,
622        store: Arc<MappingStore>,
623        config: ScanConfig,
624        extra_patterns: Vec<ScanPattern>,
625    ) -> Result<(Self, Vec<(usize, SanitizeError)>)> {
626        let (mut patterns, warnings) =
627            crate::secrets::load_encrypted_secrets(encrypted_bytes, password, format)?;
628        patterns.extend(extra_patterns);
629        let scanner = Self::new(patterns, store, config)?;
630        Ok((scanner, warnings))
631    }
632
633    /// Create a scanner from a plaintext secrets file.
634    ///
635    /// Convenience for development / testing without encryption.
636    ///
637    /// # Errors
638    ///
639    /// Returns [`SanitizeError::SecretsError`] on parse failure
640    /// or [`SanitizeError::InvalidConfig`] on invalid scanner config.
641    pub fn from_plaintext_secrets(
642        plaintext: &[u8],
643        format: Option<crate::secrets::SecretsFormat>,
644        store: Arc<MappingStore>,
645        config: ScanConfig,
646        extra_patterns: Vec<ScanPattern>,
647    ) -> Result<(Self, Vec<(usize, SanitizeError)>)> {
648        let (mut patterns, warnings) = crate::secrets::load_plaintext_secrets(plaintext, format)?;
649        patterns.extend(extra_patterns);
650        let scanner = Self::new(patterns, store, config)?;
651        Ok((scanner, warnings))
652    }
653
654    // ---- Internal helpers ----
655
656    /// Find all non-overlapping matches across all patterns.
657    ///
658    /// Strategy: use the `RegexSet` for a fast check of which patterns
659    /// have *any* match in the window, then run only those individual
660    /// regexes for precise match positions.  This avoids running every
661    /// pattern on every chunk (R-3 optimisation).
662    fn find_matches(&self, window: &[u8]) -> Vec<RawMatch> {
663        let mut all_matches = Vec::new();
664
665        // Fast pre-filter: which patterns have at least one match?
666        let active: Vec<usize> = self.regex_set.matches(window).into_iter().collect();
667
668        // Only run individual regexes for patterns that matched.
669        for &idx in &active {
670            let pattern = &self.patterns[idx];
671            for m in pattern.regex.find_iter(window) {
672                all_matches.push(RawMatch {
673                    start: m.start(),
674                    end: m.end(),
675                    pattern_idx: idx,
676                });
677            }
678        }
679
680        // Sort: primary by start (ascending), secondary by length
681        // (descending — prefer longer matches when they start at the
682        // same position).
683        all_matches.sort_by(|a, b| {
684            a.start
685                .cmp(&b.start)
686                .then_with(|| (b.end - b.start).cmp(&(a.end - a.start)))
687        });
688
689        // Greedily select non-overlapping matches.
690        let mut selected = Vec::new();
691        let mut last_end = 0;
692        for m in all_matches {
693            if m.start >= last_end {
694                last_end = m.end;
695                selected.push(m);
696            }
697        }
698
699        selected
700    }
701
702    /// Adjust the commit point to avoid splitting a match across the
703    /// commit / carry boundary.
704    ///
705    /// If any match straddles `base_commit` (starts before, ends after),
706    /// the commit point is moved to after that match so it is emitted
707    /// in full this iteration.
708    #[allow(clippy::unused_self)] // keep &self for API consistency with other scanner methods
709    fn adjusted_commit_point(
710        &self,
711        matches: &[RawMatch],
712        base_commit: usize,
713        window_len: usize,
714        is_eof: bool,
715    ) -> usize {
716        if is_eof {
717            return window_len;
718        }
719
720        let mut commit = base_commit;
721
722        for m in matches {
723            if m.start < commit && m.end > commit {
724                // Match straddles the boundary — extend commit to include it.
725                commit = m.end;
726            }
727        }
728
729        // Never exceed window length.
730        commit.min(window_len)
731    }
732
733    /// Build the output buffer for the committed region by splicing in
734    /// replacements for every match.
735    fn apply_replacements(
736        &self,
737        committed: &[u8],
738        matches: &[&RawMatch],
739        stats: &mut ScanStats,
740    ) -> Result<Vec<u8>> {
741        if matches.is_empty() {
742            return Ok(committed.to_vec());
743        }
744
745        let mut output = Vec::with_capacity(committed.len());
746        let mut last_end = 0;
747
748        for m in matches {
749            // Emit the non-matching region before this match.
750            output.extend_from_slice(&committed[last_end..m.start]);
751
752            // Extract the matched text (lossy UTF-8 for binary safety).
753            let matched_bytes = &committed[m.start..m.end];
754            let matched_text = String::from_utf8_lossy(matched_bytes);
755
756            // Look up or create the one-way replacement.
757            let pattern = &self.patterns[m.pattern_idx];
758            let replacement = self.store.get_or_insert(&pattern.category, &matched_text)?;
759
760            output.extend_from_slice(replacement.as_bytes());
761            last_end = m.end;
762
763            // Accumulate per-match stats.
764            stats.matches_found += 1;
765            stats.replacements_applied += 1;
766            *stats
767                .pattern_counts
768                .entry(pattern.label.clone())
769                .or_insert(0) += 1;
770        }
771
772        // Emit trailing non-matching region.
773        output.extend_from_slice(&committed[last_end..]);
774
775        Ok(output)
776    }
777}
778
779// ---------------------------------------------------------------------------
780// Send + Sync compile-time assertion
781// ---------------------------------------------------------------------------
782
783const _: fn() = || {
784    fn assert_send<T: Send>() {}
785    fn assert_sync<T: Sync>() {}
786    assert_send::<StreamScanner>();
787    assert_sync::<StreamScanner>();
788};
789
790// ---------------------------------------------------------------------------
791// I/O helper
792// ---------------------------------------------------------------------------
793
794/// Read up to `buf.len()` bytes from `reader`, retrying on `Interrupted`.
795///
796/// Returns the number of bytes actually read (< `buf.len()` only at EOF).
797fn read_fully<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
798    let mut total = 0;
799    while total < buf.len() {
800        match reader.read(&mut buf[total..]) {
801            Ok(0) => break, // EOF
802            Ok(n) => total += n,
803            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
804            Err(e) => return Err(SanitizeError::IoError(e.to_string())),
805        }
806    }
807    Ok(total)
808}
809
810// ---------------------------------------------------------------------------
811// Unit tests
812// ---------------------------------------------------------------------------
813
814#[cfg(test)]
815mod tests {
816    use super::*;
817    use crate::generator::HmacGenerator;
818
819    /// Helper: build a scanner with given patterns and small chunk config.
820    fn test_scanner(patterns: Vec<ScanPattern>) -> StreamScanner {
821        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
822        let store = Arc::new(MappingStore::new(gen, None));
823        StreamScanner::new(
824            patterns,
825            store,
826            ScanConfig {
827                chunk_size: 64,
828                overlap_size: 16,
829            },
830        )
831        .unwrap()
832    }
833
834    /// Helper: email pattern.
835    fn email_pattern() -> ScanPattern {
836        ScanPattern::from_regex(
837            r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
838            Category::Email,
839            "email",
840        )
841        .unwrap()
842    }
843
844    /// Helper: IPv4 pattern.
845    fn ipv4_pattern() -> ScanPattern {
846        ScanPattern::from_regex(
847            r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
848            Category::IpV4,
849            "ipv4",
850        )
851        .unwrap()
852    }
853
854    // ---- Construction ----
855
856    #[test]
857    fn scanner_creation() {
858        let scanner = test_scanner(vec![email_pattern()]);
859        assert_eq!(scanner.pattern_count(), 1);
860    }
861
862    #[test]
863    fn invalid_config_zero_chunk() {
864        let gen = Arc::new(HmacGenerator::new([0u8; 32]));
865        let store = Arc::new(MappingStore::new(gen, None));
866        let result = StreamScanner::new(vec![], store, ScanConfig::new(0, 0));
867        assert!(result.is_err());
868    }
869
870    #[test]
871    fn invalid_config_overlap_ge_chunk() {
872        let gen = Arc::new(HmacGenerator::new([0u8; 32]));
873        let store = Arc::new(MappingStore::new(gen, None));
874        let result = StreamScanner::new(vec![], store, ScanConfig::new(100, 100));
875        assert!(result.is_err());
876    }
877
878    // ---- Empty / no-match cases ----
879
880    #[test]
881    fn empty_input() {
882        let scanner = test_scanner(vec![email_pattern()]);
883        let (output, stats) = scanner.scan_bytes(b"").unwrap();
884        assert!(output.is_empty());
885        assert_eq!(stats.matches_found, 0);
886        assert_eq!(stats.bytes_processed, 0);
887    }
888
889    #[test]
890    fn no_matches() {
891        let scanner = test_scanner(vec![email_pattern()]);
892        let input = b"There are no email addresses here.";
893        let (output, stats) = scanner.scan_bytes(input).unwrap();
894        assert_eq!(output, input.as_slice());
895        assert_eq!(stats.matches_found, 0);
896    }
897
898    // ---- Single match ----
899
900    #[test]
901    fn single_email_replaced() {
902        let scanner = test_scanner(vec![email_pattern()]);
903        let input = b"Contact alice@corp.com for help.";
904        let (output, stats) = scanner.scan_bytes(input).unwrap();
905        assert_eq!(stats.matches_found, 1);
906        assert_eq!(stats.replacements_applied, 1);
907        // Original must not appear in output.
908        assert!(!output
909            .windows(b"alice@corp.com".len())
910            .any(|w| w == b"alice@corp.com"));
911        // Replacement should contain the @ from the domain-preserving email.
912        let output_str = String::from_utf8_lossy(&output);
913        assert!(output_str.contains("@corp.com"));
914        // Length preserved: output is same total length as input.
915        assert_eq!(output.len(), input.len(), "length must be preserved");
916        // Surrounding text preserved.
917        assert!(output_str.starts_with("Contact "));
918        assert!(output_str.ends_with(" for help."));
919    }
920
921    // ---- Multiple matches ----
922
923    #[test]
924    fn multiple_emails_replaced() {
925        let scanner = test_scanner(vec![email_pattern()]);
926        let input = b"From alice@corp.com to bob@corp.com cc admin@corp.com";
927        let (output, stats) = scanner.scan_bytes(input).unwrap();
928        assert_eq!(stats.matches_found, 3);
929        let out_str = String::from_utf8_lossy(&output);
930        assert!(!out_str.contains("alice@corp.com"));
931        assert!(!out_str.contains("bob@corp.com"));
932        assert!(!out_str.contains("admin@corp.com"));
933    }
934
935    // ---- Same secret gets same replacement ----
936
937    #[test]
938    fn same_secret_same_replacement() {
939        let scanner = test_scanner(vec![email_pattern()]);
940        let input = b"First alice@corp.com then alice@corp.com again.";
941        let (output, stats) = scanner.scan_bytes(input).unwrap();
942        assert_eq!(stats.matches_found, 2);
943        let out_str = String::from_utf8_lossy(&output);
944        // Both occurrences should be replaced with the same value.
945        // With length-preserving replacements, look for the preserved domain.
946        let parts: Vec<&str> = out_str.split("@corp.com").collect();
947        // 3 parts = 2 occurrences of the replacement.
948        assert_eq!(parts.len(), 3);
949    }
950
951    // ---- Literal pattern ----
952
953    #[test]
954    fn literal_pattern_matched() {
955        let pat = ScanPattern::from_literal(
956            "SECRET_API_KEY_12345",
957            Category::Custom("api_key".into()),
958            "api_key",
959        )
960        .unwrap();
961        let scanner = test_scanner(vec![pat]);
962        let input = b"key=SECRET_API_KEY_12345&foo=bar";
963        let (output, stats) = scanner.scan_bytes(input).unwrap();
964        assert_eq!(stats.matches_found, 1);
965        assert!(!output
966            .windows(b"SECRET_API_KEY_12345".len())
967            .any(|w| w == b"SECRET_API_KEY_12345"));
968    }
969
970    // ---- Multiple pattern types ----
971
972    #[test]
973    fn multiple_pattern_types() {
974        let scanner = test_scanner(vec![email_pattern(), ipv4_pattern()]);
975        let input = b"Server 192.168.1.100 contact admin@server.com";
976        let (output, stats) = scanner.scan_bytes(input).unwrap();
977        assert_eq!(stats.matches_found, 2);
978        let out_str = String::from_utf8_lossy(&output);
979        assert!(!out_str.contains("192.168.1.100"));
980        assert!(!out_str.contains("admin@server.com"));
981        assert_eq!(*stats.pattern_counts.get("email").unwrap(), 1);
982        assert_eq!(*stats.pattern_counts.get("ipv4").unwrap(), 1);
983    }
984
985    // ---- Chunk boundary: match spans two chunks ----
986
987    #[test]
988    fn match_at_chunk_boundary() {
989        // Use a very small chunk size so the email straddles a boundary.
990        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
991        let store = Arc::new(MappingStore::new(gen, None));
992        let scanner = StreamScanner::new(
993            vec![email_pattern()],
994            store,
995            ScanConfig {
996                chunk_size: 20, // very small
997                overlap_size: 16,
998            },
999        )
1000        .unwrap();
1001
1002        // Place an email address that will definitely straddle a boundary.
1003        let input = b"AAAAAAAAAAAAAAAA alice@corp.com BBBBBBBBBBBBB";
1004        let (output, stats) = scanner.scan_bytes(input).unwrap();
1005        assert_eq!(stats.matches_found, 1);
1006        let out_str = String::from_utf8_lossy(&output);
1007        assert!(!out_str.contains("alice@corp.com"));
1008        assert!(out_str.contains("@corp.com"), "domain must be preserved");
1009    }
1010
1011    // ---- Large input requiring many chunks ----
1012
1013    #[test]
1014    fn large_input_many_chunks() {
1015        let scanner = test_scanner(vec![email_pattern()]);
1016
1017        // Build a ~2 KiB input with emails sprinkled in.
1018        let mut input = Vec::new();
1019        let filler = b"Lorem ipsum dolor sit amet. ";
1020        for i in 0..20 {
1021            input.extend_from_slice(filler);
1022            let email = format!("user{}@example.com ", i);
1023            input.extend_from_slice(email.as_bytes());
1024        }
1025
1026        let (output, stats) = scanner.scan_bytes(&input).unwrap();
1027        assert_eq!(stats.matches_found, 20);
1028        let out_str = String::from_utf8_lossy(&output);
1029        for i in 0..20 {
1030            let email = format!("user{}@example.com", i);
1031            assert!(!out_str.contains(&email));
1032        }
1033    }
1034
1035    // ---- Scan via Read/Write interface ----
1036
1037    #[test]
1038    fn scan_reader_writer() {
1039        let scanner = test_scanner(vec![email_pattern()]);
1040        let input = b"hello alice@corp.com world";
1041        let mut output = Vec::new();
1042        let stats = scanner.scan_reader(&input[..], &mut output).unwrap();
1043        assert_eq!(stats.matches_found, 1);
1044        let out_str = String::from_utf8_lossy(&output);
1045        assert!(out_str.contains("@corp.com"), "domain must be preserved");
1046    }
1047
1048    // ---- Pattern compile error ----
1049
1050    #[test]
1051    fn invalid_regex_pattern() {
1052        let result = ScanPattern::from_regex("[invalid(", Category::Email, "bad");
1053        assert!(result.is_err());
1054    }
1055
1056    // ---- Default config ----
1057
1058    #[test]
1059    fn default_config_valid() {
1060        ScanConfig::default().validate().unwrap();
1061    }
1062
1063    // ---- Config edge cases ----
1064
1065    #[test]
1066    fn config_chunk_1_overlap_0() {
1067        // Extreme but valid: 1-byte chunks, no overlap.
1068        // Won't catch multi-byte patterns, but should not crash.
1069        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1070        let store = Arc::new(MappingStore::new(gen, None));
1071        let scanner = StreamScanner::new(vec![], store, ScanConfig::new(1, 0)).unwrap();
1072        let (output, _) = scanner.scan_bytes(b"hello").unwrap();
1073        assert_eq!(output, b"hello");
1074    }
1075
1076    // ---- Bytes output tracking ----
1077
1078    #[test]
1079    fn bytes_output_preserved_on_replacement() {
1080        let scanner = test_scanner(vec![email_pattern()]);
1081        let input = b"a@b.cc"; // short email
1082        let (output, stats) = scanner.scan_bytes(input).unwrap();
1083        assert_eq!(stats.bytes_processed, input.len() as u64);
1084        assert_eq!(stats.bytes_output, output.len() as u64);
1085        // Length-preserving: output length matches input length.
1086        assert_eq!(output.len(), input.len());
1087    }
1088}