use anyhow::Result;
use arrow_array::{BooleanArray, FixedSizeBinaryArray, StringArray, UInt64Array};
use std::{
collections::{HashMap, HashSet},
fs::{File, OpenOptions},
os::unix::fs::FileExt,
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
};
use crate::{
common_config::CONFIG,
index::VerifyReport,
};
fn safe_output_path(out_dir: &Path, rel_path: &str) -> Result<PathBuf> {
use std::path::Component;
let mut normalized = PathBuf::new();
for comp in Path::new(rel_path).components() {
match comp {
Component::Normal(c) => normalized.push(c),
Component::CurDir => {}
Component::ParentDir => {
anyhow::bail!("unsafe archive path escapes output dir: {:?}", rel_path)
}
Component::RootDir | Component::Prefix(_) => {
anyhow::bail!("unsafe absolute archive path: {:?}", rel_path)
}
}
}
anyhow::ensure!(!normalized.as_os_str().is_empty(), "empty archive path");
Ok(out_dir.join(normalized))
}
#[derive(Default)]
struct WorkerStats {
total_chunks: u64,
total_written_bytes: u64,
verified_bytes: u64,
corrupt_bytes: u64,
corrupt_rows: Vec<u64>,
}
pub fn decompress_archive(
index_path: &Path,
save_data: bool,
out_dir: &Path,
) -> Result<VerifyReport> {
decompress_archive_filtered(index_path, save_data, out_dir, &crate::index::IndexFilter::default())
}
pub fn decompress_archive_filtered(
index_path: &Path,
save_data: bool,
out_dir: &Path,
filter: &crate::index::IndexFilter,
) -> Result<VerifyReport> {
let (schema, batches) = crate::index::read_znippy_index_filtered(index_path, filter)?;
let batch = Arc::new(match batches.len() {
0 => arrow::record_batch::RecordBatch::new_empty(Arc::new(
crate::index::ZNIPPY_INDEX_SCHEMA.as_ref().clone(),
)),
1 => batches.into_iter().next().unwrap(),
_ => arrow_select::concat::concat_batches(&schema, batches.iter())
.map_err(|e| anyhow::anyhow!("failed to merge index batches: {}", e))?,
});
let total_rows = batch.num_rows();
let paths_col = batch
.column_by_name("relative_path")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut unique_files = HashSet::new();
for i in 0..total_rows {
unique_files.insert(paths_col.value(i));
}
let total_files = unique_files.len();
drop(unique_files);
let output_files: Arc<Vec<Option<Arc<File>>>> = if save_data {
let mut path_to_file: HashMap<&str, Arc<File>> = HashMap::new();
let mut created_dirs: HashSet<PathBuf> = HashSet::new();
let mut files: Vec<Option<Arc<File>>> = Vec::with_capacity(total_rows);
for i in 0..total_rows {
let rel_path = paths_col.value(i);
let f = match path_to_file.get(rel_path) {
Some(f) => Arc::clone(f),
None => {
let full = safe_output_path(out_dir, rel_path)?;
if let Some(parent) = full.parent() {
if created_dirs.insert(parent.to_path_buf()) {
std::fs::create_dir_all(parent).map_err(|e| {
anyhow::anyhow!(
"failed to create output dir {}: {}",
parent.display(),
e
)
})?;
}
}
let file = Arc::new(
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&full)
.map_err(|e| {
anyhow::anyhow!(
"failed to open output file {}: {}",
full.display(),
e
)
})?,
);
path_to_file.insert(rel_path, Arc::clone(&file));
file
}
};
files.push(Some(f));
}
Arc::new(files)
} else {
Arc::new(vec![])
};
let archive = Arc::new(File::open(index_path)?);
let archive_len = archive.metadata()?.len();
let cursor = Arc::new(AtomicUsize::new(0));
let num_workers = (CONFIG.max_core_in_flight as usize).max(1);
let mut handles = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
let batch = Arc::clone(&batch);
let archive = Arc::clone(&archive);
let cursor = Arc::clone(&cursor);
let output_files = Arc::clone(&output_files);
handles.push(thread::spawn(move || -> Result<WorkerStats> {
let blob_offset_col = batch
.column_by_name("blob_offset").unwrap()
.as_any().downcast_ref::<UInt64Array>().unwrap();
let blob_size_col = batch
.column_by_name("blob_size").unwrap()
.as_any().downcast_ref::<UInt64Array>().unwrap();
let fdata_offset_col = batch
.column_by_name("fdata_offset").unwrap()
.as_any().downcast_ref::<UInt64Array>().unwrap();
let compressed_col = batch
.column_by_name("compressed").unwrap()
.as_any().downcast_ref::<BooleanArray>().unwrap();
let checksum_col = batch
.column_by_name("checksum").unwrap()
.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap();
let mut st = WorkerStats::default();
let mut read_buf: Vec<u8> = Vec::new(); let mut out_buf: Vec<u8> = Vec::new();
loop {
let row = cursor.fetch_add(1, Ordering::Relaxed);
if row >= total_rows {
break;
}
st.total_chunks += 1;
let blob_offset = blob_offset_col.value(row);
let blob_size = blob_size_col.value(row) as usize;
let fdata_offset = fdata_offset_col.value(row);
let compressed = compressed_col.value(row);
if blob_size > 0 {
let in_bounds = blob_offset
.checked_add(blob_size as u64)
.is_some_and(|end| end <= archive_len);
anyhow::ensure!(
in_bounds,
"blob for row {} out of bounds (offset={}, size={}, archive_len={})",
row,
blob_offset,
blob_size,
archive_len
);
}
read_buf.resize(blob_size, 0);
if blob_size > 0 {
archive.read_exact_at(&mut read_buf, blob_offset).map_err(|e| {
anyhow::anyhow!("failed to read blob for row {}: {}", row, e)
})?;
}
let out: &[u8] = if compressed {
match crate::codec::decompress_into(&read_buf, &mut out_buf) {
Ok(_) => &out_buf,
Err(e) => {
log::error!("[decomp] row {} error={}", row, e);
st.corrupt_rows.push(row as u64);
continue;
}
}
} else {
&read_buf
};
let len = out.len() as u64;
st.total_written_bytes += len;
let computed = blake3::hash(out);
let verified = &computed.as_bytes()[..] == checksum_col.value(row);
if verified {
st.verified_bytes += len;
} else {
st.corrupt_bytes += len;
st.corrupt_rows.push(row as u64);
log::error!(
"[verify] MISMATCH row={} expected={} got={}",
row,
hex::encode(checksum_col.value(row)),
hex::encode(computed.as_bytes()),
);
}
if verified {
if let Some(Some(file)) = output_files.get(row) {
file.write_all_at(out, fdata_offset).map_err(|e| {
anyhow::anyhow!("failed to write output for row {}: {}", row, e)
})?;
}
}
}
Ok(st)
}));
}
let mut total_chunks = 0u64;
let mut total_written_bytes = 0u64;
let mut verified_bytes = 0u64;
let mut corrupt_bytes = 0u64;
let mut corrupt_rows: HashSet<u64> = HashSet::new();
for h in handles {
let st = match h.join() {
Ok(res) => res?,
Err(_) => anyhow::bail!("decompression worker thread panicked"),
};
total_chunks += st.total_chunks;
total_written_bytes += st.total_written_bytes;
verified_bytes += st.verified_bytes;
corrupt_bytes += st.corrupt_bytes;
for r in st.corrupt_rows {
corrupt_rows.insert(r);
}
}
let corrupt_files = corrupt_rows.len();
let verified_files = total_files.saturating_sub(corrupt_files);
Ok(VerifyReport {
total_files,
verified_files,
corrupt_files,
total_bytes: total_written_bytes,
verified_bytes,
corrupt_bytes,
chunks: total_chunks,
})
}
pub fn get_file(archive_path: &Path, target: &str) -> Result<Vec<u8>> {
let chunks = crate::index::locate_file(archive_path, target)?;
anyhow::ensure!(!chunks.is_empty(), "file not found in archive: {target}");
let archive = File::open(archive_path)?;
let archive_len = archive.metadata()?.len();
let total: u64 = chunks.iter().map(|c| c.fdata_offset + c.uncompressed_size).max().unwrap_or(0);
let mut out = vec![0u8; total as usize];
let mut read_buf: Vec<u8> = Vec::new();
let mut dec_buf: Vec<u8> = Vec::new();
for c in &chunks {
if c.blob_size > 0 {
let in_bounds = c
.blob_offset
.checked_add(c.blob_size as u64)
.is_some_and(|end| end <= archive_len);
anyhow::ensure!(
in_bounds,
"blob for {target} out of bounds (offset={}, size={}, archive_len={})",
c.blob_offset,
c.blob_size,
archive_len
);
}
read_buf.resize(c.blob_size as usize, 0);
if c.blob_size > 0 {
archive.read_exact_at(&mut read_buf, c.blob_offset)?;
}
let bytes: &[u8] = if c.compressed {
crate::codec::decompress_into(&read_buf, &mut dec_buf)?;
&dec_buf
} else {
&read_buf
};
let computed = blake3::hash(bytes);
anyhow::ensure!(
computed.as_bytes()[..] == c.checksum[..],
"checksum mismatch for {target} chunk {}",
c.chunk_seq
);
let start = c.fdata_offset as usize;
let end = start + bytes.len();
anyhow::ensure!(end <= out.len(), "chunk overruns file length for {target}");
out[start..end].copy_from_slice(bytes);
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn safe_output_path_accepts_normal_relative_paths() {
let out = Path::new("/tmp/out");
assert_eq!(
safe_output_path(out, "a/b/c.txt").unwrap(),
out.join("a/b/c.txt")
);
assert_eq!(
safe_output_path(out, "./a/./b.txt").unwrap(),
out.join("a/b.txt")
);
}
#[test]
fn safe_output_path_rejects_zip_slip_and_absolute_paths() {
let out = Path::new("/tmp/out");
assert!(safe_output_path(out, "../evil").is_err());
assert!(safe_output_path(out, "a/../../evil").is_err());
assert!(safe_output_path(out, "a/b/../../../evil").is_err());
assert!(safe_output_path(out, "/etc/passwd").is_err());
assert!(safe_output_path(out, "").is_err());
}
#[test]
fn decompress_corrupt_archive_errors_not_panics() {
let dir = tempfile::tempdir().unwrap();
let archive = dir.path().join("bad.znippy");
std::fs::write(&archive, b"not a real znippy archive, just junk bytes").unwrap();
let out_dir = dir.path().join("out");
let res = decompress_archive(&archive, true, &out_dir);
assert!(res.is_err(), "corrupt archive should return Err");
}
}