use std::{
fs::{File, OpenOptions},
io::{BufWriter, Read, Seek, SeekFrom, Write},
path::PathBuf,
};
use super::{ObjectType, PackObjectId, PackStats, pack_container_spec, write_container_header};
#[cfg(feature = "zstd")]
const CSIZE_PLACEHOLDER_LEN: usize = 10;
use crate::{
object::ContentHash,
store::{Result, StoreError, compression::CompressionConfig},
};
const BUCKETS_PER_VARIANT: usize = 256;
const TOTAL_BUCKETS: usize = BUCKETS_PER_VARIANT * 2;
const MAX_OPEN_BUCKET_WRITERS: usize = 32;
const HASH_VARIANT: usize = 0;
const CHANGEID_VARIANT: usize = 1;
pub struct StreamingPackBuilder<W: Write + Read + Seek> {
pack_writer: Option<BufWriter<W>>,
header_offset: u64,
object_count: u64,
total_uncompressed: u64,
total_compressed: u64,
#[cfg_attr(not(feature = "zstd"), allow(dead_code))]
compression: CompressionConfig,
bucket_dir: PathBuf,
bucket_writers: Vec<Option<BucketWriter>>,
open_bucket_writers: usize,
bucket_access_tick: u64,
bucket_paths: Vec<PathBuf>,
index_path: PathBuf,
finalized: bool,
}
struct BucketWriter {
writer: BufWriter<File>,
last_used: u64,
}
impl<W: Write + Read + Seek> StreamingPackBuilder<W> {
pub fn new(
mut pack_writer: W,
index_path: PathBuf,
compression: CompressionConfig,
bucket_dir: PathBuf,
) -> Result<Self> {
std::fs::create_dir_all(&bucket_dir).map_err(StoreError::from)?;
let header_offset = pack_writer.stream_position().map_err(StoreError::from)?;
let mut header_bytes = Vec::with_capacity(16);
write_container_header(&mut header_bytes, pack_container_spec(), 0);
pack_writer
.write_all(&header_bytes)
.map_err(StoreError::from)?;
let bucket_paths: Vec<PathBuf> = (0..TOTAL_BUCKETS)
.map(|i| {
let variant = if i < BUCKETS_PER_VARIANT { 'h' } else { 'c' };
let prefix = i % BUCKETS_PER_VARIANT;
bucket_dir.join(format!("bucket-{variant}-{prefix:02x}"))
})
.collect();
for path in &bucket_paths {
let _ = std::fs::remove_file(path);
}
Ok(Self {
pack_writer: Some(BufWriter::new(pack_writer)),
header_offset,
object_count: 0,
total_uncompressed: 0,
total_compressed: 0,
compression,
bucket_dir,
bucket_writers: (0..TOTAL_BUCKETS).map(|_| None).collect(),
open_bucket_writers: 0,
bucket_access_tick: 0,
bucket_paths,
index_path,
finalized: false,
})
}
pub fn add(&mut self, hash: ContentHash, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
self.add_id(PackObjectId::Hash(hash), obj_type, data)
}
pub fn add_id(&mut self, id: PackObjectId, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
let pw = self
.pack_writer
.as_mut()
.expect("add_id called after finalize");
pw.flush().map_err(StoreError::from)?;
let entry_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
let offset = entry_start
.checked_sub(self.header_offset)
.expect("header_offset should never be past current position");
self.total_uncompressed += data.len() as u64;
let mut header_buf = Vec::with_capacity(40);
id.encode_tagged(&mut header_buf);
super::varint::encode_type_and_size(obj_type, data.len() as u64, &mut header_buf);
pw.write_all(&header_buf).map_err(StoreError::from)?;
#[cfg(feature = "zstd")]
let csize_pos = entry_start + header_buf.len() as u64;
let want_compress: bool;
#[cfg(feature = "zstd")]
{
want_compress = self.compression.enabled && data.len() >= self.compression.min_size;
}
#[cfg(not(feature = "zstd"))]
{
want_compress = false;
}
if !want_compress {
let mut csize_buf = Vec::with_capacity(10);
super::varint::encode_varint(data.len() as u64, &mut csize_buf);
pw.write_all(&csize_buf).map_err(StoreError::from)?;
pw.write_all(&data).map_err(StoreError::from)?;
self.total_compressed += data.len() as u64;
} else {
#[cfg(feature = "zstd")]
{
pw.write_all(&[0u8; CSIZE_PLACEHOLDER_LEN])
.map_err(StoreError::from)?;
pw.flush().map_err(StoreError::from)?;
let body_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
{
let mut enc =
zstd::stream::write::Encoder::new(&mut *pw, self.compression.level)
.map_err(StoreError::from)?;
enc.set_pledged_src_size(Some(data.len() as u64))
.map_err(StoreError::from)?;
enc.write_all(&data).map_err(StoreError::from)?;
enc.finish().map_err(StoreError::from)?;
}
pw.flush().map_err(StoreError::from)?;
let body_end = pw.get_mut().stream_position().map_err(StoreError::from)?;
let compressed_size = body_end - body_start;
self.total_compressed += compressed_size;
let mut csize_bytes = [0u8; CSIZE_PLACEHOLDER_LEN];
encode_varint_padded_to_10(compressed_size, &mut csize_bytes);
let inner = pw.get_mut();
inner
.seek(SeekFrom::Start(csize_pos))
.map_err(StoreError::from)?;
inner.write_all(&csize_bytes).map_err(StoreError::from)?;
inner
.seek(SeekFrom::Start(body_end))
.map_err(StoreError::from)?;
}
#[cfg(not(feature = "zstd"))]
{
unreachable!("compression branch reached without `zstd` feature");
}
}
let bucket_idx = bucket_index_for(&id);
let bucket = self.get_or_open_bucket(bucket_idx)?;
let mut idx_entry = Vec::with_capacity(33 + 8);
id.encode_tagged(&mut idx_entry);
idx_entry.extend_from_slice(&offset.to_be_bytes());
bucket.write_all(&idx_entry).map_err(StoreError::from)?;
self.object_count += 1;
Ok(())
}
fn get_or_open_bucket(&mut self, idx: usize) -> Result<&mut BufWriter<File>> {
self.bucket_access_tick = self.bucket_access_tick.wrapping_add(1);
let last_used = self.bucket_access_tick;
if self.bucket_writers[idx].is_none() {
if self.open_bucket_writers >= MAX_OPEN_BUCKET_WRITERS {
self.evict_lru_bucket()?;
}
let path = &self.bucket_paths[idx];
let f = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(StoreError::from)?;
self.bucket_writers[idx] = Some(BucketWriter {
writer: BufWriter::new(f),
last_used,
});
self.open_bucket_writers += 1;
} else if let Some(bucket) = self.bucket_writers[idx].as_mut() {
bucket.last_used = last_used;
}
Ok(&mut self.bucket_writers[idx]
.as_mut()
.expect("just inserted above")
.writer)
}
fn evict_lru_bucket(&mut self) -> Result<()> {
let Some((idx, _)) = self
.bucket_writers
.iter()
.enumerate()
.filter_map(|(idx, bucket)| bucket.as_ref().map(|bucket| (idx, bucket.last_used)))
.min_by_key(|(_, last_used)| *last_used)
else {
return Ok(());
};
if let Some(mut bucket) = self.bucket_writers[idx].take() {
bucket.writer.flush().map_err(StoreError::from)?;
self.open_bucket_writers -= 1;
}
Ok(())
}
pub fn finalize(mut self) -> Result<(W, PackStats)> {
for bucket in self.bucket_writers.iter_mut().flatten() {
bucket.writer.flush().map_err(StoreError::from)?;
}
for slot in self.bucket_writers.iter_mut() {
*slot = None;
}
self.open_bucket_writers = 0;
let bw = self
.pack_writer
.take()
.expect("finalize called twice — pack_writer already consumed");
let mut writer = bw
.into_inner()
.map_err(|e| StoreError::from(std::io::Error::other(e.to_string())))?;
writer
.seek(SeekFrom::Start(self.header_offset))
.map_err(StoreError::from)?;
let mut header_bytes = Vec::with_capacity(16);
write_container_header(&mut header_bytes, pack_container_spec(), self.object_count);
writer.write_all(&header_bytes).map_err(StoreError::from)?;
writer
.seek(SeekFrom::Start(self.header_offset))
.map_err(StoreError::from)?;
let mut hasher = blake3::Hasher::new();
let mut buf = vec![0u8; 64 * 1024];
loop {
let n = writer.read(&mut buf).map_err(StoreError::from)?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
}
let checksum = hasher.finalize();
writer.seek(SeekFrom::End(0)).map_err(StoreError::from)?;
writer
.write_all(checksum.as_bytes())
.map_err(StoreError::from)?;
writer.flush().map_err(StoreError::from)?;
let idx_file = File::create(&self.index_path).map_err(StoreError::from)?;
let mut idx_writer = BufWriter::new(idx_file);
write_index_header(&mut idx_writer, self.object_count)?;
let mut entries_written: u64 = 0;
for path in self.bucket_paths.iter() {
if !path.exists() {
continue;
}
let bucket_bytes = std::fs::read(path).map_err(StoreError::from)?;
let mut entries = decode_bucket_file(&bucket_bytes)?;
entries.sort_by_key(|(id, _)| *id);
for (id, offset) in entries {
write_index_entry(&mut idx_writer, id, offset)?;
entries_written += 1;
}
}
idx_writer.flush().map_err(StoreError::from)?;
debug_assert_eq!(
entries_written, self.object_count,
"streaming index entry count drifted from add() count"
);
for path in self.bucket_paths.iter() {
let _ = std::fs::remove_file(path);
}
let _ = std::fs::remove_dir(&self.bucket_dir);
self.finalized = true;
let stats = PackStats {
object_count: self.object_count,
total_uncompressed: self.total_uncompressed,
total_compressed: self.total_compressed,
delta_count: 0,
compression_ratio: if self.total_uncompressed == 0 {
0.0
} else {
self.total_compressed as f64 / self.total_uncompressed as f64
},
};
Ok((writer, stats))
}
}
fn write_index_header<W: Write>(out: &mut W, count: u64) -> Result<()> {
super::pack_index::index_header().write_to(out, count)
}
fn write_index_entry<W: Write>(out: &mut W, id: PackObjectId, offset: u64) -> Result<()> {
let mut buf = Vec::with_capacity(33 + 8);
id.encode_tagged(&mut buf);
buf.extend_from_slice(&offset.to_be_bytes());
out.write_all(&buf).map_err(StoreError::from)
}
#[cfg(feature = "zstd")]
fn encode_varint_padded_to_10(value: u64, out: &mut [u8; 10]) {
let mut v = value;
for slot in out.iter_mut().take(9) {
*slot = 0x80 | ((v & 0x7F) as u8);
v >>= 7;
}
out[9] = (v & 0x7F) as u8;
}
impl<W: Write + Read + Seek> Drop for StreamingPackBuilder<W> {
fn drop(&mut self) {
if self.finalized {
return;
}
for path in self.bucket_paths.iter() {
let _ = std::fs::remove_file(path);
}
let _ = std::fs::remove_dir(&self.bucket_dir);
}
}
fn bucket_index_for(id: &PackObjectId) -> usize {
match id {
PackObjectId::Hash(h) => HASH_VARIANT * BUCKETS_PER_VARIANT + h.as_bytes()[0] as usize,
PackObjectId::ChangeId(c) => {
CHANGEID_VARIANT * BUCKETS_PER_VARIANT + c.as_bytes()[0] as usize
}
}
}
fn decode_bucket_file(bytes: &[u8]) -> Result<Vec<(PackObjectId, u64)>> {
let mut out = Vec::new();
let mut pos = 0;
while pos < bytes.len() {
let (id, id_len) = PackObjectId::decode_tagged(&bytes[pos..])?;
pos += id_len;
if pos + 8 > bytes.len() {
return Err(StoreError::InvalidObject(
"streaming bucket entry truncated at offset".to_string(),
));
}
let offset = u64::from_be_bytes(bytes[pos..pos + 8].try_into().map_err(|_| {
StoreError::InvalidObject("streaming bucket bad offset slice".to_string())
})?);
pos += 8;
out.push((id, offset));
}
Ok(out)
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
use crate::{
object::ChangeId,
store::pack::{PackReader, PackStats},
};
fn deterministic_hash(seed: u8) -> ContentHash {
let mut bytes = [0u8; 32];
bytes[0] = seed;
for (i, b) in bytes.iter_mut().enumerate().skip(1) {
*b = seed.wrapping_mul(31).wrapping_add(i as u8);
}
ContentHash::from_bytes(bytes)
}
fn deterministic_change_id(seed: u8) -> ChangeId {
let mut bytes = [0u8; 16];
bytes[0] = seed;
for (i, b) in bytes.iter_mut().enumerate().skip(1) {
*b = seed.wrapping_add(i as u8 * 7);
}
ChangeId::from_bytes(bytes)
}
fn fresh_builder(
tmp: &tempfile::TempDir,
) -> (StreamingPackBuilder<Cursor<Vec<u8>>>, PathBuf, PathBuf) {
let bucket_dir = tmp.path().join("buckets");
let index_path = tmp.path().join("test.idx");
let cursor = Cursor::new(Vec::<u8>::new());
let b = StreamingPackBuilder::new(
cursor,
index_path.clone(),
CompressionConfig::default(),
bucket_dir.clone(),
)
.unwrap();
(b, bucket_dir, index_path)
}
fn finalize_cursor(
b: StreamingPackBuilder<Cursor<Vec<u8>>>,
index_path: &std::path::Path,
) -> (Vec<u8>, Vec<u8>, PackStats) {
let (cursor, stats) = b.finalize().unwrap();
let index_bytes = std::fs::read(index_path).unwrap();
(cursor.into_inner(), index_bytes, stats)
}
#[test]
fn empty_pack_finalizes_to_valid_zero_count_pack() {
let tmp = tempfile::TempDir::new().unwrap();
let (b, bucket_dir, idx_path) = fresh_builder(&tmp);
let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
assert_eq!(stats.object_count, 0);
let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
assert!(reader.list_ids().is_empty());
assert!(
!bucket_dir.exists(),
"bucket dir should be cleaned on successful finalize"
);
}
#[test]
fn single_blob_with_hash_id_round_trips() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _, idx_path) = fresh_builder(&tmp);
let hash = deterministic_hash(0x42);
let payload = b"hello, streaming pack".to_vec();
b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
assert_eq!(stats.object_count, 1);
let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
let id = PackObjectId::Hash(hash);
assert!(reader.has_object(&id));
let (got_type, got_data) = reader.get_object(&id).unwrap().unwrap();
assert_eq!(got_type, ObjectType::Blob);
assert_eq!(got_data, payload);
}
#[test]
fn single_state_with_change_id_round_trips() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _, idx_path) = fresh_builder(&tmp);
let cid = deterministic_change_id(0xa5);
let payload = b"serialized-state-bytes".to_vec();
b.add_id(
PackObjectId::ChangeId(cid),
ObjectType::State,
payload.clone(),
)
.unwrap();
let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
assert_eq!(stats.object_count, 1);
let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
let id = PackObjectId::ChangeId(cid);
let (ty, data) = reader.get_object(&id).unwrap().unwrap();
assert_eq!(ty, ObjectType::State);
assert_eq!(data, payload);
}
#[test]
fn mixed_hash_and_changeid_ids_all_retrievable() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _, idx_path) = fresh_builder(&tmp);
let blob_hash = deterministic_hash(0x10);
let tree_hash = deterministic_hash(0x20);
let state_cid = deterministic_change_id(0x80);
b.add(blob_hash, ObjectType::Blob, b"blob-bytes".to_vec())
.unwrap();
b.add(tree_hash, ObjectType::Tree, b"serialized-tree".to_vec())
.unwrap();
b.add_id(
PackObjectId::ChangeId(state_cid),
ObjectType::State,
b"serialized-state".to_vec(),
)
.unwrap();
let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
assert_eq!(stats.object_count, 3);
let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
assert_eq!(
reader
.get_object(&PackObjectId::Hash(blob_hash))
.unwrap()
.unwrap()
.1,
b"blob-bytes".to_vec()
);
assert_eq!(
reader
.get_object(&PackObjectId::Hash(tree_hash))
.unwrap()
.unwrap()
.1,
b"serialized-tree".to_vec()
);
assert_eq!(
reader
.get_object(&PackObjectId::ChangeId(state_cid))
.unwrap()
.unwrap()
.1,
b"serialized-state".to_vec()
);
}
#[test]
fn ten_thousand_objects_round_trip_correctly() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _, idx_path) = fresh_builder(&tmp);
let mut hashes = Vec::with_capacity(10_000);
for i in 0..10_000u32 {
let h = blake3::hash(&i.to_le_bytes());
let hash = ContentHash::from_bytes(*h.as_bytes());
hashes.push(hash);
b.add(hash, ObjectType::Blob, format!("payload-{i}").into_bytes())
.unwrap();
}
let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
assert_eq!(stats.object_count, 10_000);
let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
assert_eq!(reader.list_ids().len(), 10_000);
for i in [0, 1, 99, 1234, 5_000, 9_999] {
let id = PackObjectId::Hash(hashes[i]);
let (_ty, data) = reader.get_object(&id).unwrap().unwrap();
assert_eq!(data, format!("payload-{i}").into_bytes());
}
}
#[test]
fn bucket_writers_are_lru_capped_below_fd_limit() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _bucket_dir, idx_path) = fresh_builder(&tmp);
let mut ids = Vec::new();
for i in 0..BUCKETS_PER_VARIANT {
let hash = deterministic_hash(i as u8);
ids.push(PackObjectId::Hash(hash));
b.add(hash, ObjectType::Blob, format!("hash-{i}").into_bytes())
.unwrap();
assert!(
b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
"open bucket writers should stay capped"
);
}
for i in 0..BUCKETS_PER_VARIANT {
let cid = deterministic_change_id(i as u8);
ids.push(PackObjectId::ChangeId(cid));
b.add_id(
PackObjectId::ChangeId(cid),
ObjectType::State,
format!("state-{i}").into_bytes(),
)
.unwrap();
assert!(
b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
"open bucket writers should stay capped"
);
}
let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
assert_eq!(stats.object_count, TOTAL_BUCKETS as u64);
let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
for id in ids {
assert!(reader.has_object(&id), "missing id {id:?}");
}
}
#[test]
fn index_id_sort_order_matches_packbuilder_output() {
use crate::store::pack::PackBuilder;
let payloads: Vec<(PackObjectId, ObjectType, Vec<u8>)> = (0..200u32)
.map(|i| {
let h = blake3::hash(&i.to_le_bytes());
(
PackObjectId::Hash(ContentHash::from_bytes(*h.as_bytes())),
if i % 3 == 0 {
ObjectType::Tree
} else {
ObjectType::Blob
},
format!("body-{i}").into_bytes(),
)
})
.collect();
let compression = CompressionConfig {
max_delta_size: 0,
..CompressionConfig::default()
};
let mut classic = PackBuilder::new(compression);
for (id, ty, data) in payloads.iter() {
classic.add_id(*id, *ty, data.clone());
}
let (classic_pack, classic_index, _) = classic.build().unwrap();
let classic_reader = PackReader::from_bytes(classic_pack, classic_index).unwrap();
let tmp = tempfile::TempDir::new().unwrap();
let bucket_dir = tmp.path().join("buckets");
let idx_path = tmp.path().join("test.idx");
let cursor = Cursor::new(Vec::<u8>::new());
let mut streaming =
StreamingPackBuilder::new(cursor, idx_path.clone(), compression, bucket_dir).unwrap();
for (id, ty, data) in payloads.iter() {
streaming.add_id(*id, *ty, data.clone()).unwrap();
}
let (streaming_pack, streaming_index, _) = finalize_cursor(streaming, &idx_path);
let streaming_reader = PackReader::from_bytes(streaming_pack, streaming_index).unwrap();
assert_eq!(
streaming_reader.list_ids(),
classic_reader.list_ids(),
"streaming and classic indices should report the same id sequence"
);
for (id, _ty, want) in payloads.iter().take(10).chain(payloads.iter().skip(190)) {
let (_, got) = streaming_reader.get_object(id).unwrap().unwrap();
assert_eq!(&got, want);
let (_, classic_got) = classic_reader.get_object(id).unwrap().unwrap();
assert_eq!(got, classic_got);
}
}
#[test]
fn corrupted_pack_fails_checksum_verification() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _, idx_path) = fresh_builder(&tmp);
b.add(
deterministic_hash(0x01),
ObjectType::Blob,
b"some bytes".to_vec(),
)
.unwrap();
let (mut pack_data, index_data, _) = finalize_cursor(b, &idx_path);
let body_byte = 18; pack_data[body_byte] ^= 0xff;
let result = PackReader::from_bytes(pack_data, index_data);
assert!(
result.is_err(),
"PackReader should reject pack with mutated body"
);
}
#[test]
fn pack_count_in_header_matches_index_entry_count() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _, idx_path) = fresh_builder(&tmp);
for i in 0..7u8 {
b.add(
deterministic_hash(i),
ObjectType::Blob,
format!("p{i}").into_bytes(),
)
.unwrap();
}
let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
let count = u64::from_be_bytes(pack_data[8..16].try_into().unwrap());
assert_eq!(count, 7);
let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
assert_eq!(reader.list_ids().len(), 7);
}
#[test]
fn bucket_files_are_cleaned_on_successful_finalize() {
let tmp = tempfile::TempDir::new().unwrap();
let bucket_dir = tmp.path().join("buckets");
let idx_path = tmp.path().join("test.idx");
let cursor = Cursor::new(Vec::<u8>::new());
let mut b = StreamingPackBuilder::new(
cursor,
idx_path.clone(),
CompressionConfig::default(),
bucket_dir.clone(),
)
.unwrap();
for i in 0..50u8 {
b.add(deterministic_hash(i), ObjectType::Blob, vec![i; 32])
.unwrap();
}
assert!(bucket_dir.exists());
let bucket_count = std::fs::read_dir(&bucket_dir).unwrap().count();
assert!(bucket_count > 0, "bucket dir should hold some files");
let _ = finalize_cursor(b, &idx_path);
assert!(
!bucket_dir.exists(),
"bucket dir should be removed on finalize"
);
}
#[test]
fn bucket_files_are_cleaned_on_drop_without_finalize() {
let tmp = tempfile::TempDir::new().unwrap();
let bucket_dir = tmp.path().join("buckets");
let idx_path = tmp.path().join("test.idx");
{
let cursor = Cursor::new(Vec::<u8>::new());
let mut b = StreamingPackBuilder::new(
cursor,
idx_path.clone(),
CompressionConfig::default(),
bucket_dir.clone(),
)
.unwrap();
for i in 0..10u8 {
b.add(deterministic_hash(i), ObjectType::Blob, vec![0; 32])
.unwrap();
}
assert!(bucket_dir.exists());
}
assert!(
!idx_path.exists(),
"no index file should have been created without finalize"
);
assert!(
!bucket_dir.exists(),
"bucket dir should be removed on Drop when finalize never ran"
);
}
#[test]
fn large_blob_streams_to_disk_without_double_buffering() {
let tmp = tempfile::TempDir::new().unwrap();
let bucket_dir = tmp.path().join("buckets");
let pack_path = tmp.path().join("pack.dat");
let idx_path = tmp.path().join("pack.idx");
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&pack_path)
.unwrap();
let mut b = StreamingPackBuilder::new(
file,
idx_path.clone(),
CompressionConfig::default(),
bucket_dir,
)
.unwrap();
let payload: Vec<u8> = (0..4 * 1024 * 1024u32).map(|i| (i & 0xff) as u8).collect();
let hash = deterministic_hash(0xff);
b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
let (_, stats) = b.finalize().unwrap();
let index_data = std::fs::read(&idx_path).unwrap();
assert_eq!(stats.object_count, 1);
let pack_bytes = std::fs::read(&pack_path).unwrap();
let reader = PackReader::from_bytes(pack_bytes, index_data).unwrap();
let (_ty, got) = reader
.get_object(&PackObjectId::Hash(hash))
.unwrap()
.unwrap();
assert_eq!(got, payload);
}
#[test]
fn bucket_distribution_for_random_hashes_is_roughly_uniform() {
let tmp = tempfile::TempDir::new().unwrap();
let bucket_dir = tmp.path().join("buckets");
let idx_path = tmp.path().join("test.idx");
let cursor = Cursor::new(Vec::<u8>::new());
let mut b = StreamingPackBuilder::new(
cursor,
idx_path.clone(),
CompressionConfig::default(),
bucket_dir.clone(),
)
.unwrap();
for i in 0..1024u32 {
let h = blake3::hash(&i.to_le_bytes());
let hash = ContentHash::from_bytes(*h.as_bytes());
b.add(hash, ObjectType::Blob, b"x".to_vec()).unwrap();
}
b.pack_writer.as_mut().unwrap().flush().unwrap();
let mut max_entries = 0usize;
let entry_size = 33 + 8; for path in b.bucket_paths.iter() {
if path.exists() {
let size = std::fs::metadata(path).unwrap().len() as usize;
let entries = size / entry_size;
if entries > max_entries {
max_entries = entries;
}
}
}
assert!(
max_entries <= 16,
"max bucket has {max_entries} entries; uniform expected ~4"
);
let _ = finalize_cursor(b, &idx_path);
}
#[test]
fn finalize_returns_correct_stats() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _, idx_path) = fresh_builder(&tmp);
let payload = vec![0xabu8; 1024];
for i in 0..5u8 {
b.add(deterministic_hash(i), ObjectType::Blob, payload.clone())
.unwrap();
}
let (_, _, stats) = finalize_cursor(b, &idx_path);
assert_eq!(stats.object_count, 5);
assert_eq!(stats.total_uncompressed, 5 * 1024);
assert!(stats.total_compressed > 0);
assert!(stats.compression_ratio > 0.0);
assert_eq!(stats.delta_count, 0, "streaming builder never deltas");
}
#[cfg(feature = "zstd")]
#[test]
fn streaming_compression_roundtrips_through_zstd_frame() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _, idx_path) = fresh_builder(&tmp);
let payload = vec![0u8; 64 * 1024];
let hash = deterministic_hash(0x77);
b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
assert!(
stats.total_compressed < stats.total_uncompressed,
"expected compression ratio < 1.0, got {}/{}",
stats.total_compressed,
stats.total_uncompressed
);
let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
let (_ty, got) = reader
.get_object(&PackObjectId::Hash(hash))
.unwrap()
.unwrap();
assert_eq!(got, payload);
}
#[cfg(feature = "zstd")]
#[test]
fn padded_varint_decodes_to_original_value_for_canonical_decoder() {
let cases: &[u64] = &[0, 1, 127, 128, 4096, 1_000_000, 1_000_000_000_000, u64::MAX];
for &value in cases {
let mut buf = [0u8; 10];
super::encode_varint_padded_to_10(value, &mut buf);
let (decoded, consumed) = super::super::varint::decode_varint(&buf)
.expect("padded varint should always decode");
assert_eq!(decoded, value, "varint roundtrip failed for {value}");
assert_eq!(
consumed, 10,
"padded encoding should consume all 10 bytes for {value}"
);
}
}
#[cfg(feature = "zstd")]
#[test]
fn streaming_path_does_not_buffer_compressed_payload_in_memory() {
let tmp = tempfile::TempDir::new().unwrap();
let bucket_dir = tmp.path().join("buckets");
let pack_path = tmp.path().join("pack.dat");
let idx_path = tmp.path().join("pack.idx");
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&pack_path)
.unwrap();
let mut b = StreamingPackBuilder::new(
file,
idx_path.clone(),
CompressionConfig::default(),
bucket_dir,
)
.unwrap();
let payload = vec![0xa5u8; 8 * 1024 * 1024];
let hash = deterministic_hash(0x66);
b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
let mid_size = std::fs::metadata(&pack_path).unwrap().len();
assert!(
mid_size > 16 + 40,
"pack file should hold real entry data after add; size={mid_size}"
);
let (_, _) = b.finalize().unwrap();
let pack_bytes = std::fs::read(&pack_path).unwrap();
let index_bytes = std::fs::read(&idx_path).unwrap();
let reader = PackReader::from_bytes(pack_bytes, index_bytes).unwrap();
let (_ty, got) = reader
.get_object(&PackObjectId::Hash(hash))
.unwrap()
.unwrap();
assert_eq!(got, payload);
}
#[test]
fn list_ids_returns_all_added_ids_sorted() {
let tmp = tempfile::TempDir::new().unwrap();
let (mut b, _, idx_path) = fresh_builder(&tmp);
let mut added: Vec<PackObjectId> = Vec::new();
for seed in [0x05u8, 0xa0, 0x12, 0x9f, 0x33] {
let id = PackObjectId::Hash(deterministic_hash(seed));
b.add_id(id, ObjectType::Blob, vec![seed; 4]).unwrap();
added.push(id);
}
for seed in [0x80u8, 0x10, 0xff] {
let id = PackObjectId::ChangeId(deterministic_change_id(seed));
b.add_id(id, ObjectType::State, vec![seed; 4]).unwrap();
added.push(id);
}
let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
let mut got = reader.list_ids();
let mut sorted = got.clone();
sorted.sort();
assert_eq!(got, sorted, "list_ids must come back sorted");
added.sort();
got.sort();
assert_eq!(got, added);
}
}