1use std::{
62 fs::{File, OpenOptions},
63 io::{BufWriter, Read, Seek, SeekFrom, Write},
64 path::PathBuf,
65};
66
67use heddle_format::compression::CompressionConfig;
68
69use super::{ObjectType, PackObjectId, PackStats, pack_container_spec, write_container_header};
70
71#[cfg(feature = "zstd")]
77const CSIZE_PLACEHOLDER_LEN: usize = 10;
78use crate::{
79 object::ContentHash,
80 store::{Result, StoreError},
81};
82
83const BUCKETS_PER_VARIANT: usize = 256;
89const TOTAL_BUCKETS: usize = BUCKETS_PER_VARIANT * 2;
91const MAX_OPEN_BUCKET_WRITERS: usize = 32;
95
96const HASH_VARIANT: usize = 0;
100const CHANGEID_VARIANT: usize = 1;
101
102pub struct StreamingPackBuilder<W: Write + Read + Seek> {
105 pack_writer: Option<BufWriter<W>>,
111 header_offset: u64,
115 pack_position: u64,
118 object_count: u64,
119 declared_object_count: Option<u64>,
120 total_uncompressed: u64,
121 total_compressed: u64,
122 #[cfg_attr(not(feature = "zstd"), allow(dead_code))]
127 compression: CompressionConfig,
128 bucket_dir: PathBuf,
131 bucket_writers: Vec<Option<BucketWriter>>,
135 open_bucket_writers: usize,
136 bucket_access_tick: u64,
137 bucket_paths: Vec<PathBuf>,
138 index_path: PathBuf,
142 finalized: bool,
145}
146
147struct BucketWriter {
148 writer: BufWriter<File>,
149 last_used: u64,
150}
151
152#[cfg(feature = "zstd")]
153struct CountingWriter<'a, W: Write> {
154 inner: &'a mut W,
155 written: u64,
156}
157
158#[cfg(feature = "zstd")]
159impl<'a, W: Write> CountingWriter<'a, W> {
160 fn new(inner: &'a mut W) -> Self {
161 Self { inner, written: 0 }
162 }
163}
164
165#[cfg(feature = "zstd")]
166impl<W: Write> Write for CountingWriter<'_, W> {
167 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
168 let written = self.inner.write(buf)?;
169 self.written = self.written.saturating_add(written as u64);
170 Ok(written)
171 }
172
173 fn flush(&mut self) -> std::io::Result<()> {
174 self.inner.flush()
175 }
176}
177
178impl<W: Write + Read + Seek> StreamingPackBuilder<W> {
179 pub fn new(
195 pack_writer: W,
196 index_path: PathBuf,
197 compression: CompressionConfig,
198 bucket_dir: PathBuf,
199 ) -> Result<Self> {
200 Self::new_inner(pack_writer, index_path, compression, bucket_dir, None)
201 }
202
203 pub fn new_with_object_count(
210 pack_writer: W,
211 index_path: PathBuf,
212 compression: CompressionConfig,
213 bucket_dir: PathBuf,
214 object_count: u64,
215 ) -> Result<Self> {
216 Self::new_inner(
217 pack_writer,
218 index_path,
219 compression,
220 bucket_dir,
221 Some(object_count),
222 )
223 }
224
225 fn new_inner(
226 mut pack_writer: W,
227 index_path: PathBuf,
228 compression: CompressionConfig,
229 bucket_dir: PathBuf,
230 declared_object_count: Option<u64>,
231 ) -> Result<Self> {
232 std::fs::create_dir_all(&bucket_dir).map_err(StoreError::from)?;
233 let header_offset = pack_writer.stream_position().map_err(StoreError::from)?;
234
235 let mut header_bytes = Vec::with_capacity(16);
240 write_container_header(
241 &mut header_bytes,
242 pack_container_spec(),
243 declared_object_count.unwrap_or(0),
244 );
245 pack_writer
246 .write_all(&header_bytes)
247 .map_err(StoreError::from)?;
248
249 let bucket_paths: Vec<PathBuf> = (0..TOTAL_BUCKETS)
250 .map(|i| {
251 let variant = if i < BUCKETS_PER_VARIANT { 'h' } else { 'c' };
252 let prefix = i % BUCKETS_PER_VARIANT;
253 bucket_dir.join(format!("bucket-{variant}-{prefix:02x}"))
254 })
255 .collect();
256 for path in &bucket_paths {
257 let _ = std::fs::remove_file(path);
258 }
259
260 Ok(Self {
261 pack_writer: Some(BufWriter::new(pack_writer)),
262 header_offset,
263 pack_position: header_offset + header_bytes.len() as u64,
264 object_count: 0,
265 declared_object_count,
266 total_uncompressed: 0,
267 total_compressed: 0,
268 compression,
269 bucket_dir,
270 bucket_writers: (0..TOTAL_BUCKETS).map(|_| None).collect(),
271 open_bucket_writers: 0,
272 bucket_access_tick: 0,
273 bucket_paths,
274 index_path,
275 finalized: false,
276 })
277 }
278
279 pub fn flush_pack(&mut self) -> Result<()> {
284 if let Some(writer) = self.pack_writer.as_mut() {
285 writer.flush().map_err(StoreError::from)?;
286 }
287 Ok(())
288 }
289
290 pub fn add(&mut self, hash: ContentHash, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
292 self.add_id(PackObjectId::Hash(hash), obj_type, data)
293 }
294
295 pub fn add_id(&mut self, id: PackObjectId, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
318 let pw = self
323 .pack_writer
324 .as_mut()
325 .expect("add_id called after finalize");
326 let entry_start = self.pack_position;
327 let offset = entry_start
328 .checked_sub(self.header_offset)
329 .expect("header_offset should never be past current position");
330
331 self.total_uncompressed += data.len() as u64;
332
333 let mut header_buf = Vec::with_capacity(40);
336 id.encode_tagged(&mut header_buf);
337 super::varint::encode_type_and_size(obj_type, data.len() as u64, &mut header_buf);
338 pw.write_all(&header_buf).map_err(StoreError::from)?;
339 self.pack_position = self
340 .pack_position
341 .checked_add(header_buf.len() as u64)
342 .ok_or_else(|| {
343 StoreError::InvalidObject("streaming pack position overflow".to_string())
344 })?;
345 #[cfg(feature = "zstd")]
349 let csize_pos = entry_start + header_buf.len() as u64;
350
351 let want_compress: bool;
364 #[cfg(feature = "zstd")]
365 {
366 want_compress = self.compression.enabled && data.len() >= self.compression.min_size;
367 }
368 #[cfg(not(feature = "zstd"))]
369 {
370 want_compress = false;
371 }
372 if !want_compress {
373 let mut csize_buf = Vec::with_capacity(10);
376 super::varint::encode_varint(data.len() as u64, &mut csize_buf);
377 pw.write_all(&csize_buf).map_err(StoreError::from)?;
378 self.pack_position = self
379 .pack_position
380 .checked_add(csize_buf.len() as u64)
381 .ok_or_else(|| {
382 StoreError::InvalidObject("streaming pack position overflow".to_string())
383 })?;
384 pw.write_all(&data).map_err(StoreError::from)?;
385 self.pack_position = self
386 .pack_position
387 .checked_add(data.len() as u64)
388 .ok_or_else(|| {
389 StoreError::InvalidObject("streaming pack position overflow".to_string())
390 })?;
391 self.total_compressed += data.len() as u64;
392 } else {
393 #[cfg(feature = "zstd")]
394 {
395 pw.write_all(&[0u8; CSIZE_PLACEHOLDER_LEN])
398 .map_err(StoreError::from)?;
399 self.pack_position = self
400 .pack_position
401 .checked_add(CSIZE_PLACEHOLDER_LEN as u64)
402 .ok_or_else(|| {
403 StoreError::InvalidObject("streaming pack position overflow".to_string())
404 })?;
405 let body_start = self.pack_position;
406 let compressed_size;
407 {
408 let mut counting = CountingWriter::new(&mut *pw);
409 let mut enc =
410 zstd::stream::write::Encoder::new(&mut counting, self.compression.level)
411 .map_err(StoreError::from)?;
412 enc.set_pledged_src_size(Some(data.len() as u64))
417 .map_err(StoreError::from)?;
418 enc.write_all(&data).map_err(StoreError::from)?;
419 enc.finish().map_err(StoreError::from)?;
420 compressed_size = counting.written;
421 }
422 self.pack_position =
423 self.pack_position
424 .checked_add(compressed_size)
425 .ok_or_else(|| {
426 StoreError::InvalidObject(
427 "streaming pack position overflow".to_string(),
428 )
429 })?;
430 let body_end = body_start.checked_add(compressed_size).ok_or_else(|| {
431 StoreError::InvalidObject("streaming pack position overflow".to_string())
432 })?;
433 self.total_compressed += compressed_size;
434
435 let mut csize_bytes = [0u8; CSIZE_PLACEHOLDER_LEN];
440 encode_varint_padded_to_10(compressed_size, &mut csize_bytes);
441 pw.flush().map_err(StoreError::from)?;
442 let inner = pw.get_mut();
443 inner
444 .seek(SeekFrom::Start(csize_pos))
445 .map_err(StoreError::from)?;
446 inner.write_all(&csize_bytes).map_err(StoreError::from)?;
447 inner
448 .seek(SeekFrom::Start(body_end))
449 .map_err(StoreError::from)?;
450 }
451 #[cfg(not(feature = "zstd"))]
452 {
453 unreachable!("compression branch reached without `zstd` feature");
456 }
457 }
458
459 let bucket_idx = bucket_index_for(&id);
463 let bucket = self.get_or_open_bucket(bucket_idx)?;
464 let mut idx_entry = Vec::with_capacity(33 + 8);
465 id.encode_tagged(&mut idx_entry);
466 idx_entry.extend_from_slice(&offset.to_be_bytes());
467 bucket.write_all(&idx_entry).map_err(StoreError::from)?;
468
469 self.object_count += 1;
470 Ok(())
471 }
472
473 fn get_or_open_bucket(&mut self, idx: usize) -> Result<&mut BufWriter<File>> {
474 self.bucket_access_tick = self.bucket_access_tick.wrapping_add(1);
475 let last_used = self.bucket_access_tick;
476 if self.bucket_writers[idx].is_none() {
477 if self.open_bucket_writers >= MAX_OPEN_BUCKET_WRITERS {
478 self.evict_lru_bucket()?;
479 }
480 let path = &self.bucket_paths[idx];
481 let f = OpenOptions::new()
482 .create(true)
483 .append(true)
484 .open(path)
485 .map_err(StoreError::from)?;
486 self.bucket_writers[idx] = Some(BucketWriter {
487 writer: BufWriter::new(f),
488 last_used,
489 });
490 self.open_bucket_writers += 1;
491 } else if let Some(bucket) = self.bucket_writers[idx].as_mut() {
492 bucket.last_used = last_used;
493 }
494 Ok(&mut self.bucket_writers[idx]
495 .as_mut()
496 .expect("just inserted above")
497 .writer)
498 }
499
500 fn evict_lru_bucket(&mut self) -> Result<()> {
501 let Some((idx, _)) = self
502 .bucket_writers
503 .iter()
504 .enumerate()
505 .filter_map(|(idx, bucket)| bucket.as_ref().map(|bucket| (idx, bucket.last_used)))
506 .min_by_key(|(_, last_used)| *last_used)
507 else {
508 return Ok(());
509 };
510
511 if let Some(mut bucket) = self.bucket_writers[idx].take() {
512 bucket.writer.flush().map_err(StoreError::from)?;
513 self.open_bucket_writers -= 1;
514 }
515 Ok(())
516 }
517
518 pub fn finalize(mut self) -> Result<(W, PackStats)> {
529 for bucket in self.bucket_writers.iter_mut().flatten() {
532 bucket.writer.flush().map_err(StoreError::from)?;
533 }
534 for slot in self.bucket_writers.iter_mut() {
537 *slot = None;
538 }
539 self.open_bucket_writers = 0;
540
541 let bw = self
545 .pack_writer
546 .take()
547 .expect("finalize called twice — pack_writer already consumed");
548 let mut writer = bw
549 .into_inner()
550 .map_err(|e| StoreError::from(std::io::Error::other(e.to_string())))?;
551 if let Some(expected) = self.declared_object_count {
552 if expected != self.object_count {
553 return Err(StoreError::InvalidObject(format!(
554 "streaming pack declared {expected} object(s) but added {}",
555 self.object_count
556 )));
557 }
558 } else {
559 writer
560 .seek(SeekFrom::Start(self.header_offset))
561 .map_err(StoreError::from)?;
562 let mut header_bytes = Vec::with_capacity(16);
563 write_container_header(&mut header_bytes, pack_container_spec(), self.object_count);
564 writer.write_all(&header_bytes).map_err(StoreError::from)?;
565 }
566
567 writer
572 .seek(SeekFrom::Start(self.header_offset))
573 .map_err(StoreError::from)?;
574 let mut hasher = blake3::Hasher::new();
575 let mut buf = vec![0u8; 64 * 1024];
576 loop {
577 let n = writer.read(&mut buf).map_err(StoreError::from)?;
578 if n == 0 {
579 break;
580 }
581 hasher.update(&buf[..n]);
582 }
583 let checksum = hasher.finalize();
584
585 writer.seek(SeekFrom::End(0)).map_err(StoreError::from)?;
587 writer
588 .write_all(checksum.as_bytes())
589 .map_err(StoreError::from)?;
590 writer.flush().map_err(StoreError::from)?;
591
592 let idx_file = File::create(&self.index_path).map_err(StoreError::from)?;
605 let mut idx_writer = BufWriter::new(idx_file);
606 write_index_header(&mut idx_writer, self.object_count)?;
607 let mut entries_written: u64 = 0;
608 for path in self.bucket_paths.iter() {
609 if !path.exists() {
610 continue;
611 }
612 let bucket_bytes = std::fs::read(path).map_err(StoreError::from)?;
613 let mut entries = decode_bucket_file(&bucket_bytes)?;
614 entries.sort_by_key(|(id, _)| *id);
619 for (id, offset) in entries {
620 write_index_entry(&mut idx_writer, id, offset)?;
621 entries_written += 1;
622 }
623 }
624 idx_writer.flush().map_err(StoreError::from)?;
625 debug_assert_eq!(
626 entries_written, self.object_count,
627 "streaming index entry count drifted from add() count"
628 );
629
630 for path in self.bucket_paths.iter() {
635 let _ = std::fs::remove_file(path);
636 }
637 let _ = std::fs::remove_dir(&self.bucket_dir);
638 self.finalized = true;
639
640 let stats = PackStats {
641 object_count: self.object_count,
642 total_uncompressed: self.total_uncompressed,
643 total_compressed: self.total_compressed,
644 delta_count: 0,
645 compression_ratio: if self.total_uncompressed == 0 {
646 0.0
647 } else {
648 self.total_compressed as f64 / self.total_uncompressed as f64
649 },
650 };
651
652 Ok((writer, stats))
653 }
654}
655
656fn write_index_header<W: Write>(out: &mut W, count: u64) -> Result<()> {
661 super::pack_index::index_header().write_to(out, count)
662}
663
664fn write_index_entry<W: Write>(out: &mut W, id: PackObjectId, offset: u64) -> Result<()> {
668 let mut buf = Vec::with_capacity(33 + 8);
669 id.encode_tagged(&mut buf);
670 buf.extend_from_slice(&offset.to_be_bytes());
671 out.write_all(&buf).map_err(StoreError::from)
672}
673
674#[cfg(feature = "zstd")]
687fn encode_varint_padded_to_10(value: u64, out: &mut [u8; 10]) {
688 let mut v = value;
689 for slot in out.iter_mut().take(9) {
690 *slot = 0x80 | ((v & 0x7F) as u8);
691 v >>= 7;
692 }
693 out[9] = (v & 0x7F) as u8;
694}
695
696impl<W: Write + Read + Seek> Drop for StreamingPackBuilder<W> {
697 fn drop(&mut self) {
698 if self.finalized {
699 return;
700 }
701 for path in self.bucket_paths.iter() {
704 let _ = std::fs::remove_file(path);
705 }
706 let _ = std::fs::remove_dir(&self.bucket_dir);
707 }
708}
709
710fn bucket_index_for(id: &PackObjectId) -> usize {
714 match id {
715 PackObjectId::Hash(h) => HASH_VARIANT * BUCKETS_PER_VARIANT + h.as_bytes()[0] as usize,
716 PackObjectId::ChangeId(c) => {
717 CHANGEID_VARIANT * BUCKETS_PER_VARIANT + c.as_bytes()[0] as usize
718 }
719 }
720}
721
722fn decode_bucket_file(bytes: &[u8]) -> Result<Vec<(PackObjectId, u64)>> {
727 let mut out = Vec::new();
728 let mut pos = 0;
729 while pos < bytes.len() {
730 let (id, id_len) = PackObjectId::decode_tagged(&bytes[pos..])?;
731 pos += id_len;
732 if pos + 8 > bytes.len() {
733 return Err(StoreError::InvalidObject(
734 "streaming bucket entry truncated at offset".to_string(),
735 ));
736 }
737 let offset = u64::from_be_bytes(bytes[pos..pos + 8].try_into().map_err(|_| {
738 StoreError::InvalidObject("streaming bucket bad offset slice".to_string())
739 })?);
740 pos += 8;
741 out.push((id, offset));
742 }
743 Ok(out)
744}
745
746#[cfg(test)]
749mod tests {
750 use std::io::Cursor;
751
752 use super::*;
753 use crate::{
754 object::ChangeId,
755 store::pack::{PackReader, PackStats},
756 };
757
758 fn deterministic_hash(seed: u8) -> ContentHash {
759 let mut bytes = [0u8; 32];
763 bytes[0] = seed;
764 for (i, b) in bytes.iter_mut().enumerate().skip(1) {
765 *b = seed.wrapping_mul(31).wrapping_add(i as u8);
766 }
767 ContentHash::from_bytes(bytes)
768 }
769
770 fn deterministic_change_id(seed: u8) -> ChangeId {
771 let mut bytes = [0u8; 16];
772 bytes[0] = seed;
773 for (i, b) in bytes.iter_mut().enumerate().skip(1) {
774 *b = seed.wrapping_add(i as u8 * 7);
775 }
776 ChangeId::from_bytes(bytes)
777 }
778
779 fn fresh_builder(
784 tmp: &tempfile::TempDir,
785 ) -> (StreamingPackBuilder<Cursor<Vec<u8>>>, PathBuf, PathBuf) {
786 let bucket_dir = tmp.path().join("buckets");
787 let index_path = tmp.path().join("test.idx");
788 let cursor = Cursor::new(Vec::<u8>::new());
789 let b = StreamingPackBuilder::new(
790 cursor,
791 index_path.clone(),
792 CompressionConfig::default(),
793 bucket_dir.clone(),
794 )
795 .unwrap();
796 (b, bucket_dir, index_path)
797 }
798
799 fn finalize_cursor(
804 b: StreamingPackBuilder<Cursor<Vec<u8>>>,
805 index_path: &std::path::Path,
806 ) -> (Vec<u8>, Vec<u8>, PackStats) {
807 let (cursor, stats) = b.finalize().unwrap();
808 let index_bytes = std::fs::read(index_path).unwrap();
809 (cursor.into_inner(), index_bytes, stats)
810 }
811
812 #[test]
813 fn empty_pack_finalizes_to_valid_zero_count_pack() {
814 let tmp = tempfile::TempDir::new().unwrap();
815 let (b, bucket_dir, idx_path) = fresh_builder(&tmp);
816 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
817
818 assert_eq!(stats.object_count, 0);
819 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
821 assert!(reader.list_ids().is_empty());
822 assert!(
824 !bucket_dir.exists(),
825 "bucket dir should be cleaned on successful finalize"
826 );
827 }
828
829 #[test]
830 fn single_blob_with_hash_id_round_trips() {
831 let tmp = tempfile::TempDir::new().unwrap();
832 let (mut b, _, idx_path) = fresh_builder(&tmp);
833 let hash = deterministic_hash(0x42);
834 let payload = b"hello, streaming pack".to_vec();
835 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
836 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
837
838 assert_eq!(stats.object_count, 1);
839 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
840 let id = PackObjectId::Hash(hash);
841 assert!(reader.has_object(&id));
842 let (got_type, got_data) = reader.get_object(&id).unwrap().unwrap();
843 assert_eq!(got_type, ObjectType::Blob);
844 assert_eq!(got_data, payload);
845 }
846
847 #[test]
848 fn single_state_with_change_id_round_trips() {
849 let tmp = tempfile::TempDir::new().unwrap();
850 let (mut b, _, idx_path) = fresh_builder(&tmp);
851 let cid = deterministic_change_id(0xa5);
852 let payload = b"serialized-state-bytes".to_vec();
853 b.add_id(
854 PackObjectId::ChangeId(cid),
855 ObjectType::State,
856 payload.clone(),
857 )
858 .unwrap();
859 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
860
861 assert_eq!(stats.object_count, 1);
862 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
863 let id = PackObjectId::ChangeId(cid);
864 let (ty, data) = reader.get_object(&id).unwrap().unwrap();
865 assert_eq!(ty, ObjectType::State);
866 assert_eq!(data, payload);
867 }
868
869 #[test]
870 fn mixed_hash_and_changeid_ids_all_retrievable() {
871 let tmp = tempfile::TempDir::new().unwrap();
872 let (mut b, _, idx_path) = fresh_builder(&tmp);
873 let blob_hash = deterministic_hash(0x10);
874 let tree_hash = deterministic_hash(0x20);
875 let state_cid = deterministic_change_id(0x80);
876
877 b.add(blob_hash, ObjectType::Blob, b"blob-bytes".to_vec())
878 .unwrap();
879 b.add(tree_hash, ObjectType::Tree, b"serialized-tree".to_vec())
880 .unwrap();
881 b.add_id(
882 PackObjectId::ChangeId(state_cid),
883 ObjectType::State,
884 b"serialized-state".to_vec(),
885 )
886 .unwrap();
887
888 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
889 assert_eq!(stats.object_count, 3);
890 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
891 assert_eq!(
892 reader
893 .get_object(&PackObjectId::Hash(blob_hash))
894 .unwrap()
895 .unwrap()
896 .1,
897 b"blob-bytes".to_vec()
898 );
899 assert_eq!(
900 reader
901 .get_object(&PackObjectId::Hash(tree_hash))
902 .unwrap()
903 .unwrap()
904 .1,
905 b"serialized-tree".to_vec()
906 );
907 assert_eq!(
908 reader
909 .get_object(&PackObjectId::ChangeId(state_cid))
910 .unwrap()
911 .unwrap()
912 .1,
913 b"serialized-state".to_vec()
914 );
915 }
916
917 #[test]
918 fn ten_thousand_objects_round_trip_correctly() {
919 let tmp = tempfile::TempDir::new().unwrap();
923 let (mut b, _, idx_path) = fresh_builder(&tmp);
924 let mut hashes = Vec::with_capacity(10_000);
925 for i in 0..10_000u32 {
926 let h = blake3::hash(&i.to_le_bytes());
929 let hash = ContentHash::from_bytes(*h.as_bytes());
930 hashes.push(hash);
931 b.add(hash, ObjectType::Blob, format!("payload-{i}").into_bytes())
932 .unwrap();
933 }
934 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
935 assert_eq!(stats.object_count, 10_000);
936
937 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
938 assert_eq!(reader.list_ids().len(), 10_000);
939 for i in [0, 1, 99, 1234, 5_000, 9_999] {
941 let id = PackObjectId::Hash(hashes[i]);
942 let (_ty, data) = reader.get_object(&id).unwrap().unwrap();
943 assert_eq!(data, format!("payload-{i}").into_bytes());
944 }
945 }
946
947 #[test]
948 fn bucket_writers_are_lru_capped_below_fd_limit() {
949 let tmp = tempfile::TempDir::new().unwrap();
950 let (mut b, _bucket_dir, idx_path) = fresh_builder(&tmp);
951 let mut ids = Vec::new();
952
953 for i in 0..BUCKETS_PER_VARIANT {
954 let hash = deterministic_hash(i as u8);
955 ids.push(PackObjectId::Hash(hash));
956 b.add(hash, ObjectType::Blob, format!("hash-{i}").into_bytes())
957 .unwrap();
958 assert!(
959 b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
960 "open bucket writers should stay capped"
961 );
962 }
963
964 for i in 0..BUCKETS_PER_VARIANT {
965 let cid = deterministic_change_id(i as u8);
966 ids.push(PackObjectId::ChangeId(cid));
967 b.add_id(
968 PackObjectId::ChangeId(cid),
969 ObjectType::State,
970 format!("state-{i}").into_bytes(),
971 )
972 .unwrap();
973 assert!(
974 b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
975 "open bucket writers should stay capped"
976 );
977 }
978
979 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
980 assert_eq!(stats.object_count, TOTAL_BUCKETS as u64);
981 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
982 for id in ids {
983 assert!(reader.has_object(&id), "missing id {id:?}");
984 }
985 }
986
987 #[test]
988 fn index_id_sort_order_matches_packbuilder_output() {
989 use crate::store::pack::PackBuilder;
997 let payloads: Vec<(PackObjectId, ObjectType, Vec<u8>)> = (0..200u32)
998 .map(|i| {
999 let h = blake3::hash(&i.to_le_bytes());
1000 (
1001 PackObjectId::Hash(ContentHash::from_bytes(*h.as_bytes())),
1002 if i % 3 == 0 {
1003 ObjectType::Tree
1004 } else {
1005 ObjectType::Blob
1006 },
1007 format!("body-{i}").into_bytes(),
1008 )
1009 })
1010 .collect();
1011
1012 let compression = CompressionConfig {
1015 max_delta_size: 0,
1016 ..CompressionConfig::default()
1017 };
1018 let mut classic = PackBuilder::new(compression);
1019 for (id, ty, data) in payloads.iter() {
1020 classic.add_id(*id, *ty, data.clone());
1021 }
1022 let (classic_pack, classic_index, _) = classic.build().unwrap();
1023 let classic_reader = PackReader::from_bytes(classic_pack, classic_index).unwrap();
1024
1025 let tmp = tempfile::TempDir::new().unwrap();
1026 let bucket_dir = tmp.path().join("buckets");
1027 let idx_path = tmp.path().join("test.idx");
1028 let cursor = Cursor::new(Vec::<u8>::new());
1029 let mut streaming =
1030 StreamingPackBuilder::new(cursor, idx_path.clone(), compression, bucket_dir).unwrap();
1031 for (id, ty, data) in payloads.iter() {
1032 streaming.add_id(*id, *ty, data.clone()).unwrap();
1033 }
1034 let (streaming_pack, streaming_index, _) = finalize_cursor(streaming, &idx_path);
1035 let streaming_reader = PackReader::from_bytes(streaming_pack, streaming_index).unwrap();
1036
1037 assert_eq!(
1040 streaming_reader.list_ids(),
1041 classic_reader.list_ids(),
1042 "streaming and classic indices should report the same id sequence"
1043 );
1044 for (id, _ty, want) in payloads.iter().take(10).chain(payloads.iter().skip(190)) {
1047 let (_, got) = streaming_reader.get_object(id).unwrap().unwrap();
1048 assert_eq!(&got, want);
1049 let (_, classic_got) = classic_reader.get_object(id).unwrap().unwrap();
1050 assert_eq!(got, classic_got);
1051 }
1052 }
1053
1054 #[test]
1055 fn corrupted_pack_fails_checksum_verification() {
1056 let tmp = tempfile::TempDir::new().unwrap();
1057 let (mut b, _, idx_path) = fresh_builder(&tmp);
1058 b.add(
1059 deterministic_hash(0x01),
1060 ObjectType::Blob,
1061 b"some bytes".to_vec(),
1062 )
1063 .unwrap();
1064 let (mut pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1065 let body_byte = 18; pack_data[body_byte] ^= 0xff;
1068 let result = PackReader::from_bytes(pack_data, index_data);
1069 assert!(
1070 result.is_err(),
1071 "PackReader should reject pack with mutated body"
1072 );
1073 }
1074
1075 #[test]
1076 fn pack_count_in_header_matches_index_entry_count() {
1077 let tmp = tempfile::TempDir::new().unwrap();
1078 let (mut b, _, idx_path) = fresh_builder(&tmp);
1079 for i in 0..7u8 {
1080 b.add(
1081 deterministic_hash(i),
1082 ObjectType::Blob,
1083 format!("p{i}").into_bytes(),
1084 )
1085 .unwrap();
1086 }
1087 let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1088 let count = u64::from_be_bytes(pack_data[8..16].try_into().unwrap());
1090 assert_eq!(count, 7);
1091 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1092 assert_eq!(reader.list_ids().len(), 7);
1093 }
1094
1095 #[test]
1096 fn declared_pack_count_is_written_before_finalize() {
1097 let tmp = tempfile::TempDir::new().unwrap();
1098 let bucket_dir = tmp.path().join("buckets");
1099 let idx_path = tmp.path().join("test.idx");
1100 let cursor = Cursor::new(Vec::<u8>::new());
1101 let mut b = StreamingPackBuilder::new_with_object_count(
1102 cursor,
1103 idx_path.clone(),
1104 CompressionConfig::default(),
1105 bucket_dir,
1106 2,
1107 )
1108 .unwrap();
1109
1110 b.flush_pack().unwrap();
1111 let initial = b.pack_writer.as_ref().unwrap().get_ref().get_ref().clone();
1112 assert_eq!(u64::from_be_bytes(initial[8..16].try_into().unwrap()), 2);
1113
1114 let hash = deterministic_hash(0x40);
1115 b.add(hash, ObjectType::Blob, b"known-count-entry".to_vec())
1116 .unwrap();
1117 b.flush_pack().unwrap();
1118 let after_add = b.pack_writer.as_ref().unwrap().get_ref().get_ref().clone();
1119 assert_eq!(u64::from_be_bytes(after_add[8..16].try_into().unwrap()), 2);
1120
1121 let second_hash = deterministic_hash(0x41);
1122 b.add(second_hash, ObjectType::Blob, b"second-entry".to_vec())
1123 .unwrap();
1124 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
1125
1126 assert_eq!(stats.object_count, 2);
1127 assert_eq!(u64::from_be_bytes(pack_data[8..16].try_into().unwrap()), 2);
1128 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1129 assert!(reader.has_object(&PackObjectId::Hash(hash)));
1130 assert!(reader.has_object(&PackObjectId::Hash(second_hash)));
1131 }
1132
1133 #[test]
1134 fn declared_pack_count_mismatch_fails_finalize() {
1135 let tmp = tempfile::TempDir::new().unwrap();
1136 let bucket_dir = tmp.path().join("buckets");
1137 let idx_path = tmp.path().join("test.idx");
1138 let cursor = Cursor::new(Vec::<u8>::new());
1139 let mut b = StreamingPackBuilder::new_with_object_count(
1140 cursor,
1141 idx_path,
1142 CompressionConfig::default(),
1143 bucket_dir,
1144 2,
1145 )
1146 .unwrap();
1147
1148 b.add(
1149 deterministic_hash(0x50),
1150 ObjectType::Blob,
1151 b"only-entry".to_vec(),
1152 )
1153 .unwrap();
1154 let error = b.finalize().unwrap_err();
1155
1156 assert!(
1157 error
1158 .to_string()
1159 .contains("streaming pack declared 2 object(s) but added 1")
1160 );
1161 }
1162
1163 #[test]
1164 fn bucket_files_are_cleaned_on_successful_finalize() {
1165 let tmp = tempfile::TempDir::new().unwrap();
1166 let bucket_dir = tmp.path().join("buckets");
1167 let idx_path = tmp.path().join("test.idx");
1168 let cursor = Cursor::new(Vec::<u8>::new());
1169 let mut b = StreamingPackBuilder::new(
1170 cursor,
1171 idx_path.clone(),
1172 CompressionConfig::default(),
1173 bucket_dir.clone(),
1174 )
1175 .unwrap();
1176 for i in 0..50u8 {
1177 b.add(deterministic_hash(i), ObjectType::Blob, vec![i; 32])
1178 .unwrap();
1179 }
1180 assert!(bucket_dir.exists());
1182 let bucket_count = std::fs::read_dir(&bucket_dir).unwrap().count();
1183 assert!(bucket_count > 0, "bucket dir should hold some files");
1184 let _ = finalize_cursor(b, &idx_path);
1185 assert!(
1186 !bucket_dir.exists(),
1187 "bucket dir should be removed on finalize"
1188 );
1189 }
1190
1191 #[test]
1192 fn bucket_files_are_cleaned_on_drop_without_finalize() {
1193 let tmp = tempfile::TempDir::new().unwrap();
1194 let bucket_dir = tmp.path().join("buckets");
1195 let idx_path = tmp.path().join("test.idx");
1196 {
1197 let cursor = Cursor::new(Vec::<u8>::new());
1198 let mut b = StreamingPackBuilder::new(
1199 cursor,
1200 idx_path.clone(),
1201 CompressionConfig::default(),
1202 bucket_dir.clone(),
1203 )
1204 .unwrap();
1205 for i in 0..10u8 {
1206 b.add(deterministic_hash(i), ObjectType::Blob, vec![0; 32])
1207 .unwrap();
1208 }
1209 assert!(bucket_dir.exists());
1210 }
1212 assert!(
1213 !idx_path.exists(),
1214 "no index file should have been created without finalize"
1215 );
1216 assert!(
1217 !bucket_dir.exists(),
1218 "bucket dir should be removed on Drop when finalize never ran"
1219 );
1220 }
1221
1222 #[test]
1223 fn large_blob_streams_to_disk_without_double_buffering() {
1224 let tmp = tempfile::TempDir::new().unwrap();
1229 let bucket_dir = tmp.path().join("buckets");
1230 let pack_path = tmp.path().join("pack.dat");
1231 let idx_path = tmp.path().join("pack.idx");
1232 let file = std::fs::OpenOptions::new()
1233 .read(true)
1234 .write(true)
1235 .create(true)
1236 .truncate(true)
1237 .open(&pack_path)
1238 .unwrap();
1239 let mut b = StreamingPackBuilder::new(
1240 file,
1241 idx_path.clone(),
1242 CompressionConfig::default(),
1243 bucket_dir,
1244 )
1245 .unwrap();
1246 let payload: Vec<u8> = (0..4 * 1024 * 1024u32).map(|i| (i & 0xff) as u8).collect();
1247 let hash = deterministic_hash(0xff);
1248 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1249 let (_, stats) = b.finalize().unwrap();
1250 let index_data = std::fs::read(&idx_path).unwrap();
1251 assert_eq!(stats.object_count, 1);
1252 let pack_bytes = std::fs::read(&pack_path).unwrap();
1253 let reader = PackReader::from_bytes(pack_bytes, index_data).unwrap();
1256 let (_ty, got) = reader
1257 .get_object(&PackObjectId::Hash(hash))
1258 .unwrap()
1259 .unwrap();
1260 assert_eq!(got, payload);
1261 }
1262
1263 #[test]
1264 fn bucket_distribution_for_random_hashes_is_roughly_uniform() {
1265 let tmp = tempfile::TempDir::new().unwrap();
1272 let bucket_dir = tmp.path().join("buckets");
1273 let idx_path = tmp.path().join("test.idx");
1274 let cursor = Cursor::new(Vec::<u8>::new());
1275 let mut b = StreamingPackBuilder::new(
1276 cursor,
1277 idx_path.clone(),
1278 CompressionConfig::default(),
1279 bucket_dir.clone(),
1280 )
1281 .unwrap();
1282 for i in 0..1024u32 {
1283 let h = blake3::hash(&i.to_le_bytes());
1284 let hash = ContentHash::from_bytes(*h.as_bytes());
1285 b.add(hash, ObjectType::Blob, b"x".to_vec()).unwrap();
1286 }
1287 b.pack_writer.as_mut().unwrap().flush().unwrap();
1289 let mut max_entries = 0usize;
1290 let entry_size = 33 + 8; for path in b.bucket_paths.iter() {
1292 if path.exists() {
1293 let size = std::fs::metadata(path).unwrap().len() as usize;
1294 let entries = size / entry_size;
1295 if entries > max_entries {
1296 max_entries = entries;
1297 }
1298 }
1299 }
1300 assert!(
1303 max_entries <= 16,
1304 "max bucket has {max_entries} entries; uniform expected ~4"
1305 );
1306 let _ = finalize_cursor(b, &idx_path);
1307 }
1308
1309 #[test]
1310 fn finalize_returns_correct_stats() {
1311 let tmp = tempfile::TempDir::new().unwrap();
1312 let (mut b, _, idx_path) = fresh_builder(&tmp);
1313 let payload = vec![0xabu8; 1024];
1314 for i in 0..5u8 {
1315 b.add(deterministic_hash(i), ObjectType::Blob, payload.clone())
1316 .unwrap();
1317 }
1318 let (_, _, stats) = finalize_cursor(b, &idx_path);
1319 assert_eq!(stats.object_count, 5);
1320 assert_eq!(stats.total_uncompressed, 5 * 1024);
1321 assert!(stats.total_compressed > 0);
1322 assert!(stats.compression_ratio > 0.0);
1323 assert_eq!(stats.delta_count, 0, "streaming builder never deltas");
1324 }
1325
1326 #[cfg(feature = "zstd")]
1327 #[test]
1328 fn streaming_compression_roundtrips_through_zstd_frame() {
1329 let tmp = tempfile::TempDir::new().unwrap();
1337 let (mut b, _, idx_path) = fresh_builder(&tmp);
1338 let payload = vec![0u8; 64 * 1024];
1341 let hash = deterministic_hash(0x77);
1342 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1343 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
1344 assert!(
1345 stats.total_compressed < stats.total_uncompressed,
1346 "expected compression ratio < 1.0, got {}/{}",
1347 stats.total_compressed,
1348 stats.total_uncompressed
1349 );
1350 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1351 let (_ty, got) = reader
1352 .get_object(&PackObjectId::Hash(hash))
1353 .unwrap()
1354 .unwrap();
1355 assert_eq!(got, payload);
1356 }
1357
1358 #[cfg(feature = "zstd")]
1359 #[test]
1360 fn padded_varint_decodes_to_original_value_for_canonical_decoder() {
1361 let cases: &[u64] = &[0, 1, 127, 128, 4096, 1_000_000, 1_000_000_000_000, u64::MAX];
1367 for &value in cases {
1368 let mut buf = [0u8; 10];
1369 super::encode_varint_padded_to_10(value, &mut buf);
1370 let (decoded, consumed) = super::super::varint::decode_varint(&buf)
1371 .expect("padded varint should always decode");
1372 assert_eq!(decoded, value, "varint roundtrip failed for {value}");
1373 assert_eq!(
1374 consumed, 10,
1375 "padded encoding should consume all 10 bytes for {value}"
1376 );
1377 }
1378 }
1379
1380 #[cfg(feature = "zstd")]
1381 #[test]
1382 fn streaming_path_does_not_buffer_compressed_payload_in_memory() {
1383 let tmp = tempfile::TempDir::new().unwrap();
1396 let bucket_dir = tmp.path().join("buckets");
1397 let pack_path = tmp.path().join("pack.dat");
1398 let idx_path = tmp.path().join("pack.idx");
1399 let file = std::fs::OpenOptions::new()
1400 .read(true)
1401 .write(true)
1402 .create(true)
1403 .truncate(true)
1404 .open(&pack_path)
1405 .unwrap();
1406 let mut b = StreamingPackBuilder::new(
1407 file,
1408 idx_path.clone(),
1409 CompressionConfig::default(),
1410 bucket_dir,
1411 )
1412 .unwrap();
1413 let payload = vec![0xa5u8; 8 * 1024 * 1024];
1414 let hash = deterministic_hash(0x66);
1415 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1416 let mid_size = std::fs::metadata(&pack_path).unwrap().len();
1420 assert!(
1421 mid_size > 16 + 40,
1422 "pack file should hold real entry data after add; size={mid_size}"
1423 );
1424 let (_, _) = b.finalize().unwrap();
1425 let pack_bytes = std::fs::read(&pack_path).unwrap();
1426 let index_bytes = std::fs::read(&idx_path).unwrap();
1427 let reader = PackReader::from_bytes(pack_bytes, index_bytes).unwrap();
1428 let (_ty, got) = reader
1429 .get_object(&PackObjectId::Hash(hash))
1430 .unwrap()
1431 .unwrap();
1432 assert_eq!(got, payload);
1433 }
1434
1435 #[test]
1436 fn list_ids_returns_all_added_ids_sorted() {
1437 let tmp = tempfile::TempDir::new().unwrap();
1438 let (mut b, _, idx_path) = fresh_builder(&tmp);
1439 let mut added: Vec<PackObjectId> = Vec::new();
1440 for seed in [0x05u8, 0xa0, 0x12, 0x9f, 0x33] {
1442 let id = PackObjectId::Hash(deterministic_hash(seed));
1443 b.add_id(id, ObjectType::Blob, vec![seed; 4]).unwrap();
1444 added.push(id);
1445 }
1446 for seed in [0x80u8, 0x10, 0xff] {
1447 let id = PackObjectId::ChangeId(deterministic_change_id(seed));
1448 b.add_id(id, ObjectType::State, vec![seed; 4]).unwrap();
1449 added.push(id);
1450 }
1451 let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1452 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1453 let mut got = reader.list_ids();
1454 let mut sorted = got.clone();
1457 sorted.sort();
1458 assert_eq!(got, sorted, "list_ids must come back sorted");
1459 added.sort();
1461 got.sort();
1462 assert_eq!(got, added);
1463 }
1464}