1use std::{
61 fs::File,
62 io::{BufWriter, Read, Seek, SeekFrom, Write},
63 path::PathBuf,
64};
65
66use super::{ObjectType, PackObjectId, PackStats, pack_container_spec, write_container_header};
67
68#[cfg(feature = "zstd")]
74const CSIZE_PLACEHOLDER_LEN: usize = 10;
75use crate::{
76 object::ContentHash,
77 store::{Result, StoreError, compression::CompressionConfig},
78};
79
80const BUCKETS_PER_VARIANT: usize = 256;
86const TOTAL_BUCKETS: usize = BUCKETS_PER_VARIANT * 2;
88
89const HASH_VARIANT: usize = 0;
93const CHANGEID_VARIANT: usize = 1;
94
95pub struct StreamingPackBuilder<W: Write + Read + Seek> {
98 pack_writer: Option<BufWriter<W>>,
104 header_offset: u64,
108 object_count: u64,
109 total_uncompressed: u64,
110 total_compressed: u64,
111 #[cfg_attr(not(feature = "zstd"), allow(dead_code))]
116 compression: CompressionConfig,
117 bucket_dir: PathBuf,
120 bucket_writers: Vec<Option<BufWriter<File>>>,
124 bucket_paths: Vec<PathBuf>,
125 index_path: PathBuf,
129 finalized: bool,
132}
133
134impl<W: Write + Read + Seek> StreamingPackBuilder<W> {
135 pub fn new(
151 mut pack_writer: W,
152 index_path: PathBuf,
153 compression: CompressionConfig,
154 bucket_dir: PathBuf,
155 ) -> Result<Self> {
156 std::fs::create_dir_all(&bucket_dir).map_err(StoreError::from)?;
157 let header_offset = pack_writer.stream_position().map_err(StoreError::from)?;
158
159 let mut header_bytes = Vec::with_capacity(16);
162 write_container_header(&mut header_bytes, pack_container_spec(), 0);
163 pack_writer
164 .write_all(&header_bytes)
165 .map_err(StoreError::from)?;
166
167 let bucket_paths: Vec<PathBuf> = (0..TOTAL_BUCKETS)
168 .map(|i| {
169 let variant = if i < BUCKETS_PER_VARIANT { 'h' } else { 'c' };
170 let prefix = i % BUCKETS_PER_VARIANT;
171 bucket_dir.join(format!("bucket-{variant}-{prefix:02x}"))
172 })
173 .collect();
174
175 Ok(Self {
176 pack_writer: Some(BufWriter::new(pack_writer)),
177 header_offset,
178 object_count: 0,
179 total_uncompressed: 0,
180 total_compressed: 0,
181 compression,
182 bucket_dir,
183 bucket_writers: (0..TOTAL_BUCKETS).map(|_| None).collect(),
184 bucket_paths,
185 index_path,
186 finalized: false,
187 })
188 }
189
190 pub fn add(&mut self, hash: ContentHash, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
192 self.add_id(PackObjectId::Hash(hash), obj_type, data)
193 }
194
195 pub fn add_id(&mut self, id: PackObjectId, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
218 let pw = self
222 .pack_writer
223 .as_mut()
224 .expect("add_id called after finalize");
225 pw.flush().map_err(StoreError::from)?;
226 let entry_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
227 let offset = entry_start
228 .checked_sub(self.header_offset)
229 .expect("header_offset should never be past current position");
230
231 self.total_uncompressed += data.len() as u64;
232
233 let mut header_buf = Vec::with_capacity(40);
236 id.encode_tagged(&mut header_buf);
237 super::varint::encode_type_and_size(obj_type, data.len() as u64, &mut header_buf);
238 pw.write_all(&header_buf).map_err(StoreError::from)?;
239 #[cfg(feature = "zstd")]
243 let csize_pos = entry_start + header_buf.len() as u64;
244
245 let want_compress: bool;
258 #[cfg(feature = "zstd")]
259 {
260 want_compress = self.compression.enabled && data.len() >= self.compression.min_size;
261 }
262 #[cfg(not(feature = "zstd"))]
263 {
264 want_compress = false;
265 }
266 if !want_compress {
267 let mut csize_buf = Vec::with_capacity(10);
270 super::varint::encode_varint(data.len() as u64, &mut csize_buf);
271 pw.write_all(&csize_buf).map_err(StoreError::from)?;
272 pw.write_all(&data).map_err(StoreError::from)?;
273 self.total_compressed += data.len() as u64;
274 } else {
275 #[cfg(feature = "zstd")]
276 {
277 pw.write_all(&[0u8; CSIZE_PLACEHOLDER_LEN])
280 .map_err(StoreError::from)?;
281 pw.flush().map_err(StoreError::from)?;
282 let body_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
283 {
284 let mut enc =
285 zstd::stream::write::Encoder::new(&mut *pw, self.compression.level)
286 .map_err(StoreError::from)?;
287 enc.set_pledged_src_size(Some(data.len() as u64))
292 .map_err(StoreError::from)?;
293 enc.write_all(&data).map_err(StoreError::from)?;
294 enc.finish().map_err(StoreError::from)?;
295 }
296 pw.flush().map_err(StoreError::from)?;
297 let body_end = pw.get_mut().stream_position().map_err(StoreError::from)?;
298 let compressed_size = body_end - body_start;
299 self.total_compressed += compressed_size;
300
301 let mut csize_bytes = [0u8; CSIZE_PLACEHOLDER_LEN];
306 encode_varint_padded_to_10(compressed_size, &mut csize_bytes);
307 let inner = pw.get_mut();
308 inner
309 .seek(SeekFrom::Start(csize_pos))
310 .map_err(StoreError::from)?;
311 inner.write_all(&csize_bytes).map_err(StoreError::from)?;
312 inner
313 .seek(SeekFrom::Start(body_end))
314 .map_err(StoreError::from)?;
315 }
316 #[cfg(not(feature = "zstd"))]
317 {
318 unreachable!("compression branch reached without `zstd` feature");
321 }
322 }
323
324 let bucket_idx = bucket_index_for(&id);
328 let bucket = self.get_or_open_bucket(bucket_idx)?;
329 let mut idx_entry = Vec::with_capacity(33 + 8);
330 id.encode_tagged(&mut idx_entry);
331 idx_entry.extend_from_slice(&offset.to_be_bytes());
332 bucket.write_all(&idx_entry).map_err(StoreError::from)?;
333
334 self.object_count += 1;
335 Ok(())
336 }
337
338 fn get_or_open_bucket(&mut self, idx: usize) -> Result<&mut BufWriter<File>> {
339 if self.bucket_writers[idx].is_none() {
340 let path = &self.bucket_paths[idx];
341 let f = File::create(path).map_err(StoreError::from)?;
342 self.bucket_writers[idx] = Some(BufWriter::new(f));
343 }
344 Ok(self.bucket_writers[idx]
345 .as_mut()
346 .expect("just inserted above"))
347 }
348
349 pub fn finalize(mut self) -> Result<(W, PackStats)> {
360 for w in self.bucket_writers.iter_mut().flatten() {
363 w.flush().map_err(StoreError::from)?;
364 }
365 for slot in self.bucket_writers.iter_mut() {
368 *slot = None;
369 }
370
371 let bw = self
375 .pack_writer
376 .take()
377 .expect("finalize called twice — pack_writer already consumed");
378 let mut writer = bw
379 .into_inner()
380 .map_err(|e| StoreError::from(std::io::Error::other(e.to_string())))?;
381 writer
382 .seek(SeekFrom::Start(self.header_offset))
383 .map_err(StoreError::from)?;
384 let mut header_bytes = Vec::with_capacity(16);
385 write_container_header(&mut header_bytes, pack_container_spec(), self.object_count);
386 writer.write_all(&header_bytes).map_err(StoreError::from)?;
387
388 writer
393 .seek(SeekFrom::Start(self.header_offset))
394 .map_err(StoreError::from)?;
395 let mut hasher = blake3::Hasher::new();
396 let mut buf = vec![0u8; 64 * 1024];
397 loop {
398 let n = writer.read(&mut buf).map_err(StoreError::from)?;
399 if n == 0 {
400 break;
401 }
402 hasher.update(&buf[..n]);
403 }
404 let checksum = hasher.finalize();
405
406 writer.seek(SeekFrom::End(0)).map_err(StoreError::from)?;
408 writer
409 .write_all(checksum.as_bytes())
410 .map_err(StoreError::from)?;
411 writer.flush().map_err(StoreError::from)?;
412
413 let idx_file = File::create(&self.index_path).map_err(StoreError::from)?;
426 let mut idx_writer = BufWriter::new(idx_file);
427 write_index_header(&mut idx_writer, self.object_count)?;
428 let mut entries_written: u64 = 0;
429 for path in self.bucket_paths.iter() {
430 if !path.exists() {
431 continue;
432 }
433 let bucket_bytes = std::fs::read(path).map_err(StoreError::from)?;
434 let mut entries = decode_bucket_file(&bucket_bytes)?;
435 entries.sort_by_key(|(id, _)| *id);
440 for (id, offset) in entries {
441 write_index_entry(&mut idx_writer, id, offset)?;
442 entries_written += 1;
443 }
444 }
445 idx_writer.flush().map_err(StoreError::from)?;
446 debug_assert_eq!(
447 entries_written, self.object_count,
448 "streaming index entry count drifted from add() count"
449 );
450
451 for path in self.bucket_paths.iter() {
456 let _ = std::fs::remove_file(path);
457 }
458 let _ = std::fs::remove_dir(&self.bucket_dir);
459 self.finalized = true;
460
461 let stats = PackStats {
462 object_count: self.object_count,
463 total_uncompressed: self.total_uncompressed,
464 total_compressed: self.total_compressed,
465 delta_count: 0,
466 compression_ratio: if self.total_uncompressed == 0 {
467 0.0
468 } else {
469 self.total_compressed as f64 / self.total_uncompressed as f64
470 },
471 };
472
473 Ok((writer, stats))
474 }
475}
476
477fn write_index_header<W: Write>(out: &mut W, count: u64) -> Result<()> {
482 out.write_all(super::pack_index::INDEX_MAGIC)
483 .map_err(StoreError::from)?;
484 out.write_all(&super::pack_index::INDEX_VERSION.to_be_bytes())
485 .map_err(StoreError::from)?;
486 out.write_all(&count.to_be_bytes())
487 .map_err(StoreError::from)?;
488 Ok(())
489}
490
491fn write_index_entry<W: Write>(out: &mut W, id: PackObjectId, offset: u64) -> Result<()> {
495 let mut buf = Vec::with_capacity(33 + 8);
496 id.encode_tagged(&mut buf);
497 buf.extend_from_slice(&offset.to_be_bytes());
498 out.write_all(&buf).map_err(StoreError::from)
499}
500
501#[cfg(feature = "zstd")]
514fn encode_varint_padded_to_10(value: u64, out: &mut [u8; 10]) {
515 let mut v = value;
516 for slot in out.iter_mut().take(9) {
517 *slot = 0x80 | ((v & 0x7F) as u8);
518 v >>= 7;
519 }
520 out[9] = (v & 0x7F) as u8;
521}
522
523impl<W: Write + Read + Seek> Drop for StreamingPackBuilder<W> {
524 fn drop(&mut self) {
525 if self.finalized {
526 return;
527 }
528 for path in self.bucket_paths.iter() {
531 let _ = std::fs::remove_file(path);
532 }
533 let _ = std::fs::remove_dir(&self.bucket_dir);
534 }
535}
536
537fn bucket_index_for(id: &PackObjectId) -> usize {
541 match id {
542 PackObjectId::Hash(h) => HASH_VARIANT * BUCKETS_PER_VARIANT + h.as_bytes()[0] as usize,
543 PackObjectId::ChangeId(c) => {
544 CHANGEID_VARIANT * BUCKETS_PER_VARIANT + c.as_bytes()[0] as usize
545 }
546 }
547}
548
549fn decode_bucket_file(bytes: &[u8]) -> Result<Vec<(PackObjectId, u64)>> {
554 let mut out = Vec::new();
555 let mut pos = 0;
556 while pos < bytes.len() {
557 let (id, id_len) = PackObjectId::decode_tagged(&bytes[pos..])?;
558 pos += id_len;
559 if pos + 8 > bytes.len() {
560 return Err(StoreError::InvalidObject(
561 "streaming bucket entry truncated at offset".to_string(),
562 ));
563 }
564 let offset = u64::from_be_bytes(bytes[pos..pos + 8].try_into().map_err(|_| {
565 StoreError::InvalidObject("streaming bucket bad offset slice".to_string())
566 })?);
567 pos += 8;
568 out.push((id, offset));
569 }
570 Ok(out)
571}
572
573#[cfg(test)]
576mod tests {
577 use std::io::Cursor;
578
579 use super::*;
580 use crate::{
581 object::ChangeId,
582 store::pack::{PackReader, PackStats},
583 };
584
585 fn deterministic_hash(seed: u8) -> ContentHash {
586 let mut bytes = [0u8; 32];
590 bytes[0] = seed;
591 for (i, b) in bytes.iter_mut().enumerate().skip(1) {
592 *b = seed.wrapping_mul(31).wrapping_add(i as u8);
593 }
594 ContentHash::from_bytes(bytes)
595 }
596
597 fn deterministic_change_id(seed: u8) -> ChangeId {
598 let mut bytes = [0u8; 16];
599 bytes[0] = seed;
600 for (i, b) in bytes.iter_mut().enumerate().skip(1) {
601 *b = seed.wrapping_add(i as u8 * 7);
602 }
603 ChangeId::from_bytes(bytes)
604 }
605
606 fn fresh_builder(
611 tmp: &tempfile::TempDir,
612 ) -> (StreamingPackBuilder<Cursor<Vec<u8>>>, PathBuf, PathBuf) {
613 let bucket_dir = tmp.path().join("buckets");
614 let index_path = tmp.path().join("test.idx");
615 let cursor = Cursor::new(Vec::<u8>::new());
616 let b = StreamingPackBuilder::new(
617 cursor,
618 index_path.clone(),
619 CompressionConfig::default(),
620 bucket_dir.clone(),
621 )
622 .unwrap();
623 (b, bucket_dir, index_path)
624 }
625
626 fn finalize_cursor(
631 b: StreamingPackBuilder<Cursor<Vec<u8>>>,
632 index_path: &std::path::Path,
633 ) -> (Vec<u8>, Vec<u8>, PackStats) {
634 let (cursor, stats) = b.finalize().unwrap();
635 let index_bytes = std::fs::read(index_path).unwrap();
636 (cursor.into_inner(), index_bytes, stats)
637 }
638
639 #[test]
640 fn empty_pack_finalizes_to_valid_zero_count_pack() {
641 let tmp = tempfile::TempDir::new().unwrap();
642 let (b, bucket_dir, idx_path) = fresh_builder(&tmp);
643 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
644
645 assert_eq!(stats.object_count, 0);
646 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
648 assert!(reader.list_ids().is_empty());
649 assert!(
651 !bucket_dir.exists(),
652 "bucket dir should be cleaned on successful finalize"
653 );
654 }
655
656 #[test]
657 fn single_blob_with_hash_id_round_trips() {
658 let tmp = tempfile::TempDir::new().unwrap();
659 let (mut b, _, idx_path) = fresh_builder(&tmp);
660 let hash = deterministic_hash(0x42);
661 let payload = b"hello, streaming pack".to_vec();
662 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
663 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
664
665 assert_eq!(stats.object_count, 1);
666 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
667 let id = PackObjectId::Hash(hash);
668 assert!(reader.has_object(&id));
669 let (got_type, got_data) = reader.get_object(&id).unwrap().unwrap();
670 assert_eq!(got_type, ObjectType::Blob);
671 assert_eq!(got_data, payload);
672 }
673
674 #[test]
675 fn single_state_with_change_id_round_trips() {
676 let tmp = tempfile::TempDir::new().unwrap();
677 let (mut b, _, idx_path) = fresh_builder(&tmp);
678 let cid = deterministic_change_id(0xa5);
679 let payload = b"serialized-state-bytes".to_vec();
680 b.add_id(
681 PackObjectId::ChangeId(cid),
682 ObjectType::State,
683 payload.clone(),
684 )
685 .unwrap();
686 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
687
688 assert_eq!(stats.object_count, 1);
689 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
690 let id = PackObjectId::ChangeId(cid);
691 let (ty, data) = reader.get_object(&id).unwrap().unwrap();
692 assert_eq!(ty, ObjectType::State);
693 assert_eq!(data, payload);
694 }
695
696 #[test]
697 fn mixed_hash_and_changeid_ids_all_retrievable() {
698 let tmp = tempfile::TempDir::new().unwrap();
699 let (mut b, _, idx_path) = fresh_builder(&tmp);
700 let blob_hash = deterministic_hash(0x10);
701 let tree_hash = deterministic_hash(0x20);
702 let state_cid = deterministic_change_id(0x80);
703
704 b.add(blob_hash, ObjectType::Blob, b"blob-bytes".to_vec())
705 .unwrap();
706 b.add(tree_hash, ObjectType::Tree, b"serialized-tree".to_vec())
707 .unwrap();
708 b.add_id(
709 PackObjectId::ChangeId(state_cid),
710 ObjectType::State,
711 b"serialized-state".to_vec(),
712 )
713 .unwrap();
714
715 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
716 assert_eq!(stats.object_count, 3);
717 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
718 assert_eq!(
719 reader
720 .get_object(&PackObjectId::Hash(blob_hash))
721 .unwrap()
722 .unwrap()
723 .1,
724 b"blob-bytes".to_vec()
725 );
726 assert_eq!(
727 reader
728 .get_object(&PackObjectId::Hash(tree_hash))
729 .unwrap()
730 .unwrap()
731 .1,
732 b"serialized-tree".to_vec()
733 );
734 assert_eq!(
735 reader
736 .get_object(&PackObjectId::ChangeId(state_cid))
737 .unwrap()
738 .unwrap()
739 .1,
740 b"serialized-state".to_vec()
741 );
742 }
743
744 #[test]
745 fn ten_thousand_objects_round_trip_correctly() {
746 let tmp = tempfile::TempDir::new().unwrap();
750 let (mut b, _, idx_path) = fresh_builder(&tmp);
751 let mut hashes = Vec::with_capacity(10_000);
752 for i in 0..10_000u32 {
753 let h = blake3::hash(&i.to_le_bytes());
756 let hash = ContentHash::from_bytes(*h.as_bytes());
757 hashes.push(hash);
758 b.add(hash, ObjectType::Blob, format!("payload-{i}").into_bytes())
759 .unwrap();
760 }
761 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
762 assert_eq!(stats.object_count, 10_000);
763
764 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
765 assert_eq!(reader.list_ids().len(), 10_000);
766 for i in [0, 1, 99, 1234, 5_000, 9_999] {
768 let id = PackObjectId::Hash(hashes[i]);
769 let (_ty, data) = reader.get_object(&id).unwrap().unwrap();
770 assert_eq!(data, format!("payload-{i}").into_bytes());
771 }
772 }
773
774 #[test]
775 fn index_id_sort_order_matches_packbuilder_output() {
776 use crate::store::pack::PackBuilder;
784 let payloads: Vec<(PackObjectId, ObjectType, Vec<u8>)> = (0..200u32)
785 .map(|i| {
786 let h = blake3::hash(&i.to_le_bytes());
787 (
788 PackObjectId::Hash(ContentHash::from_bytes(*h.as_bytes())),
789 if i % 3 == 0 {
790 ObjectType::Tree
791 } else {
792 ObjectType::Blob
793 },
794 format!("body-{i}").into_bytes(),
795 )
796 })
797 .collect();
798
799 let compression = CompressionConfig {
802 max_delta_size: 0,
803 ..CompressionConfig::default()
804 };
805 let mut classic = PackBuilder::new(compression);
806 for (id, ty, data) in payloads.iter() {
807 classic.add_id(*id, *ty, data.clone());
808 }
809 let (classic_pack, classic_index, _) = classic.build().unwrap();
810 let classic_reader = PackReader::from_bytes(classic_pack, classic_index).unwrap();
811
812 let tmp = tempfile::TempDir::new().unwrap();
813 let bucket_dir = tmp.path().join("buckets");
814 let idx_path = tmp.path().join("test.idx");
815 let cursor = Cursor::new(Vec::<u8>::new());
816 let mut streaming =
817 StreamingPackBuilder::new(cursor, idx_path.clone(), compression, bucket_dir).unwrap();
818 for (id, ty, data) in payloads.iter() {
819 streaming.add_id(*id, *ty, data.clone()).unwrap();
820 }
821 let (streaming_pack, streaming_index, _) = finalize_cursor(streaming, &idx_path);
822 let streaming_reader = PackReader::from_bytes(streaming_pack, streaming_index).unwrap();
823
824 assert_eq!(
827 streaming_reader.list_ids(),
828 classic_reader.list_ids(),
829 "streaming and classic indices should report the same id sequence"
830 );
831 for (id, _ty, want) in payloads.iter().take(10).chain(payloads.iter().skip(190)) {
834 let (_, got) = streaming_reader.get_object(id).unwrap().unwrap();
835 assert_eq!(&got, want);
836 let (_, classic_got) = classic_reader.get_object(id).unwrap().unwrap();
837 assert_eq!(got, classic_got);
838 }
839 }
840
841 #[test]
842 fn corrupted_pack_fails_checksum_verification() {
843 let tmp = tempfile::TempDir::new().unwrap();
844 let (mut b, _, idx_path) = fresh_builder(&tmp);
845 b.add(
846 deterministic_hash(0x01),
847 ObjectType::Blob,
848 b"some bytes".to_vec(),
849 )
850 .unwrap();
851 let (mut pack_data, index_data, _) = finalize_cursor(b, &idx_path);
852 let body_byte = 18; pack_data[body_byte] ^= 0xff;
855 let result = PackReader::from_bytes(pack_data, index_data);
856 assert!(
857 result.is_err(),
858 "PackReader should reject pack with mutated body"
859 );
860 }
861
862 #[test]
863 fn pack_count_in_header_matches_index_entry_count() {
864 let tmp = tempfile::TempDir::new().unwrap();
865 let (mut b, _, idx_path) = fresh_builder(&tmp);
866 for i in 0..7u8 {
867 b.add(
868 deterministic_hash(i),
869 ObjectType::Blob,
870 format!("p{i}").into_bytes(),
871 )
872 .unwrap();
873 }
874 let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
875 let count = u64::from_be_bytes(pack_data[8..16].try_into().unwrap());
877 assert_eq!(count, 7);
878 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
879 assert_eq!(reader.list_ids().len(), 7);
880 }
881
882 #[test]
883 fn bucket_files_are_cleaned_on_successful_finalize() {
884 let tmp = tempfile::TempDir::new().unwrap();
885 let bucket_dir = tmp.path().join("buckets");
886 let idx_path = tmp.path().join("test.idx");
887 let cursor = Cursor::new(Vec::<u8>::new());
888 let mut b = StreamingPackBuilder::new(
889 cursor,
890 idx_path.clone(),
891 CompressionConfig::default(),
892 bucket_dir.clone(),
893 )
894 .unwrap();
895 for i in 0..50u8 {
896 b.add(deterministic_hash(i), ObjectType::Blob, vec![i; 32])
897 .unwrap();
898 }
899 assert!(bucket_dir.exists());
901 let bucket_count = std::fs::read_dir(&bucket_dir).unwrap().count();
902 assert!(bucket_count > 0, "bucket dir should hold some files");
903 let _ = finalize_cursor(b, &idx_path);
904 assert!(
905 !bucket_dir.exists(),
906 "bucket dir should be removed on finalize"
907 );
908 }
909
910 #[test]
911 fn bucket_files_are_cleaned_on_drop_without_finalize() {
912 let tmp = tempfile::TempDir::new().unwrap();
913 let bucket_dir = tmp.path().join("buckets");
914 let idx_path = tmp.path().join("test.idx");
915 {
916 let cursor = Cursor::new(Vec::<u8>::new());
917 let mut b = StreamingPackBuilder::new(
918 cursor,
919 idx_path.clone(),
920 CompressionConfig::default(),
921 bucket_dir.clone(),
922 )
923 .unwrap();
924 for i in 0..10u8 {
925 b.add(deterministic_hash(i), ObjectType::Blob, vec![0; 32])
926 .unwrap();
927 }
928 assert!(bucket_dir.exists());
929 }
931 assert!(
932 !idx_path.exists(),
933 "no index file should have been created without finalize"
934 );
935 assert!(
936 !bucket_dir.exists(),
937 "bucket dir should be removed on Drop when finalize never ran"
938 );
939 }
940
941 #[test]
942 fn large_blob_streams_to_disk_without_double_buffering() {
943 let tmp = tempfile::TempDir::new().unwrap();
948 let bucket_dir = tmp.path().join("buckets");
949 let pack_path = tmp.path().join("pack.dat");
950 let idx_path = tmp.path().join("pack.idx");
951 let file = std::fs::OpenOptions::new()
952 .read(true)
953 .write(true)
954 .create(true)
955 .truncate(true)
956 .open(&pack_path)
957 .unwrap();
958 let mut b = StreamingPackBuilder::new(
959 file,
960 idx_path.clone(),
961 CompressionConfig::default(),
962 bucket_dir,
963 )
964 .unwrap();
965 let payload: Vec<u8> = (0..4 * 1024 * 1024u32).map(|i| (i & 0xff) as u8).collect();
966 let hash = deterministic_hash(0xff);
967 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
968 let (_, stats) = b.finalize().unwrap();
969 let index_data = std::fs::read(&idx_path).unwrap();
970 assert_eq!(stats.object_count, 1);
971 let pack_bytes = std::fs::read(&pack_path).unwrap();
972 let reader = PackReader::from_bytes(pack_bytes, index_data).unwrap();
975 let (_ty, got) = reader
976 .get_object(&PackObjectId::Hash(hash))
977 .unwrap()
978 .unwrap();
979 assert_eq!(got, payload);
980 }
981
982 #[test]
983 fn bucket_distribution_for_random_hashes_is_roughly_uniform() {
984 let tmp = tempfile::TempDir::new().unwrap();
991 let bucket_dir = tmp.path().join("buckets");
992 let idx_path = tmp.path().join("test.idx");
993 let cursor = Cursor::new(Vec::<u8>::new());
994 let mut b = StreamingPackBuilder::new(
995 cursor,
996 idx_path.clone(),
997 CompressionConfig::default(),
998 bucket_dir.clone(),
999 )
1000 .unwrap();
1001 for i in 0..1024u32 {
1002 let h = blake3::hash(&i.to_le_bytes());
1003 let hash = ContentHash::from_bytes(*h.as_bytes());
1004 b.add(hash, ObjectType::Blob, b"x".to_vec()).unwrap();
1005 }
1006 b.pack_writer.as_mut().unwrap().flush().unwrap();
1008 let mut max_entries = 0usize;
1009 let entry_size = 33 + 8; for path in b.bucket_paths.iter() {
1011 if path.exists() {
1012 let size = std::fs::metadata(path).unwrap().len() as usize;
1013 let entries = size / entry_size;
1014 if entries > max_entries {
1015 max_entries = entries;
1016 }
1017 }
1018 }
1019 assert!(
1022 max_entries <= 16,
1023 "max bucket has {max_entries} entries; uniform expected ~4"
1024 );
1025 let _ = finalize_cursor(b, &idx_path);
1026 }
1027
1028 #[test]
1029 fn finalize_returns_correct_stats() {
1030 let tmp = tempfile::TempDir::new().unwrap();
1031 let (mut b, _, idx_path) = fresh_builder(&tmp);
1032 let payload = vec![0xabu8; 1024];
1033 for i in 0..5u8 {
1034 b.add(deterministic_hash(i), ObjectType::Blob, payload.clone())
1035 .unwrap();
1036 }
1037 let (_, _, stats) = finalize_cursor(b, &idx_path);
1038 assert_eq!(stats.object_count, 5);
1039 assert_eq!(stats.total_uncompressed, 5 * 1024);
1040 assert!(stats.total_compressed > 0);
1041 assert!(stats.compression_ratio > 0.0);
1042 assert_eq!(stats.delta_count, 0, "streaming builder never deltas");
1043 }
1044
1045 #[cfg(feature = "zstd")]
1046 #[test]
1047 fn streaming_compression_roundtrips_through_zstd_frame() {
1048 let tmp = tempfile::TempDir::new().unwrap();
1056 let (mut b, _, idx_path) = fresh_builder(&tmp);
1057 let payload = vec![0u8; 64 * 1024];
1060 let hash = deterministic_hash(0x77);
1061 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1062 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
1063 assert!(
1064 stats.total_compressed < stats.total_uncompressed,
1065 "expected compression ratio < 1.0, got {}/{}",
1066 stats.total_compressed,
1067 stats.total_uncompressed
1068 );
1069 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1070 let (_ty, got) = reader
1071 .get_object(&PackObjectId::Hash(hash))
1072 .unwrap()
1073 .unwrap();
1074 assert_eq!(got, payload);
1075 }
1076
1077 #[cfg(feature = "zstd")]
1078 #[test]
1079 fn padded_varint_decodes_to_original_value_for_canonical_decoder() {
1080 let cases: &[u64] = &[0, 1, 127, 128, 4096, 1_000_000, 1_000_000_000_000, u64::MAX];
1086 for &value in cases {
1087 let mut buf = [0u8; 10];
1088 super::encode_varint_padded_to_10(value, &mut buf);
1089 let (decoded, consumed) = super::super::varint::decode_varint(&buf)
1090 .expect("padded varint should always decode");
1091 assert_eq!(decoded, value, "varint roundtrip failed for {value}");
1092 assert_eq!(
1093 consumed, 10,
1094 "padded encoding should consume all 10 bytes for {value}"
1095 );
1096 }
1097 }
1098
1099 #[cfg(feature = "zstd")]
1100 #[test]
1101 fn streaming_path_does_not_buffer_compressed_payload_in_memory() {
1102 let tmp = tempfile::TempDir::new().unwrap();
1115 let bucket_dir = tmp.path().join("buckets");
1116 let pack_path = tmp.path().join("pack.dat");
1117 let idx_path = tmp.path().join("pack.idx");
1118 let file = std::fs::OpenOptions::new()
1119 .read(true)
1120 .write(true)
1121 .create(true)
1122 .truncate(true)
1123 .open(&pack_path)
1124 .unwrap();
1125 let mut b = StreamingPackBuilder::new(
1126 file,
1127 idx_path.clone(),
1128 CompressionConfig::default(),
1129 bucket_dir,
1130 )
1131 .unwrap();
1132 let payload = vec![0xa5u8; 8 * 1024 * 1024];
1133 let hash = deterministic_hash(0x66);
1134 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1135 let mid_size = std::fs::metadata(&pack_path).unwrap().len();
1139 assert!(
1140 mid_size > 16 + 40,
1141 "pack file should hold real entry data after add; size={mid_size}"
1142 );
1143 let (_, _) = b.finalize().unwrap();
1144 let pack_bytes = std::fs::read(&pack_path).unwrap();
1145 let index_bytes = std::fs::read(&idx_path).unwrap();
1146 let reader = PackReader::from_bytes(pack_bytes, index_bytes).unwrap();
1147 let (_ty, got) = reader
1148 .get_object(&PackObjectId::Hash(hash))
1149 .unwrap()
1150 .unwrap();
1151 assert_eq!(got, payload);
1152 }
1153
1154 #[test]
1155 fn list_ids_returns_all_added_ids_sorted() {
1156 let tmp = tempfile::TempDir::new().unwrap();
1157 let (mut b, _, idx_path) = fresh_builder(&tmp);
1158 let mut added: Vec<PackObjectId> = Vec::new();
1159 for seed in [0x05u8, 0xa0, 0x12, 0x9f, 0x33] {
1161 let id = PackObjectId::Hash(deterministic_hash(seed));
1162 b.add_id(id, ObjectType::Blob, vec![seed; 4]).unwrap();
1163 added.push(id);
1164 }
1165 for seed in [0x80u8, 0x10, 0xff] {
1166 let id = PackObjectId::ChangeId(deterministic_change_id(seed));
1167 b.add_id(id, ObjectType::State, vec![seed; 4]).unwrap();
1168 added.push(id);
1169 }
1170 let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1171 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1172 let mut got = reader.list_ids();
1173 let mut sorted = got.clone();
1176 sorted.sort();
1177 assert_eq!(got, sorted, "list_ids must come back sorted");
1178 added.sort();
1180 got.sort();
1181 assert_eq!(got, added);
1182 }
1183}