use anyhow::Result;
use arrow_array::{BooleanArray, FixedSizeBinaryArray, StringArray, UInt64Array};
use std::{
collections::{HashMap, HashSet},
fs::{File, OpenOptions},
os::unix::fs::FileExt,
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
};
use crate::{
common_config::CONFIG,
index::{VerifyReport, read_znippy_index},
};
#[derive(Default)]
struct WorkerStats {
total_chunks: u64,
total_written_bytes: u64,
verified_bytes: u64,
corrupt_bytes: u64,
corrupt_rows: Vec<u64>,
}
pub fn decompress_archive(
index_path: &Path,
save_data: bool,
out_dir: &Path,
) -> Result<VerifyReport> {
let (schema, batches) = read_znippy_index(index_path)?;
let batch = Arc::new(match batches.len() {
0 => arrow::record_batch::RecordBatch::new_empty(Arc::new(
crate::index::ZNIPPY_INDEX_SCHEMA.as_ref().clone(),
)),
1 => batches.into_iter().next().unwrap(),
_ => arrow_select::concat::concat_batches(&schema, batches.iter())
.map_err(|e| anyhow::anyhow!("failed to merge index batches: {}", e))?,
});
let total_rows = batch.num_rows();
let paths_col = batch
.column_by_name("relative_path")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut unique_files = HashSet::new();
for i in 0..total_rows {
unique_files.insert(paths_col.value(i));
}
let total_files = unique_files.len();
drop(unique_files);
let output_files: Arc<Vec<Option<Arc<File>>>> = if save_data {
let mut path_to_file: HashMap<&str, Arc<File>> = HashMap::new();
let mut created_dirs: HashSet<PathBuf> = HashSet::new();
let mut files: Vec<Option<Arc<File>>> = Vec::with_capacity(total_rows);
for i in 0..total_rows {
let rel_path = paths_col.value(i);
let f = path_to_file.entry(rel_path).or_insert_with(|| {
let full = out_dir.join(rel_path);
if let Some(parent) = full.parent() {
if created_dirs.insert(parent.to_path_buf()) {
let _ = std::fs::create_dir_all(parent);
}
}
Arc::new(
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&full)
.expect("failed to open output file"),
)
});
files.push(Some(Arc::clone(f)));
}
Arc::new(files)
} else {
Arc::new(vec![])
};
let archive = Arc::new(File::open(index_path)?);
let cursor = Arc::new(AtomicUsize::new(0));
let num_workers = (CONFIG.max_core_in_flight as usize).max(1);
let mut handles = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
let batch = Arc::clone(&batch);
let archive = Arc::clone(&archive);
let cursor = Arc::clone(&cursor);
let output_files = Arc::clone(&output_files);
handles.push(thread::spawn(move || -> WorkerStats {
let blob_offset_col = batch
.column_by_name("blob_offset").unwrap()
.as_any().downcast_ref::<UInt64Array>().unwrap();
let blob_size_col = batch
.column_by_name("blob_size").unwrap()
.as_any().downcast_ref::<UInt64Array>().unwrap();
let fdata_offset_col = batch
.column_by_name("fdata_offset").unwrap()
.as_any().downcast_ref::<UInt64Array>().unwrap();
let compressed_col = batch
.column_by_name("compressed").unwrap()
.as_any().downcast_ref::<BooleanArray>().unwrap();
let checksum_col = batch
.column_by_name("checksum").unwrap()
.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap();
let mut st = WorkerStats::default();
let mut read_buf: Vec<u8> = Vec::new(); let mut out_buf: Vec<u8> = Vec::new();
loop {
let row = cursor.fetch_add(1, Ordering::Relaxed);
if row >= total_rows {
break;
}
st.total_chunks += 1;
let blob_offset = blob_offset_col.value(row);
let blob_size = blob_size_col.value(row) as usize;
let fdata_offset = fdata_offset_col.value(row);
let compressed = compressed_col.value(row);
read_buf.resize(blob_size, 0);
if blob_size > 0 {
archive
.read_exact_at(&mut read_buf, blob_offset)
.expect("failed to read blob from archive");
}
let out: &[u8] = if compressed {
match crate::codec::decompress_into(&read_buf, &mut out_buf) {
Ok(_) => &out_buf,
Err(e) => {
log::error!("[decomp] row {} error={}", row, e);
continue;
}
}
} else {
&read_buf
};
let len = out.len() as u64;
st.total_written_bytes += len;
let computed = blake3::hash(out);
if &computed.as_bytes()[..] == checksum_col.value(row) {
st.verified_bytes += len;
} else {
st.corrupt_bytes += len;
st.corrupt_rows.push(row as u64);
log::error!(
"[verify] MISMATCH row={} expected={} got={}",
row,
hex::encode(checksum_col.value(row)),
hex::encode(computed.as_bytes()),
);
}
if let Some(Some(file)) = output_files.get(row) {
file.write_all_at(out, fdata_offset)
.expect("pwrite to output file failed");
}
}
st
}));
}
let mut total_chunks = 0u64;
let mut total_written_bytes = 0u64;
let mut verified_bytes = 0u64;
let mut corrupt_bytes = 0u64;
let mut corrupt_rows: HashSet<u64> = HashSet::new();
for h in handles {
let st = h.join().expect("worker panicked");
total_chunks += st.total_chunks;
total_written_bytes += st.total_written_bytes;
verified_bytes += st.verified_bytes;
corrupt_bytes += st.corrupt_bytes;
for r in st.corrupt_rows {
corrupt_rows.insert(r);
}
}
let corrupt_files = corrupt_rows.len();
let verified_files = total_files.saturating_sub(corrupt_files);
Ok(VerifyReport {
total_files,
verified_files,
corrupt_files,
total_bytes: total_written_bytes,
verified_bytes,
corrupt_bytes,
chunks: total_chunks,
})
}
pub fn decompress_microchunk(input: &[u8]) -> Result<Vec<u8>> {
crate::codec::decompress_frame(input)
}