1use std::{
62 fs::{File, OpenOptions},
63 io::{BufWriter, Read, Seek, SeekFrom, Write},
64 path::PathBuf,
65};
66
67use super::{ObjectType, PackObjectId, PackStats, pack_container_spec, write_container_header};
68
69#[cfg(feature = "zstd")]
75const CSIZE_PLACEHOLDER_LEN: usize = 10;
76use crate::{
77 object::ContentHash,
78 store::{Result, StoreError, compression::CompressionConfig},
79};
80
81const BUCKETS_PER_VARIANT: usize = 256;
87const TOTAL_BUCKETS: usize = BUCKETS_PER_VARIANT * 2;
89const MAX_OPEN_BUCKET_WRITERS: usize = 32;
93
94const HASH_VARIANT: usize = 0;
98const CHANGEID_VARIANT: usize = 1;
99
100pub struct StreamingPackBuilder<W: Write + Read + Seek> {
103 pack_writer: Option<BufWriter<W>>,
109 header_offset: u64,
113 object_count: u64,
114 total_uncompressed: u64,
115 total_compressed: u64,
116 #[cfg_attr(not(feature = "zstd"), allow(dead_code))]
121 compression: CompressionConfig,
122 bucket_dir: PathBuf,
125 bucket_writers: Vec<Option<BucketWriter>>,
129 open_bucket_writers: usize,
130 bucket_access_tick: u64,
131 bucket_paths: Vec<PathBuf>,
132 index_path: PathBuf,
136 finalized: bool,
139}
140
141struct BucketWriter {
142 writer: BufWriter<File>,
143 last_used: u64,
144}
145
146impl<W: Write + Read + Seek> StreamingPackBuilder<W> {
147 pub fn new(
163 mut pack_writer: W,
164 index_path: PathBuf,
165 compression: CompressionConfig,
166 bucket_dir: PathBuf,
167 ) -> Result<Self> {
168 std::fs::create_dir_all(&bucket_dir).map_err(StoreError::from)?;
169 let header_offset = pack_writer.stream_position().map_err(StoreError::from)?;
170
171 let mut header_bytes = Vec::with_capacity(16);
174 write_container_header(&mut header_bytes, pack_container_spec(), 0);
175 pack_writer
176 .write_all(&header_bytes)
177 .map_err(StoreError::from)?;
178
179 let bucket_paths: Vec<PathBuf> = (0..TOTAL_BUCKETS)
180 .map(|i| {
181 let variant = if i < BUCKETS_PER_VARIANT { 'h' } else { 'c' };
182 let prefix = i % BUCKETS_PER_VARIANT;
183 bucket_dir.join(format!("bucket-{variant}-{prefix:02x}"))
184 })
185 .collect();
186 for path in &bucket_paths {
187 let _ = std::fs::remove_file(path);
188 }
189
190 Ok(Self {
191 pack_writer: Some(BufWriter::new(pack_writer)),
192 header_offset,
193 object_count: 0,
194 total_uncompressed: 0,
195 total_compressed: 0,
196 compression,
197 bucket_dir,
198 bucket_writers: (0..TOTAL_BUCKETS).map(|_| None).collect(),
199 open_bucket_writers: 0,
200 bucket_access_tick: 0,
201 bucket_paths,
202 index_path,
203 finalized: false,
204 })
205 }
206
207 pub fn add(&mut self, hash: ContentHash, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
209 self.add_id(PackObjectId::Hash(hash), obj_type, data)
210 }
211
212 pub fn add_id(&mut self, id: PackObjectId, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
235 let pw = self
239 .pack_writer
240 .as_mut()
241 .expect("add_id called after finalize");
242 pw.flush().map_err(StoreError::from)?;
243 let entry_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
244 let offset = entry_start
245 .checked_sub(self.header_offset)
246 .expect("header_offset should never be past current position");
247
248 self.total_uncompressed += data.len() as u64;
249
250 let mut header_buf = Vec::with_capacity(40);
253 id.encode_tagged(&mut header_buf);
254 super::varint::encode_type_and_size(obj_type, data.len() as u64, &mut header_buf);
255 pw.write_all(&header_buf).map_err(StoreError::from)?;
256 #[cfg(feature = "zstd")]
260 let csize_pos = entry_start + header_buf.len() as u64;
261
262 let want_compress: bool;
275 #[cfg(feature = "zstd")]
276 {
277 want_compress = self.compression.enabled && data.len() >= self.compression.min_size;
278 }
279 #[cfg(not(feature = "zstd"))]
280 {
281 want_compress = false;
282 }
283 if !want_compress {
284 let mut csize_buf = Vec::with_capacity(10);
287 super::varint::encode_varint(data.len() as u64, &mut csize_buf);
288 pw.write_all(&csize_buf).map_err(StoreError::from)?;
289 pw.write_all(&data).map_err(StoreError::from)?;
290 self.total_compressed += data.len() as u64;
291 } else {
292 #[cfg(feature = "zstd")]
293 {
294 pw.write_all(&[0u8; CSIZE_PLACEHOLDER_LEN])
297 .map_err(StoreError::from)?;
298 pw.flush().map_err(StoreError::from)?;
299 let body_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
300 {
301 let mut enc =
302 zstd::stream::write::Encoder::new(&mut *pw, self.compression.level)
303 .map_err(StoreError::from)?;
304 enc.set_pledged_src_size(Some(data.len() as u64))
309 .map_err(StoreError::from)?;
310 enc.write_all(&data).map_err(StoreError::from)?;
311 enc.finish().map_err(StoreError::from)?;
312 }
313 pw.flush().map_err(StoreError::from)?;
314 let body_end = pw.get_mut().stream_position().map_err(StoreError::from)?;
315 let compressed_size = body_end - body_start;
316 self.total_compressed += compressed_size;
317
318 let mut csize_bytes = [0u8; CSIZE_PLACEHOLDER_LEN];
323 encode_varint_padded_to_10(compressed_size, &mut csize_bytes);
324 let inner = pw.get_mut();
325 inner
326 .seek(SeekFrom::Start(csize_pos))
327 .map_err(StoreError::from)?;
328 inner.write_all(&csize_bytes).map_err(StoreError::from)?;
329 inner
330 .seek(SeekFrom::Start(body_end))
331 .map_err(StoreError::from)?;
332 }
333 #[cfg(not(feature = "zstd"))]
334 {
335 unreachable!("compression branch reached without `zstd` feature");
338 }
339 }
340
341 let bucket_idx = bucket_index_for(&id);
345 let bucket = self.get_or_open_bucket(bucket_idx)?;
346 let mut idx_entry = Vec::with_capacity(33 + 8);
347 id.encode_tagged(&mut idx_entry);
348 idx_entry.extend_from_slice(&offset.to_be_bytes());
349 bucket.write_all(&idx_entry).map_err(StoreError::from)?;
350
351 self.object_count += 1;
352 Ok(())
353 }
354
355 fn get_or_open_bucket(&mut self, idx: usize) -> Result<&mut BufWriter<File>> {
356 self.bucket_access_tick = self.bucket_access_tick.wrapping_add(1);
357 let last_used = self.bucket_access_tick;
358 if self.bucket_writers[idx].is_none() {
359 if self.open_bucket_writers >= MAX_OPEN_BUCKET_WRITERS {
360 self.evict_lru_bucket()?;
361 }
362 let path = &self.bucket_paths[idx];
363 let f = OpenOptions::new()
364 .create(true)
365 .append(true)
366 .open(path)
367 .map_err(StoreError::from)?;
368 self.bucket_writers[idx] = Some(BucketWriter {
369 writer: BufWriter::new(f),
370 last_used,
371 });
372 self.open_bucket_writers += 1;
373 } else if let Some(bucket) = self.bucket_writers[idx].as_mut() {
374 bucket.last_used = last_used;
375 }
376 Ok(&mut self.bucket_writers[idx]
377 .as_mut()
378 .expect("just inserted above")
379 .writer)
380 }
381
382 fn evict_lru_bucket(&mut self) -> Result<()> {
383 let Some((idx, _)) = self
384 .bucket_writers
385 .iter()
386 .enumerate()
387 .filter_map(|(idx, bucket)| bucket.as_ref().map(|bucket| (idx, bucket.last_used)))
388 .min_by_key(|(_, last_used)| *last_used)
389 else {
390 return Ok(());
391 };
392
393 if let Some(mut bucket) = self.bucket_writers[idx].take() {
394 bucket.writer.flush().map_err(StoreError::from)?;
395 self.open_bucket_writers -= 1;
396 }
397 Ok(())
398 }
399
400 pub fn finalize(mut self) -> Result<(W, PackStats)> {
411 for bucket in self.bucket_writers.iter_mut().flatten() {
414 bucket.writer.flush().map_err(StoreError::from)?;
415 }
416 for slot in self.bucket_writers.iter_mut() {
419 *slot = None;
420 }
421 self.open_bucket_writers = 0;
422
423 let bw = self
427 .pack_writer
428 .take()
429 .expect("finalize called twice — pack_writer already consumed");
430 let mut writer = bw
431 .into_inner()
432 .map_err(|e| StoreError::from(std::io::Error::other(e.to_string())))?;
433 writer
434 .seek(SeekFrom::Start(self.header_offset))
435 .map_err(StoreError::from)?;
436 let mut header_bytes = Vec::with_capacity(16);
437 write_container_header(&mut header_bytes, pack_container_spec(), self.object_count);
438 writer.write_all(&header_bytes).map_err(StoreError::from)?;
439
440 writer
445 .seek(SeekFrom::Start(self.header_offset))
446 .map_err(StoreError::from)?;
447 let mut hasher = blake3::Hasher::new();
448 let mut buf = vec![0u8; 64 * 1024];
449 loop {
450 let n = writer.read(&mut buf).map_err(StoreError::from)?;
451 if n == 0 {
452 break;
453 }
454 hasher.update(&buf[..n]);
455 }
456 let checksum = hasher.finalize();
457
458 writer.seek(SeekFrom::End(0)).map_err(StoreError::from)?;
460 writer
461 .write_all(checksum.as_bytes())
462 .map_err(StoreError::from)?;
463 writer.flush().map_err(StoreError::from)?;
464
465 let idx_file = File::create(&self.index_path).map_err(StoreError::from)?;
478 let mut idx_writer = BufWriter::new(idx_file);
479 write_index_header(&mut idx_writer, self.object_count)?;
480 let mut entries_written: u64 = 0;
481 for path in self.bucket_paths.iter() {
482 if !path.exists() {
483 continue;
484 }
485 let bucket_bytes = std::fs::read(path).map_err(StoreError::from)?;
486 let mut entries = decode_bucket_file(&bucket_bytes)?;
487 entries.sort_by_key(|(id, _)| *id);
492 for (id, offset) in entries {
493 write_index_entry(&mut idx_writer, id, offset)?;
494 entries_written += 1;
495 }
496 }
497 idx_writer.flush().map_err(StoreError::from)?;
498 debug_assert_eq!(
499 entries_written, self.object_count,
500 "streaming index entry count drifted from add() count"
501 );
502
503 for path in self.bucket_paths.iter() {
508 let _ = std::fs::remove_file(path);
509 }
510 let _ = std::fs::remove_dir(&self.bucket_dir);
511 self.finalized = true;
512
513 let stats = PackStats {
514 object_count: self.object_count,
515 total_uncompressed: self.total_uncompressed,
516 total_compressed: self.total_compressed,
517 delta_count: 0,
518 compression_ratio: if self.total_uncompressed == 0 {
519 0.0
520 } else {
521 self.total_compressed as f64 / self.total_uncompressed as f64
522 },
523 };
524
525 Ok((writer, stats))
526 }
527}
528
529fn write_index_header<W: Write>(out: &mut W, count: u64) -> Result<()> {
534 super::pack_index::index_header().write_to(out, count)
535}
536
537fn write_index_entry<W: Write>(out: &mut W, id: PackObjectId, offset: u64) -> Result<()> {
541 let mut buf = Vec::with_capacity(33 + 8);
542 id.encode_tagged(&mut buf);
543 buf.extend_from_slice(&offset.to_be_bytes());
544 out.write_all(&buf).map_err(StoreError::from)
545}
546
547#[cfg(feature = "zstd")]
560fn encode_varint_padded_to_10(value: u64, out: &mut [u8; 10]) {
561 let mut v = value;
562 for slot in out.iter_mut().take(9) {
563 *slot = 0x80 | ((v & 0x7F) as u8);
564 v >>= 7;
565 }
566 out[9] = (v & 0x7F) as u8;
567}
568
569impl<W: Write + Read + Seek> Drop for StreamingPackBuilder<W> {
570 fn drop(&mut self) {
571 if self.finalized {
572 return;
573 }
574 for path in self.bucket_paths.iter() {
577 let _ = std::fs::remove_file(path);
578 }
579 let _ = std::fs::remove_dir(&self.bucket_dir);
580 }
581}
582
583fn bucket_index_for(id: &PackObjectId) -> usize {
587 match id {
588 PackObjectId::Hash(h) => HASH_VARIANT * BUCKETS_PER_VARIANT + h.as_bytes()[0] as usize,
589 PackObjectId::ChangeId(c) => {
590 CHANGEID_VARIANT * BUCKETS_PER_VARIANT + c.as_bytes()[0] as usize
591 }
592 }
593}
594
595fn decode_bucket_file(bytes: &[u8]) -> Result<Vec<(PackObjectId, u64)>> {
600 let mut out = Vec::new();
601 let mut pos = 0;
602 while pos < bytes.len() {
603 let (id, id_len) = PackObjectId::decode_tagged(&bytes[pos..])?;
604 pos += id_len;
605 if pos + 8 > bytes.len() {
606 return Err(StoreError::InvalidObject(
607 "streaming bucket entry truncated at offset".to_string(),
608 ));
609 }
610 let offset = u64::from_be_bytes(bytes[pos..pos + 8].try_into().map_err(|_| {
611 StoreError::InvalidObject("streaming bucket bad offset slice".to_string())
612 })?);
613 pos += 8;
614 out.push((id, offset));
615 }
616 Ok(out)
617}
618
619#[cfg(test)]
622mod tests {
623 use std::io::Cursor;
624
625 use super::*;
626 use crate::{
627 object::ChangeId,
628 store::pack::{PackReader, PackStats},
629 };
630
631 fn deterministic_hash(seed: u8) -> ContentHash {
632 let mut bytes = [0u8; 32];
636 bytes[0] = seed;
637 for (i, b) in bytes.iter_mut().enumerate().skip(1) {
638 *b = seed.wrapping_mul(31).wrapping_add(i as u8);
639 }
640 ContentHash::from_bytes(bytes)
641 }
642
643 fn deterministic_change_id(seed: u8) -> ChangeId {
644 let mut bytes = [0u8; 16];
645 bytes[0] = seed;
646 for (i, b) in bytes.iter_mut().enumerate().skip(1) {
647 *b = seed.wrapping_add(i as u8 * 7);
648 }
649 ChangeId::from_bytes(bytes)
650 }
651
652 fn fresh_builder(
657 tmp: &tempfile::TempDir,
658 ) -> (StreamingPackBuilder<Cursor<Vec<u8>>>, PathBuf, PathBuf) {
659 let bucket_dir = tmp.path().join("buckets");
660 let index_path = tmp.path().join("test.idx");
661 let cursor = Cursor::new(Vec::<u8>::new());
662 let b = StreamingPackBuilder::new(
663 cursor,
664 index_path.clone(),
665 CompressionConfig::default(),
666 bucket_dir.clone(),
667 )
668 .unwrap();
669 (b, bucket_dir, index_path)
670 }
671
672 fn finalize_cursor(
677 b: StreamingPackBuilder<Cursor<Vec<u8>>>,
678 index_path: &std::path::Path,
679 ) -> (Vec<u8>, Vec<u8>, PackStats) {
680 let (cursor, stats) = b.finalize().unwrap();
681 let index_bytes = std::fs::read(index_path).unwrap();
682 (cursor.into_inner(), index_bytes, stats)
683 }
684
685 #[test]
686 fn empty_pack_finalizes_to_valid_zero_count_pack() {
687 let tmp = tempfile::TempDir::new().unwrap();
688 let (b, bucket_dir, idx_path) = fresh_builder(&tmp);
689 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
690
691 assert_eq!(stats.object_count, 0);
692 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
694 assert!(reader.list_ids().is_empty());
695 assert!(
697 !bucket_dir.exists(),
698 "bucket dir should be cleaned on successful finalize"
699 );
700 }
701
702 #[test]
703 fn single_blob_with_hash_id_round_trips() {
704 let tmp = tempfile::TempDir::new().unwrap();
705 let (mut b, _, idx_path) = fresh_builder(&tmp);
706 let hash = deterministic_hash(0x42);
707 let payload = b"hello, streaming pack".to_vec();
708 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
709 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
710
711 assert_eq!(stats.object_count, 1);
712 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
713 let id = PackObjectId::Hash(hash);
714 assert!(reader.has_object(&id));
715 let (got_type, got_data) = reader.get_object(&id).unwrap().unwrap();
716 assert_eq!(got_type, ObjectType::Blob);
717 assert_eq!(got_data, payload);
718 }
719
720 #[test]
721 fn single_state_with_change_id_round_trips() {
722 let tmp = tempfile::TempDir::new().unwrap();
723 let (mut b, _, idx_path) = fresh_builder(&tmp);
724 let cid = deterministic_change_id(0xa5);
725 let payload = b"serialized-state-bytes".to_vec();
726 b.add_id(
727 PackObjectId::ChangeId(cid),
728 ObjectType::State,
729 payload.clone(),
730 )
731 .unwrap();
732 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
733
734 assert_eq!(stats.object_count, 1);
735 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
736 let id = PackObjectId::ChangeId(cid);
737 let (ty, data) = reader.get_object(&id).unwrap().unwrap();
738 assert_eq!(ty, ObjectType::State);
739 assert_eq!(data, payload);
740 }
741
742 #[test]
743 fn mixed_hash_and_changeid_ids_all_retrievable() {
744 let tmp = tempfile::TempDir::new().unwrap();
745 let (mut b, _, idx_path) = fresh_builder(&tmp);
746 let blob_hash = deterministic_hash(0x10);
747 let tree_hash = deterministic_hash(0x20);
748 let state_cid = deterministic_change_id(0x80);
749
750 b.add(blob_hash, ObjectType::Blob, b"blob-bytes".to_vec())
751 .unwrap();
752 b.add(tree_hash, ObjectType::Tree, b"serialized-tree".to_vec())
753 .unwrap();
754 b.add_id(
755 PackObjectId::ChangeId(state_cid),
756 ObjectType::State,
757 b"serialized-state".to_vec(),
758 )
759 .unwrap();
760
761 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
762 assert_eq!(stats.object_count, 3);
763 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
764 assert_eq!(
765 reader
766 .get_object(&PackObjectId::Hash(blob_hash))
767 .unwrap()
768 .unwrap()
769 .1,
770 b"blob-bytes".to_vec()
771 );
772 assert_eq!(
773 reader
774 .get_object(&PackObjectId::Hash(tree_hash))
775 .unwrap()
776 .unwrap()
777 .1,
778 b"serialized-tree".to_vec()
779 );
780 assert_eq!(
781 reader
782 .get_object(&PackObjectId::ChangeId(state_cid))
783 .unwrap()
784 .unwrap()
785 .1,
786 b"serialized-state".to_vec()
787 );
788 }
789
790 #[test]
791 fn ten_thousand_objects_round_trip_correctly() {
792 let tmp = tempfile::TempDir::new().unwrap();
796 let (mut b, _, idx_path) = fresh_builder(&tmp);
797 let mut hashes = Vec::with_capacity(10_000);
798 for i in 0..10_000u32 {
799 let h = blake3::hash(&i.to_le_bytes());
802 let hash = ContentHash::from_bytes(*h.as_bytes());
803 hashes.push(hash);
804 b.add(hash, ObjectType::Blob, format!("payload-{i}").into_bytes())
805 .unwrap();
806 }
807 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
808 assert_eq!(stats.object_count, 10_000);
809
810 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
811 assert_eq!(reader.list_ids().len(), 10_000);
812 for i in [0, 1, 99, 1234, 5_000, 9_999] {
814 let id = PackObjectId::Hash(hashes[i]);
815 let (_ty, data) = reader.get_object(&id).unwrap().unwrap();
816 assert_eq!(data, format!("payload-{i}").into_bytes());
817 }
818 }
819
820 #[test]
821 fn bucket_writers_are_lru_capped_below_fd_limit() {
822 let tmp = tempfile::TempDir::new().unwrap();
823 let (mut b, _bucket_dir, idx_path) = fresh_builder(&tmp);
824 let mut ids = Vec::new();
825
826 for i in 0..BUCKETS_PER_VARIANT {
827 let hash = deterministic_hash(i as u8);
828 ids.push(PackObjectId::Hash(hash));
829 b.add(hash, ObjectType::Blob, format!("hash-{i}").into_bytes())
830 .unwrap();
831 assert!(
832 b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
833 "open bucket writers should stay capped"
834 );
835 }
836
837 for i in 0..BUCKETS_PER_VARIANT {
838 let cid = deterministic_change_id(i as u8);
839 ids.push(PackObjectId::ChangeId(cid));
840 b.add_id(
841 PackObjectId::ChangeId(cid),
842 ObjectType::State,
843 format!("state-{i}").into_bytes(),
844 )
845 .unwrap();
846 assert!(
847 b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
848 "open bucket writers should stay capped"
849 );
850 }
851
852 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
853 assert_eq!(stats.object_count, TOTAL_BUCKETS as u64);
854 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
855 for id in ids {
856 assert!(reader.has_object(&id), "missing id {id:?}");
857 }
858 }
859
860 #[test]
861 fn index_id_sort_order_matches_packbuilder_output() {
862 use crate::store::pack::PackBuilder;
870 let payloads: Vec<(PackObjectId, ObjectType, Vec<u8>)> = (0..200u32)
871 .map(|i| {
872 let h = blake3::hash(&i.to_le_bytes());
873 (
874 PackObjectId::Hash(ContentHash::from_bytes(*h.as_bytes())),
875 if i % 3 == 0 {
876 ObjectType::Tree
877 } else {
878 ObjectType::Blob
879 },
880 format!("body-{i}").into_bytes(),
881 )
882 })
883 .collect();
884
885 let compression = CompressionConfig {
888 max_delta_size: 0,
889 ..CompressionConfig::default()
890 };
891 let mut classic = PackBuilder::new(compression);
892 for (id, ty, data) in payloads.iter() {
893 classic.add_id(*id, *ty, data.clone());
894 }
895 let (classic_pack, classic_index, _) = classic.build().unwrap();
896 let classic_reader = PackReader::from_bytes(classic_pack, classic_index).unwrap();
897
898 let tmp = tempfile::TempDir::new().unwrap();
899 let bucket_dir = tmp.path().join("buckets");
900 let idx_path = tmp.path().join("test.idx");
901 let cursor = Cursor::new(Vec::<u8>::new());
902 let mut streaming =
903 StreamingPackBuilder::new(cursor, idx_path.clone(), compression, bucket_dir).unwrap();
904 for (id, ty, data) in payloads.iter() {
905 streaming.add_id(*id, *ty, data.clone()).unwrap();
906 }
907 let (streaming_pack, streaming_index, _) = finalize_cursor(streaming, &idx_path);
908 let streaming_reader = PackReader::from_bytes(streaming_pack, streaming_index).unwrap();
909
910 assert_eq!(
913 streaming_reader.list_ids(),
914 classic_reader.list_ids(),
915 "streaming and classic indices should report the same id sequence"
916 );
917 for (id, _ty, want) in payloads.iter().take(10).chain(payloads.iter().skip(190)) {
920 let (_, got) = streaming_reader.get_object(id).unwrap().unwrap();
921 assert_eq!(&got, want);
922 let (_, classic_got) = classic_reader.get_object(id).unwrap().unwrap();
923 assert_eq!(got, classic_got);
924 }
925 }
926
927 #[test]
928 fn corrupted_pack_fails_checksum_verification() {
929 let tmp = tempfile::TempDir::new().unwrap();
930 let (mut b, _, idx_path) = fresh_builder(&tmp);
931 b.add(
932 deterministic_hash(0x01),
933 ObjectType::Blob,
934 b"some bytes".to_vec(),
935 )
936 .unwrap();
937 let (mut pack_data, index_data, _) = finalize_cursor(b, &idx_path);
938 let body_byte = 18; pack_data[body_byte] ^= 0xff;
941 let result = PackReader::from_bytes(pack_data, index_data);
942 assert!(
943 result.is_err(),
944 "PackReader should reject pack with mutated body"
945 );
946 }
947
948 #[test]
949 fn pack_count_in_header_matches_index_entry_count() {
950 let tmp = tempfile::TempDir::new().unwrap();
951 let (mut b, _, idx_path) = fresh_builder(&tmp);
952 for i in 0..7u8 {
953 b.add(
954 deterministic_hash(i),
955 ObjectType::Blob,
956 format!("p{i}").into_bytes(),
957 )
958 .unwrap();
959 }
960 let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
961 let count = u64::from_be_bytes(pack_data[8..16].try_into().unwrap());
963 assert_eq!(count, 7);
964 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
965 assert_eq!(reader.list_ids().len(), 7);
966 }
967
968 #[test]
969 fn bucket_files_are_cleaned_on_successful_finalize() {
970 let tmp = tempfile::TempDir::new().unwrap();
971 let bucket_dir = tmp.path().join("buckets");
972 let idx_path = tmp.path().join("test.idx");
973 let cursor = Cursor::new(Vec::<u8>::new());
974 let mut b = StreamingPackBuilder::new(
975 cursor,
976 idx_path.clone(),
977 CompressionConfig::default(),
978 bucket_dir.clone(),
979 )
980 .unwrap();
981 for i in 0..50u8 {
982 b.add(deterministic_hash(i), ObjectType::Blob, vec![i; 32])
983 .unwrap();
984 }
985 assert!(bucket_dir.exists());
987 let bucket_count = std::fs::read_dir(&bucket_dir).unwrap().count();
988 assert!(bucket_count > 0, "bucket dir should hold some files");
989 let _ = finalize_cursor(b, &idx_path);
990 assert!(
991 !bucket_dir.exists(),
992 "bucket dir should be removed on finalize"
993 );
994 }
995
996 #[test]
997 fn bucket_files_are_cleaned_on_drop_without_finalize() {
998 let tmp = tempfile::TempDir::new().unwrap();
999 let bucket_dir = tmp.path().join("buckets");
1000 let idx_path = tmp.path().join("test.idx");
1001 {
1002 let cursor = Cursor::new(Vec::<u8>::new());
1003 let mut b = StreamingPackBuilder::new(
1004 cursor,
1005 idx_path.clone(),
1006 CompressionConfig::default(),
1007 bucket_dir.clone(),
1008 )
1009 .unwrap();
1010 for i in 0..10u8 {
1011 b.add(deterministic_hash(i), ObjectType::Blob, vec![0; 32])
1012 .unwrap();
1013 }
1014 assert!(bucket_dir.exists());
1015 }
1017 assert!(
1018 !idx_path.exists(),
1019 "no index file should have been created without finalize"
1020 );
1021 assert!(
1022 !bucket_dir.exists(),
1023 "bucket dir should be removed on Drop when finalize never ran"
1024 );
1025 }
1026
1027 #[test]
1028 fn large_blob_streams_to_disk_without_double_buffering() {
1029 let tmp = tempfile::TempDir::new().unwrap();
1034 let bucket_dir = tmp.path().join("buckets");
1035 let pack_path = tmp.path().join("pack.dat");
1036 let idx_path = tmp.path().join("pack.idx");
1037 let file = std::fs::OpenOptions::new()
1038 .read(true)
1039 .write(true)
1040 .create(true)
1041 .truncate(true)
1042 .open(&pack_path)
1043 .unwrap();
1044 let mut b = StreamingPackBuilder::new(
1045 file,
1046 idx_path.clone(),
1047 CompressionConfig::default(),
1048 bucket_dir,
1049 )
1050 .unwrap();
1051 let payload: Vec<u8> = (0..4 * 1024 * 1024u32).map(|i| (i & 0xff) as u8).collect();
1052 let hash = deterministic_hash(0xff);
1053 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1054 let (_, stats) = b.finalize().unwrap();
1055 let index_data = std::fs::read(&idx_path).unwrap();
1056 assert_eq!(stats.object_count, 1);
1057 let pack_bytes = std::fs::read(&pack_path).unwrap();
1058 let reader = PackReader::from_bytes(pack_bytes, index_data).unwrap();
1061 let (_ty, got) = reader
1062 .get_object(&PackObjectId::Hash(hash))
1063 .unwrap()
1064 .unwrap();
1065 assert_eq!(got, payload);
1066 }
1067
1068 #[test]
1069 fn bucket_distribution_for_random_hashes_is_roughly_uniform() {
1070 let tmp = tempfile::TempDir::new().unwrap();
1077 let bucket_dir = tmp.path().join("buckets");
1078 let idx_path = tmp.path().join("test.idx");
1079 let cursor = Cursor::new(Vec::<u8>::new());
1080 let mut b = StreamingPackBuilder::new(
1081 cursor,
1082 idx_path.clone(),
1083 CompressionConfig::default(),
1084 bucket_dir.clone(),
1085 )
1086 .unwrap();
1087 for i in 0..1024u32 {
1088 let h = blake3::hash(&i.to_le_bytes());
1089 let hash = ContentHash::from_bytes(*h.as_bytes());
1090 b.add(hash, ObjectType::Blob, b"x".to_vec()).unwrap();
1091 }
1092 b.pack_writer.as_mut().unwrap().flush().unwrap();
1094 let mut max_entries = 0usize;
1095 let entry_size = 33 + 8; for path in b.bucket_paths.iter() {
1097 if path.exists() {
1098 let size = std::fs::metadata(path).unwrap().len() as usize;
1099 let entries = size / entry_size;
1100 if entries > max_entries {
1101 max_entries = entries;
1102 }
1103 }
1104 }
1105 assert!(
1108 max_entries <= 16,
1109 "max bucket has {max_entries} entries; uniform expected ~4"
1110 );
1111 let _ = finalize_cursor(b, &idx_path);
1112 }
1113
1114 #[test]
1115 fn finalize_returns_correct_stats() {
1116 let tmp = tempfile::TempDir::new().unwrap();
1117 let (mut b, _, idx_path) = fresh_builder(&tmp);
1118 let payload = vec![0xabu8; 1024];
1119 for i in 0..5u8 {
1120 b.add(deterministic_hash(i), ObjectType::Blob, payload.clone())
1121 .unwrap();
1122 }
1123 let (_, _, stats) = finalize_cursor(b, &idx_path);
1124 assert_eq!(stats.object_count, 5);
1125 assert_eq!(stats.total_uncompressed, 5 * 1024);
1126 assert!(stats.total_compressed > 0);
1127 assert!(stats.compression_ratio > 0.0);
1128 assert_eq!(stats.delta_count, 0, "streaming builder never deltas");
1129 }
1130
1131 #[cfg(feature = "zstd")]
1132 #[test]
1133 fn streaming_compression_roundtrips_through_zstd_frame() {
1134 let tmp = tempfile::TempDir::new().unwrap();
1142 let (mut b, _, idx_path) = fresh_builder(&tmp);
1143 let payload = vec![0u8; 64 * 1024];
1146 let hash = deterministic_hash(0x77);
1147 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1148 let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
1149 assert!(
1150 stats.total_compressed < stats.total_uncompressed,
1151 "expected compression ratio < 1.0, got {}/{}",
1152 stats.total_compressed,
1153 stats.total_uncompressed
1154 );
1155 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1156 let (_ty, got) = reader
1157 .get_object(&PackObjectId::Hash(hash))
1158 .unwrap()
1159 .unwrap();
1160 assert_eq!(got, payload);
1161 }
1162
1163 #[cfg(feature = "zstd")]
1164 #[test]
1165 fn padded_varint_decodes_to_original_value_for_canonical_decoder() {
1166 let cases: &[u64] = &[0, 1, 127, 128, 4096, 1_000_000, 1_000_000_000_000, u64::MAX];
1172 for &value in cases {
1173 let mut buf = [0u8; 10];
1174 super::encode_varint_padded_to_10(value, &mut buf);
1175 let (decoded, consumed) = super::super::varint::decode_varint(&buf)
1176 .expect("padded varint should always decode");
1177 assert_eq!(decoded, value, "varint roundtrip failed for {value}");
1178 assert_eq!(
1179 consumed, 10,
1180 "padded encoding should consume all 10 bytes for {value}"
1181 );
1182 }
1183 }
1184
1185 #[cfg(feature = "zstd")]
1186 #[test]
1187 fn streaming_path_does_not_buffer_compressed_payload_in_memory() {
1188 let tmp = tempfile::TempDir::new().unwrap();
1201 let bucket_dir = tmp.path().join("buckets");
1202 let pack_path = tmp.path().join("pack.dat");
1203 let idx_path = tmp.path().join("pack.idx");
1204 let file = std::fs::OpenOptions::new()
1205 .read(true)
1206 .write(true)
1207 .create(true)
1208 .truncate(true)
1209 .open(&pack_path)
1210 .unwrap();
1211 let mut b = StreamingPackBuilder::new(
1212 file,
1213 idx_path.clone(),
1214 CompressionConfig::default(),
1215 bucket_dir,
1216 )
1217 .unwrap();
1218 let payload = vec![0xa5u8; 8 * 1024 * 1024];
1219 let hash = deterministic_hash(0x66);
1220 b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1221 let mid_size = std::fs::metadata(&pack_path).unwrap().len();
1225 assert!(
1226 mid_size > 16 + 40,
1227 "pack file should hold real entry data after add; size={mid_size}"
1228 );
1229 let (_, _) = b.finalize().unwrap();
1230 let pack_bytes = std::fs::read(&pack_path).unwrap();
1231 let index_bytes = std::fs::read(&idx_path).unwrap();
1232 let reader = PackReader::from_bytes(pack_bytes, index_bytes).unwrap();
1233 let (_ty, got) = reader
1234 .get_object(&PackObjectId::Hash(hash))
1235 .unwrap()
1236 .unwrap();
1237 assert_eq!(got, payload);
1238 }
1239
1240 #[test]
1241 fn list_ids_returns_all_added_ids_sorted() {
1242 let tmp = tempfile::TempDir::new().unwrap();
1243 let (mut b, _, idx_path) = fresh_builder(&tmp);
1244 let mut added: Vec<PackObjectId> = Vec::new();
1245 for seed in [0x05u8, 0xa0, 0x12, 0x9f, 0x33] {
1247 let id = PackObjectId::Hash(deterministic_hash(seed));
1248 b.add_id(id, ObjectType::Blob, vec![seed; 4]).unwrap();
1249 added.push(id);
1250 }
1251 for seed in [0x80u8, 0x10, 0xff] {
1252 let id = PackObjectId::ChangeId(deterministic_change_id(seed));
1253 b.add_id(id, ObjectType::State, vec![seed; 4]).unwrap();
1254 added.push(id);
1255 }
1256 let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1257 let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1258 let mut got = reader.list_ids();
1259 let mut sorted = got.clone();
1262 sorted.sort();
1263 assert_eq!(got, sorted, "list_ids must come back sorted");
1264 added.sort();
1266 got.sort();
1267 assert_eq!(got, added);
1268 }
1269}