use SeekFrom::Start;
use anyhow::anyhow;
use anyhow::{Context, Result};
use arrow::datatypes::SchemaRef;
use arrow::ipc::RecordBatch;
use arrow_array::{
Array, BinaryArray, BooleanArray, Datum, GenericListArray, ListArray, StringArray, StructArray,
UInt8Array, UInt32Array, UInt64Array,
};
use std::any::{Any, type_name};
use std::{
collections::HashMap,
fs::{File, OpenOptions},
io::{Read, Seek, SeekFrom, Write, sink},
path::{Path, PathBuf},
sync::{Arc, Mutex},
thread,
};
use crate::{
ChunkMeta, ChunkRevolver, common_config::CONFIG, extract_config_from_arrow_metadata,
index::VerifyReport, index::read_znippy_index,
};
use blake3::Hasher;
use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
use hex::FromHex;
use crate::chunkrevolver::{Chunk, SendPtr, get_chunk_slice};
use arrow_array::ArrayRef;
pub fn decompress_archive(
index_path: &Path,
save_data: bool,
out_dir: &Path,
) -> Result<VerifyReport> {
let zdata_path = index_path.with_extension("zdata");
let (schema, batches) = read_znippy_index(index_path)?;
let file_checksums = extract_file_checksums_from_metadata(&schema);
let config = &CONFIG;
log::debug!(
"read config from meta {:?}\n and checksums {:?}",
config,
file_checksums
);
let batch = Arc::new(batches[0].clone()); let batch_cloned_for_writer = Arc::clone(&batch);
let mut revolver = ChunkRevolver::new(&config);
let base_ptrs = revolver.base_ptrs();
let chunk_size = revolver.chunk_size();
let (work_tx_array, work_rx_array): (
Vec<Sender<(ChunkMeta, u8, u32)>>,
Vec<Receiver<(ChunkMeta, u8, u32)>>,
) = (0..CONFIG.max_core_in_flight)
.map(|_| bounded(CONFIG.max_chunks as usize))
.unzip();
let (tx_return, rx_return): (Sender<(u8, u64)>, Receiver<(u8, u64)>) = unbounded();
let (chunk_tx, chunk_rx): (Sender<(ChunkMeta, Vec<u8>)>, Receiver<_>) =
bounded(config.max_core_in_flight);
let out_dir = Arc::new(out_dir.to_path_buf());
let chunk_rx_cloned = chunk_rx.clone();
let out_dir_cloned = Arc::clone(&out_dir);
let reader_thread = {
let done_rx = rx_return.clone();
let work_tx_array = work_tx_array.clone();
thread::spawn(move || -> ReaderStats {
let mut inflight_chunks = 0usize;
let mut zdata_file = File::open(&zdata_path).expect("Failed to open .zdata file");
let Some(batch) = batches.get(0) else {
eprintln!("❌ No batch found in index");
return ReaderStats {
total_files: 0,
skipped_files: 0,
};
};
let total_files = batch.num_rows();
let paths = batch
.column_by_name("relative_path")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let chunks_array = batch
.column_by_name("chunks")
.unwrap()
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let struct_array = chunks_array
.values()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let uncompressed_size_arr = batch
.column_by_name("uncompressed_size")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let chunk_offsets = chunks_array.value_offsets();
for file_index in 0..total_files as u64 {
let chunks_array_for_file = chunks_array.value(file_index as usize);
if let Some(struct_array) =
chunks_array_for_file.as_any().downcast_ref::<StructArray>()
{
let n_chunks = struct_array.len() as u16;
for local_idx in 0..n_chunks {
let zdata_offset_arr = struct_array
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let fdata_offset_arr = struct_array
.column(1)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let length_arr = struct_array
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let chunk_seq_arr = struct_array
.column(3)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap();
let checksum_group_arr = struct_array
.column(4)
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap();
let zdata_offset = zdata_offset_arr.value(local_idx as usize);
let fdata_offset = fdata_offset_arr.value(local_idx as usize);
let compressed_size = length_arr.value(local_idx as usize);
let chunk_seq = chunk_seq_arr.value(local_idx as usize);
let checksum_group = checksum_group_arr.value(local_idx as usize);
log::debug!(
"[reader] reading file {} chunk {:?}",
paths.value(file_index as usize),
(
zdata_offset,
fdata_offset,
compressed_size,
chunk_seq,
checksum_group
)
);
let mut chunk_data: Chunk = loop {
match revolver.try_get_chunk() {
Some(c) => break c,
None => {
let (thread_nr, returned) = done_rx
.recv()
.expect("rx_done channel closed unexpectedly");
log::debug!(
"[reader] Blocking wait — returned chunk {} from thread nr {} to pool",
returned,
thread_nr
);
revolver.return_chunk(thread_nr, returned);
inflight_chunks = inflight_chunks
.checked_sub(1)
.expect("inflight_chunks underflow");
}
}
};
let uncompressed_size = uncompressed_size_arr.value(file_index as usize);
let compressed = batch
.column_by_name("compressed")
.unwrap()
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap()
.value(file_index as usize);
zdata_file.seek(Start(zdata_offset)).unwrap();
zdata_file
.read_exact(&mut chunk_data[..compressed_size as usize])
.unwrap();
let meta = ChunkMeta {
zdata_offset,
fdata_offset,
compressed_size,
chunk_seq,
checksum_group,
compressed, file_index,
uncompressed_size,
};
work_tx_array[chunk_data.ring_nr as usize]
.send((meta, chunk_data.ring_nr, chunk_data.index as u32))
.unwrap();
inflight_chunks += 1;
}
} else {
log::debug!("❌ The chunks array is not a StructArray.");
}
}
log::debug!("[reader] Thread done about to drain compressor returning chunks ");
while inflight_chunks > 0 {
log::debug!(
"[reader] draining inflight_chunks amount = {}",
inflight_chunks
);
match done_rx.recv() {
Ok((thread_nr, returned)) => {
log::debug!(
"[reader] Returned chunk {} to pool during draining",
returned
);
revolver.return_chunk(thread_nr, returned);
inflight_chunks -= 1;
}
Err(_) => {
log::debug!("[reader] rx_done channel closed, exiting draining loop");
break;
}
}
}
log::debug!("[reader] Drain done ");
work_tx_array.into_iter().for_each(drop);
log::debug!("[reader] tx_work dropped after finishing all chunk sends");
drop(done_rx);
drop(revolver);
ReaderStats {
total_files,
skipped_files: 0 as usize,
}
})
};
let mut decompressor_threads: Vec<JoinHandle<Result<()>>> =
Vec::with_capacity(config.max_core_in_flight as usize);
let rx_array = work_rx_array.clone();
for decompressor_nr in 0..config.max_core_in_flight as u8 {
let base_ptr: SendPtr = base_ptrs[decompressor_nr as usize];
let rx = rx_array[decompressor_nr as usize].clone();
let tx = chunk_tx.clone();
let done_tx = tx_return.clone(); let handle = thread::spawn(move || unsafe {
let raw_ptr = base_ptr.as_ptr();
loop {
match rx.recv() {
Ok((chunk_meta, ring_nr, chunk_nr)) => {
log::debug!(
"[Decompressor {}] got chunk_nr {}",
decompressor_nr,
chunk_nr
);
let data = get_chunk_slice(
raw_ptr,
chunk_size,
chunk_nr,
chunk_meta.compressed_size as usize,
);
let chunk_org_size = chunk_meta.uncompressed_size;
let chunk_seq = chunk_meta.chunk_seq;
let file_index = chunk_meta.file_index;
let decompress_result = std::panic::catch_unwind(|| {
if chunk_meta.compressed {
decompress2_microchunk(&data)
} else {
Ok(data.to_vec())
}
});
match decompress_result {
Ok(Ok(decompressed)) => {
log::debug!(
"Decompression successful chunk_nr {} ({} bytes)",
chunk_nr,
decompressed.len()
);
if let Err(e) = tx.send((chunk_meta, decompressed)) {
log::error!(
"[Decompressor {}] tx.send failed: {}",
decompressor_nr,
e
);
}
}
Ok(Err(e)) => {
log::error!(
"Decompression failed: file_index {} chunk_nr {} uncompressed={} compressed={} chunk_seq={} error={}",
file_index,
chunk_nr,
chunk_org_size,
data.len(),
chunk_seq,
e
);
}
Err(_) => {
log::error!(
"PANIC: decompress2_microchunk panicked! file_index {} chunk_nr {} chunk_seq={}",
file_index,
chunk_nr,
chunk_seq
);
}
}
if let Err(e) = done_tx.send((decompressor_nr, chunk_nr as u64)) {
log::warn!(
"[Decompressor {}] done_tx failed (chunk_nr {}): {}",
decompressor_nr,
chunk_nr,
e
);
}
}
Err(_) => {
log::debug!(
"[Decompressor {}] rx channel closed, exiting thread",
decompressor_nr
);
break;
}
}
}
drop(tx);
drop(done_tx);
drop(rx);
log::debug!("[compressor] Decompressor thread finished processing.");
log::info!(
"📦 Decompressor thread/group {} returning ",
decompressor_nr
);
Ok(())
});
decompressor_threads.push(handle);
}
let expected_checksums = file_checksums.unwrap_or_default();
let num_groups = expected_checksums.len();
let (verify_txs, verify_threads): (Vec<_>, Vec<_>) = (0..num_groups)
.map(|grp_idx| {
let (vtx, vrx): (Sender<(u32, Vec<u8>)>, Receiver<(u32, Vec<u8>)>) = bounded(64);
let expected = expected_checksums[grp_idx];
let handle = thread::spawn(move || -> (bool, u64) {
let mut hasher = Hasher::new();
let mut next_seq: u32 = 0;
let mut pending: std::collections::BTreeMap<u32, Vec<u8>> =
std::collections::BTreeMap::new();
let mut total_bytes: u64 = 0;
while let Ok((seq, data)) = vrx.recv() {
if seq == next_seq {
hasher.update(&data);
total_bytes += data.len() as u64;
next_seq += 1;
while let Some(buffered) = pending.remove(&next_seq) {
hasher.update(&buffered);
total_bytes += buffered.len() as u64;
next_seq += 1;
}
} else {
pending.insert(seq, data);
}
}
while let Some((&seq, _)) = pending.iter().next() {
if seq == next_seq {
let buffered = pending.remove(&seq).unwrap();
hasher.update(&buffered);
total_bytes += buffered.len() as u64;
next_seq += 1;
} else {
break;
}
}
let computed = *hasher.finalize().as_bytes();
let ok = computed == expected;
if ok {
log::debug!("[verify] checksum_group {} OK", grp_idx);
} else {
log::error!(
"[verify] checksum_group {} MISMATCH: expected {}, got {}",
grp_idx,
hex::encode(expected),
hex::encode(computed)
);
}
(ok, total_bytes)
});
(vtx, handle)
})
.unzip();
let writer_thread = thread::spawn(move || -> WriterStats {
let mut total_chunks = 0u64;
let chunks_array = batch_cloned_for_writer
.column_by_name("chunks")
.unwrap()
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let chunk_offsets = chunks_array.value_offsets();
let mut written_chunks: HashMap<u64, u16> = HashMap::new();
let mut total_written_bytes = 0u64;
let mut open_files: HashMap<usize, File> = HashMap::new();
let mut chunks_written: HashMap<usize, usize> = HashMap::new();
let mut expected_chunks: HashMap<usize, usize> = HashMap::new();
let mut current_open = 0usize;
let mut peak_open = 0usize;
let mut created_dirs: HashSet<PathBuf> = HashSet::new();
while let Ok((chunk_meta, data)) = chunk_rx_cloned.recv() {
total_chunks += 1;
total_written_bytes += data.len() as u64;
if save_data {
let col = batch_cloned_for_writer
.column_by_name("relative_path")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let rel_path = col.value(chunk_meta.file_index.try_into().unwrap());
let full_path = out_dir_cloned.join(rel_path);
log::debug!(
"[Writer] got file_index {} index file name {} transposed to fullpath={:?}",
chunk_meta.file_index,
rel_path,
full_path
);
if let Some(parent) = full_path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
let mut writer = get_output_writer(
&mut open_files,
chunk_meta.file_index as usize,
&full_path,
&mut current_open,
&mut peak_open,
&mut expected_chunks,
&mut chunks_written,
&mut created_dirs,
&chunks_array,
save_data,
);
writer.seek(Start(chunk_meta.fdata_offset)).unwrap();
writer.write_all(&data).unwrap();
let written = chunks_written.entry(chunk_meta.file_index as usize).or_default();
*written += 1;
if let Some(&chunk_goal) = expected_chunks.get(&(chunk_meta.file_index as usize)) {
if *written == chunk_goal {
if let Some(file) = open_files.remove(&(chunk_meta.file_index as usize)) {
drop(file);
current_open -= 1;
}
}
}
}
let grp = chunk_meta.checksum_group as usize;
if grp < num_groups {
let _ = verify_txs[grp].send((chunk_meta.chunk_seq, data));
}
}
for (_, file) in open_files {
drop(file);
current_open -= 1;
}
drop(verify_txs);
let mut verified_files = 0usize;
let mut corrupt_files = 0usize;
let mut verified_bytes = 0u64;
let mut corrupt_bytes = 0u64;
for handle in verify_threads {
let (ok, bytes) = handle.join().expect("verify thread panicked");
if ok {
verified_bytes += bytes;
} else {
corrupt_bytes += bytes;
corrupt_files += 1;
}
}
if corrupt_files == 0 && num_groups > 0 {
verified_files = batch_cloned_for_writer.num_rows();
}
WriterStats {
total_chunks,
total_written_bytes,
verified_files,
corrupt_files,
verified_bytes,
corrupt_bytes,
}
});
let reader_stats = reader_thread.join().expect("reader_thread panicked");
log::debug!("[reader] reader_thread joined");
work_tx_array.into_iter().for_each(drop);
log::debug!("[reader] tx_chunk dropped after reader thread finished");
drop(tx_return);
for handle in decompressor_threads {
handle.join();
}
drop(chunk_tx);
let writer_stats = writer_thread.join().expect("writher_thread panicked");
let report = VerifyReport {
total_files: reader_stats.total_files,
verified_files: writer_stats.verified_files,
corrupt_files: writer_stats.corrupt_files,
total_bytes: writer_stats.total_written_bytes,
verified_bytes: writer_stats.verified_bytes,
corrupt_bytes: writer_stats.corrupt_bytes,
chunks: writer_stats.total_chunks,
};
Ok(report)
}
pub fn extract_file_checksums_from_metadata(schema: &SchemaRef) -> Result<Vec<[u8; 32]>> {
let metadata: &HashMap<String, String> = schema.metadata();
let mut checksums: Vec<[u8; 32]> = Vec::new();
let mut sorted_keys: Vec<_> = metadata
.keys()
.filter(|k| k.starts_with("checksum_group_"))
.collect();
sorted_keys.sort_by_key(|k| {
k.trim_start_matches("checksum_group_")
.parse::<usize>()
.unwrap_or(usize::MAX)
});
for key in sorted_keys {
let hexstr = metadata.get(key).context("Missing metadata entry")?;
let bytes = <[u8; 32]>::from_hex(hexstr)
.with_context(|| format!("Invalid hex in key {}: {}", key, hexstr))?;
checksums.push(bytes);
}
Ok(checksums)
}
use crate::common_config::StrategicConfig;
use crate::meta::{ReaderStats, WriterStats};
use arrow_array::types::Int32Type;
use log::debug;
use std::slice;
use std::thread::JoinHandle;
pub fn decompress2_microchunk(input: &[u8]) -> Result<Vec<u8>> {
crate::codec::decompress_frame(input)
}
use std::collections::HashSet;
use std::io::Result as IoResult;
pub trait WriteSeek: Write + Seek + Send {}
impl<T: Write + Seek + Send> WriteSeek for T {}
struct DevNullSeek;
impl Write for DevNullSeek {
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
Ok(buf.len()) }
fn flush(&mut self) -> IoResult<()> {
Ok(())
}
}
impl Seek for DevNullSeek {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
match pos {
SeekFrom::Start(offset) => Ok(offset),
SeekFrom::End(_) => Ok(0),
SeekFrom::Current(_) => Ok(0),
}
}
}
unsafe impl Send for DevNullSeek {}
#[allow(clippy::too_many_arguments)]
fn get_output_writer(
open_files: &mut HashMap<usize, File>,
file_index: usize,
full_path: &Path,
current_open: &mut usize,
peak_open: &mut usize,
expected_chunks: &mut HashMap<usize, usize>,
chunks_written: &mut HashMap<usize, usize>,
created_dirs: &mut HashSet<std::path::PathBuf>,
chunks_array: &ListArray,
save_data: bool,
) -> Box<dyn WriteSeek> {
let chunk_goal = chunks_array.value_length(file_index);
expected_chunks
.entry(file_index)
.or_insert(chunk_goal as usize);
if let Some(parent) = full_path.parent() {
if created_dirs.insert(parent.to_path_buf()) {
let _ = std::fs::create_dir_all(parent);
}
}
if !open_files.contains_key(&file_index) {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(full_path)
.expect("Failed to open file for writing");
open_files.insert(file_index, file);
}
*current_open += 1;
*peak_open = (*peak_open).max(*current_open);
let file = open_files.get_mut(&file_index).unwrap();
Box::new(file.try_clone().expect("Failed to clone file"))
}
fn maybe_close_file(
open_files: &mut HashMap<usize, File>,
file_index: usize,
chunks_written: &mut HashMap<usize, usize>,
chunks_array: &ListArray,
current_open: &mut usize,
) {
let chunk_goal = chunks_array.value_length(file_index) as usize;
let written = chunks_written.entry(file_index).or_default();
*written += 1;
if *written == chunk_goal {
if let Some(file) = open_files.remove(&file_index) {
drop(file);
*current_open -= 1;
}
}
}