use std::fs::File;
use std::os::unix::fs::FileExt;
use std::path::Path;
use std::sync::Arc;
use anyhow::{Result, anyhow};
use arrow::array::{
BooleanArray, BooleanBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder, StringArray,
StringBuilder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder,
};
use arrow::datatypes::Schema;
use arrow::ipc::writer::StreamWriter;
use arrow::record_batch::RecordBatch;
use crate::index::{
ChunkLoc, LOOKUP_MODULE, MULTI_INDEX_MAGIC, ManifestEntry, RESERVED_PKG_TYPE, TRIE_MODULE,
is_reserved_module, lookup_schema, read_znippy_full_manifest, write_manifest_bytes,
};
use crate::meta_sink::{ArchiveMetaSink, GroupKey};
pub struct ArrowIpcSinkAppend {
file: Arc<File>,
cursor: u64,
entries: Vec<ManifestEntry>,
lookup_paths: Vec<String>,
lookup_locs: Vec<ChunkLoc>,
carried: Vec<(String, ChunkLoc)>,
}
impl ArrowIpcSinkAppend {
pub fn new(file: Arc<File>, blob_end_offset: u64) -> Self {
Self {
file,
cursor: blob_end_offset,
entries: Vec::new(),
lookup_paths: Vec::new(),
lookup_locs: Vec::new(),
carried: Vec::new(),
}
}
pub fn open_existing(path: &Path) -> Result<Self> {
let file = Arc::new(
std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(path)
.map_err(|e| anyhow!("append: open {} for resume: {e}", path.display()))?,
);
let (entries, _manifest_offset) = read_znippy_full_manifest(path)?;
if entries.is_empty() {
return Err(anyhow!("append: archive {} has an empty manifest", path.display()));
}
let blob_end = entries
.iter()
.map(|e| e.index_offset)
.min()
.ok_or_else(|| anyhow!("append: no sections in manifest"))?;
let (paths, locs) = recover_rows(path, &entries)?;
let carried: Vec<(String, ChunkLoc)> = paths.into_iter().zip(locs).collect();
Ok(Self {
file,
cursor: blob_end,
entries: Vec::new(), lookup_paths: Vec::new(),
lookup_locs: Vec::new(),
carried,
})
}
pub fn blob_end(&self) -> u64 {
self.cursor
}
pub fn recovered_rows(&self) -> usize {
self.carried.len()
}
fn emit_carried(&mut self) -> Result<()> {
if self.carried.is_empty() {
return Ok(());
}
let n = self.carried.len();
let mut path_b = StringBuilder::with_capacity(n, n * 16);
let mut seq_b = UInt32Builder::with_capacity(n);
let mut fdata_b = UInt64Builder::with_capacity(n);
let mut comp_b = BooleanBuilder::with_capacity(n);
let mut usz_b = UInt64Builder::with_capacity(n);
let mut boff_b = UInt64Builder::with_capacity(n);
let mut bsz_b = UInt64Builder::with_capacity(n);
let mut ck_b = FixedSizeBinaryBuilder::with_capacity(n, 32);
for (p, loc) in &self.carried {
path_b.append_value(p);
seq_b.append_value(loc.chunk_seq);
fdata_b.append_value(loc.fdata_offset);
comp_b.append_value(loc.compressed);
usz_b.append_value(loc.uncompressed_size);
boff_b.append_value(loc.blob_offset);
bsz_b.append_value(loc.blob_size);
ck_b.append_value(loc.checksum).expect("checksum is 32 bytes");
}
let schema = lookup_schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(path_b.finish()),
Arc::new(seq_b.finish()),
Arc::new(fdata_b.finish()),
Arc::new(comp_b.finish()),
Arc::new(usz_b.finish()),
Arc::new(boff_b.finish()),
Arc::new(bsz_b.finish()),
Arc::new(ck_b.finish()),
],
)?;
self.carried.clear(); self.push_subindex(&schema, &[batch], GroupKey {
pkg_type: 0,
repo: String::new(),
module_name: String::new(),
})
}
fn accumulate_lookup(&mut self, batch: &RecordBatch) {
let cols = (|| {
Some((
batch.column_by_name("relative_path")?.as_any().downcast_ref::<StringArray>()?,
batch.column_by_name("chunk_seq")?.as_any().downcast_ref::<UInt32Array>()?,
batch.column_by_name("fdata_offset")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("compressed")?.as_any().downcast_ref::<BooleanArray>()?,
batch.column_by_name("uncompressed_size")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("blob_offset")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("blob_size")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("checksum")?.as_any().downcast_ref::<FixedSizeBinaryArray>()?,
))
})();
let Some((paths, chunk_seq, fdata, compressed, usz, blob_off, blob_sz, checksum)) = cols
else { return; };
for i in 0..batch.num_rows() {
let mut ck = [0u8; 32];
ck.copy_from_slice(checksum.value(i));
self.lookup_paths.push(paths.value(i).to_string());
self.lookup_locs.push(ChunkLoc {
chunk_seq: chunk_seq.value(i),
fdata_offset: fdata.value(i),
blob_offset: blob_off.value(i),
blob_size: blob_sz.value(i),
uncompressed_size: usz.value(i),
compressed: compressed.value(i),
checksum: ck,
});
}
}
fn write_lookup_and_trie(&mut self) -> Result<()> {
let n = self.lookup_paths.len();
let mut order: Vec<usize> = (0..n).collect();
order.sort_by(|&a, &b| {
self.lookup_paths[a].cmp(&self.lookup_paths[b])
.then(self.lookup_locs[a].chunk_seq.cmp(&self.lookup_locs[b].chunk_seq))
});
let mut path_b = StringBuilder::with_capacity(n, n * 16);
let mut seq_b = UInt32Builder::with_capacity(n);
let mut fdata_b = UInt64Builder::with_capacity(n);
let mut comp_b = BooleanBuilder::with_capacity(n);
let mut usz_b = UInt64Builder::with_capacity(n);
let mut boff_b = UInt64Builder::with_capacity(n);
let mut bsz_b = UInt64Builder::with_capacity(n);
let mut ck_b = FixedSizeBinaryBuilder::with_capacity(n, 32);
for &i in &order {
let loc = &self.lookup_locs[i];
path_b.append_value(&self.lookup_paths[i]);
seq_b.append_value(loc.chunk_seq);
fdata_b.append_value(loc.fdata_offset);
comp_b.append_value(loc.compressed);
usz_b.append_value(loc.uncompressed_size);
boff_b.append_value(loc.blob_offset);
bsz_b.append_value(loc.blob_size);
ck_b.append_value(loc.checksum).expect("checksum is 32 bytes");
}
let schema = lookup_schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(path_b.finish()),
Arc::new(seq_b.finish()),
Arc::new(fdata_b.finish()),
Arc::new(comp_b.finish()),
Arc::new(usz_b.finish()),
Arc::new(boff_b.finish()),
Arc::new(bsz_b.finish()),
Arc::new(ck_b.finish()),
],
)?;
self.push_subindex(&schema, &[batch], GroupKey {
pkg_type: RESERVED_PKG_TYPE,
repo: String::new(),
module_name: LOOKUP_MODULE.to_string(),
})?;
let mut builder = fst::MapBuilder::memory();
let mut prev: Option<&str> = None;
for (sorted_idx, &orig) in order.iter().enumerate() {
let p = self.lookup_paths[orig].as_str();
if prev != Some(p) {
builder.insert(p.as_bytes(), sorted_idx as u64)
.map_err(|e| anyhow!("trie insert: {e}"))?;
prev = Some(p);
}
}
let trie_bytes = builder.into_inner().map_err(|e| anyhow!("trie finish: {e}"))?;
self.write_raw_section(&trie_bytes, GroupKey {
pkg_type: RESERVED_PKG_TYPE,
repo: String::new(),
module_name: TRIE_MODULE.to_string(),
})
}
fn write_raw_section(&mut self, bytes: &[u8], key: GroupKey) -> Result<()> {
let start = self.cursor;
self.file.write_all_at(bytes, start)?;
self.cursor += bytes.len() as u64;
self.entries.push(ManifestEntry {
pkg_type: key.pkg_type,
repo: key.repo,
module_name: key.module_name,
index_offset: start,
index_len: bytes.len() as u64,
row_count: 0,
});
Ok(())
}
}
impl ArchiveMetaSink for ArrowIpcSinkAppend {
fn push_subindex(
&mut self,
schema: &Schema,
batches: &[RecordBatch],
key: GroupKey,
) -> Result<()> {
let sub_start = self.cursor;
let mut sub_bytes: Vec<u8> = Vec::new();
let mut sw = StreamWriter::try_new(&mut sub_bytes, schema)
.map_err(|e| anyhow!("sub-index writer: {e}"))?;
let mut row_count = 0u64;
for batch in batches {
row_count += batch.num_rows() as u64;
sw.write(batch).map_err(|e| anyhow!("sub-index write: {e}"))?;
}
sw.finish().map_err(|e| anyhow!("sub-index finish: {e}"))?;
if key.module_name != LOOKUP_MODULE && key.module_name != TRIE_MODULE {
for batch in batches {
self.accumulate_lookup(batch);
}
}
let sub_len = sub_bytes.len() as u64;
self.file.write_all_at(&sub_bytes, sub_start)?;
self.cursor += sub_len;
self.entries.push(ManifestEntry {
pkg_type: key.pkg_type,
repo: key.repo,
module_name: key.module_name,
index_offset: sub_start,
index_len: sub_len,
row_count,
});
Ok(())
}
fn finish(mut self: Box<Self>) -> Result<u64> {
self.emit_carried()?;
self.write_lookup_and_trie()?;
let manifest_offset = self.cursor;
let manifest_bytes =
write_manifest_bytes(&self.entries).map_err(|e| anyhow!("manifest: {e}"))?;
self.file.write_all_at(&manifest_bytes, manifest_offset)?;
let after = manifest_offset + manifest_bytes.len() as u64;
self.file.write_all_at(&MULTI_INDEX_MAGIC, after)?;
self.file.write_all_at(
&manifest_offset.to_le_bytes(),
after + MULTI_INDEX_MAGIC.len() as u64,
)?;
let final_len = after + MULTI_INDEX_MAGIC.len() as u64 + 8;
self.file.set_len(final_len)?;
self.file.sync_all()?;
Ok(final_len)
}
}
#[derive(Debug, Clone)]
pub struct AppendReport {
pub rows_before: u64,
pub rows_added: u64,
pub blob_append_offset: u64,
pub blob_bytes_added: u64,
pub sealed_total_bytes: u64,
}
pub fn append_files(
archive: &Path,
new_files: &[(String, Vec<u8>)],
compression_level: i32,
) -> Result<AppendReport> {
let sink = ArrowIpcSinkAppend::open_existing(archive)?;
let rows_before = sink.recovered_rows() as u64;
let blob_append_offset = sink.blob_end();
write_files_into_sink(sink, new_files, compression_level, rows_before, blob_append_offset)
}
pub fn create_archive(
archive: &Path,
files: &[(String, Vec<u8>)],
compression_level: i32,
) -> Result<AppendReport> {
let blob_file = Arc::new(
File::create(archive)
.map_err(|e| anyhow!("create archive {}: {e}", archive.display()))?,
);
let sink = ArrowIpcSinkAppend::new(blob_file, 0);
write_files_into_sink(sink, files, compression_level, 0, 0)
}
fn write_files_into_sink(
mut sink: ArrowIpcSinkAppend,
new_files: &[(String, Vec<u8>)],
compression_level: i32,
rows_before: u64,
blob_append_offset: u64,
) -> Result<AppendReport> {
use crate::codec::CompressCtx;
use crate::meta::{BlobMeta, ChunkMeta};
let blob_file = sink.file.clone();
let mut ctx = CompressCtx::new(compression_level)?;
let mut blobs: Vec<BlobMeta> = Vec::with_capacity(new_files.len());
let mut paths: Vec<String> = Vec::with_capacity(new_files.len());
let mut cursor = blob_append_offset;
let mut blob_bytes_added = 0u64;
for (file_index, (rel, bytes)) in new_files.iter().enumerate() {
let checksum = *blake3::hash(bytes).as_bytes();
let frame = ctx.compress(bytes)?;
let (on_disk, compressed): (&[u8], bool) = if frame.len() < bytes.len() {
(&frame, true)
} else {
(bytes, false)
};
let blob_offset = cursor;
blob_file.write_all_at(on_disk, blob_offset)?;
cursor += on_disk.len() as u64;
blob_bytes_added += on_disk.len() as u64;
paths.push(rel.clone());
blobs.push(BlobMeta {
blob_offset,
blob_size: on_disk.len() as u64,
chunk_meta: ChunkMeta {
fdata_offset: 0,
file_index: file_index as u64,
chunk_seq: 0,
checksum,
compressed,
uncompressed_size: bytes.len() as u64,
compressed_size: on_disk.len() as u64,
},
});
}
blob_file.sync_all()?;
sink.cursor = cursor;
let resolver = {
let paths = paths.clone();
move |fi: u64| paths[fi as usize].clone()
};
let batch = crate::build_metadata_batch(&blobs, resolver, &[], &[])
.map_err(|e| anyhow!("append: build_metadata_batch: {e}"))?;
let schema = lookup_schema();
let rows_added = batch.num_rows() as u64;
sink.push_subindex(
schema.as_ref(),
&[batch],
GroupKey { pkg_type: 0, repo: String::new(), module_name: String::new() },
)?;
let sealed_total_bytes = Box::new(sink).finish()?;
Ok(AppendReport {
rows_before,
rows_added,
blob_append_offset,
blob_bytes_added,
sealed_total_bytes,
})
}
fn recover_rows(
path: &Path,
entries: &[ManifestEntry],
) -> Result<(Vec<String>, Vec<ChunkLoc>)> {
use std::io::{Read, Seek, SeekFrom};
let mut file = File::open(path)?;
if let Some(lk) = entries.iter().find(|e| e.module_name == LOOKUP_MODULE) {
file.seek(SeekFrom::Start(lk.index_offset))?;
let mut bytes = vec![0u8; lk.index_len as usize];
file.read_exact(&mut bytes)?;
return decode_base_rows(&bytes);
}
let mut paths = Vec::new();
let mut locs = Vec::new();
for e in entries {
if is_reserved_module(&e.module_name) {
continue;
}
file.seek(SeekFrom::Start(e.index_offset))?;
let mut bytes = vec![0u8; e.index_len as usize];
file.read_exact(&mut bytes)?;
let (mut p, mut l) = decode_base_rows(&bytes)?;
paths.append(&mut p);
locs.append(&mut l);
}
Ok((paths, locs))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::CompressCtx;
use crate::meta::{BlobMeta, ChunkMeta};
use crate::{ArrowIpcSink, ZnippyArchive, ZnippyReader};
use std::time::{SystemTime, UNIX_EPOCH};
fn unique_dir(tag: &str) -> std::path::PathBuf {
let ns = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let d = std::env::temp_dir().join(format!("znippy_append_{tag}_{ns}_{:?}", std::thread::current().id()));
std::fs::create_dir_all(&d).unwrap();
d
}
fn synth(n: usize, salt: u64) -> Vec<(String, Vec<u8>)> {
(0..n)
.map(|i| {
let g = (i.wrapping_mul(2_654_435_761) ^ salt as usize) % 1000;
let p = format!("repo/grp{g:03}/file{:08}_{salt}.bin", i);
let body = format!("payload {i} salt {salt} {}\n", "z".repeat(8 + (i % 40)));
(p, body.into_bytes())
})
.collect()
}
fn write_fresh<S: ArchiveMetaSink + 'static>(
path: &Path,
files: &[(String, Vec<u8>)],
make_sink: impl FnOnce(Arc<File>, u64) -> S,
) -> u64 {
let file = Arc::new(File::create(path).unwrap());
let mut ctx = CompressCtx::new(3).unwrap();
let mut blobs = Vec::new();
let mut paths = Vec::new();
let mut cursor = 0u64;
for (fi, (rel, bytes)) in files.iter().enumerate() {
let checksum = *blake3::hash(bytes).as_bytes();
let frame = ctx.compress(bytes).unwrap();
let (on_disk, compressed): (&[u8], bool) =
if frame.len() < bytes.len() { (&frame, true) } else { (bytes, false) };
file.write_all_at(on_disk, cursor).unwrap();
let blob_offset = cursor;
cursor += on_disk.len() as u64;
paths.push(rel.clone());
blobs.push(BlobMeta {
blob_offset,
blob_size: on_disk.len() as u64,
chunk_meta: ChunkMeta {
fdata_offset: 0,
file_index: fi as u64,
chunk_seq: 0,
checksum,
compressed,
uncompressed_size: bytes.len() as u64,
compressed_size: on_disk.len() as u64,
},
});
}
let resolver = { let p = paths.clone(); move |fi: u64| p[fi as usize].clone() };
let batch = crate::build_metadata_batch(&blobs, resolver, &[], &[]).unwrap();
let schema = lookup_schema();
let mut sink = make_sink(file.clone(), cursor);
sink.push_subindex(
schema.as_ref(),
&[batch],
GroupKey { pkg_type: 0, repo: String::new(), module_name: String::new() },
)
.unwrap();
Box::new(sink).finish().unwrap()
}
#[test]
fn clone_fresh_path_is_byte_identical_to_original() {
let dir = unique_dir("parity");
let files = synth(2_000, 1);
let a = dir.join("a.znippy");
let b = dir.join("b.znippy");
let len_a = write_fresh(&a, &files, ArrowIpcSink::new);
let len_b = write_fresh(&b, &files, ArrowIpcSinkAppend::new);
assert_eq!(len_a, len_b, "clone seal produced a different total length");
let bytes_a = std::fs::read(&a).unwrap();
let bytes_b = std::fs::read(&b).unwrap();
assert_eq!(
bytes_a, bytes_b,
"clone's fresh write path is NOT byte-identical to ArrowIpcSink — parity broken"
);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn native_append_roundtrips_old_and_new_files() {
let dir = unique_dir("resume");
let archive = dir.join("store.znippy");
let orig = synth(1_500, 7);
write_fresh(&archive, &orig, ArrowIpcSink::new);
let added = synth(300, 99);
let report = append_files(&archive, &added, 3).unwrap();
assert_eq!(report.rows_before, orig.len() as u64, "must recover all original rows");
assert_eq!(report.rows_added, added.len() as u64);
assert!(report.blob_bytes_added > 0, "append must write new blob bytes");
assert!(
report.sealed_total_bytes > report.blob_append_offset,
"re-sealed file must be larger than the old blob region"
);
let ar = ZnippyArchive::open(&archive).unwrap();
let mut listed = ar.list_files().unwrap();
listed.sort();
let mut expected: Vec<String> =
orig.iter().chain(added.iter()).map(|(p, _)| p.clone()).collect();
expected.sort();
assert_eq!(listed, expected, "index must list exactly old+new files after append");
for (p, bytes) in orig.iter().chain(added.iter()) {
let got = ar.extract_file(p).unwrap();
assert_eq!(&got, bytes, "byte mismatch after append for {p}");
}
let probe = &added[123].0;
let chunks = crate::locate_file(&archive, probe).unwrap();
assert!(!chunks.is_empty(), "appended file must be locatable via the re-sealed lookup");
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn create_archive_seeds_then_grows() {
let dir = unique_dir("create");
let seed = synth(40, 5);
let made = dir.join("made.znippy");
let report = create_archive(&made, &seed, 3).unwrap();
assert_eq!(report.rows_before, 0, "fresh archive has no prior rows");
assert_eq!(report.rows_added, seed.len() as u64);
let ref_path = dir.join("ref.znippy");
write_fresh(&ref_path, &seed, ArrowIpcSinkAppend::new);
assert_eq!(
std::fs::read(&made).unwrap(),
std::fs::read(&ref_path).unwrap(),
"create_archive must be byte-identical to the proven fresh write path"
);
let ar = ZnippyArchive::open(&made).unwrap();
for (p, bytes) in &seed {
assert_eq!(&ar.extract_file(p).unwrap(), bytes, "seed byte mismatch for {p}");
}
let added = synth(15, 88);
let rep2 = append_files(&made, &added, 3).unwrap();
assert_eq!(rep2.rows_before, seed.len() as u64, "append must recover seeded rows");
assert_eq!(rep2.rows_added, added.len() as u64);
let ar2 = ZnippyArchive::open(&made).unwrap();
for (p, bytes) in seed.iter().chain(added.iter()) {
assert_eq!(&ar2.extract_file(p).unwrap(), bytes, "byte mismatch after grow for {p}");
}
let _ = std::fs::remove_dir_all(&dir);
}
}
fn decode_base_rows(bytes: &[u8]) -> Result<(Vec<String>, Vec<ChunkLoc>)> {
use arrow::ipc::reader::StreamReader;
let reader = StreamReader::try_new(std::io::Cursor::new(bytes.to_vec()), None)
.map_err(|e| anyhow!("append: lookup ipc reader: {e}"))?;
let mut paths = Vec::new();
let mut locs = Vec::new();
for batch in reader {
let batch = batch.map_err(|e| anyhow!("append: lookup batch decode: {e}"))?;
let get = |n: &str| batch.column_by_name(n)
.ok_or_else(|| anyhow!("append: lookup missing column {n}"));
let p = get("relative_path")?.as_any().downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("relative_path type"))?;
let seq = get("chunk_seq")?.as_any().downcast_ref::<UInt32Array>()
.ok_or_else(|| anyhow!("chunk_seq type"))?;
let fdata = get("fdata_offset")?.as_any().downcast_ref::<UInt64Array>()
.ok_or_else(|| anyhow!("fdata_offset type"))?;
let comp = get("compressed")?.as_any().downcast_ref::<BooleanArray>()
.ok_or_else(|| anyhow!("compressed type"))?;
let usz = get("uncompressed_size")?.as_any().downcast_ref::<UInt64Array>()
.ok_or_else(|| anyhow!("uncompressed_size type"))?;
let boff = get("blob_offset")?.as_any().downcast_ref::<UInt64Array>()
.ok_or_else(|| anyhow!("blob_offset type"))?;
let bsz = get("blob_size")?.as_any().downcast_ref::<UInt64Array>()
.ok_or_else(|| anyhow!("blob_size type"))?;
let ck = get("checksum")?.as_any().downcast_ref::<FixedSizeBinaryArray>()
.ok_or_else(|| anyhow!("checksum type"))?;
for i in 0..batch.num_rows() {
let mut c = [0u8; 32];
c.copy_from_slice(ck.value(i));
paths.push(p.value(i).to_string());
locs.push(ChunkLoc {
chunk_seq: seq.value(i),
fdata_offset: fdata.value(i),
blob_offset: boff.value(i),
blob_size: bsz.value(i),
uncompressed_size: usz.value(i),
compressed: comp.value(i),
checksum: c,
});
}
}
Ok((paths, locs))
}