use anyhow::Result;
use anyhow::anyhow;
use arrow::array::Array;
use arrow::ipc::MetadataVersion;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use blake3::Hasher; use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::net::Shutdown::Write as OtherWrite;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use walkdir::WalkDir;
use znippy_common::chunkrevolver::{Chunk, ChunkRevolver, SendPtr, get_chunk_slice};
use znippy_common::common_config::CONFIG;
use znippy_common::index::{
build_arrow_metadata_for_checksums_and_config, should_skip_compression,
};
use znippy_common::meta::{ChunkMeta, WriterStats};
use znippy_common::{
CompressionReport, FileMeta, attach_metadata, build_arrow_batch_from_files,
split_into_microchunks,
};
fn strip_prefix<'a>(base: &'a Path, full: &'a Path) -> PathBuf {
full.strip_prefix(base).unwrap_or(full).to_path_buf()
}
pub fn compress_dir(
input_dir: &PathBuf,
output: &PathBuf,
no_skip: bool,
) -> anyhow::Result<CompressionReport> {
log::debug!("Reading directory: {:?}", input_dir);
let mut total_dirs = 0;
let mut filesTOSkip = 0;
let mut filesTOCompress = 0;
let all_files: Arc<Vec<PathBuf>> = Arc::new(
WalkDir::new(input_dir)
.into_iter()
.filter_map(|entry| entry.ok())
.filter_map(|e| {
if e.file_type().is_dir() {
total_dirs += 1;
None
} else if e.file_type().is_file() {
let skip = !no_skip && should_skip_compression(e.path());
if (skip) {
filesTOSkip = filesTOSkip + 1;
} else {
filesTOCompress = filesTOCompress + 1;
}
Some(e.into_path())
} else {
None
}
})
.collect(),
);
log::debug!(
"Found {} files include in {} directories. to compressed {} will skip {}",
all_files.len(),
total_dirs,
filesTOCompress,
filesTOSkip
);
let total_files: u64 = all_files.len() as u64;
let (tx_chunk_array, rx_chunk_array): (
Vec<Sender<(u64, u64, u8, u64, u64, bool)>>,
Vec<Receiver<(u64, u64, u8, u64, u64, bool)>>,
) = (0..CONFIG.max_core_in_flight)
.map(|_| bounded(CONFIG.max_chunks as usize))
.unzip();
let (tx_compressed, rx_compressed): (
Sender<(Arc<[u8]>, ChunkMeta)>,
Receiver<(Arc<[u8]>, ChunkMeta)>,
) = unbounded();
let (tx_return, rx_return): (Sender<(u8, u64)>, Receiver<(u8, u64)>) = unbounded();
let output_zdata_path = output.with_extension("zdata");
log::debug!("Creating zdata file at: {:?}", output_zdata_path);
let zdata_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&output_zdata_path)?;
let mut writer =
BufWriter::with_capacity((CONFIG.file_split_block_size / 2) as usize, zdata_file);
let mut revolver = ChunkRevolver::new(&CONFIG);
let base_ptrs = revolver.base_ptrs();
let chunk_size = revolver.chunk_size();
let input_dir_cloned = input_dir.clone();
let all_files_for_reader = Arc::clone(&all_files);
let all_files_for_writer = Arc::clone(&all_files);
let reader_thread = {
let tx_chunk_array = tx_chunk_array.clone();
let rx_done = rx_return.clone();
thread::spawn(move || {
let mut inflight_chunks = 0usize;
let mut uncompressed_files: u64 = 0;
let mut uncompressed_bytes: u64 = 0;
let mut compressed_files: u64 = 0;
let mut compressed_bytes: u64 = 0;
for (file_index, path) in all_files_for_reader.iter().enumerate() {
let skip = !no_skip && should_skip_compression(path);
if skip {
uncompressed_files += 1; uncompressed_bytes += path.metadata().unwrap().len(); } else {
compressed_files += 1;
compressed_bytes += path.metadata().unwrap().len();
}
let file = match File::open(path) {
Ok(f) => f,
Err(e) => {
panic!("Problem opening the file: {:?}: {}", path, e)
}
};
let mut reader = BufReader::new(file);
let mut has_read_any_data = false;
let mut fdata_offset: u64 = 0;
loop {
let maybe_chunk = revolver.try_get_chunk();
match maybe_chunk {
Some(mut chunk) => {
let ring_nr = chunk.ring_nr as usize;
match reader.read(&mut *chunk) {
Ok(0) => {
if !has_read_any_data {
tx_chunk_array[ring_nr]
.send((
file_index as u64,
fdata_offset,
chunk.ring_nr,
chunk.index,
0,
skip,
))
.unwrap();
inflight_chunks += 1;
} else {
let ring_nr = chunk.ring_nr;
let index = chunk.index;
drop(chunk); revolver.return_chunk(ring_nr, index);
}
break;
}
Ok(bytes_read) => {
has_read_any_data = true;
tx_chunk_array[ring_nr]
.send((
file_index as u64,
fdata_offset,
chunk.ring_nr,
chunk.index,
bytes_read as u64,
skip,
))
.unwrap();
inflight_chunks += 1;
fdata_offset += bytes_read as u64;
}
Err(e) => {
log::warn!(
"[reader] Error reading file {}: {}",
path.display(),
e
);
let ring_nr = chunk.ring_nr;
let index = chunk.index;
drop(chunk);
revolver.return_chunk(ring_nr, index);
break;
}
}
}
None => {
let (ring_nr, returned) =
rx_done.recv().expect("rx_done channel closed unexpectedly");
revolver.return_chunk(ring_nr, returned);
inflight_chunks = inflight_chunks
.checked_sub(1)
.expect("inflight_chunks underflow");
continue;
}
}
}
}
log::debug!("[reader] Thread done about to drain compressor returning chunks ");
while inflight_chunks > 0 {
match rx_done.recv() {
Ok((ring_nr, returned)) => {
log::debug!(
"[reader] Returned chunk {} to pool during draining",
returned
);
revolver.return_chunk(ring_nr, returned);
inflight_chunks -= 1;
}
Err(_) => {
log::debug!("[reader] rx_done channel closed, exiting draining loop");
break;
}
}
}
log::debug!("[reader] Drain done ");
tx_chunk_array.into_iter().for_each(drop);
log::debug!("[reader] tx_chunk dropped after finishing all chunk sends");
drop(rx_done); log::debug!("[reader] rx_done dropped after processing all chunks");
drop(revolver);
(
uncompressed_files,
uncompressed_bytes,
compressed_files,
compressed_bytes,
)
})
};
let mut compressor_threads = Vec::with_capacity(CONFIG.max_core_in_flight as u8 as usize);
for compressor_group in 0..CONFIG.max_core_in_flight as u8 {
let rx_chunk = rx_chunk_array[compressor_group as usize].clone();
let tx_compressed = tx_compressed.clone();
let tx_ret = tx_return.clone();
let base_ptr: SendPtr = base_ptrs[compressor_group as usize];
let chunk_size = chunk_size;
let handle = thread::spawn(move || {
let raw_ptr = base_ptr.as_ptr();
let mut local_chunkmeta: Vec<ChunkMeta> = Vec::new();
let mut hasher = Hasher::new(); let mut chunk_seq: u32 = 0;
let mut cctx = znippy_common::codec::CompressCtx::new(CONFIG.compression_level)
.expect("Failed to create compression context");
log::info!(
"[compressor] Compressor thread started with level {}",
CONFIG.compression_level,
);
unsafe {
loop {
match rx_chunk.recv() {
Ok((file_index, mut fdata_offset, ring_nr, chunk_nr, length, skip)) => {
log::debug!(
"[compressor] Processing chunk {} from file {}: {} bytes",
chunk_nr,
file_index,
length
);
let input = get_chunk_slice(
raw_ptr,
chunk_size,
chunk_nr as u32,
length as usize,
);
let chunk_meta;
let output: Arc<[u8]>;
hasher.update(&input);
if skip {
log::debug!(
"[compressor] Skipping compression for chunk {} of file {}",
chunk_nr,
file_index
);
output = Arc::from(input);
chunk_meta = ChunkMeta {
zdata_offset: 0,
fdata_offset,
file_index,
chunk_seq,
checksum_group: compressor_group,
compressed_size: input.len() as u64,
compressed: false,
uncompressed_size: input.len() as u64,
};
tx_compressed.send((output, chunk_meta)).unwrap();
chunk_seq += 1;
fdata_offset += input.len() as u64;
} else {
let micro_chunks =
split_into_microchunks(input, CONFIG.zstd_output_buffer_size);
for (micro_nr, micro) in micro_chunks.iter().enumerate() {
let compressed_vec = cctx.compress(micro)?;
let compressed_chunk: Arc<[u8]> =
Arc::from(compressed_vec.into_boxed_slice());
let chunk_meta = ChunkMeta {
zdata_offset: 0, fdata_offset,
file_index,
chunk_seq,
checksum_group: compressor_group,
compressed_size: compressed_chunk.len() as u64,
compressed: true,
uncompressed_size: micro.len() as u64,
};
fdata_offset += micro.len() as u64;
log::debug!(
"[compressor {}] did File_index {} chunk nr {} size {} micro nr {} chunk size {} out {} zdata_offset = {}",
compressor_group,
file_index,
chunk_nr,
length,
micro_nr,
micro.len(),
chunk_meta.compressed_size,
chunk_meta.zdata_offset
);
tx_compressed.send((compressed_chunk, chunk_meta))?;
chunk_seq += 1;
}
}
tx_ret.send((ring_nr, chunk_nr));
}
Err(_) => {
log::debug!("[compressor] rx_chunk channel closed, compressor exiting");
break;
}
}
}
drop(tx_compressed);
drop(tx_ret);
drop(rx_chunk);
log::debug!("[compressor] Compressor thread finished processing.");
}
log::info!("📦 Compressor thread/group {} returning ", compressor_group);
Ok((compressor_group, *hasher.finalize().as_bytes()))
});
compressor_threads.push(handle);
}
let output = output.clone(); let writer_thread = thread::spawn(move || {
let mut file_metadata: Vec<FileMeta> = Vec::with_capacity(all_files_for_writer.len());
file_metadata = all_files_for_writer
.iter()
.map(|path| FileMeta {
relative_path: path.to_string_lossy().to_string(),
compressed: false,
uncompressed_size: 0,
chunks: Vec::new(),
})
.collect();
let mut writerstats = WriterStats {
total_chunks: 0,
total_written_bytes: 0,
verified_files: 0,
corrupt_files: 0,
verified_bytes: 0,
corrupt_bytes: 0,
};
let mut zdata_offset: u64 = 0;
let mut zero_fdata_per_file: std::collections::HashMap<u64, usize> =
std::collections::HashMap::new();
while let Ok((compressed_data, mut chunk_meta)) = rx_compressed.recv() {
if chunk_meta.fdata_offset == 0 {
let count = zero_fdata_per_file
.entry(chunk_meta.file_index)
.or_default();
*count += 1;
if *count > 1 {
panic!(
"❌ More than one chunk with fdata_offset == 0 for file_index {} (count = {})",
chunk_meta.file_index, count
);
}
}
let idx = chunk_meta.file_index as usize;
debug_assert_eq!(
compressed_data.len() as u64,
chunk_meta.compressed_size,
"Mismatch between actual compressed data length ({}) and chunk_meta.compressed_size ({}) for file_index {}, chunk_seq {}",
compressed_data.len(),
chunk_meta.compressed_size,
chunk_meta.file_index,
chunk_meta.chunk_seq
);
if idx >= file_metadata.len() {
log::error!(
"[writer] Invalid file_index {}: file_metadata.len() = {}",
idx,
file_metadata.len()
);
continue;
}
let file = &mut file_metadata[idx];
let path = &all_files_for_writer[idx];
let uncompressed_size = chunk_meta.uncompressed_size;
file.relative_path = path.to_string_lossy().to_string();
file.compressed = chunk_meta.compressed;
file.uncompressed_size += uncompressed_size;
chunk_meta.zdata_offset = zdata_offset;
if let Err(e) = writer.write_all(&compressed_data) {
log::error!("[writer] Write error: {}", e);
continue;
}
let chunk_seq = chunk_meta.chunk_seq;
let file_index = chunk_meta.file_index;
let expected_size = chunk_meta.uncompressed_size;
file.chunks.push(chunk_meta);
zdata_offset += compressed_data.len() as u64;
writerstats.total_chunks += 1;
writerstats.total_written_bytes += compressed_data.len() as u64; }
log::info!(
"[writer] Done {} chunks , total written {} bytes)",
writerstats.total_chunks,
writerstats.total_written_bytes
);
let batch = build_arrow_batch_from_files(&file_metadata, &input_dir_cloned);
(writerstats, batch)
});
let (uncompressed_files, uncompressed_bytes, compressed_files, compressed_bytes) =
reader_thread.join().unwrap();
log::debug!("[reader] reader_thread joined");
tx_chunk_array.into_iter().for_each(drop);
log::debug!("[reader] tx_chunk dropped after reader thread finished");
let mut checksums: Vec<[u8; 32]> = Vec::with_capacity(CONFIG.max_core_in_compress as usize);
let mut i = 0;
for handle in compressor_threads {
let Ok((compressor_group, checksum)): Result<(u8, [u8; 32]), anyhow::Error> =
handle.join().unwrap()
else {
return Err(anyhow!("Compressor thread returned error"));
};
checksums.insert(compressor_group as usize, checksum);
}
log::info!(
"📦 Compressor threads returning blake3 checksums from {} compressor threads",
checksums.len()
);
drop(tx_compressed);
log::debug!("[compressor] tx_compressed dropped after compressors finished");
log::debug!("[writer] Waiting for writer thread to finish");
let (writerstats, batch) = writer_thread.join().unwrap();
let metadata = build_arrow_metadata_for_checksums_and_config(&checksums, &CONFIG);
let final_batch = attach_metadata(batch?, metadata)?;
log::debug!("[writer] writer_thread finished");
let index_path = output.with_extension("znippy");
let index_file = File::create(&index_path)?;
let mut writer = FileWriter::try_new(index_file, &final_batch.schema())?;
writer.write(&final_batch)?;
writer.finish()?;
log::info!("[main] Arrow index written and finished.");
let report = CompressionReport {
total_files,
compressed_files,
uncompressed_files,
chunks: writerstats.total_chunks,
total_dirs,
total_bytes_in: compressed_bytes + uncompressed_bytes,
total_bytes_out: writerstats.total_written_bytes,
compressed_bytes,
uncompressed_bytes,
compression_ratio: if uncompressed_bytes > 0 {
(compressed_bytes as f32
/ (writerstats.total_written_bytes - uncompressed_bytes) as f32)
* 100.0
} else {
0.0
},
};
Ok(report)
}