use crate::error::{Result, SanitizeError};
use crate::processor::profile::FileTypeProfile;
use crate::processor::registry::ProcessorRegistry;
use crate::scanner::{ScanStats, StreamScanner};
use crate::store::MappingStore;
use glob::MatchOptions;
use rayon::prelude::*;
use std::collections::HashMap;
use std::io::{self, Read, Seek, Write};
use std::sync::Arc;
const MAX_STRUCTURED_ENTRY_SIZE: u64 = 256 * 1024 * 1024;
const MAX_PARALLEL_ZIP_DATA_SIZE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_MAX_ARCHIVE_DEPTH: u32 = 3;
const MAX_ALLOWED_ARCHIVE_DEPTH: u32 = 10;
const PARALLEL_ENTRY_THRESHOLD: usize = 4;
type ZipEntryResult = (usize, Result<(Vec<u8>, ArchiveStats)>);
#[derive(Default, Clone)]
pub struct ArchiveFilter {
only: Vec<CompiledPattern>,
exclude: Vec<CompiledPattern>,
}
#[derive(Clone)]
enum CompiledPattern {
DirPrefix(String),
Glob(glob::Pattern),
}
const GLOB_OPTS: MatchOptions = MatchOptions {
case_sensitive: true,
require_literal_separator: true,
require_literal_leading_dot: false,
};
impl CompiledPattern {
fn compile(raw: &str) -> std::result::Result<Self, String> {
if raw.ends_with('/') {
Ok(CompiledPattern::DirPrefix(
raw.trim_end_matches('/').to_string(),
))
} else {
glob::Pattern::new(raw)
.map(CompiledPattern::Glob)
.map_err(|e| format!("invalid glob pattern '{raw}': {e}"))
}
}
fn matches(&self, path: &str) -> bool {
match self {
CompiledPattern::DirPrefix(prefix) => {
path == prefix || path.starts_with(&format!("{prefix}/"))
}
CompiledPattern::Glob(pat) => pat.matches_with(path, GLOB_OPTS),
}
}
}
impl ArchiveFilter {
pub fn new(
only: Vec<String>,
exclude: Vec<String>,
) -> std::result::Result<Self, String> {
let only = only
.iter()
.map(|p| CompiledPattern::compile(p))
.collect::<std::result::Result<Vec<_>, _>>()?;
let exclude = exclude
.iter()
.map(|p| CompiledPattern::compile(p))
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(Self { only, exclude })
}
pub fn is_empty(&self) -> bool {
self.only.is_empty() && self.exclude.is_empty()
}
pub fn passes(&self, path: &str) -> bool {
if !self.only.is_empty() && !self.only.iter().any(|p| p.matches(path)) {
return false;
}
if self.exclude.iter().any(|p| p.matches(path)) {
return false;
}
true
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ArchiveFormat {
Zip,
Tar,
TarGz,
}
impl ArchiveFormat {
pub fn from_path(path: &str) -> Option<Self> {
let lower = path.to_ascii_lowercase();
if lower.ends_with(".tar.gz")
|| std::path::Path::new(&lower)
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("tgz"))
{
Some(Self::TarGz)
} else if std::path::Path::new(&lower)
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("tar"))
{
Some(Self::Tar)
} else if std::path::Path::new(&lower)
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("zip"))
{
Some(Self::Zip)
} else {
None
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ArchiveStats {
pub files_processed: u64,
pub entries_skipped: u64,
pub structured_hits: u64,
pub scanner_fallback: u64,
pub nested_archives: u64,
pub total_input_bytes: u64,
pub total_output_bytes: u64,
pub file_methods: HashMap<String, String>,
pub file_scan_stats: HashMap<String, ScanStats>,
pub entries_filtered: u64,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ArchiveProgress {
pub entries_seen: u64,
pub files_processed: u64,
pub entries_skipped: u64,
pub total_entries: Option<u64>,
pub current_entry: String,
}
type ArchiveProgressCallback = Arc<dyn Fn(&ArchiveProgress) + Send + Sync>;
impl ArchiveStats {
fn merge(&mut self, child: &ArchiveStats) {
self.files_processed += child.files_processed;
self.entries_skipped += child.entries_skipped;
self.structured_hits += child.structured_hits;
self.scanner_fallback += child.scanner_fallback;
self.nested_archives += child.nested_archives;
self.total_input_bytes += child.total_input_bytes;
self.total_output_bytes += child.total_output_bytes;
self.entries_filtered += child.entries_filtered;
self.file_methods
.extend(child.file_methods.iter().map(|(k, v)| (k.clone(), v.clone())));
self.file_scan_stats
.extend(child.file_scan_stats.iter().map(|(k, v)| (k.clone(), v.clone())));
}
}
pub struct ArchiveProcessor {
registry: Arc<ProcessorRegistry>,
scanner: Arc<StreamScanner>,
store: Arc<MappingStore>,
profiles: Vec<FileTypeProfile>,
max_depth: u32,
progress_callback: Option<ArchiveProgressCallback>,
parallel_threshold: usize,
filter: ArchiveFilter,
force_text: bool,
}
impl ArchiveProcessor {
pub fn new(
registry: Arc<ProcessorRegistry>,
scanner: Arc<StreamScanner>,
store: Arc<MappingStore>,
profiles: Vec<FileTypeProfile>,
) -> Self {
Self {
registry,
scanner,
store,
profiles,
max_depth: DEFAULT_MAX_ARCHIVE_DEPTH,
progress_callback: None,
parallel_threshold: PARALLEL_ENTRY_THRESHOLD,
filter: ArchiveFilter::default(),
force_text: false,
}
}
#[must_use]
pub fn with_max_depth(mut self, depth: u32) -> Self {
self.max_depth = depth.min(MAX_ALLOWED_ARCHIVE_DEPTH);
self
}
#[must_use]
pub fn with_parallel_threshold(mut self, threshold: usize) -> Self {
self.parallel_threshold = threshold;
self
}
#[must_use]
pub fn with_progress_callback(mut self, callback: ArchiveProgressCallback) -> Self {
self.progress_callback = Some(callback);
self
}
#[must_use]
pub fn with_filter(mut self, filter: ArchiveFilter) -> Self {
self.filter = filter;
self
}
#[must_use]
pub fn with_force_text(mut self, force_text: bool) -> Self {
self.force_text = force_text;
self
}
fn find_profile(&self, filename: &str) -> Option<&FileTypeProfile> {
self.profiles.iter().find(|p| p.matches_filename(filename))
}
fn emit_progress(&self, stats: &ArchiveStats, total_entries: Option<u64>, current_entry: &str) {
if let Some(callback) = &self.progress_callback {
callback(&ArchiveProgress {
entries_seen: stats.files_processed + stats.entries_skipped,
files_processed: stats.files_processed,
entries_skipped: stats.entries_skipped,
total_entries,
current_entry: current_entry.to_string(),
});
}
}
fn sanitize_entry_bytes(
&self,
filename: &str,
data: &[u8],
entry_size_hint: Option<u64>,
depth: u32,
) -> Result<(Vec<u8>, ArchiveStats)> {
let mut out: Vec<u8> = Vec::with_capacity(data.len());
let mut entry_stats = ArchiveStats::default();
let mut reader = io::Cursor::new(data);
self.sanitize_entry(
filename,
&mut reader,
&mut out,
&mut entry_stats,
entry_size_hint,
depth,
)?;
Ok((out, entry_stats))
}
#[allow(clippy::missing_errors_doc)] fn sanitize_entry(
&self,
filename: &str,
reader: &mut dyn Read,
writer: &mut dyn Write,
stats: &mut ArchiveStats,
entry_size_hint: Option<u64>,
depth: u32,
) -> Result<()> {
if let Some(nested_fmt) = ArchiveFormat::from_path(filename) {
return self.sanitize_nested_archive(
filename,
reader,
writer,
stats,
entry_size_hint,
nested_fmt,
depth,
);
}
let within_size_cap = entry_size_hint.map_or(true, |sz| sz <= MAX_STRUCTURED_ENTRY_SIZE);
if !self.force_text && within_size_cap {
if let Some(profile) = self.find_profile(filename) {
let mut content = Vec::new();
reader.read_to_end(&mut content).map_err(|e| {
SanitizeError::ArchiveError(format!("read entry '{filename}': {e}"))
})?;
stats.total_input_bytes += content.len() as u64;
match self.registry.process(&content, profile, &self.store) {
Ok(Some(structured_out)) => {
let (output, scan_stats) = self.scanner.scan_bytes(&structured_out)?;
stats.structured_hits += 1;
stats.total_output_bytes += output.len() as u64;
stats.file_methods.insert(
filename.to_string(),
format!("structured+scan:{}", profile.processor),
);
stats
.file_scan_stats
.insert(filename.to_string(), scan_stats);
writer.write_all(&output).map_err(|e| {
SanitizeError::ArchiveError(format!("write entry '{filename}': {e}"))
})?;
return Ok(());
}
Ok(None) => {} Err(_) => {} }
let (output, scan_stats) = self.scanner.scan_bytes(&content)?;
stats.scanner_fallback += 1;
stats.total_output_bytes += output.len() as u64;
stats
.file_methods
.insert(filename.to_string(), "scanner".to_string());
stats
.file_scan_stats
.insert(filename.to_string(), scan_stats);
writer.write_all(&output).map_err(|e| {
SanitizeError::ArchiveError(format!("write entry '{filename}': {e}"))
})?;
return Ok(());
}
}
let mut counting_r = CountingReader::new(reader);
let mut counting_w = CountingWriter::new(writer);
let scan_stats = self.scanner.scan_reader(&mut counting_r, &mut counting_w)?;
stats.scanner_fallback += 1;
stats.total_input_bytes += counting_r.bytes_read();
stats.total_output_bytes += counting_w.bytes_written();
stats
.file_methods
.insert(filename.to_string(), "scanner".to_string());
stats
.file_scan_stats
.insert(filename.to_string(), scan_stats);
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn sanitize_nested_archive(
&self,
filename: &str,
reader: &mut dyn Read,
writer: &mut dyn Write,
stats: &mut ArchiveStats,
entry_size_hint: Option<u64>,
nested_fmt: ArchiveFormat,
depth: u32,
) -> Result<()> {
if depth >= self.max_depth {
return Err(SanitizeError::RecursionDepthExceeded(format!(
"nested archive '{}' at depth {} exceeds maximum nesting depth of {}",
filename, depth, self.max_depth,
)));
}
if let Some(sz) = entry_size_hint {
if sz > MAX_STRUCTURED_ENTRY_SIZE {
return Err(SanitizeError::ArchiveError(format!(
"nested archive '{}' is too large ({} bytes, limit {} bytes)",
filename, sz, MAX_STRUCTURED_ENTRY_SIZE,
)));
}
}
let mut content = Vec::new();
reader.read_to_end(&mut content).map_err(|e| {
SanitizeError::ArchiveError(format!("read nested archive '{filename}': {e}"))
})?;
stats.total_input_bytes += content.len() as u64;
let mut output_buf: Vec<u8> = Vec::new();
let child_stats = match nested_fmt {
ArchiveFormat::Tar => {
self.process_tar_at_depth(&content[..], &mut output_buf, depth + 1)?
}
ArchiveFormat::TarGz => {
self.process_tar_gz_at_depth(&content[..], &mut output_buf, depth + 1)?
}
ArchiveFormat::Zip => {
let reader = io::Cursor::new(&content);
let mut writer = io::Cursor::new(Vec::new());
let s = self.process_zip_at_depth(reader, &mut writer, depth + 1)?;
output_buf = writer.into_inner();
s
}
};
stats.nested_archives += 1;
stats.merge(&child_stats);
stats.total_output_bytes += output_buf.len() as u64;
let fmt_name = match nested_fmt {
ArchiveFormat::Tar => "tar",
ArchiveFormat::TarGz => "tar.gz",
ArchiveFormat::Zip => "zip",
};
stats
.file_methods
.insert(filename.to_string(), format!("nested:{fmt_name}"));
writer.write_all(&output_buf).map_err(|e| {
SanitizeError::ArchiveError(format!("write nested archive '{filename}': {e}"))
})?;
Ok(())
}
pub fn discover_profiles_tar<R: Read>(&self, reader: R) -> Result<()> {
if self.profiles.is_empty() {
return Ok(());
}
let mut archive = tar::Archive::new(reader);
let entries = archive
.entries()
.map_err(|e| SanitizeError::ArchiveError(format!("discover tar entries: {e}")))?;
for entry_result in entries {
let mut entry = entry_result
.map_err(|e| SanitizeError::ArchiveError(format!("discover tar entry: {e}")))?;
if !entry.header().entry_type().is_file() {
continue;
}
let path = entry
.path()
.map_err(|e| SanitizeError::ArchiveError(format!("entry path: {e}")))?
.to_string_lossy()
.to_string();
let Some(profile) = self.find_profile(&path) else {
continue;
};
let mut content = Vec::new();
entry
.read_to_end(&mut content)
.map_err(|e| SanitizeError::ArchiveError(format!("read '{path}': {e}")))?;
let _ = self.registry.process(&content, profile, &self.store);
}
Ok(())
}
pub fn discover_profiles_tar_gz<R: Read>(&self, reader: R) -> Result<()> {
let gz = flate2::read::GzDecoder::new(reader);
self.discover_profiles_tar(gz)
}
pub fn discover_profiles_zip<R: Read + Seek>(&self, reader: R) -> Result<()> {
if self.profiles.is_empty() {
return Ok(());
}
let mut zip = zip::ZipArchive::new(reader)
.map_err(|e| SanitizeError::ArchiveError(format!("open zip for discovery: {e}")))?;
for i in 0..zip.len() {
let mut entry = zip
.by_index(i)
.map_err(|e| SanitizeError::ArchiveError(format!("zip entry {i}: {e}")))?;
if entry.is_dir() {
continue;
}
let name = entry.name().to_string();
let Some(profile) = self.find_profile(&name) else {
continue;
};
let mut content = Vec::new();
entry
.read_to_end(&mut content)
.map_err(|e| SanitizeError::ArchiveError(format!("read '{name}': {e}")))?;
let _ = self.registry.process(&content, profile, &self.store);
}
Ok(())
}
pub fn process_tar<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ArchiveStats> {
self.process_tar_at_depth(reader, writer, 0)
}
fn process_tar_at_depth<R: Read, W: Write>(
&self,
reader: R,
writer: W,
depth: u32,
) -> Result<ArchiveStats> {
let mut stats = ArchiveStats::default();
let mut archive = tar::Archive::new(reader);
let mut builder = tar::Builder::new(writer);
let entries_iter = archive
.entries()
.map_err(|e| SanitizeError::ArchiveError(format!("read tar entries: {}", e)))?;
for entry_result in entries_iter {
let mut entry = entry_result
.map_err(|e| SanitizeError::ArchiveError(format!("read tar entry: {}", e)))?;
let header = entry.header().clone();
let path = entry
.path()
.map_err(|e| SanitizeError::ArchiveError(format!("entry path: {}", e)))?
.to_string_lossy()
.to_string();
let is_file = header.entry_type().is_file();
if !is_file {
let mut data = Vec::new();
entry.read_to_end(&mut data).map_err(|e| {
SanitizeError::ArchiveError(format!("read tar entry '{}': {}", path, e))
})?;
drop(entry);
builder.append(&header, &*data).map_err(|e| {
SanitizeError::ArchiveError(format!(
"append non-file entry '{}': {}",
path, e
))
})?;
stats.entries_skipped += 1;
self.emit_progress(&stats, None, &path);
continue;
}
if !self.filter.passes(&path) {
stats.entries_filtered += 1;
continue;
}
let size_hint = header.size().ok();
let mut sanitized_buf: Vec<u8> = Vec::new();
let mut entry_stats = ArchiveStats::default();
self.sanitize_entry(
&path,
&mut entry,
&mut sanitized_buf,
&mut entry_stats,
size_hint,
depth,
)?;
drop(entry);
let mut new_header = header.clone();
new_header.set_size(sanitized_buf.len() as u64);
new_header.set_cksum();
builder.append(&new_header, &*sanitized_buf).map_err(|e| {
SanitizeError::ArchiveError(format!("append entry '{}': {}", path, e))
})?;
drop(sanitized_buf);
stats.merge(&entry_stats);
stats.files_processed += 1;
self.emit_progress(&stats, None, &path);
}
builder
.finish()
.map_err(|e| SanitizeError::ArchiveError(format!("finalize tar: {}", e)))?;
Ok(stats)
}
pub fn process_tar_gz<R: Read, W: Write>(&self, reader: R, writer: W) -> Result<ArchiveStats> {
self.process_tar_gz_at_depth(reader, writer, 0)
}
fn process_tar_gz_at_depth<R: Read, W: Write>(
&self,
reader: R,
writer: W,
depth: u32,
) -> Result<ArchiveStats> {
let gz_reader = flate2::read::GzDecoder::new(reader);
let gz_writer = flate2::write::GzEncoder::new(writer, flate2::Compression::fast());
let stats = self.process_tar_at_depth(gz_reader, gz_writer, depth)?;
Ok(stats)
}
pub fn process_zip<R: Read + Seek, W: Write + Seek>(
&self,
reader: R,
writer: W,
) -> Result<ArchiveStats> {
self.process_zip_at_depth(reader, writer, 0)
}
#[allow(clippy::too_many_lines)]
fn process_zip_at_depth<R: Read + Seek, W: Write + Seek>(
&self,
reader: R,
writer: W,
depth: u32,
) -> Result<ArchiveStats> {
struct ZipMeta {
name: String,
is_dir: bool,
compression: zip::CompressionMethod,
last_modified: zip::DateTime,
unix_mode: Option<u32>,
size: u64,
}
let mut zip_in = zip::ZipArchive::new(reader)
.map_err(|e| SanitizeError::ArchiveError(format!("open zip: {}", e)))?;
let total_entries = zip_in.len();
let total_entries_hint = Some(total_entries as u64);
let mut metas: Vec<ZipMeta> = Vec::with_capacity(total_entries);
let mut file_count = 0usize;
let mut total_uncompressed_size: u64 = 0;
for i in 0..total_entries {
let entry = zip_in
.by_index(i)
.map_err(|e| SanitizeError::ArchiveError(format!("zip entry {}: {}", i, e)))?;
let is_dir = entry.is_dir();
let size = entry.size();
if !is_dir {
file_count += 1;
total_uncompressed_size = total_uncompressed_size.saturating_add(size);
}
metas.push(ZipMeta {
name: entry.name().to_string(),
is_dir,
compression: entry.compression(),
last_modified: entry.last_modified(),
unix_mode: entry.unix_mode(),
size,
});
}
let use_parallel = file_count >= self.parallel_threshold
&& rayon::current_thread_index().is_none()
&& total_uncompressed_size <= MAX_PARALLEL_ZIP_DATA_SIZE;
let mut stats = ArchiveStats::default();
let make_options = |m: &ZipMeta| {
let opts = zip::write::FileOptions::default()
.compression_method(m.compression)
.last_modified_time(m.last_modified);
if let Some(mode) = m.unix_mode {
opts.unix_permissions(mode)
} else {
opts
}
};
if use_parallel {
struct ZipEntry {
meta_idx: usize,
data: Vec<u8>,
}
let mut file_entries: Vec<ZipEntry> = Vec::with_capacity(file_count);
for (i, meta) in metas.iter().enumerate() {
if meta.is_dir {
continue;
}
if !self.filter.passes(&meta.name) {
continue;
}
let mut entry = zip_in.by_index(i).map_err(|e| {
SanitizeError::ArchiveError(format!("zip entry {}: {}", i, e))
})?;
let mut data = Vec::new();
entry.read_to_end(&mut data).map_err(|e| {
SanitizeError::ArchiveError(format!("read zip entry '{}': {}", meta.name, e))
})?;
file_entries.push(ZipEntry { meta_idx: i, data });
}
let results: Vec<ZipEntryResult> = file_entries
.into_par_iter()
.map(|e| {
let meta = &metas[e.meta_idx];
let result =
self.sanitize_entry_bytes(&meta.name, &e.data, Some(meta.size), depth);
(e.meta_idx, result)
})
.collect();
let mut sanitized: Vec<Option<(Vec<u8>, ArchiveStats)>> = vec![None; metas.len()];
for (meta_idx, r) in results {
sanitized[meta_idx] = Some(r?);
}
let mut zip_out = zip::ZipWriter::new(writer);
for (i, meta) in metas.iter().enumerate() {
let options = make_options(meta);
if meta.is_dir {
zip_out.add_directory(&meta.name, options).map_err(|e| {
SanitizeError::ArchiveError(format!("add dir '{}': {}", meta.name, e))
})?;
stats.entries_skipped += 1;
self.emit_progress(&stats, total_entries_hint, &meta.name);
continue;
}
if !self.filter.passes(&meta.name) {
stats.entries_filtered += 1;
self.emit_progress(&stats, total_entries_hint, &meta.name);
continue;
}
let (sanitized_buf, entry_stats) = sanitized[i]
.take()
.expect("file entry sanitization result missing");
stats.merge(&entry_stats);
zip_out.start_file(&meta.name, options).map_err(|e| {
SanitizeError::ArchiveError(format!("start file '{}': {}", meta.name, e))
})?;
zip_out.write_all(&sanitized_buf).map_err(|e| {
SanitizeError::ArchiveError(format!("write file '{}': {}", meta.name, e))
})?;
stats.files_processed += 1;
self.emit_progress(&stats, total_entries_hint, &meta.name);
}
zip_out
.finish()
.map_err(|e| SanitizeError::ArchiveError(format!("finalize zip: {}", e)))?;
} else {
let mut zip_out = zip::ZipWriter::new(writer);
for (i, meta) in metas.iter().enumerate() {
let options = make_options(meta);
if meta.is_dir {
zip_out.add_directory(&meta.name, options).map_err(|e| {
SanitizeError::ArchiveError(format!("add dir '{}': {}", meta.name, e))
})?;
stats.entries_skipped += 1;
self.emit_progress(&stats, total_entries_hint, &meta.name);
continue;
}
if !self.filter.passes(&meta.name) {
stats.entries_filtered += 1;
self.emit_progress(&stats, total_entries_hint, &meta.name);
continue;
}
let data = {
let mut entry = zip_in.by_index(i).map_err(|e| {
SanitizeError::ArchiveError(format!("zip entry {}: {}", i, e))
})?;
let mut buf = Vec::new();
entry.read_to_end(&mut buf).map_err(|e| {
SanitizeError::ArchiveError(format!(
"read zip entry '{}': {}",
meta.name, e
))
})?;
buf
};
let (sanitized_buf, entry_stats) =
self.sanitize_entry_bytes(&meta.name, &data, Some(meta.size), depth)?;
drop(data);
zip_out.start_file(&meta.name, options).map_err(|e| {
SanitizeError::ArchiveError(format!("start file '{}': {}", meta.name, e))
})?;
zip_out.write_all(&sanitized_buf).map_err(|e| {
SanitizeError::ArchiveError(format!("write file '{}': {}", meta.name, e))
})?;
drop(sanitized_buf);
stats.merge(&entry_stats);
stats.files_processed += 1;
self.emit_progress(&stats, total_entries_hint, &meta.name);
}
zip_out
.finish()
.map_err(|e| SanitizeError::ArchiveError(format!("finalize zip: {}", e)))?;
}
Ok(stats)
}
pub fn process<R: Read + Seek, W: Write + Seek>(
&self,
reader: R,
writer: W,
format: ArchiveFormat,
) -> Result<ArchiveStats> {
match format {
ArchiveFormat::Zip => self.process_zip(reader, writer),
ArchiveFormat::Tar => self.process_tar(reader, writer),
ArchiveFormat::TarGz => self.process_tar_gz(reader, writer),
}
}
}
struct CountingReader<'a> {
inner: &'a mut dyn Read,
count: u64,
}
impl<'a> CountingReader<'a> {
fn new(inner: &'a mut dyn Read) -> Self {
Self { inner, count: 0 }
}
fn bytes_read(&self) -> u64 {
self.count
}
}
impl Read for CountingReader<'_> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.inner.read(buf)?;
self.count += n as u64;
Ok(n)
}
}
struct CountingWriter<'a> {
inner: &'a mut dyn Write,
count: u64,
}
impl<'a> CountingWriter<'a> {
fn new(inner: &'a mut dyn Write) -> Self {
Self { inner, count: 0 }
}
fn bytes_written(&self) -> u64 {
self.count
}
}
impl Write for CountingWriter<'_> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n = self.inner.write(buf)?;
self.count += n as u64;
Ok(n)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::category::Category;
use crate::generator::HmacGenerator;
use crate::processor::profile::{FieldRule, FileTypeProfile};
use crate::processor::registry::ProcessorRegistry;
use crate::scanner::{ScanConfig, ScanPattern};
use std::io::Cursor;
use std::sync::Mutex;
fn make_archive_processor() -> ArchiveProcessor {
let gen = Arc::new(HmacGenerator::new([42u8; 32]));
let store = Arc::new(MappingStore::new(gen, None));
let patterns = vec![
ScanPattern::from_regex(
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
Category::Email,
"email",
)
.unwrap(),
ScanPattern::from_literal("SUPERSECRET", Category::Custom("api_key".into()), "api_key")
.unwrap(),
];
let scanner = Arc::new(
StreamScanner::new(patterns, Arc::clone(&store), ScanConfig::default()).unwrap(),
);
let registry = Arc::new(ProcessorRegistry::with_builtins());
let profiles = vec![FileTypeProfile::new(
"json",
vec![FieldRule::new("*").with_category(Category::Custom("field".into()))],
)
.with_extension(".json")];
ArchiveProcessor::new(registry, scanner, store, profiles)
}
fn build_test_tar(entries: &[(&str, &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
{
let mut builder = tar::Builder::new(&mut buf);
for (name, data) in entries {
let mut header = tar::Header::new_gnu();
header.set_size(data.len() as u64);
header.set_mode(0o644);
header.set_mtime(1_700_000_000);
header.set_cksum();
builder.append_data(&mut header, *name, *data).unwrap();
}
builder.finish().unwrap();
}
buf
}
#[test]
fn tar_sanitizes_plaintext_with_scanner() {
let proc = make_archive_processor();
let input = build_test_tar(&[("readme.txt", b"Contact alice@corp.com for help.")]);
let mut output = Vec::new();
let stats = proc.process_tar(&input[..], &mut output).unwrap();
assert_eq!(stats.files_processed, 1);
assert_eq!(stats.scanner_fallback, 1);
assert_eq!(stats.structured_hits, 0);
let mut archive = tar::Archive::new(&output[..]);
for entry in archive.entries().unwrap() {
let mut e = entry.unwrap();
let mut content = String::new();
e.read_to_string(&mut content).unwrap();
assert!(
!content.contains("alice@corp.com"),
"email should be sanitized: {content}"
);
}
}
#[test]
fn tar_sanitizes_json_with_structured_processor() {
let proc = make_archive_processor();
let json_content = br#"{"email": "bob@example.org", "name": "Bob"}"#;
let input = build_test_tar(&[("config.json", json_content)]);
let mut output = Vec::new();
let stats = proc.process_tar(&input[..], &mut output).unwrap();
assert_eq!(stats.files_processed, 1);
assert_eq!(stats.structured_hits, 1);
assert_eq!(stats.scanner_fallback, 0);
assert_eq!(
stats.file_methods.get("config.json").unwrap(),
"structured+scan:json"
);
let mut archive = tar::Archive::new(&output[..]);
for entry in archive.entries().unwrap() {
let mut e = entry.unwrap();
let mut content = String::new();
e.read_to_string(&mut content).unwrap();
assert!(
!content.contains("bob@example.org"),
"email should be sanitized"
);
assert!(!content.contains("Bob"), "name should be sanitized");
}
}
#[test]
fn tar_preserves_metadata() {
let proc = make_archive_processor();
let input = build_test_tar(&[("data.txt", b"SUPERSECRET token here")]);
let mut output = Vec::new();
proc.process_tar(&input[..], &mut output).unwrap();
let mut archive = tar::Archive::new(&output[..]);
for entry in archive.entries().unwrap() {
let e = entry.unwrap();
let hdr = e.header();
assert_eq!(hdr.mode().unwrap(), 0o644);
assert_eq!(hdr.mtime().unwrap(), 1_700_000_000);
}
}
#[test]
fn tar_handles_multiple_files() {
let proc = make_archive_processor();
let input = build_test_tar(&[
("a.txt", b"alice@corp.com"),
("b.json", br#"{"key":"value"}"#),
("c.log", b"no secrets here"),
]);
let mut output = Vec::new();
let stats = proc.process_tar(&input[..], &mut output).unwrap();
assert_eq!(stats.files_processed, 3);
assert_eq!(stats.structured_hits, 1);
assert_eq!(stats.scanner_fallback, 2);
}
#[test]
fn tar_passes_through_directories() {
let mut buf = Vec::new();
{
let mut builder = tar::Builder::new(&mut buf);
let mut dir_header = tar::Header::new_gnu();
dir_header.set_entry_type(tar::EntryType::Directory);
dir_header.set_size(0);
dir_header.set_mode(0o755);
dir_header.set_cksum();
builder
.append_data(&mut dir_header, "mydir/", &b""[..])
.unwrap();
let mut file_header = tar::Header::new_gnu();
file_header.set_size(5);
file_header.set_mode(0o644);
file_header.set_cksum();
builder
.append_data(&mut file_header, "mydir/hello.txt", &b"hello"[..])
.unwrap();
builder.finish().unwrap();
}
let proc = make_archive_processor();
let mut output = Vec::new();
let stats = proc.process_tar(&buf[..], &mut output).unwrap();
assert_eq!(stats.entries_skipped, 1);
assert_eq!(stats.files_processed, 1);
}
#[test]
fn tar_gz_round_trip() {
let proc = make_archive_processor();
let tar_data = build_test_tar(&[("secret.txt", b"Key is SUPERSECRET okay")]);
let mut gz_input = Vec::new();
{
let mut encoder =
flate2::write::GzEncoder::new(&mut gz_input, flate2::Compression::fast());
encoder.write_all(&tar_data).unwrap();
encoder.finish().unwrap();
}
let mut gz_output = Vec::new();
let stats = proc.process_tar_gz(&gz_input[..], &mut gz_output).unwrap();
assert_eq!(stats.files_processed, 1);
assert_eq!(stats.scanner_fallback, 1);
let decoder = flate2::read::GzDecoder::new(&gz_output[..]);
let mut archive = tar::Archive::new(decoder);
for entry in archive.entries().unwrap() {
let mut e = entry.unwrap();
let mut content = String::new();
e.read_to_string(&mut content).unwrap();
assert!(
!content.contains("SUPERSECRET"),
"secret should be sanitized: {content}"
);
}
}
fn build_test_zip(entries: &[(&str, &[u8])]) -> Vec<u8> {
let mut buf = Cursor::new(Vec::new());
{
let mut zip = zip::ZipWriter::new(&mut buf);
for (name, data) in entries {
let options = zip::write::FileOptions::default()
.compression_method(zip::CompressionMethod::Deflated);
zip.start_file(*name, options).unwrap();
zip.write_all(data).unwrap();
}
zip.finish().unwrap();
}
buf.into_inner()
}
#[test]
fn zip_sanitizes_plaintext_with_scanner() {
let proc = make_archive_processor();
let zip_data = build_test_zip(&[("notes.txt", b"Reach alice@corp.com for info.")]);
let reader = Cursor::new(&zip_data);
let mut writer = Cursor::new(Vec::new());
let stats = proc.process_zip(reader, &mut writer).unwrap();
assert_eq!(stats.files_processed, 1);
assert_eq!(stats.scanner_fallback, 1);
let out_data = writer.into_inner();
let mut zip_out = zip::ZipArchive::new(Cursor::new(out_data)).unwrap();
let mut entry = zip_out.by_index(0).unwrap();
let mut content = String::new();
entry.read_to_string(&mut content).unwrap();
assert!(
!content.contains("alice@corp.com"),
"email should be sanitized: {content}"
);
}
#[test]
fn zip_sanitizes_json_with_structured_processor() {
let proc = make_archive_processor();
let json_content = br#"{"password": "hunter2", "host": "db.internal"}"#;
let zip_data = build_test_zip(&[("settings.json", json_content)]);
let reader = Cursor::new(&zip_data);
let mut writer = Cursor::new(Vec::new());
let stats = proc.process_zip(reader, &mut writer).unwrap();
assert_eq!(stats.files_processed, 1);
assert_eq!(stats.structured_hits, 1);
let out_data = writer.into_inner();
let mut zip_out = zip::ZipArchive::new(Cursor::new(out_data)).unwrap();
let mut entry = zip_out.by_index(0).unwrap();
let mut content = String::new();
entry.read_to_string(&mut content).unwrap();
assert!(!content.contains("hunter2"), "password should be sanitized");
assert!(!content.contains("db.internal"), "host should be sanitized");
}
#[test]
fn zip_preserves_directory_entries() {
let mut buf = Cursor::new(Vec::new());
{
let mut zip = zip::ZipWriter::new(&mut buf);
let dir_options = zip::write::FileOptions::default();
zip.add_directory("subdir/", dir_options).unwrap();
let file_options = zip::write::FileOptions::default()
.compression_method(zip::CompressionMethod::Stored);
zip.start_file("subdir/data.txt", file_options).unwrap();
zip.write_all(b"SUPERSECRET value").unwrap();
zip.finish().unwrap();
}
let zip_data = buf.into_inner();
let proc = make_archive_processor();
let reader = Cursor::new(&zip_data);
let mut writer = Cursor::new(Vec::new());
let stats = proc.process_zip(reader, &mut writer).unwrap();
assert_eq!(stats.entries_skipped, 1); assert_eq!(stats.files_processed, 1);
}
#[test]
fn zip_handles_multiple_files() {
let proc = make_archive_processor();
let zip_data = build_test_zip(&[
("file1.txt", b"alice@corp.com"),
("file2.json", br#"{"secret":"SUPERSECRET"}"#),
("file3.log", b"nothing to see"),
]);
let reader = Cursor::new(&zip_data);
let mut writer = Cursor::new(Vec::new());
let stats = proc.process_zip(reader, &mut writer).unwrap();
assert_eq!(stats.files_processed, 3);
assert_eq!(stats.structured_hits, 1); assert_eq!(stats.scanner_fallback, 2); }
#[test]
fn tar_progress_callback_receives_updates() {
let updates = Arc::new(Mutex::new(Vec::new()));
let proc = make_archive_processor().with_progress_callback({
let updates = Arc::clone(&updates);
Arc::new(move |progress| {
updates.lock().expect("archive progress lock").push(progress.clone());
})
});
let input = build_test_tar(&[("a.txt", b"alice@corp.com"), ("b.txt", b"SUPERSECRET")]);
let mut output = Vec::new();
let stats = proc.process_tar(&input[..], &mut output).unwrap();
let updates = updates.lock().unwrap();
assert_eq!(updates.len(), 2);
assert_eq!(updates.last().unwrap().entries_seen, 2);
assert_eq!(
updates.last().unwrap().files_processed,
stats.files_processed
);
assert_eq!(updates.last().unwrap().total_entries, None);
}
#[test]
fn zip_progress_callback_reports_total_entries() {
let updates = Arc::new(Mutex::new(Vec::new()));
let proc = make_archive_processor().with_progress_callback({
let updates = Arc::clone(&updates);
Arc::new(move |progress| {
updates.lock().expect("archive progress lock").push(progress.clone());
})
});
let zip_data = build_test_zip(&[
("file1.txt", b"alice@corp.com"),
("file2.log", b"nothing to see"),
]);
let reader = Cursor::new(&zip_data);
let mut writer = Cursor::new(Vec::new());
let stats = proc.process_zip(reader, &mut writer).unwrap();
let updates = updates.lock().unwrap();
assert_eq!(updates.len(), 2);
assert_eq!(
updates.last().unwrap().files_processed,
stats.files_processed
);
assert_eq!(updates.last().unwrap().total_entries, Some(2));
assert_eq!(updates.last().unwrap().current_entry, "file2.log");
}
#[test]
fn format_detection_from_path() {
assert_eq!(
ArchiveFormat::from_path("data.tar"),
Some(ArchiveFormat::Tar)
);
assert_eq!(
ArchiveFormat::from_path("data.tar.gz"),
Some(ArchiveFormat::TarGz)
);
assert_eq!(
ArchiveFormat::from_path("data.tgz"),
Some(ArchiveFormat::TarGz)
);
assert_eq!(
ArchiveFormat::from_path("data.zip"),
Some(ArchiveFormat::Zip)
);
assert_eq!(
ArchiveFormat::from_path("DATA.ZIP"),
Some(ArchiveFormat::Zip)
);
assert_eq!(ArchiveFormat::from_path("photo.png"), None);
}
#[test]
fn same_secret_gets_same_replacement_across_entries() {
let proc = make_archive_processor();
let input = build_test_tar(&[
("a.txt", b"contact alice@corp.com"),
("b.txt", b"reach alice@corp.com"),
]);
let mut output = Vec::new();
proc.process_tar(&input[..], &mut output).unwrap();
let mut archive = tar::Archive::new(&output[..]);
let mut contents: Vec<String> = Vec::new();
for entry in archive.entries().unwrap() {
let mut e = entry.unwrap();
let mut s = String::new();
e.read_to_string(&mut s).unwrap();
contents.push(s);
}
let replacement_a = contents[0].strip_prefix("contact ").unwrap();
let replacement_b = contents[1].strip_prefix("reach ").unwrap();
assert_eq!(
replacement_a, replacement_b,
"dedup should produce identical replacements"
);
assert!(!replacement_a.contains("alice@corp.com"));
}
#[test]
fn process_auto_dispatch_tar() {
let proc = make_archive_processor();
let tar_data = build_test_tar(&[("f.txt", b"SUPERSECRET")]);
let reader = Cursor::new(tar_data);
let writer = Cursor::new(Vec::new());
let stats = proc.process(reader, writer, ArchiveFormat::Tar).unwrap();
assert_eq!(stats.files_processed, 1);
}
#[test]
fn process_auto_dispatch_zip() {
let proc = make_archive_processor();
let zip_data = build_test_zip(&[("f.txt", b"SUPERSECRET")]);
let reader = Cursor::new(zip_data);
let mut writer = Cursor::new(Vec::new());
let stats = proc
.process(reader, &mut writer, ArchiveFormat::Zip)
.unwrap();
assert_eq!(stats.files_processed, 1);
}
#[test]
fn tar_empty_archive() {
let proc = make_archive_processor();
let tar_data = build_test_tar(&[]);
let mut output = Vec::new();
let stats = proc.process_tar(&tar_data[..], &mut output).unwrap();
assert_eq!(stats.files_processed, 0);
assert_eq!(stats.entries_skipped, 0);
}
#[test]
fn zip_empty_archive() {
let proc = make_archive_processor();
let zip_data = build_test_zip(&[]);
let reader = Cursor::new(zip_data);
let mut writer = Cursor::new(Vec::new());
let stats = proc.process_zip(reader, &mut writer).unwrap();
assert_eq!(stats.files_processed, 0);
}
}