use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{Array, AsArray, ListArray, StructArray, UInt64Array};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use crate::codec;
use crate::index::read_znippy_index;
pub trait ZnippyReader: Send + Sync {
fn list_files(&self) -> Result<Vec<String>>;
fn extract_file(&self, relative_path: &str) -> Result<Vec<u8>>;
fn contains(&self, relative_path: &str) -> bool;
fn file_size(&self, relative_path: &str) -> Option<u64>;
}
pub struct ZnippyArchive {
index_path: PathBuf,
zdata_path: PathBuf,
schema: Arc<Schema>,
file_index: HashMap<String, FileEntry>,
}
struct FileEntry {
compressed: bool,
uncompressed_size: u64,
chunks: Vec<ChunkRef>,
}
struct ChunkRef {
zdata_offset: u64,
length: u64,
}
impl ZnippyArchive {
pub fn open(index_path: &Path) -> Result<Self> {
let zdata_path = index_path.with_extension("zdata");
if !zdata_path.exists() {
return Err(anyhow!("Missing .zdata file: {:?}", zdata_path));
}
let (schema, batches) = read_znippy_index(index_path)?;
let file_index = Self::build_file_index(&batches)?;
Ok(Self {
index_path: index_path.to_path_buf(),
zdata_path,
schema,
file_index,
})
}
pub fn file_count(&self) -> usize {
self.file_index.len()
}
fn build_file_index(batches: &[RecordBatch]) -> Result<HashMap<String, FileEntry>> {
let mut index = HashMap::new();
for batch in batches {
let paths = batch.column(0).as_string::<i32>();
let compressed_col = batch.column(1).as_boolean();
let sizes = batch.column(2).as_primitive::<arrow::datatypes::UInt64Type>();
let chunks_col = batch.column(4).as_any().downcast_ref::<ListArray>()
.ok_or_else(|| anyhow!("chunks column is not a ListArray"))?;
for row in 0..batch.num_rows() {
let path = paths.value(row).to_string();
let compressed = compressed_col.value(row);
let uncompressed_size = sizes.value(row);
let mut chunks = Vec::new();
if !chunks_col.is_null(row) {
let chunk_list = chunks_col.value(row);
let struct_arr = chunk_list.as_any().downcast_ref::<StructArray>()
.ok_or_else(|| anyhow!("chunk item is not a StructArray"))?;
let offsets = struct_arr.column(0).as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| anyhow!("zdata_offset not UInt64"))?;
let lengths = struct_arr.column(2).as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| anyhow!("length not UInt64"))?;
for i in 0..struct_arr.len() {
chunks.push(ChunkRef {
zdata_offset: offsets.value(i),
length: lengths.value(i),
});
}
}
index.insert(path, FileEntry { compressed, uncompressed_size, chunks });
}
}
Ok(index)
}
}
impl ZnippyReader for ZnippyArchive {
fn list_files(&self) -> Result<Vec<String>> {
Ok(self.file_index.keys().cloned().collect())
}
fn extract_file(&self, relative_path: &str) -> Result<Vec<u8>> {
let entry = self.file_index.get(relative_path)
.ok_or_else(|| anyhow!("file not found in archive: {}", relative_path))?;
let mut file = BufReader::new(File::open(&self.zdata_path)?);
let mut result = Vec::with_capacity(entry.uncompressed_size as usize);
for chunk in &entry.chunks {
file.seek(SeekFrom::Start(chunk.zdata_offset))?;
let mut compressed = vec![0u8; chunk.length as usize];
file.read_exact(&mut compressed)?;
if entry.compressed {
let decompressed = codec::decompress_frame(&compressed)?;
result.extend_from_slice(&decompressed);
} else {
result.extend_from_slice(&compressed);
}
}
Ok(result)
}
fn contains(&self, relative_path: &str) -> bool {
self.file_index.contains_key(relative_path)
}
fn file_size(&self, relative_path: &str) -> Option<u64> {
self.file_index.get(relative_path).map(|e| e.uncompressed_size)
}
}