Skip to main content

sanitize_engine/processor/
archive.rs

1//! Archive processor for sanitizing files inside `.zip`, `.tar`, and `.tar.gz` archives.
2//!
3//! # Architecture
4//!
5//! ```text
6//! ┌───────────────────────┐
7//! │  Archive (zip/tar/gz) │
8//! └────────┬──────────────┘
9//!          │  for each entry
10//!          ▼
11//! ┌─────────────────────────────────────────────┐
12//! │  1. Match entry filename → FileTypeProfile  │
13//! │  2. Try ProcessorRegistry (structured)      │
14//! │  3. Fallback: StreamScanner (streaming)     │
15//! └────────┬────────────────────────────────────┘
16//!          │  sanitized bytes
17//!          ▼
18//! ┌───────────────────────┐
19//! │  Rebuilt archive       │
20//! │  (same format, meta   │
21//! │   preserved)          │
22//! └───────────────────────┘
23//! ```
24//!
25//! # Memory Efficiency
26//!
27//! Archives are processed **entry-by-entry**. Each entry is piped
28//! through either a structured processor (which must buffer the full
29//! entry) or the [`StreamScanner`]
30//! (which processes in configurable chunks). This means the maximum
31//! memory footprint is proportional to the largest *single entry*
32//! that uses a structured processor. Files without a profile match
33//! are streamed through the scanner without buffering the whole entry.
34//!
35//! For very large individual files inside archives, the streaming
36//! scanner path keeps only `chunk_size + overlap_size` bytes in memory.
37//!
38//! # Thread Safety
39//!
40//! [`ArchiveProcessor`] is `Send + Sync`. The underlying
41//! [`MappingStore`] provides lock-free
42//! reads for dedup consistency.
43//!
44//! # Metadata Preservation
45//!
46//! - **Tar**: modification time, permissions (mode), uid/gid, and
47//!   username/groupname are copied from the source entry.
48//! - **Zip**: modification time, compression method, and unix
49//!   permissions are preserved.
50//! - Symlinks, directories, and other non-regular entries are passed
51//!   through unchanged.
52
53use crate::error::{Result, SanitizeError};
54use crate::processor::profile::FileTypeProfile;
55use crate::processor::registry::ProcessorRegistry;
56use crate::scanner::{ScanStats, StreamScanner};
57use crate::store::MappingStore;
58
59use glob::MatchOptions;
60use rayon::prelude::*;
61use std::collections::HashMap;
62use std::io::{self, Read, Seek, Write};
63use std::sync::Arc;
64
65/// Maximum size (in bytes) for a single archive entry to be loaded into
66/// memory for structured processing. Entries larger than this are
67/// streamed through the scanner instead (M-3 fix).
68const MAX_STRUCTURED_ENTRY_SIZE: u64 = 256 * 1024 * 1024; // 256 MiB
69
70/// Maximum total uncompressed data size (bytes) across all zip entries
71/// before the parallel processing path is disabled.  Above this threshold
72/// the zip processor falls back to sequential (one entry at a time) to
73/// avoid loading the entire archive into memory simultaneously.
74const MAX_PARALLEL_ZIP_DATA_SIZE: u64 = 256 * 1024 * 1024; // 256 MiB
75
76/// Default maximum nesting depth for recursive archive processing.
77///
78/// Depth 0 is the top-level archive. Nested archives at depths 1
79/// through `DEFAULT_MAX_ARCHIVE_DEPTH` are recursively extracted and
80/// sanitized. Exceeding this limit returns
81/// [`SanitizeError::RecursionDepthExceeded`].
82///
83/// Each nesting level buffers the inner archive in memory (up to
84/// `MAX_STRUCTURED_ENTRY_SIZE` per level), so the hard maximum is
85/// capped at 10 to bound peak memory.
86pub const DEFAULT_MAX_ARCHIVE_DEPTH: u32 = 3;
87
88/// Absolute maximum allowed value for archive nesting depth.
89/// Guards against excessive memory usage (each level can buffer up to
90/// 256 MiB).
91const MAX_ALLOWED_ARCHIVE_DEPTH: u32 = 10;
92
93/// Minimum number of file entries in an archive before parallel entry
94/// processing is enabled. Below this threshold the overhead of spawning
95/// rayon tasks exceeds the savings.
96const PARALLEL_ENTRY_THRESHOLD: usize = 4;
97
98// ---------------------------------------------------------------------------
99// Archive format enum
100// ---------------------------------------------------------------------------
101
102/// Per-entry result from parallel zip processing: `(meta_index, sanitized_bytes_and_stats)`.
103type ZipEntryResult = (usize, Result<(Vec<u8>, ArchiveStats)>);
104
105// ---------------------------------------------------------------------------
106// ArchiveFilter
107// ---------------------------------------------------------------------------
108
109/// A compiled glob-based entry filter for archive processing.
110///
111/// Patterns are compiled once at construction time. At processing time
112/// `passes()` is called for each file entry path inside the archive.
113///
114/// ## Pattern semantics
115///
116/// - `*` matches any sequence of characters that does **not** contain `/`.
117/// - `**` matches any sequence of characters including `/`.
118/// - `?` matches any single character except `/`.
119/// - `[abc]` matches one of the listed characters.
120/// - A pattern ending with `/` is a *directory prefix* — it matches
121///   the directory itself and any path underneath it.
122///
123/// ## Filter logic
124///
125/// 1. If `--only` patterns are present: the entry path must match at
126///    least one pattern, otherwise it is dropped.
127/// 2. If `--exclude` patterns are present: if the entry path matches
128///    any pattern, it is dropped.
129/// 3. Only file entries are filtered; directory / symlink entries
130///    always pass through to preserve archive structure.
131#[derive(Default, Clone)]
132pub struct ArchiveFilter {
133    only: Vec<CompiledPattern>,
134    exclude: Vec<CompiledPattern>,
135}
136
137#[derive(Clone)]
138enum CompiledPattern {
139    /// Pattern that ended with `/` — matches the prefix directory and
140    /// everything inside it.
141    DirPrefix(String),
142    /// General glob pattern compiled with `require_literal_separator`.
143    Glob(glob::Pattern),
144}
145
146const GLOB_OPTS: MatchOptions = MatchOptions {
147    case_sensitive: true,
148    require_literal_separator: true,
149    require_literal_leading_dot: false,
150};
151
152impl CompiledPattern {
153    fn compile(raw: &str) -> std::result::Result<Self, String> {
154        if raw.ends_with('/') {
155            // Strip trailing slash; matching is done manually in `matches`.
156            Ok(CompiledPattern::DirPrefix(
157                raw.trim_end_matches('/').to_string(),
158            ))
159        } else {
160            glob::Pattern::new(raw)
161                .map(CompiledPattern::Glob)
162                .map_err(|e| format!("invalid glob pattern '{raw}': {e}"))
163        }
164    }
165
166    fn matches(&self, path: &str) -> bool {
167        match self {
168            CompiledPattern::DirPrefix(prefix) => {
169                path == prefix || path.starts_with(&format!("{prefix}/"))
170            }
171            CompiledPattern::Glob(pat) => pat.matches_with(path, GLOB_OPTS),
172        }
173    }
174}
175
176impl ArchiveFilter {
177    /// Compile `only` and `exclude` pattern lists into an `ArchiveFilter`.
178    ///
179    /// Returns an error if any pattern contains invalid glob syntax.
180    pub fn new(
181        only: Vec<String>,
182        exclude: Vec<String>,
183    ) -> std::result::Result<Self, String> {
184        let only = only
185            .iter()
186            .map(|p| CompiledPattern::compile(p))
187            .collect::<std::result::Result<Vec<_>, _>>()?;
188        let exclude = exclude
189            .iter()
190            .map(|p| CompiledPattern::compile(p))
191            .collect::<std::result::Result<Vec<_>, _>>()?;
192        Ok(Self { only, exclude })
193    }
194
195    /// Returns `true` when neither `--only` nor `--exclude` patterns are set.
196    pub fn is_empty(&self) -> bool {
197        self.only.is_empty() && self.exclude.is_empty()
198    }
199
200    /// Returns `true` if `path` should be included in the output archive.
201    ///
202    /// Only applies to file entries; directory entries bypass this check.
203    pub fn passes(&self, path: &str) -> bool {
204        if !self.only.is_empty() && !self.only.iter().any(|p| p.matches(path)) {
205            return false;
206        }
207        if self.exclude.iter().any(|p| p.matches(path)) {
208            return false;
209        }
210        true
211    }
212}
213
214// ---------------------------------------------------------------------------
215// Archive format enum
216// ---------------------------------------------------------------------------
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub enum ArchiveFormat {
219    /// `.zip` archive.
220    Zip,
221    /// Uncompressed `.tar` archive.
222    Tar,
223    /// Gzip-compressed `.tar.gz` / `.tgz` archive.
224    TarGz,
225}
226
227impl ArchiveFormat {
228    /// Detect archive format from a file path / extension.
229    ///
230    /// Returns `None` for unrecognised extensions.
231    pub fn from_path(path: &str) -> Option<Self> {
232        let lower = path.to_ascii_lowercase();
233        if lower.ends_with(".tar.gz")
234            || std::path::Path::new(&lower)
235                .extension()
236                .is_some_and(|ext| ext.eq_ignore_ascii_case("tgz"))
237        {
238            Some(Self::TarGz)
239        } else if std::path::Path::new(&lower)
240            .extension()
241            .is_some_and(|ext| ext.eq_ignore_ascii_case("tar"))
242        {
243            Some(Self::Tar)
244        } else if std::path::Path::new(&lower)
245            .extension()
246            .is_some_and(|ext| ext.eq_ignore_ascii_case("zip"))
247        {
248            Some(Self::Zip)
249        } else {
250            None
251        }
252    }
253}
254
255// ---------------------------------------------------------------------------
256// Archive statistics
257// ---------------------------------------------------------------------------
258
259/// Statistics collected while processing an archive.
260#[derive(Debug, Clone, Default)]
261pub struct ArchiveStats {
262    /// Number of file entries processed (excludes dirs/symlinks).
263    pub files_processed: u64,
264    /// Number of entries passed through unchanged (dirs, symlinks, etc.).
265    pub entries_skipped: u64,
266    /// Number of files handled by a structured processor.
267    pub structured_hits: u64,
268    /// Number of files handled by the streaming scanner fallback.
269    pub scanner_fallback: u64,
270    /// Number of entries that were themselves archives and processed
271    /// recursively.
272    pub nested_archives: u64,
273    /// Total input bytes across all file entries.
274    pub total_input_bytes: u64,
275    /// Total output bytes across all file entries.
276    pub total_output_bytes: u64,
277    /// Per-file processing method: filename → `"structured:<proc>"`, `"scanner"`,
278    /// or `"nested:<format>"`.
279    pub file_methods: HashMap<String, String>,
280    /// Per-file scan statistics (matches, replacements, bytes, pattern counts).
281    pub file_scan_stats: HashMap<String, ScanStats>,
282    /// Number of file entries removed by the [`ArchiveFilter`].
283    pub entries_filtered: u64,
284}
285
286/// Progress snapshot emitted while processing archive entries.
287#[derive(Debug, Clone, Eq, PartialEq)]
288pub struct ArchiveProgress {
289    /// Entries seen so far, including skipped entries.
290    pub entries_seen: u64,
291    /// Regular file entries processed so far.
292    pub files_processed: u64,
293    /// Non-file entries skipped so far.
294    pub entries_skipped: u64,
295    /// Total entries when cheaply known.
296    pub total_entries: Option<u64>,
297    /// Path of the current entry.
298    pub current_entry: String,
299}
300
301type ArchiveProgressCallback = Arc<dyn Fn(&ArchiveProgress) + Send + Sync>;
302
303impl ArchiveStats {
304    /// Merge statistics from a nested archive into this parent.
305    fn merge(&mut self, child: &ArchiveStats) {
306        self.files_processed += child.files_processed;
307        self.entries_skipped += child.entries_skipped;
308        self.structured_hits += child.structured_hits;
309        self.scanner_fallback += child.scanner_fallback;
310        self.nested_archives += child.nested_archives;
311        self.total_input_bytes += child.total_input_bytes;
312        self.total_output_bytes += child.total_output_bytes;
313        self.entries_filtered += child.entries_filtered;
314        self.file_methods
315            .extend(child.file_methods.iter().map(|(k, v)| (k.clone(), v.clone())));
316        self.file_scan_stats
317            .extend(child.file_scan_stats.iter().map(|(k, v)| (k.clone(), v.clone())));
318    }
319}
320
321// ---------------------------------------------------------------------------
322// ArchiveProcessor
323// ---------------------------------------------------------------------------
324
325/// Processes archives by sanitizing each contained file and rebuilding
326/// the archive with the same format and preserved metadata.
327///
328/// # Usage
329///
330/// ```rust,no_run
331/// use sanitize_engine::processor::archive::{ArchiveProcessor, ArchiveFormat};
332/// use sanitize_engine::processor::registry::ProcessorRegistry;
333/// use sanitize_engine::scanner::{StreamScanner, ScanPattern, ScanConfig};
334/// use sanitize_engine::generator::HmacGenerator;
335/// use sanitize_engine::store::MappingStore;
336/// use sanitize_engine::category::Category;
337/// use std::sync::Arc;
338///
339/// let gen = Arc::new(HmacGenerator::new([42u8; 32]));
340/// let store = Arc::new(MappingStore::new(gen, None));
341/// let patterns = vec![
342///     ScanPattern::from_regex(r"secret\w+", Category::Custom("secret".into()), "secrets").unwrap(),
343/// ];
344/// let scanner = Arc::new(
345///     StreamScanner::new(patterns, Arc::clone(&store), ScanConfig::default()).unwrap(),
346/// );
347/// let registry = Arc::new(ProcessorRegistry::with_builtins());
348///
349/// let archive_proc = ArchiveProcessor::new(registry, scanner, store, vec![]);
350/// ```
351pub struct ArchiveProcessor {
352    /// Registry of structured processors.
353    registry: Arc<ProcessorRegistry>,
354    /// Streaming scanner for fallback processing.
355    scanner: Arc<StreamScanner>,
356    /// Shared mapping store (one-way replacements).
357    store: Arc<MappingStore>,
358    /// File-type profiles for structured processor matching.
359    profiles: Vec<FileTypeProfile>,
360    /// Maximum nesting depth for recursive archive processing.
361    max_depth: u32,
362    /// Optional callback for per-entry progress updates.
363    progress_callback: Option<ArchiveProgressCallback>,
364    /// Minimum number of file entries required to enable parallel entry
365    /// sanitization. Default: [`PARALLEL_ENTRY_THRESHOLD`].
366    parallel_threshold: usize,
367    /// Entry-level filter controlling which paths are included in the
368    /// output archive. Default: empty (pass all entries).
369    filter: ArchiveFilter,
370    /// When true, bypass all structured processors and use only the
371    /// streaming scanner for every entry. Trades format preservation
372    /// for maximum sanitization coverage.
373    force_text: bool,
374}
375
376impl ArchiveProcessor {
377    /// Create a new archive processor.
378    ///
379    /// # Arguments
380    ///
381    /// - `registry` — structured processor registry.
382    /// - `scanner` — streaming scanner for fallback.
383    /// - `store` — shared mapping store for one-way dedup replacements.
384    /// - `profiles` — file-type profiles for structured matching.
385    pub fn new(
386        registry: Arc<ProcessorRegistry>,
387        scanner: Arc<StreamScanner>,
388        store: Arc<MappingStore>,
389        profiles: Vec<FileTypeProfile>,
390    ) -> Self {
391        Self {
392            registry,
393            scanner,
394            store,
395            profiles,
396            max_depth: DEFAULT_MAX_ARCHIVE_DEPTH,
397            progress_callback: None,
398            parallel_threshold: PARALLEL_ENTRY_THRESHOLD,
399            filter: ArchiveFilter::default(),
400            force_text: false,
401        }
402    }
403
404    /// Override the maximum nesting depth for recursive archive
405    /// processing.
406    ///
407    /// The default is [`DEFAULT_MAX_ARCHIVE_DEPTH`] (3). Values above
408    /// 10 are clamped.
409    #[must_use]
410    pub fn with_max_depth(mut self, depth: u32) -> Self {
411        self.max_depth = depth.min(MAX_ALLOWED_ARCHIVE_DEPTH);
412        self
413    }
414
415    /// Override the minimum entry count required to enable parallel
416    /// entry sanitization. Set to `usize::MAX` to disable parallelism
417    /// entirely for this processor instance (e.g. when outer file-level
418    /// parallelism is already saturating the thread budget).
419    #[must_use]
420    pub fn with_parallel_threshold(mut self, threshold: usize) -> Self {
421        self.parallel_threshold = threshold;
422        self
423    }
424
425    /// Register a per-entry archive progress callback.
426    #[must_use]
427    pub fn with_progress_callback(mut self, callback: ArchiveProgressCallback) -> Self {
428        self.progress_callback = Some(callback);
429        self
430    }
431
432    /// Apply an [`ArchiveFilter`] that controls which file entries are
433    /// included in the output archive.
434    ///
435    /// Entries that do not pass the filter are **removed** from the
436    /// output entirely. Directory / symlink entries are never filtered.
437    #[must_use]
438    pub fn with_filter(mut self, filter: ArchiveFilter) -> Self {
439        self.filter = filter;
440        self
441    }
442
443    /// When set, bypass all structured processors and use only the
444    /// streaming scanner for every archive entry.
445    ///
446    /// Trades format preservation for maximum sanitization coverage.
447    /// Useful when the user is uncertain about field rules or wants a
448    /// belt-and-suspenders guarantee that every byte is scanned.
449    #[must_use]
450    pub fn with_force_text(mut self, force_text: bool) -> Self {
451        self.force_text = force_text;
452        self
453    }
454
455    /// Find the first profile matching a filename.
456    fn find_profile(&self, filename: &str) -> Option<&FileTypeProfile> {
457        self.profiles.iter().find(|p| p.matches_filename(filename))
458    }
459
460    fn emit_progress(&self, stats: &ArchiveStats, total_entries: Option<u64>, current_entry: &str) {
461        if let Some(callback) = &self.progress_callback {
462            callback(&ArchiveProgress {
463                entries_seen: stats.files_processed + stats.entries_skipped,
464                files_processed: stats.files_processed,
465                entries_skipped: stats.entries_skipped,
466                total_entries,
467                current_entry: current_entry.to_string(),
468            });
469        }
470    }
471
472    /// Sanitize a file entry given its raw bytes.
473    ///
474    /// Returns the sanitized bytes together with a fresh [`ArchiveStats`]
475    /// covering only this entry. This is the core work unit for parallel
476    /// entry processing in [`process_tar_at_depth`] and
477    /// [`process_zip_at_depth`].
478    fn sanitize_entry_bytes(
479        &self,
480        filename: &str,
481        data: &[u8],
482        entry_size_hint: Option<u64>,
483        depth: u32,
484    ) -> Result<(Vec<u8>, ArchiveStats)> {
485        let mut out: Vec<u8> = Vec::with_capacity(data.len());
486        let mut entry_stats = ArchiveStats::default();
487        let mut reader = io::Cursor::new(data);
488        self.sanitize_entry(
489            filename,
490            &mut reader,
491            &mut out,
492            &mut entry_stats,
493            entry_size_hint,
494            depth,
495        )?;
496        Ok((out, entry_stats))
497    }
498
499    /// Sanitize the content of a single file entry.
500    ///
501    /// If the entry is itself an archive (detected via extension), it is
502    /// recursively processed up to `self.max_depth`. Otherwise, tries a
503    /// structured processor first; falls back to the streaming scanner
504    /// if no processor matches.
505    ///
506    /// For the streaming scanner path, the content is piped through
507    /// `scan_reader` directly to the writer for memory-efficient
508    /// chunk-based processing (F-02 fix: no full output buffering).
509    #[allow(clippy::missing_errors_doc)] // private method
510    fn sanitize_entry(
511        &self,
512        filename: &str,
513        reader: &mut dyn Read,
514        writer: &mut dyn Write,
515        stats: &mut ArchiveStats,
516        entry_size_hint: Option<u64>,
517        depth: u32,
518    ) -> Result<()> {
519        // --- Nested archive detection ---
520        if let Some(nested_fmt) = ArchiveFormat::from_path(filename) {
521            return self.sanitize_nested_archive(
522                filename,
523                reader,
524                writer,
525                stats,
526                entry_size_hint,
527                nested_fmt,
528                depth,
529            );
530        }
531
532        // --- Structured / scanner processing ---
533
534        // Try structured processing first, but only if the entry is
535        // within the size cap and --force-text is not set.
536        // Oversized entries fall through to the streaming scanner (M-3 fix).
537        let within_size_cap = entry_size_hint.map_or(true, |sz| sz <= MAX_STRUCTURED_ENTRY_SIZE); // unknown size → allow (conservative)
538
539        if !self.force_text && within_size_cap {
540            if let Some(profile) = self.find_profile(filename) {
541                // Structured processors need the full content in memory.
542                let mut content = Vec::new();
543                reader.read_to_end(&mut content).map_err(|e| {
544                    SanitizeError::ArchiveError(format!("read entry '{filename}': {e}"))
545                })?;
546
547                stats.total_input_bytes += content.len() as u64;
548
549                // A parse error (e.g. binary content with a .yaml extension, like
550                // macOS resource-fork ._* files) falls through to the scanner
551                // rather than failing the whole archive.
552                match self.registry.process(&content, profile, &self.store) {
553                    Ok(Some(structured_out)) => {
554                        // Double-pass: run the streaming scanner on the structured
555                        // output to catch anything the field rules missed.
556                        let (output, scan_stats) = self.scanner.scan_bytes(&structured_out)?;
557                        stats.structured_hits += 1;
558                        stats.total_output_bytes += output.len() as u64;
559                        stats.file_methods.insert(
560                            filename.to_string(),
561                            format!("structured+scan:{}", profile.processor),
562                        );
563                        stats
564                            .file_scan_stats
565                            .insert(filename.to_string(), scan_stats);
566                        writer.write_all(&output).map_err(|e| {
567                            SanitizeError::ArchiveError(format!("write entry '{filename}': {e}"))
568                        })?;
569                        return Ok(());
570                    }
571                    Ok(None) => {} // heuristic rejected — fall through to scanner below
572                    Err(_) => {}  // parse failed — fall through to scanner below
573                }
574
575                // Processor didn't match or failed — fall back to
576                // scanner with the already-buffered content.
577                let (output, scan_stats) = self.scanner.scan_bytes(&content)?;
578                stats.scanner_fallback += 1;
579                stats.total_output_bytes += output.len() as u64;
580                stats
581                    .file_methods
582                    .insert(filename.to_string(), "scanner".to_string());
583                stats
584                    .file_scan_stats
585                    .insert(filename.to_string(), scan_stats);
586                writer.write_all(&output).map_err(|e| {
587                    SanitizeError::ArchiveError(format!("write entry '{filename}': {e}"))
588                })?;
589                return Ok(());
590            }
591        }
592
593        // No profile (or entry too large) → streaming scanner.
594        // F-02 fix: stream directly from reader → scanner → writer
595        // without buffering the full output. We use a CountingWriter
596        // to track output bytes alongside the CountingReader for input.
597        let mut counting_r = CountingReader::new(reader);
598        let mut counting_w = CountingWriter::new(writer);
599        let scan_stats = self.scanner.scan_reader(&mut counting_r, &mut counting_w)?;
600
601        stats.scanner_fallback += 1;
602        stats.total_input_bytes += counting_r.bytes_read();
603        stats.total_output_bytes += counting_w.bytes_written();
604        stats
605            .file_methods
606            .insert(filename.to_string(), "scanner".to_string());
607        stats
608            .file_scan_stats
609            .insert(filename.to_string(), scan_stats);
610
611        Ok(())
612    }
613
614    /// Handle a nested archive entry: validate depth/size, buffer, recurse,
615    /// and write the sanitized output.
616    #[allow(clippy::too_many_arguments)]
617    fn sanitize_nested_archive(
618        &self,
619        filename: &str,
620        reader: &mut dyn Read,
621        writer: &mut dyn Write,
622        stats: &mut ArchiveStats,
623        entry_size_hint: Option<u64>,
624        nested_fmt: ArchiveFormat,
625        depth: u32,
626    ) -> Result<()> {
627        if depth >= self.max_depth {
628            return Err(SanitizeError::RecursionDepthExceeded(format!(
629                "nested archive '{}' at depth {} exceeds maximum nesting depth of {}",
630                filename, depth, self.max_depth,
631            )));
632        }
633
634        // Buffer the nested archive (bounded by MAX_STRUCTURED_ENTRY_SIZE).
635        if let Some(sz) = entry_size_hint {
636            if sz > MAX_STRUCTURED_ENTRY_SIZE {
637                return Err(SanitizeError::ArchiveError(format!(
638                    "nested archive '{}' is too large ({} bytes, limit {} bytes)",
639                    filename, sz, MAX_STRUCTURED_ENTRY_SIZE,
640                )));
641            }
642        }
643
644        let mut content = Vec::new();
645        reader.read_to_end(&mut content).map_err(|e| {
646            SanitizeError::ArchiveError(format!("read nested archive '{filename}': {e}"))
647        })?;
648        stats.total_input_bytes += content.len() as u64;
649
650        // Recurse into the nested archive.
651        let mut output_buf: Vec<u8> = Vec::new();
652        let child_stats = match nested_fmt {
653            ArchiveFormat::Tar => {
654                self.process_tar_at_depth(&content[..], &mut output_buf, depth + 1)?
655            }
656            ArchiveFormat::TarGz => {
657                self.process_tar_gz_at_depth(&content[..], &mut output_buf, depth + 1)?
658            }
659            ArchiveFormat::Zip => {
660                let reader = io::Cursor::new(&content);
661                let mut writer = io::Cursor::new(Vec::new());
662                let s = self.process_zip_at_depth(reader, &mut writer, depth + 1)?;
663                output_buf = writer.into_inner();
664                s
665            }
666        };
667
668        stats.nested_archives += 1;
669        stats.merge(&child_stats);
670        stats.total_output_bytes += output_buf.len() as u64;
671        let fmt_name = match nested_fmt {
672            ArchiveFormat::Tar => "tar",
673            ArchiveFormat::TarGz => "tar.gz",
674            ArchiveFormat::Zip => "zip",
675        };
676        stats
677            .file_methods
678            .insert(filename.to_string(), format!("nested:{fmt_name}"));
679        writer.write_all(&output_buf).map_err(|e| {
680            SanitizeError::ArchiveError(format!("write nested archive '{filename}': {e}"))
681        })?;
682        Ok(())
683    }
684
685    // -----------------------------------------------------------------------
686    // Profile discovery passes (two-phase support)
687    // -----------------------------------------------------------------------
688    //
689    // These methods perform a read-only pre-pass over an archive, running the
690    // structured processor on every profile-matched entry and discarding the
691    // output.  The side-effect is that `self.store` is populated with the
692    // original→replacement mappings for those fields, so a subsequent call to
693    // `build_augmented_scanner` can inject those values as literals into the
694    // scanner used for the real processing pass.
695
696    /// Run the structured processor on every profile-matched entry in a
697    /// `.tar` archive, recording replacements into the store.  Output is
698    /// discarded; the archive is not modified.
699    pub fn discover_profiles_tar<R: Read>(&self, reader: R) -> Result<()> {
700        if self.profiles.is_empty() {
701            return Ok(());
702        }
703        let mut archive = tar::Archive::new(reader);
704        let entries = archive
705            .entries()
706            .map_err(|e| SanitizeError::ArchiveError(format!("discover tar entries: {e}")))?;
707        for entry_result in entries {
708            let mut entry = entry_result
709                .map_err(|e| SanitizeError::ArchiveError(format!("discover tar entry: {e}")))?;
710            if !entry.header().entry_type().is_file() {
711                continue;
712            }
713            let path = entry
714                .path()
715                .map_err(|e| SanitizeError::ArchiveError(format!("entry path: {e}")))?
716                .to_string_lossy()
717                .to_string();
718            let Some(profile) = self.find_profile(&path) else {
719                continue;
720            };
721            let mut content = Vec::new();
722            entry
723                .read_to_end(&mut content)
724                .map_err(|e| SanitizeError::ArchiveError(format!("read '{path}': {e}")))?;
725            let _ = self.registry.process(&content, profile, &self.store);
726        }
727        Ok(())
728    }
729
730    /// Run the structured processor on every profile-matched entry in a
731    /// `.tar.gz` archive, recording replacements into the store.  Output is
732    /// discarded; the archive is not modified.
733    pub fn discover_profiles_tar_gz<R: Read>(&self, reader: R) -> Result<()> {
734        let gz = flate2::read::GzDecoder::new(reader);
735        self.discover_profiles_tar(gz)
736    }
737
738    /// Run the structured processor on every profile-matched entry in a
739    /// `.zip` archive, recording replacements into the store.  Output is
740    /// discarded; the archive is not modified.
741    pub fn discover_profiles_zip<R: Read + Seek>(&self, reader: R) -> Result<()> {
742        if self.profiles.is_empty() {
743            return Ok(());
744        }
745        let mut zip = zip::ZipArchive::new(reader)
746            .map_err(|e| SanitizeError::ArchiveError(format!("open zip for discovery: {e}")))?;
747        for i in 0..zip.len() {
748            let mut entry = zip
749                .by_index(i)
750                .map_err(|e| SanitizeError::ArchiveError(format!("zip entry {i}: {e}")))?;
751            if entry.is_dir() {
752                continue;
753            }
754            let name = entry.name().to_string();
755            let Some(profile) = self.find_profile(&name) else {
756                continue;
757            };
758            let mut content = Vec::new();
759            entry
760                .read_to_end(&mut content)
761                .map_err(|e| SanitizeError::ArchiveError(format!("read '{name}': {e}")))?;
762            let _ = self.registry.process(&content, profile, &self.store);
763        }
764        Ok(())
765    }
766
767    // Tar processing
768    // -----------------------------------------------------------------------
769
770    /// Process a `.tar` archive, sanitizing each file entry and
771    /// rebuilding the archive with preserved metadata.
772    ///
773    /// Entries that are not regular files (directories, symlinks, etc.)
774    /// are copied through unchanged.
775    ///
776    /// # Errors
777    ///
778    /// Returns [`SanitizeError::ArchiveError`] on I/O failures or
779    /// [`SanitizeError::RecursionDepthExceeded`] for nested archives.
780    pub fn process_tar<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ArchiveStats> {
781        self.process_tar_at_depth(reader, writer, 0)
782    }
783
784    /// Internal: process a tar archive at a given nesting depth.
785    ///
786    /// Processes entries one at a time in a single streaming pass so only
787    /// one entry's bytes are in memory at a time (bounded by
788    /// `MAX_STRUCTURED_ENTRY_SIZE`). File-level parallelism (multiple
789    /// archive files processed concurrently by the outer loop) replaces
790    /// intra-archive parallelism, which required buffering the whole archive.
791    fn process_tar_at_depth<R: Read, W: Write>(
792        &self,
793        reader: R,
794        writer: W,
795        depth: u32,
796    ) -> Result<ArchiveStats> {
797        let mut stats = ArchiveStats::default();
798        let mut archive = tar::Archive::new(reader);
799        let mut builder = tar::Builder::new(writer);
800
801        let entries_iter = archive
802            .entries()
803            .map_err(|e| SanitizeError::ArchiveError(format!("read tar entries: {}", e)))?;
804
805        for entry_result in entries_iter {
806            let mut entry = entry_result
807                .map_err(|e| SanitizeError::ArchiveError(format!("read tar entry: {}", e)))?;
808
809            let header = entry.header().clone();
810            let path = entry
811                .path()
812                .map_err(|e| SanitizeError::ArchiveError(format!("entry path: {}", e)))?
813                .to_string_lossy()
814                .to_string();
815            let is_file = header.entry_type().is_file();
816
817            if !is_file {
818                // Non-file entries (dirs, symlinks): pass through unchanged.
819                let mut data = Vec::new();
820                entry.read_to_end(&mut data).map_err(|e| {
821                    SanitizeError::ArchiveError(format!("read tar entry '{}': {}", path, e))
822                })?;
823                drop(entry);
824                builder.append(&header, &*data).map_err(|e| {
825                    SanitizeError::ArchiveError(format!(
826                        "append non-file entry '{}': {}",
827                        path, e
828                    ))
829                })?;
830                stats.entries_skipped += 1;
831                self.emit_progress(&stats, None, &path);
832                continue;
833            }
834
835            // File entry: pipe the entry reader directly into the sanitizer,
836            // eliminating the intermediate input buffer. Structured processors
837            // and nested-archive paths buffer internally as required; the
838            // scanner path works chunk-by-chunk so peak memory is the output
839            // buffer only (half the footprint of the previous approach).
840
841            // Filter: drop entries that do not match the --only/--exclude rules.
842            if !self.filter.passes(&path) {
843                stats.entries_filtered += 1;
844                continue;
845            }
846
847            let size_hint = header.size().ok();
848            let mut sanitized_buf: Vec<u8> = Vec::new();
849            let mut entry_stats = ArchiveStats::default();
850            self.sanitize_entry(
851                &path,
852                &mut entry,
853                &mut sanitized_buf,
854                &mut entry_stats,
855                size_hint,
856                depth,
857            )?;
858            drop(entry);
859
860            let mut new_header = header.clone();
861            new_header.set_size(sanitized_buf.len() as u64);
862            new_header.set_cksum();
863
864            builder.append(&new_header, &*sanitized_buf).map_err(|e| {
865                SanitizeError::ArchiveError(format!("append entry '{}': {}", path, e))
866            })?;
867            drop(sanitized_buf);
868
869            stats.merge(&entry_stats);
870            stats.files_processed += 1;
871            self.emit_progress(&stats, None, &path);
872        }
873
874        builder
875            .finish()
876            .map_err(|e| SanitizeError::ArchiveError(format!("finalize tar: {}", e)))?;
877
878        Ok(stats)
879    }
880
881    /// Process a `.tar.gz` archive (gzip-compressed tar).
882    ///
883    /// Decompresses on the fly, processes each entry, and recompresses
884    /// the output.
885    ///
886    /// # Errors
887    ///
888    /// Returns [`SanitizeError::ArchiveError`] on I/O failures or
889    /// [`SanitizeError::RecursionDepthExceeded`] for nested archives.
890    pub fn process_tar_gz<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ArchiveStats> {
891        self.process_tar_gz_at_depth(reader, writer, 0)
892    }
893
894    /// Internal: process a tar.gz archive at a given nesting depth.
895    fn process_tar_gz_at_depth<R: Read, W: Write>(
896        &self,
897        reader: R,
898        writer: W,
899        depth: u32,
900    ) -> Result<ArchiveStats> {
901        let gz_reader = flate2::read::GzDecoder::new(reader);
902        let gz_writer = flate2::write::GzEncoder::new(writer, flate2::Compression::fast());
903
904        let stats = self.process_tar_at_depth(gz_reader, gz_writer, depth)?;
905        // GzEncoder is flushed when the tar builder finishes and the
906        // encoder is dropped. The `finish()` call in `process_tar`
907        // flushes the tar builder, which flushes writes to the
908        // GzEncoder. When the GzEncoder is dropped it finalises the
909        // gzip stream.
910        Ok(stats)
911    }
912
913    // -----------------------------------------------------------------------
914    // Zip processing
915    // -----------------------------------------------------------------------
916
917    /// Process a `.zip` archive, sanitizing each file entry and
918    /// rebuilding the archive with preserved metadata.
919    ///
920    /// # Type Bounds
921    ///
922    /// Zip requires seekable I/O for both reading and writing.
923    ///
924    /// # Errors
925    ///
926    /// Returns [`SanitizeError::ArchiveError`] on I/O failures or
927    /// [`SanitizeError::RecursionDepthExceeded`] for nested archives.
928    pub fn process_zip<R: Read + Seek, W: Write + Seek>(
929        &self,
930        reader: R,
931        writer: W,
932    ) -> Result<ArchiveStats> {
933        self.process_zip_at_depth(reader, writer, 0)
934    }
935
936    /// Internal: process a zip archive at a given nesting depth.
937    ///
938    /// Uses a lightweight metadata pre-pass (local-header reads, no data
939    /// decompression) to decide between parallel and sequential strategies:
940    ///
941    /// - **Parallel** (total uncompressed ≤ `MAX_PARALLEL_ZIP_DATA_SIZE` AND
942    ///   file count ≥ threshold AND depth == 0): load all entry data into
943    ///   memory, sanitize with rayon, write in order.
944    /// - **Sequential** (everything else): read → sanitize → write one entry
945    ///   at a time.  Peak memory is bounded to 2 × largest single entry.
946    #[allow(clippy::too_many_lines)]
947    fn process_zip_at_depth<R: Read + Seek, W: Write + Seek>(
948        &self,
949        reader: R,
950        writer: W,
951        depth: u32,
952    ) -> Result<ArchiveStats> {
953        // --- Stage 0: metadata pre-pass (no data reads) ---------------------
954        // Read local file headers to collect names, sizes, and options.
955        // This does N seeks but decompresses nothing, keeping memory flat.
956        struct ZipMeta {
957            name: String,
958            is_dir: bool,
959            compression: zip::CompressionMethod,
960            last_modified: zip::DateTime,
961            unix_mode: Option<u32>,
962            size: u64,
963        }
964
965        let mut zip_in = zip::ZipArchive::new(reader)
966            .map_err(|e| SanitizeError::ArchiveError(format!("open zip: {}", e)))?;
967        let total_entries = zip_in.len();
968        let total_entries_hint = Some(total_entries as u64);
969
970        let mut metas: Vec<ZipMeta> = Vec::with_capacity(total_entries);
971        let mut file_count = 0usize;
972        let mut total_uncompressed_size: u64 = 0;
973
974        for i in 0..total_entries {
975            let entry = zip_in
976                .by_index(i)
977                .map_err(|e| SanitizeError::ArchiveError(format!("zip entry {}: {}", i, e)))?;
978            let is_dir = entry.is_dir();
979            let size = entry.size();
980            if !is_dir {
981                file_count += 1;
982                total_uncompressed_size = total_uncompressed_size.saturating_add(size);
983            }
984            metas.push(ZipMeta {
985                name: entry.name().to_string(),
986                is_dir,
987                compression: entry.compression(),
988                last_modified: entry.last_modified(),
989                unix_mode: entry.unix_mode(),
990                size,
991            });
992            // entry dropped here — no data decompressed
993        }
994
995        // Parallel only when the total data fits comfortably in memory.
996        // Parallel when: enough entries, data fits in memory, and we are not
997        // already running inside a rayon worker thread (nested parallelism
998        // would over-subscribe the pool without proportional gains).
999        let use_parallel = file_count >= self.parallel_threshold
1000            && rayon::current_thread_index().is_none()
1001            && total_uncompressed_size <= MAX_PARALLEL_ZIP_DATA_SIZE;
1002
1003        let mut stats = ArchiveStats::default();
1004
1005        // Helper: build FileOptions for a metadata entry.
1006        let make_options = |m: &ZipMeta| {
1007            let opts = zip::write::FileOptions::default()
1008                .compression_method(m.compression)
1009                .last_modified_time(m.last_modified);
1010            if let Some(mode) = m.unix_mode {
1011                opts.unix_permissions(mode)
1012            } else {
1013                opts
1014            }
1015        };
1016
1017        if use_parallel {
1018            // --- Parallel path: load all data then sanitize concurrently ----
1019            struct ZipEntry {
1020                meta_idx: usize,
1021                data: Vec<u8>,
1022            }
1023
1024            let mut file_entries: Vec<ZipEntry> = Vec::with_capacity(file_count);
1025
1026            for (i, meta) in metas.iter().enumerate() {
1027                if meta.is_dir {
1028                    continue;
1029                }
1030                // Skip loading data for entries that will be filtered out.
1031                if !self.filter.passes(&meta.name) {
1032                    continue;
1033                }
1034                let mut entry = zip_in.by_index(i).map_err(|e| {
1035                    SanitizeError::ArchiveError(format!("zip entry {}: {}", i, e))
1036                })?;
1037                let mut data = Vec::new();
1038                entry.read_to_end(&mut data).map_err(|e| {
1039                    SanitizeError::ArchiveError(format!("read zip entry '{}': {}", meta.name, e))
1040                })?;
1041                file_entries.push(ZipEntry { meta_idx: i, data });
1042            }
1043
1044            let results: Vec<ZipEntryResult> = file_entries
1045                .into_par_iter()
1046                .map(|e| {
1047                    let meta = &metas[e.meta_idx];
1048                    let result =
1049                        self.sanitize_entry_bytes(&meta.name, &e.data, Some(meta.size), depth);
1050                    (e.meta_idx, result)
1051                })
1052                .collect();
1053
1054            // Collect into a positional Vec (indexed by metas position) for
1055            // O(1) ordered writes, avoiding HashMap hashing overhead.
1056            let mut sanitized: Vec<Option<(Vec<u8>, ArchiveStats)>> = vec![None; metas.len()];
1057            for (meta_idx, r) in results {
1058                sanitized[meta_idx] = Some(r?);
1059            }
1060
1061            let mut zip_out = zip::ZipWriter::new(writer);
1062            for (i, meta) in metas.iter().enumerate() {
1063                let options = make_options(meta);
1064                if meta.is_dir {
1065                    zip_out.add_directory(&meta.name, options).map_err(|e| {
1066                        SanitizeError::ArchiveError(format!("add dir '{}': {}", meta.name, e))
1067                    })?;
1068                    stats.entries_skipped += 1;
1069                    self.emit_progress(&stats, total_entries_hint, &meta.name);
1070                    continue;
1071                }
1072                // Filter: drop entries not matching --only/--exclude rules.
1073                if !self.filter.passes(&meta.name) {
1074                    stats.entries_filtered += 1;
1075                    self.emit_progress(&stats, total_entries_hint, &meta.name);
1076                    continue;
1077                }
1078                let (sanitized_buf, entry_stats) = sanitized[i]
1079                    .take()
1080                    .expect("file entry sanitization result missing");
1081                stats.merge(&entry_stats);
1082                zip_out.start_file(&meta.name, options).map_err(|e| {
1083                    SanitizeError::ArchiveError(format!("start file '{}': {}", meta.name, e))
1084                })?;
1085                zip_out.write_all(&sanitized_buf).map_err(|e| {
1086                    SanitizeError::ArchiveError(format!("write file '{}': {}", meta.name, e))
1087                })?;
1088                stats.files_processed += 1;
1089                self.emit_progress(&stats, total_entries_hint, &meta.name);
1090            }
1091            zip_out
1092                .finish()
1093                .map_err(|e| SanitizeError::ArchiveError(format!("finalize zip: {}", e)))?;
1094        } else {
1095            // --- Sequential path: one entry at a time -----------------------
1096            // Only one entry's data (input + sanitized output) is live at once.
1097            let mut zip_out = zip::ZipWriter::new(writer);
1098            for (i, meta) in metas.iter().enumerate() {
1099                let options = make_options(meta);
1100                if meta.is_dir {
1101                    zip_out.add_directory(&meta.name, options).map_err(|e| {
1102                        SanitizeError::ArchiveError(format!("add dir '{}': {}", meta.name, e))
1103                    })?;
1104                    stats.entries_skipped += 1;
1105                    self.emit_progress(&stats, total_entries_hint, &meta.name);
1106                    continue;
1107                }
1108
1109                // Filter: drop entries not matching --only/--exclude rules.
1110                if !self.filter.passes(&meta.name) {
1111                    stats.entries_filtered += 1;
1112                    self.emit_progress(&stats, total_entries_hint, &meta.name);
1113                    continue;
1114                }
1115
1116                let data = {
1117                    let mut entry = zip_in.by_index(i).map_err(|e| {
1118                        SanitizeError::ArchiveError(format!("zip entry {}: {}", i, e))
1119                    })?;
1120                    let mut buf = Vec::new();
1121                    entry.read_to_end(&mut buf).map_err(|e| {
1122                        SanitizeError::ArchiveError(format!(
1123                            "read zip entry '{}': {}",
1124                            meta.name, e
1125                        ))
1126                    })?;
1127                    buf
1128                    // entry dropped here
1129                };
1130
1131                let (sanitized_buf, entry_stats) =
1132                    self.sanitize_entry_bytes(&meta.name, &data, Some(meta.size), depth)?;
1133                drop(data);
1134
1135                zip_out.start_file(&meta.name, options).map_err(|e| {
1136                    SanitizeError::ArchiveError(format!("start file '{}': {}", meta.name, e))
1137                })?;
1138                zip_out.write_all(&sanitized_buf).map_err(|e| {
1139                    SanitizeError::ArchiveError(format!("write file '{}': {}", meta.name, e))
1140                })?;
1141                drop(sanitized_buf);
1142
1143                stats.merge(&entry_stats);
1144                stats.files_processed += 1;
1145                self.emit_progress(&stats, total_entries_hint, &meta.name);
1146            }
1147            zip_out
1148                .finish()
1149                .map_err(|e| SanitizeError::ArchiveError(format!("finalize zip: {}", e)))?;
1150        }
1151
1152        Ok(stats)
1153    }
1154
1155    // -----------------------------------------------------------------------
1156    // Format-aware dispatch
1157    // -----------------------------------------------------------------------
1158
1159    /// Auto-detect the archive format and process accordingly.
1160    ///
1161    /// For zip archives the reader must additionally implement `Seek`.
1162    /// This method accepts `Read + Seek` to cover all formats uniformly.
1163    /// Tar and tar.gz do not require seeking, but the bound is imposed
1164    /// for a single entry point.
1165    ///
1166    /// # Errors
1167    ///
1168    /// Returns [`SanitizeError::ArchiveError`] on I/O failures or
1169    /// [`SanitizeError::RecursionDepthExceeded`] for nested archives.
1170    pub fn process<R: Read + Seek, W: Write + Seek>(
1171        &self,
1172        reader: R,
1173        writer: W,
1174        format: ArchiveFormat,
1175    ) -> Result<ArchiveStats> {
1176        match format {
1177            ArchiveFormat::Zip => self.process_zip(reader, writer),
1178            ArchiveFormat::Tar => self.process_tar(reader, writer),
1179            ArchiveFormat::TarGz => self.process_tar_gz(reader, writer),
1180        }
1181    }
1182}
1183
1184// ---------------------------------------------------------------------------
1185// Counting reader wrapper (for input byte tracking)
1186// ---------------------------------------------------------------------------
1187
1188/// A thin wrapper around a reader that counts bytes read.
1189struct CountingReader<'a> {
1190    inner: &'a mut dyn Read,
1191    count: u64,
1192}
1193
1194impl<'a> CountingReader<'a> {
1195    fn new(inner: &'a mut dyn Read) -> Self {
1196        Self { inner, count: 0 }
1197    }
1198
1199    fn bytes_read(&self) -> u64 {
1200        self.count
1201    }
1202}
1203
1204impl Read for CountingReader<'_> {
1205    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1206        let n = self.inner.read(buf)?;
1207        self.count += n as u64;
1208        Ok(n)
1209    }
1210}
1211
1212/// A thin wrapper around a writer that counts bytes written (F-02 fix).
1213struct CountingWriter<'a> {
1214    inner: &'a mut dyn Write,
1215    count: u64,
1216}
1217
1218impl<'a> CountingWriter<'a> {
1219    fn new(inner: &'a mut dyn Write) -> Self {
1220        Self { inner, count: 0 }
1221    }
1222
1223    fn bytes_written(&self) -> u64 {
1224        self.count
1225    }
1226}
1227
1228impl Write for CountingWriter<'_> {
1229    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1230        let n = self.inner.write(buf)?;
1231        self.count += n as u64;
1232        Ok(n)
1233    }
1234
1235    fn flush(&mut self) -> io::Result<()> {
1236        self.inner.flush()
1237    }
1238}
1239
1240// ---------------------------------------------------------------------------
1241// Tests
1242// ---------------------------------------------------------------------------
1243
1244#[cfg(test)]
1245mod tests {
1246    use super::*;
1247    use crate::category::Category;
1248    use crate::generator::HmacGenerator;
1249    use crate::processor::profile::{FieldRule, FileTypeProfile};
1250    use crate::processor::registry::ProcessorRegistry;
1251    use crate::scanner::{ScanConfig, ScanPattern};
1252    use std::io::Cursor;
1253    use std::sync::Mutex;
1254
1255    /// Build a test archive processor with an email pattern and a JSON profile.
1256    fn make_archive_processor() -> ArchiveProcessor {
1257        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
1258        let store = Arc::new(MappingStore::new(gen, None));
1259
1260        let patterns = vec![
1261            ScanPattern::from_regex(
1262                r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
1263                Category::Email,
1264                "email",
1265            )
1266            .unwrap(),
1267            ScanPattern::from_literal("SUPERSECRET", Category::Custom("api_key".into()), "api_key")
1268                .unwrap(),
1269        ];
1270
1271        let scanner = Arc::new(
1272            StreamScanner::new(patterns, Arc::clone(&store), ScanConfig::default()).unwrap(),
1273        );
1274
1275        let registry = Arc::new(ProcessorRegistry::with_builtins());
1276
1277        let profiles = vec![FileTypeProfile::new(
1278            "json",
1279            vec![FieldRule::new("*").with_category(Category::Custom("field".into()))],
1280        )
1281        .with_extension(".json")];
1282
1283        ArchiveProcessor::new(registry, scanner, store, profiles)
1284    }
1285
1286    // -- Tar tests ----------------------------------------------------------
1287
1288    fn build_test_tar(entries: &[(&str, &[u8])]) -> Vec<u8> {
1289        let mut buf = Vec::new();
1290        {
1291            let mut builder = tar::Builder::new(&mut buf);
1292            for (name, data) in entries {
1293                let mut header = tar::Header::new_gnu();
1294                header.set_size(data.len() as u64);
1295                header.set_mode(0o644);
1296                header.set_mtime(1_700_000_000);
1297                header.set_cksum();
1298                builder.append_data(&mut header, *name, *data).unwrap();
1299            }
1300            builder.finish().unwrap();
1301        }
1302        buf
1303    }
1304
1305    #[test]
1306    fn tar_sanitizes_plaintext_with_scanner() {
1307        let proc = make_archive_processor();
1308        let input = build_test_tar(&[("readme.txt", b"Contact alice@corp.com for help.")]);
1309
1310        let mut output = Vec::new();
1311        let stats = proc.process_tar(&input[..], &mut output).unwrap();
1312
1313        assert_eq!(stats.files_processed, 1);
1314        assert_eq!(stats.scanner_fallback, 1);
1315        assert_eq!(stats.structured_hits, 0);
1316
1317        // Verify the output is a valid tar and the secret is gone.
1318        let mut archive = tar::Archive::new(&output[..]);
1319        for entry in archive.entries().unwrap() {
1320            let mut e = entry.unwrap();
1321            let mut content = String::new();
1322            e.read_to_string(&mut content).unwrap();
1323            assert!(
1324                !content.contains("alice@corp.com"),
1325                "email should be sanitized: {content}"
1326            );
1327        }
1328    }
1329
1330    #[test]
1331    fn tar_sanitizes_json_with_structured_processor() {
1332        let proc = make_archive_processor();
1333        let json_content = br#"{"email": "bob@example.org", "name": "Bob"}"#;
1334        let input = build_test_tar(&[("config.json", json_content)]);
1335
1336        let mut output = Vec::new();
1337        let stats = proc.process_tar(&input[..], &mut output).unwrap();
1338
1339        assert_eq!(stats.files_processed, 1);
1340        assert_eq!(stats.structured_hits, 1);
1341        assert_eq!(stats.scanner_fallback, 0);
1342        assert_eq!(
1343            stats.file_methods.get("config.json").unwrap(),
1344            "structured+scan:json"
1345        );
1346
1347        // Verify sanitized output.
1348        let mut archive = tar::Archive::new(&output[..]);
1349        for entry in archive.entries().unwrap() {
1350            let mut e = entry.unwrap();
1351            let mut content = String::new();
1352            e.read_to_string(&mut content).unwrap();
1353            assert!(
1354                !content.contains("bob@example.org"),
1355                "email should be sanitized"
1356            );
1357            assert!(!content.contains("Bob"), "name should be sanitized");
1358        }
1359    }
1360
1361    #[test]
1362    fn tar_preserves_metadata() {
1363        let proc = make_archive_processor();
1364        let input = build_test_tar(&[("data.txt", b"SUPERSECRET token here")]);
1365
1366        let mut output = Vec::new();
1367        proc.process_tar(&input[..], &mut output).unwrap();
1368
1369        let mut archive = tar::Archive::new(&output[..]);
1370        for entry in archive.entries().unwrap() {
1371            let e = entry.unwrap();
1372            let hdr = e.header();
1373            assert_eq!(hdr.mode().unwrap(), 0o644);
1374            assert_eq!(hdr.mtime().unwrap(), 1_700_000_000);
1375        }
1376    }
1377
1378    #[test]
1379    fn tar_handles_multiple_files() {
1380        let proc = make_archive_processor();
1381        let input = build_test_tar(&[
1382            ("a.txt", b"alice@corp.com"),
1383            ("b.json", br#"{"key":"value"}"#),
1384            ("c.log", b"no secrets here"),
1385        ]);
1386
1387        let mut output = Vec::new();
1388        let stats = proc.process_tar(&input[..], &mut output).unwrap();
1389
1390        assert_eq!(stats.files_processed, 3);
1391        // b.json matched the JSON profile
1392        assert_eq!(stats.structured_hits, 1);
1393        // a.txt and c.log fall back to scanner
1394        assert_eq!(stats.scanner_fallback, 2);
1395    }
1396
1397    #[test]
1398    fn tar_passes_through_directories() {
1399        let mut buf = Vec::new();
1400        {
1401            let mut builder = tar::Builder::new(&mut buf);
1402
1403            // Add a directory entry.
1404            let mut dir_header = tar::Header::new_gnu();
1405            dir_header.set_entry_type(tar::EntryType::Directory);
1406            dir_header.set_size(0);
1407            dir_header.set_mode(0o755);
1408            dir_header.set_cksum();
1409            builder
1410                .append_data(&mut dir_header, "mydir/", &b""[..])
1411                .unwrap();
1412
1413            // Add a file.
1414            let mut file_header = tar::Header::new_gnu();
1415            file_header.set_size(5);
1416            file_header.set_mode(0o644);
1417            file_header.set_cksum();
1418            builder
1419                .append_data(&mut file_header, "mydir/hello.txt", &b"hello"[..])
1420                .unwrap();
1421
1422            builder.finish().unwrap();
1423        }
1424
1425        let proc = make_archive_processor();
1426        let mut output = Vec::new();
1427        let stats = proc.process_tar(&buf[..], &mut output).unwrap();
1428
1429        assert_eq!(stats.entries_skipped, 1);
1430        assert_eq!(stats.files_processed, 1);
1431    }
1432
1433    // -- Tar.gz tests -------------------------------------------------------
1434
1435    #[test]
1436    fn tar_gz_round_trip() {
1437        let proc = make_archive_processor();
1438
1439        // Build a tar and gzip it.
1440        let tar_data = build_test_tar(&[("secret.txt", b"Key is SUPERSECRET okay")]);
1441        let mut gz_input = Vec::new();
1442        {
1443            let mut encoder =
1444                flate2::write::GzEncoder::new(&mut gz_input, flate2::Compression::fast());
1445            encoder.write_all(&tar_data).unwrap();
1446            encoder.finish().unwrap();
1447        }
1448
1449        let mut gz_output = Vec::new();
1450        let stats = proc.process_tar_gz(&gz_input[..], &mut gz_output).unwrap();
1451
1452        assert_eq!(stats.files_processed, 1);
1453        assert_eq!(stats.scanner_fallback, 1);
1454
1455        // Decompress and verify.
1456        let decoder = flate2::read::GzDecoder::new(&gz_output[..]);
1457        let mut archive = tar::Archive::new(decoder);
1458        for entry in archive.entries().unwrap() {
1459            let mut e = entry.unwrap();
1460            let mut content = String::new();
1461            e.read_to_string(&mut content).unwrap();
1462            assert!(
1463                !content.contains("SUPERSECRET"),
1464                "secret should be sanitized: {content}"
1465            );
1466        }
1467    }
1468
1469    // -- Zip tests ----------------------------------------------------------
1470
1471    fn build_test_zip(entries: &[(&str, &[u8])]) -> Vec<u8> {
1472        let mut buf = Cursor::new(Vec::new());
1473        {
1474            let mut zip = zip::ZipWriter::new(&mut buf);
1475            for (name, data) in entries {
1476                let options = zip::write::FileOptions::default()
1477                    .compression_method(zip::CompressionMethod::Deflated);
1478                zip.start_file(*name, options).unwrap();
1479                zip.write_all(data).unwrap();
1480            }
1481            zip.finish().unwrap();
1482        }
1483        buf.into_inner()
1484    }
1485
1486    #[test]
1487    fn zip_sanitizes_plaintext_with_scanner() {
1488        let proc = make_archive_processor();
1489        let zip_data = build_test_zip(&[("notes.txt", b"Reach alice@corp.com for info.")]);
1490
1491        let reader = Cursor::new(&zip_data);
1492        let mut writer = Cursor::new(Vec::new());
1493        let stats = proc.process_zip(reader, &mut writer).unwrap();
1494
1495        assert_eq!(stats.files_processed, 1);
1496        assert_eq!(stats.scanner_fallback, 1);
1497
1498        // Verify the output zip.
1499        let out_data = writer.into_inner();
1500        let mut zip_out = zip::ZipArchive::new(Cursor::new(out_data)).unwrap();
1501        let mut entry = zip_out.by_index(0).unwrap();
1502        let mut content = String::new();
1503        entry.read_to_string(&mut content).unwrap();
1504        assert!(
1505            !content.contains("alice@corp.com"),
1506            "email should be sanitized: {content}"
1507        );
1508    }
1509
1510    #[test]
1511    fn zip_sanitizes_json_with_structured_processor() {
1512        let proc = make_archive_processor();
1513        let json_content = br#"{"password": "hunter2", "host": "db.internal"}"#;
1514        let zip_data = build_test_zip(&[("settings.json", json_content)]);
1515
1516        let reader = Cursor::new(&zip_data);
1517        let mut writer = Cursor::new(Vec::new());
1518        let stats = proc.process_zip(reader, &mut writer).unwrap();
1519
1520        assert_eq!(stats.files_processed, 1);
1521        assert_eq!(stats.structured_hits, 1);
1522
1523        let out_data = writer.into_inner();
1524        let mut zip_out = zip::ZipArchive::new(Cursor::new(out_data)).unwrap();
1525        let mut entry = zip_out.by_index(0).unwrap();
1526        let mut content = String::new();
1527        entry.read_to_string(&mut content).unwrap();
1528        assert!(!content.contains("hunter2"), "password should be sanitized");
1529        assert!(!content.contains("db.internal"), "host should be sanitized");
1530    }
1531
1532    #[test]
1533    fn zip_preserves_directory_entries() {
1534        let mut buf = Cursor::new(Vec::new());
1535        {
1536            let mut zip = zip::ZipWriter::new(&mut buf);
1537
1538            let dir_options = zip::write::FileOptions::default();
1539            zip.add_directory("subdir/", dir_options).unwrap();
1540
1541            let file_options = zip::write::FileOptions::default()
1542                .compression_method(zip::CompressionMethod::Stored);
1543            zip.start_file("subdir/data.txt", file_options).unwrap();
1544            zip.write_all(b"SUPERSECRET value").unwrap();
1545
1546            zip.finish().unwrap();
1547        }
1548
1549        let zip_data = buf.into_inner();
1550        let proc = make_archive_processor();
1551        let reader = Cursor::new(&zip_data);
1552        let mut writer = Cursor::new(Vec::new());
1553        let stats = proc.process_zip(reader, &mut writer).unwrap();
1554
1555        assert_eq!(stats.entries_skipped, 1); // directory
1556        assert_eq!(stats.files_processed, 1);
1557    }
1558
1559    #[test]
1560    fn zip_handles_multiple_files() {
1561        let proc = make_archive_processor();
1562        let zip_data = build_test_zip(&[
1563            ("file1.txt", b"alice@corp.com"),
1564            ("file2.json", br#"{"secret":"SUPERSECRET"}"#),
1565            ("file3.log", b"nothing to see"),
1566        ]);
1567
1568        let reader = Cursor::new(&zip_data);
1569        let mut writer = Cursor::new(Vec::new());
1570        let stats = proc.process_zip(reader, &mut writer).unwrap();
1571
1572        assert_eq!(stats.files_processed, 3);
1573        assert_eq!(stats.structured_hits, 1); // JSON
1574        assert_eq!(stats.scanner_fallback, 2); // .txt + .log
1575    }
1576
1577    #[test]
1578    fn tar_progress_callback_receives_updates() {
1579        let updates = Arc::new(Mutex::new(Vec::new()));
1580        let proc = make_archive_processor().with_progress_callback({
1581            let updates = Arc::clone(&updates);
1582            Arc::new(move |progress| {
1583                updates.lock().expect("archive progress lock").push(progress.clone());
1584            })
1585        });
1586        let input = build_test_tar(&[("a.txt", b"alice@corp.com"), ("b.txt", b"SUPERSECRET")]);
1587
1588        let mut output = Vec::new();
1589        let stats = proc.process_tar(&input[..], &mut output).unwrap();
1590        let updates = updates.lock().unwrap();
1591
1592        assert_eq!(updates.len(), 2);
1593        assert_eq!(updates.last().unwrap().entries_seen, 2);
1594        assert_eq!(
1595            updates.last().unwrap().files_processed,
1596            stats.files_processed
1597        );
1598        assert_eq!(updates.last().unwrap().total_entries, None);
1599    }
1600
1601    #[test]
1602    fn zip_progress_callback_reports_total_entries() {
1603        let updates = Arc::new(Mutex::new(Vec::new()));
1604        let proc = make_archive_processor().with_progress_callback({
1605            let updates = Arc::clone(&updates);
1606            Arc::new(move |progress| {
1607                updates.lock().expect("archive progress lock").push(progress.clone());
1608            })
1609        });
1610        let zip_data = build_test_zip(&[
1611            ("file1.txt", b"alice@corp.com"),
1612            ("file2.log", b"nothing to see"),
1613        ]);
1614
1615        let reader = Cursor::new(&zip_data);
1616        let mut writer = Cursor::new(Vec::new());
1617        let stats = proc.process_zip(reader, &mut writer).unwrap();
1618        let updates = updates.lock().unwrap();
1619
1620        assert_eq!(updates.len(), 2);
1621        assert_eq!(
1622            updates.last().unwrap().files_processed,
1623            stats.files_processed
1624        );
1625        assert_eq!(updates.last().unwrap().total_entries, Some(2));
1626        assert_eq!(updates.last().unwrap().current_entry, "file2.log");
1627    }
1628
1629    // -- Format detection tests ---------------------------------------------
1630
1631    #[test]
1632    fn format_detection_from_path() {
1633        assert_eq!(
1634            ArchiveFormat::from_path("data.tar"),
1635            Some(ArchiveFormat::Tar)
1636        );
1637        assert_eq!(
1638            ArchiveFormat::from_path("data.tar.gz"),
1639            Some(ArchiveFormat::TarGz)
1640        );
1641        assert_eq!(
1642            ArchiveFormat::from_path("data.tgz"),
1643            Some(ArchiveFormat::TarGz)
1644        );
1645        assert_eq!(
1646            ArchiveFormat::from_path("data.zip"),
1647            Some(ArchiveFormat::Zip)
1648        );
1649        assert_eq!(
1650            ArchiveFormat::from_path("DATA.ZIP"),
1651            Some(ArchiveFormat::Zip)
1652        );
1653        assert_eq!(ArchiveFormat::from_path("photo.png"), None);
1654    }
1655
1656    // -- Determinism / dedup tests ------------------------------------------
1657
1658    #[test]
1659    fn same_secret_gets_same_replacement_across_entries() {
1660        let proc = make_archive_processor();
1661        let input = build_test_tar(&[
1662            ("a.txt", b"contact alice@corp.com"),
1663            ("b.txt", b"reach alice@corp.com"),
1664        ]);
1665
1666        let mut output = Vec::new();
1667        proc.process_tar(&input[..], &mut output).unwrap();
1668
1669        let mut archive = tar::Archive::new(&output[..]);
1670        let mut contents: Vec<String> = Vec::new();
1671        for entry in archive.entries().unwrap() {
1672            let mut e = entry.unwrap();
1673            let mut s = String::new();
1674            e.read_to_string(&mut s).unwrap();
1675            contents.push(s);
1676        }
1677
1678        // Both files should have the *same* replacement for alice@corp.com.
1679        // Extract the replacement by removing the prefix.
1680        let replacement_a = contents[0].strip_prefix("contact ").unwrap();
1681        let replacement_b = contents[1].strip_prefix("reach ").unwrap();
1682        assert_eq!(
1683            replacement_a, replacement_b,
1684            "dedup should produce identical replacements"
1685        );
1686        assert!(!replacement_a.contains("alice@corp.com"));
1687    }
1688
1689    // -- Auto-dispatch test -------------------------------------------------
1690
1691    #[test]
1692    fn process_auto_dispatch_tar() {
1693        let proc = make_archive_processor();
1694        let tar_data = build_test_tar(&[("f.txt", b"SUPERSECRET")]);
1695
1696        let reader = Cursor::new(tar_data);
1697        let writer = Cursor::new(Vec::new());
1698        let stats = proc.process(reader, writer, ArchiveFormat::Tar).unwrap();
1699
1700        assert_eq!(stats.files_processed, 1);
1701    }
1702
1703    #[test]
1704    fn process_auto_dispatch_zip() {
1705        let proc = make_archive_processor();
1706        let zip_data = build_test_zip(&[("f.txt", b"SUPERSECRET")]);
1707
1708        let reader = Cursor::new(zip_data);
1709        let mut writer = Cursor::new(Vec::new());
1710        let stats = proc
1711            .process(reader, &mut writer, ArchiveFormat::Zip)
1712            .unwrap();
1713
1714        assert_eq!(stats.files_processed, 1);
1715    }
1716
1717    // -- Empty archive tests ------------------------------------------------
1718
1719    #[test]
1720    fn tar_empty_archive() {
1721        let proc = make_archive_processor();
1722        let tar_data = build_test_tar(&[]);
1723
1724        let mut output = Vec::new();
1725        let stats = proc.process_tar(&tar_data[..], &mut output).unwrap();
1726
1727        assert_eq!(stats.files_processed, 0);
1728        assert_eq!(stats.entries_skipped, 0);
1729    }
1730
1731    #[test]
1732    fn zip_empty_archive() {
1733        let proc = make_archive_processor();
1734        let zip_data = build_test_zip(&[]);
1735
1736        let reader = Cursor::new(zip_data);
1737        let mut writer = Cursor::new(Vec::new());
1738        let stats = proc.process_zip(reader, &mut writer).unwrap();
1739
1740        assert_eq!(stats.files_processed, 0);
1741    }
1742}