Skip to main content

objects/store/pack/
streaming_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Streaming pack builder for bounded-memory imports.
3//!
4//! `PackBuilder` accumulates every `(id, type, data)` tuple in memory
5//! before producing a pack. That's fine for sync-protocol packets and
6//! small batches, but the import path can produce millions of objects
7//! and would OOM on large repos.
8//!
9//! `StreamingPackBuilder` removes the in-memory buffering by:
10//!
11//! 1. **Streaming pack data to disk** as objects are added. Compression
12//!    runs per-object (the existing zstd path is non-streaming, so the
13//!    one compressed payload is held briefly in a `Vec<u8>` before
14//!    being written), but the writer never holds more than one
15//!    object's worth of data plus its `BufWriter` capacity.
16//!
17//! 2. **External sorting the index** via 512 hash-prefix bucket files
18//!    on disk (256 for `Hash` ids, 256 for `ChangeId` ids). Each
19//!    `add()` appends one fixed-shape `(id, offset)` record to the
20//!    bucket whose first byte matches the id's first inner byte. At
21//!    finalize, each bucket is small enough to sort in memory; the
22//!    concatenation of `Hash` buckets followed by `ChangeId` buckets
23//!    in byte order produces the exact same global sort `PackBuilder`
24//!    would have via `entries.sort_by_key(|e| e.id)`.
25//!
26//! ## Memory bound
27//!
28//! - Pack data on disk: streamed; only one compressed object held in
29//!   memory at a time.
30//! - Index entries in bucket buffers: each bucket is a `BufWriter`
31//!   with default capacity (~8 KB), so 512 buckets at peak is ~4 MB.
32//! - Sort scratch at finalize: O(largest bucket). For uniformly-
33//!   distributed BLAKE3 hashes / ULID change-ids and N total objects,
34//!   the largest bucket is ~N/256 entries ≈ 40 bytes each. Even at
35//!   100 M objects that's ~16 MB peak.
36//!
37//! Net peak memory: ~20 MB regardless of repo size, modulo the size
38//! of the largest single object (which is unavoidable while the zstd
39//! API is non-streaming).
40//!
41//! ## Trade-offs vs `PackBuilder`
42//!
43//! - **No delta encoding.** Streaming and sliding-window deltas are
44//!   incompatible — delta search needs random access to recently-
45//!   written objects. The import path runs with deltas disabled
46//!   anyway (the cost-benefit is bad on real Heddle history), so this
47//!   is a non-issue for the call site that motivated this builder.
48//! - **No path-grouped reordering.** Entries land in the order added.
49//! - **Output is a pack file at a path** rather than `(Vec<u8>, Vec<u8>)`.
50//!   Callers pair this with [`crate::store::ObjectStore::install_pack_from_path`]
51//!   which moves/installs the pack without copying it through RAM.
52//! - **Re-reads the pack at finalize** to compute the BLAKE3 trailer
53//!   checksum (the pack format hashes header+body, and the count goes
54//!   in the header — we patch it on finalize via seek-back, then
55//!   re-stream the body to the hasher). 2× sequential disk I/O on the
56//!   pack data is the cost of sticking with the current format. A
57//!   future format change could put the count in the footer to avoid
58//!   the second pass.
59
60use std::{
61    fs::File,
62    io::{BufWriter, Read, Seek, SeekFrom, Write},
63    path::PathBuf,
64};
65
66use super::{ObjectType, PackObjectId, PackStats, pack_container_spec, write_container_header};
67
68/// How many bytes to reserve for the compressed-size varint in the
69/// streaming path. 10 is enough to encode any `u64` (max 9 7-bit
70/// continuation bytes plus 1 terminator). After streaming we patch
71/// the placeholder with a non-canonical varint that pads to exactly
72/// this length. Only the zstd-enabled compress path uses it.
73#[cfg(feature = "zstd")]
74const CSIZE_PLACEHOLDER_LEN: usize = 10;
75use crate::{
76    object::ContentHash,
77    store::{Result, StoreError, compression::CompressionConfig},
78};
79
80/// Number of buckets per id variant. 256 = one bucket per first byte
81/// of the inner id. We want the bucket boundaries to align with the
82/// `PackObjectId`'s `Ord` derivation (variant tag major, inner bytes
83/// minor) so the concatenated bucket output matches what
84/// `PackIndex::sort()` would have produced.
85const BUCKETS_PER_VARIANT: usize = 256;
86/// 256 for Hash ids + 256 for ChangeId ids.
87const TOTAL_BUCKETS: usize = BUCKETS_PER_VARIANT * 2;
88
89/// Variant indices into the `bucket_*` arrays. `Hash` ids fill the
90/// lower half (matches the variant order in `PackObjectId` which makes
91/// `Hash(_) < ChangeId(_)`).
92const HASH_VARIANT: usize = 0;
93const CHANGEID_VARIANT: usize = 1;
94
95/// Streaming pack builder. Held generic over the pack writer (`File`
96/// in production, `Cursor<Vec<u8>>` in tests).
97pub struct StreamingPackBuilder<W: Write + Read + Seek> {
98    /// Writer for the pack's `[header][body]` content. The trailer
99    /// checksum is appended to the same writer at `finalize`.
100    /// Wrapped in `Option` so `finalize` can `.take()` it out without
101    /// running afoul of the `Drop` impl's restriction on moving fields.
102    /// `None` after `finalize` succeeds.
103    pack_writer: Option<BufWriter<W>>,
104    /// Position in the pack writer where the header was written, so
105    /// we can seek back at finalize and patch the real `object_count`
106    /// into bytes 8..16.
107    header_offset: u64,
108    object_count: u64,
109    total_uncompressed: u64,
110    total_compressed: u64,
111    /// Compression knobs. Only consulted when the `zstd` feature is on
112    /// (`enabled` and `min_size` decide whether each entry compresses;
113    /// `level` parameterizes the encoder). Without `zstd` every entry
114    /// takes the raw branch and this field is just along for the ride.
115    #[cfg_attr(not(feature = "zstd"), allow(dead_code))]
116    compression: CompressionConfig,
117    /// Directory holding the 512 bucket files. Owned by the builder
118    /// so we can clean up on `Drop` if `finalize` is never called.
119    bucket_dir: PathBuf,
120    /// Buckets `[variant][prefix_byte]` → optional buffered file.
121    /// Lazily opened on first write to avoid creating 512 empty
122    /// files for sparse imports.
123    bucket_writers: Vec<Option<BufWriter<File>>>,
124    bucket_paths: Vec<PathBuf>,
125    /// File path where the pack index is materialized at `finalize`.
126    /// Bytes are written incrementally as buckets are sorted, so the
127    /// index never sits in memory in its entirety.
128    index_path: PathBuf,
129    /// Set true on `finalize` so `Drop` knows the bucket dir was
130    /// already cleaned and shouldn't be removed again.
131    finalized: bool,
132}
133
134impl<W: Write + Read + Seek> StreamingPackBuilder<W> {
135    /// Open a streaming builder against `pack_writer`, using
136    /// `bucket_dir` for transient index buckets and writing the
137    /// finalized index to `index_path`. The bucket dir is created if
138    /// it doesn't exist; on a successful `finalize` it's removed
139    /// (along with any bucket files left in it).
140    ///
141    /// `index_path` is *not* created by `new` — opening happens at
142    /// finalize so a misconfigured caller doesn't leave an empty index
143    /// file behind on early failure. It's still recorded here so
144    /// `finalize` can write to a known location and the caller can
145    /// install the file by path.
146    ///
147    /// The `pack_writer` must support `Read` because finalize re-streams
148    /// the body to compute the trailer checksum — see the module-level
149    /// note on the format.
150    pub fn new(
151        mut pack_writer: W,
152        index_path: PathBuf,
153        compression: CompressionConfig,
154        bucket_dir: PathBuf,
155    ) -> Result<Self> {
156        std::fs::create_dir_all(&bucket_dir).map_err(StoreError::from)?;
157        let header_offset = pack_writer.stream_position().map_err(StoreError::from)?;
158
159        // Write a placeholder header with `count = 0`; finalize seeks
160        // back here and rewrites the real count.
161        let mut header_bytes = Vec::with_capacity(16);
162        write_container_header(&mut header_bytes, pack_container_spec(), 0);
163        pack_writer
164            .write_all(&header_bytes)
165            .map_err(StoreError::from)?;
166
167        let bucket_paths: Vec<PathBuf> = (0..TOTAL_BUCKETS)
168            .map(|i| {
169                let variant = if i < BUCKETS_PER_VARIANT { 'h' } else { 'c' };
170                let prefix = i % BUCKETS_PER_VARIANT;
171                bucket_dir.join(format!("bucket-{variant}-{prefix:02x}"))
172            })
173            .collect();
174
175        Ok(Self {
176            pack_writer: Some(BufWriter::new(pack_writer)),
177            header_offset,
178            object_count: 0,
179            total_uncompressed: 0,
180            total_compressed: 0,
181            compression,
182            bucket_dir,
183            bucket_writers: (0..TOTAL_BUCKETS).map(|_| None).collect(),
184            bucket_paths,
185            index_path,
186            finalized: false,
187        })
188    }
189
190    /// Add an object with a content-hash id.
191    pub fn add(&mut self, hash: ContentHash, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
192        self.add_id(PackObjectId::Hash(hash), obj_type, data)
193    }
194
195    /// Add an object with an explicit id. Mirrors [`super::PackBuilder::add_id`].
196    ///
197    /// # Memory shape
198    ///
199    /// Per-entry, the only allocations are:
200    ///
201    /// - `data: Vec<u8>` (the input, owned by the caller — comes from
202    ///   gix' `find_object` and isn't ours to stream further).
203    /// - A ~40-byte stack scratch for the entry header.
204    /// - zstd's internal compression context (~128 KB constant).
205    /// - One 50-byte index-bucket entry buffered into the bucket's
206    ///   `BufWriter`.
207    ///
208    /// The compressed payload is **never materialized** as a `Vec<u8>` —
209    /// it streams directly through `zstd::stream::write::Encoder` into
210    /// the pack writer. The pack format requires a `compressed_size`
211    /// varint *before* the compressed bytes, which we don't know yet
212    /// when we write the header; we reserve a 10-byte placeholder and
213    /// seek-back to patch it after the encoder finishes. Heddle's
214    /// varint decoder accepts non-canonical encodings (it walks
215    /// continuation bits without enforcing minimum-byte form), so the
216    /// padded write decodes back to the same value any reader expects.
217    pub fn add_id(&mut self, id: PackObjectId, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
218        // Compute the entry's offset relative to the header. Flush the
219        // BufWriter first so `stream_position` reflects bytes actually
220        // committed to the underlying writer.
221        let pw = self
222            .pack_writer
223            .as_mut()
224            .expect("add_id called after finalize");
225        pw.flush().map_err(StoreError::from)?;
226        let entry_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
227        let offset = entry_start
228            .checked_sub(self.header_offset)
229            .expect("header_offset should never be past current position");
230
231        self.total_uncompressed += data.len() as u64;
232
233        // Phase 1: write the entry header up to (but not including) the
234        // compressed-size varint. Always small, fits in `entry_header_buf`.
235        let mut header_buf = Vec::with_capacity(40);
236        id.encode_tagged(&mut header_buf);
237        super::varint::encode_type_and_size(obj_type, data.len() as u64, &mut header_buf);
238        pw.write_all(&header_buf).map_err(StoreError::from)?;
239        // Only consumed by the zstd-enabled streaming branch below, but
240        // we compute it here while we already have `header_buf`'s length
241        // in scope.
242        #[cfg(feature = "zstd")]
243        let csize_pos = entry_start + header_buf.len() as u64;
244
245        // Phase 2: stream the compressed payload. We branch here on
246        // whether to compress at all — for tiny objects (`< min_size`)
247        // the bulk path traditionally wrote raw bytes to skip zstd
248        // overhead, and the reader's existing `compressed_size ==
249        // uncompressed_size` heuristic in `pack_reader.rs:128` reads
250        // raw entries back unchanged. We preserve that policy.
251        // `want_compress` gates the zstd path. Even with the feature
252        // enabled we fall through to raw for tiny entries (where
253        // zstd's frame overhead dominates) or when the caller
254        // explicitly disabled compression in `CompressionConfig`.
255        // Without the `zstd` Cargo feature, every entry takes the raw
256        // branch — same fallback shape as `compress_pack_payload`.
257        let want_compress: bool;
258        #[cfg(feature = "zstd")]
259        {
260            want_compress = self.compression.enabled && data.len() >= self.compression.min_size;
261        }
262        #[cfg(not(feature = "zstd"))]
263        {
264            want_compress = false;
265        }
266        if !want_compress {
267            // Raw entry: known compressed_size = data.len(). One canonical
268            // varint + the data itself. No seek-back needed.
269            let mut csize_buf = Vec::with_capacity(10);
270            super::varint::encode_varint(data.len() as u64, &mut csize_buf);
271            pw.write_all(&csize_buf).map_err(StoreError::from)?;
272            pw.write_all(&data).map_err(StoreError::from)?;
273            self.total_compressed += data.len() as u64;
274        } else {
275            #[cfg(feature = "zstd")]
276            {
277                // Streaming entry: reserve 10 bytes for compressed_size,
278                // stream-compress the payload, then seek back to patch.
279                pw.write_all(&[0u8; CSIZE_PLACEHOLDER_LEN])
280                    .map_err(StoreError::from)?;
281                pw.flush().map_err(StoreError::from)?;
282                let body_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
283                {
284                    let mut enc =
285                        zstd::stream::write::Encoder::new(&mut *pw, self.compression.level)
286                            .map_err(StoreError::from)?;
287                    // Pass the source size so the zstd frame's optional
288                    // Frame Content Size field is set — lets decoders
289                    // preallocate output buffers and validates that we
290                    // wrote exactly what we promised at finish().
291                    enc.set_pledged_src_size(Some(data.len() as u64))
292                        .map_err(StoreError::from)?;
293                    enc.write_all(&data).map_err(StoreError::from)?;
294                    enc.finish().map_err(StoreError::from)?;
295                }
296                pw.flush().map_err(StoreError::from)?;
297                let body_end = pw.get_mut().stream_position().map_err(StoreError::from)?;
298                let compressed_size = body_end - body_start;
299                self.total_compressed += compressed_size;
300
301                // Seek back over the placeholder, write a 10-byte
302                // non-canonical varint encoding the actual compressed_size,
303                // then seek forward to where we left off so subsequent
304                // adds append correctly.
305                let mut csize_bytes = [0u8; CSIZE_PLACEHOLDER_LEN];
306                encode_varint_padded_to_10(compressed_size, &mut csize_bytes);
307                let inner = pw.get_mut();
308                inner
309                    .seek(SeekFrom::Start(csize_pos))
310                    .map_err(StoreError::from)?;
311                inner.write_all(&csize_bytes).map_err(StoreError::from)?;
312                inner
313                    .seek(SeekFrom::Start(body_end))
314                    .map_err(StoreError::from)?;
315            }
316            #[cfg(not(feature = "zstd"))]
317            {
318                // Unreachable: `want_compress` is forced to `false`
319                // when the `zstd` feature is off.
320                unreachable!("compression branch reached without `zstd` feature");
321            }
322        }
323
324        // Append the index entry (id || u64 BE offset) to the bucket
325        // matching the id's first inner byte. The bucket file is opened
326        // lazily so a sparse pack only creates files it actually uses.
327        let bucket_idx = bucket_index_for(&id);
328        let bucket = self.get_or_open_bucket(bucket_idx)?;
329        let mut idx_entry = Vec::with_capacity(33 + 8);
330        id.encode_tagged(&mut idx_entry);
331        idx_entry.extend_from_slice(&offset.to_be_bytes());
332        bucket.write_all(&idx_entry).map_err(StoreError::from)?;
333
334        self.object_count += 1;
335        Ok(())
336    }
337
338    fn get_or_open_bucket(&mut self, idx: usize) -> Result<&mut BufWriter<File>> {
339        if self.bucket_writers[idx].is_none() {
340            let path = &self.bucket_paths[idx];
341            let f = File::create(path).map_err(StoreError::from)?;
342            self.bucket_writers[idx] = Some(BufWriter::new(f));
343        }
344        Ok(self.bucket_writers[idx]
345            .as_mut()
346            .expect("just inserted above"))
347    }
348
349    /// Close the pack: patch the header count, append the BLAKE3
350    /// trailer, build the sorted index from bucket files, and clean up
351    /// the bucket directory. Returns `(pack_writer, index_bytes,
352    /// stats)` so the caller can install the pack into its store.
353    ///
354    /// On any failure the bucket dir is left in place; rerunning the
355    /// import will overwrite stale bucket files (they're keyed by
356    /// fixed name, not content) so this isn't a correctness issue —
357    /// just a small amount of disk churn until the next clean
358    /// finalize.
359    pub fn finalize(mut self) -> Result<(W, PackStats)> {
360        // 1. Flush every bucket so reads in the next phase see all
361        //    queued entries. `flatten()` skips the never-opened slots.
362        for w in self.bucket_writers.iter_mut().flatten() {
363            w.flush().map_err(StoreError::from)?;
364        }
365        // Drop the writers so the OS file handles close before we
366        // re-open the same paths for reading.
367        for slot in self.bucket_writers.iter_mut() {
368            *slot = None;
369        }
370
371        // 2. Patch the pack header with the real object count, then
372        //    re-stream the [header][body] bytes to compute the
373        //    trailer checksum.
374        let bw = self
375            .pack_writer
376            .take()
377            .expect("finalize called twice — pack_writer already consumed");
378        let mut writer = bw
379            .into_inner()
380            .map_err(|e| StoreError::from(std::io::Error::other(e.to_string())))?;
381        writer
382            .seek(SeekFrom::Start(self.header_offset))
383            .map_err(StoreError::from)?;
384        let mut header_bytes = Vec::with_capacity(16);
385        write_container_header(&mut header_bytes, pack_container_spec(), self.object_count);
386        writer.write_all(&header_bytes).map_err(StoreError::from)?;
387
388        // 3. Hash the on-disk content from header_offset to current
389        //    position (which is just past the body). One sequential
390        //    pass; the BufWriter we drained is gone so this read is
391        //    on the raw writer.
392        writer
393            .seek(SeekFrom::Start(self.header_offset))
394            .map_err(StoreError::from)?;
395        let mut hasher = blake3::Hasher::new();
396        let mut buf = vec![0u8; 64 * 1024];
397        loop {
398            let n = writer.read(&mut buf).map_err(StoreError::from)?;
399            if n == 0 {
400                break;
401            }
402            hasher.update(&buf[..n]);
403        }
404        let checksum = hasher.finalize();
405
406        // 4. Append the trailer checksum.
407        writer.seek(SeekFrom::End(0)).map_err(StoreError::from)?;
408        writer
409            .write_all(checksum.as_bytes())
410            .map_err(StoreError::from)?;
411        writer.flush().map_err(StoreError::from)?;
412
413        // 5. Stream the final sorted index directly to disk. We open
414        //    a `BufWriter` against `index_path`, write the index
415        //    container header (magic + version + count — count is
416        //    already known from the per-add bookkeeping), then walk
417        //    the 512 buckets in `(variant, prefix)` order, sorting
418        //    each in memory and writing entries to the file as they
419        //    come off the sort. The intermediate `PackIndex` Vec —
420        //    O(K) in the previous implementation — is gone; the
421        //    largest in-memory state is one bucket's worth of entries.
422        //    Bucket distribution is uniform via BLAKE3 so each bucket
423        //    is ~K/256 entries × ~50 bytes; even at 100M objects that's
424        //    a ~16 MB sort scratch.
425        let idx_file = File::create(&self.index_path).map_err(StoreError::from)?;
426        let mut idx_writer = BufWriter::new(idx_file);
427        write_index_header(&mut idx_writer, self.object_count)?;
428        let mut entries_written: u64 = 0;
429        for path in self.bucket_paths.iter() {
430            if !path.exists() {
431                continue;
432            }
433            let bucket_bytes = std::fs::read(path).map_err(StoreError::from)?;
434            let mut entries = decode_bucket_file(&bucket_bytes)?;
435            // Local sort by `PackObjectId` matches the global sort
436            // because all entries in a bucket share the same variant
437            // tag *and* the same first inner byte; only the remaining
438            // bytes differ between them.
439            entries.sort_by_key(|(id, _)| *id);
440            for (id, offset) in entries {
441                write_index_entry(&mut idx_writer, id, offset)?;
442                entries_written += 1;
443            }
444        }
445        idx_writer.flush().map_err(StoreError::from)?;
446        debug_assert_eq!(
447            entries_written, self.object_count,
448            "streaming index entry count drifted from add() count"
449        );
450
451        // 6. Clean up the bucket dir so the heddle store doesn't carry
452        //    transient artifacts. Deletion failures are non-fatal —
453        //    the dir is uniquely named per import so leftovers are at
454        //    worst stale, not corrupting.
455        for path in self.bucket_paths.iter() {
456            let _ = std::fs::remove_file(path);
457        }
458        let _ = std::fs::remove_dir(&self.bucket_dir);
459        self.finalized = true;
460
461        let stats = PackStats {
462            object_count: self.object_count,
463            total_uncompressed: self.total_uncompressed,
464            total_compressed: self.total_compressed,
465            delta_count: 0,
466            compression_ratio: if self.total_uncompressed == 0 {
467                0.0
468            } else {
469                self.total_compressed as f64 / self.total_uncompressed as f64
470            },
471        };
472
473        Ok((writer, stats))
474    }
475}
476
477/// Write the index container header to `out`. Mirrors
478/// [`PackIndex::to_bytes`]'s prefix exactly (4-byte magic, 4-byte
479/// big-endian version, 8-byte big-endian count) so a reader written
480/// against the existing format works without modification.
481fn write_index_header<W: Write>(out: &mut W, count: u64) -> Result<()> {
482    out.write_all(super::pack_index::INDEX_MAGIC)
483        .map_err(StoreError::from)?;
484    out.write_all(&super::pack_index::INDEX_VERSION.to_be_bytes())
485        .map_err(StoreError::from)?;
486    out.write_all(&count.to_be_bytes())
487        .map_err(StoreError::from)?;
488    Ok(())
489}
490
491/// Append one `(id, offset)` index entry to `out`. The encoding
492/// matches [`PackIndex::to_bytes`]: tagged id immediately followed by
493/// an 8-byte big-endian offset.
494fn write_index_entry<W: Write>(out: &mut W, id: PackObjectId, offset: u64) -> Result<()> {
495    let mut buf = Vec::with_capacity(33 + 8);
496    id.encode_tagged(&mut buf);
497    buf.extend_from_slice(&offset.to_be_bytes());
498    out.write_all(&buf).map_err(StoreError::from)
499}
500
501/// Encode a `u64` as a non-canonical 10-byte LEB128 varint. The first
502/// 9 bytes always set the continuation bit (`0x80`), the 10th never
503/// does — so the decoder reads exactly 10 bytes regardless of the
504/// value. Used by the streaming path to reserve a fixed-width
505/// placeholder for `compressed_size` before stream-compressing the
506/// payload, then patch the placeholder with the actual size after.
507///
508/// `decode_varint` ignores the canonicalness of the encoding (it
509/// walks continuation bits without checking minimum-byte form), so
510/// the value round-trips exactly. Cost is up to 9 wasted bytes per
511/// entry, ~115 KB on a 13 K-entry import — negligible relative to
512/// the pack body.
513#[cfg(feature = "zstd")]
514fn encode_varint_padded_to_10(value: u64, out: &mut [u8; 10]) {
515    let mut v = value;
516    for slot in out.iter_mut().take(9) {
517        *slot = 0x80 | ((v & 0x7F) as u8);
518        v >>= 7;
519    }
520    out[9] = (v & 0x7F) as u8;
521}
522
523impl<W: Write + Read + Seek> Drop for StreamingPackBuilder<W> {
524    fn drop(&mut self) {
525        if self.finalized {
526            return;
527        }
528        // Best-effort cleanup of bucket dir on abort. Errors here are
529        // suppressed because Drop can't propagate them.
530        for path in self.bucket_paths.iter() {
531            let _ = std::fs::remove_file(path);
532        }
533        let _ = std::fs::remove_dir(&self.bucket_dir);
534    }
535}
536
537/// Map a `PackObjectId` to one of `TOTAL_BUCKETS` buckets. The variant
538/// (Hash vs ChangeId) picks the upper half; the first byte of the
539/// inner id picks the slot within the half.
540fn bucket_index_for(id: &PackObjectId) -> usize {
541    match id {
542        PackObjectId::Hash(h) => HASH_VARIANT * BUCKETS_PER_VARIANT + h.as_bytes()[0] as usize,
543        PackObjectId::ChangeId(c) => {
544            CHANGEID_VARIANT * BUCKETS_PER_VARIANT + c.as_bytes()[0] as usize
545        }
546    }
547}
548
549/// Decode `(id, offset)` records from a bucket file. The format
550/// matches `PackObjectId::encode_tagged` followed by a u64 BE offset,
551/// repeated. Unrecognized tags or truncated trailers fail loudly —
552/// we wrote the bytes, so any corruption is a bug, not user input.
553fn decode_bucket_file(bytes: &[u8]) -> Result<Vec<(PackObjectId, u64)>> {
554    let mut out = Vec::new();
555    let mut pos = 0;
556    while pos < bytes.len() {
557        let (id, id_len) = PackObjectId::decode_tagged(&bytes[pos..])?;
558        pos += id_len;
559        if pos + 8 > bytes.len() {
560            return Err(StoreError::InvalidObject(
561                "streaming bucket entry truncated at offset".to_string(),
562            ));
563        }
564        let offset = u64::from_be_bytes(bytes[pos..pos + 8].try_into().map_err(|_| {
565            StoreError::InvalidObject("streaming bucket bad offset slice".to_string())
566        })?);
567        pos += 8;
568        out.push((id, offset));
569    }
570    Ok(out)
571}
572
573// ---------------------- Tests ----------------------
574
575#[cfg(test)]
576mod tests {
577    use std::io::Cursor;
578
579    use super::*;
580    use crate::{
581        object::ChangeId,
582        store::pack::{PackReader, PackStats},
583    };
584
585    fn deterministic_hash(seed: u8) -> ContentHash {
586        // Spread `seed` across the high byte so different seeds end up
587        // in different hash-prefix buckets. We don't actually want
588        // collisions in the tests that check distribution.
589        let mut bytes = [0u8; 32];
590        bytes[0] = seed;
591        for (i, b) in bytes.iter_mut().enumerate().skip(1) {
592            *b = seed.wrapping_mul(31).wrapping_add(i as u8);
593        }
594        ContentHash::from_bytes(bytes)
595    }
596
597    fn deterministic_change_id(seed: u8) -> ChangeId {
598        let mut bytes = [0u8; 16];
599        bytes[0] = seed;
600        for (i, b) in bytes.iter_mut().enumerate().skip(1) {
601            *b = seed.wrapping_add(i as u8 * 7);
602        }
603        ChangeId::from_bytes(bytes)
604    }
605
606    /// Test rig: returns the builder, the bucket dir (for cleanup
607    /// inspection), and the index path the builder will write at
608    /// finalize. The index path lives in the temp dir so it gets
609    /// auto-cleaned with `tmp`.
610    fn fresh_builder(
611        tmp: &tempfile::TempDir,
612    ) -> (StreamingPackBuilder<Cursor<Vec<u8>>>, PathBuf, PathBuf) {
613        let bucket_dir = tmp.path().join("buckets");
614        let index_path = tmp.path().join("test.idx");
615        let cursor = Cursor::new(Vec::<u8>::new());
616        let b = StreamingPackBuilder::new(
617            cursor,
618            index_path.clone(),
619            CompressionConfig::default(),
620            bucket_dir.clone(),
621        )
622        .unwrap();
623        (b, bucket_dir, index_path)
624    }
625
626    /// Finalize the builder and return `(pack_bytes, index_bytes, stats)`.
627    /// The index bytes are read back from the file the builder wrote
628    /// to — verifying that the streaming index path actually produced
629    /// readable bytes.
630    fn finalize_cursor(
631        b: StreamingPackBuilder<Cursor<Vec<u8>>>,
632        index_path: &std::path::Path,
633    ) -> (Vec<u8>, Vec<u8>, PackStats) {
634        let (cursor, stats) = b.finalize().unwrap();
635        let index_bytes = std::fs::read(index_path).unwrap();
636        (cursor.into_inner(), index_bytes, stats)
637    }
638
639    #[test]
640    fn empty_pack_finalizes_to_valid_zero_count_pack() {
641        let tmp = tempfile::TempDir::new().unwrap();
642        let (b, bucket_dir, idx_path) = fresh_builder(&tmp);
643        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
644
645        assert_eq!(stats.object_count, 0);
646        // PackReader can parse the empty pack and reports zero objects.
647        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
648        assert!(reader.list_ids().is_empty());
649        // Bucket dir was removed.
650        assert!(
651            !bucket_dir.exists(),
652            "bucket dir should be cleaned on successful finalize"
653        );
654    }
655
656    #[test]
657    fn single_blob_with_hash_id_round_trips() {
658        let tmp = tempfile::TempDir::new().unwrap();
659        let (mut b, _, idx_path) = fresh_builder(&tmp);
660        let hash = deterministic_hash(0x42);
661        let payload = b"hello, streaming pack".to_vec();
662        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
663        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
664
665        assert_eq!(stats.object_count, 1);
666        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
667        let id = PackObjectId::Hash(hash);
668        assert!(reader.has_object(&id));
669        let (got_type, got_data) = reader.get_object(&id).unwrap().unwrap();
670        assert_eq!(got_type, ObjectType::Blob);
671        assert_eq!(got_data, payload);
672    }
673
674    #[test]
675    fn single_state_with_change_id_round_trips() {
676        let tmp = tempfile::TempDir::new().unwrap();
677        let (mut b, _, idx_path) = fresh_builder(&tmp);
678        let cid = deterministic_change_id(0xa5);
679        let payload = b"serialized-state-bytes".to_vec();
680        b.add_id(
681            PackObjectId::ChangeId(cid),
682            ObjectType::State,
683            payload.clone(),
684        )
685        .unwrap();
686        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
687
688        assert_eq!(stats.object_count, 1);
689        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
690        let id = PackObjectId::ChangeId(cid);
691        let (ty, data) = reader.get_object(&id).unwrap().unwrap();
692        assert_eq!(ty, ObjectType::State);
693        assert_eq!(data, payload);
694    }
695
696    #[test]
697    fn mixed_hash_and_changeid_ids_all_retrievable() {
698        let tmp = tempfile::TempDir::new().unwrap();
699        let (mut b, _, idx_path) = fresh_builder(&tmp);
700        let blob_hash = deterministic_hash(0x10);
701        let tree_hash = deterministic_hash(0x20);
702        let state_cid = deterministic_change_id(0x80);
703
704        b.add(blob_hash, ObjectType::Blob, b"blob-bytes".to_vec())
705            .unwrap();
706        b.add(tree_hash, ObjectType::Tree, b"serialized-tree".to_vec())
707            .unwrap();
708        b.add_id(
709            PackObjectId::ChangeId(state_cid),
710            ObjectType::State,
711            b"serialized-state".to_vec(),
712        )
713        .unwrap();
714
715        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
716        assert_eq!(stats.object_count, 3);
717        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
718        assert_eq!(
719            reader
720                .get_object(&PackObjectId::Hash(blob_hash))
721                .unwrap()
722                .unwrap()
723                .1,
724            b"blob-bytes".to_vec()
725        );
726        assert_eq!(
727            reader
728                .get_object(&PackObjectId::Hash(tree_hash))
729                .unwrap()
730                .unwrap()
731                .1,
732            b"serialized-tree".to_vec()
733        );
734        assert_eq!(
735            reader
736                .get_object(&PackObjectId::ChangeId(state_cid))
737                .unwrap()
738                .unwrap()
739                .1,
740            b"serialized-state".to_vec()
741        );
742    }
743
744    #[test]
745    fn ten_thousand_objects_round_trip_correctly() {
746        // Stresses the bucket sort: 10K objects spread across
747        // 256 hash buckets averages 40 entries per bucket — well
748        // within in-memory sort capacity but covers every bucket.
749        let tmp = tempfile::TempDir::new().unwrap();
750        let (mut b, _, idx_path) = fresh_builder(&tmp);
751        let mut hashes = Vec::with_capacity(10_000);
752        for i in 0..10_000u32 {
753            // Use BLAKE3 over the index so first-byte distribution is
754            // pseudo-uniform across the 256 hash buckets.
755            let h = blake3::hash(&i.to_le_bytes());
756            let hash = ContentHash::from_bytes(*h.as_bytes());
757            hashes.push(hash);
758            b.add(hash, ObjectType::Blob, format!("payload-{i}").into_bytes())
759                .unwrap();
760        }
761        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
762        assert_eq!(stats.object_count, 10_000);
763
764        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
765        assert_eq!(reader.list_ids().len(), 10_000);
766        // Spot-check ten across the range.
767        for i in [0, 1, 99, 1234, 5_000, 9_999] {
768            let id = PackObjectId::Hash(hashes[i]);
769            let (_ty, data) = reader.get_object(&id).unwrap().unwrap();
770            assert_eq!(data, format!("payload-{i}").into_bytes());
771        }
772    }
773
774    #[test]
775    fn index_id_sort_order_matches_packbuilder_output() {
776        // PackBuilder groups objects by `ObjectType` before encoding,
777        // which changes the byte offsets relative to a streaming builder
778        // that writes in added-order. The bytes of the two indices
779        // therefore can't match exactly. What MUST match is the
780        // **sort order of ids** — both builders ultimately call
781        // `PackIndex::sort()` (or the bucket-equivalent), and any
782        // reader binary-searches against that order.
783        use crate::store::pack::PackBuilder;
784        let payloads: Vec<(PackObjectId, ObjectType, Vec<u8>)> = (0..200u32)
785            .map(|i| {
786                let h = blake3::hash(&i.to_le_bytes());
787                (
788                    PackObjectId::Hash(ContentHash::from_bytes(*h.as_bytes())),
789                    if i % 3 == 0 {
790                        ObjectType::Tree
791                    } else {
792                        ObjectType::Blob
793                    },
794                    format!("body-{i}").into_bytes(),
795                )
796            })
797            .collect();
798
799        // Disable delta encoding so the classic builder produces a pack
800        // shape comparable to the streaming one (which never deltas).
801        let compression = CompressionConfig {
802            max_delta_size: 0,
803            ..CompressionConfig::default()
804        };
805        let mut classic = PackBuilder::new(compression);
806        for (id, ty, data) in payloads.iter() {
807            classic.add_id(*id, *ty, data.clone());
808        }
809        let (classic_pack, classic_index, _) = classic.build().unwrap();
810        let classic_reader = PackReader::from_bytes(classic_pack, classic_index).unwrap();
811
812        let tmp = tempfile::TempDir::new().unwrap();
813        let bucket_dir = tmp.path().join("buckets");
814        let idx_path = tmp.path().join("test.idx");
815        let cursor = Cursor::new(Vec::<u8>::new());
816        let mut streaming =
817            StreamingPackBuilder::new(cursor, idx_path.clone(), compression, bucket_dir).unwrap();
818        for (id, ty, data) in payloads.iter() {
819            streaming.add_id(*id, *ty, data.clone()).unwrap();
820        }
821        let (streaming_pack, streaming_index, _) = finalize_cursor(streaming, &idx_path);
822        let streaming_reader = PackReader::from_bytes(streaming_pack, streaming_index).unwrap();
823
824        // Same set of ids in the same sorted order — that's the
825        // contract for binary search to work.
826        assert_eq!(
827            streaming_reader.list_ids(),
828            classic_reader.list_ids(),
829            "streaming and classic indices should report the same id sequence"
830        );
831        // Spot-check that each id resolves to a payload that matches
832        // the classic builder's output (equal bytes after decompression).
833        for (id, _ty, want) in payloads.iter().take(10).chain(payloads.iter().skip(190)) {
834            let (_, got) = streaming_reader.get_object(id).unwrap().unwrap();
835            assert_eq!(&got, want);
836            let (_, classic_got) = classic_reader.get_object(id).unwrap().unwrap();
837            assert_eq!(got, classic_got);
838        }
839    }
840
841    #[test]
842    fn corrupted_pack_fails_checksum_verification() {
843        let tmp = tempfile::TempDir::new().unwrap();
844        let (mut b, _, idx_path) = fresh_builder(&tmp);
845        b.add(
846            deterministic_hash(0x01),
847            ObjectType::Blob,
848            b"some bytes".to_vec(),
849        )
850        .unwrap();
851        let (mut pack_data, index_data, _) = finalize_cursor(b, &idx_path);
852        // Flip one byte in the body. The trailer checksum must reject.
853        let body_byte = 18; // past the 16-byte header
854        pack_data[body_byte] ^= 0xff;
855        let result = PackReader::from_bytes(pack_data, index_data);
856        assert!(
857            result.is_err(),
858            "PackReader should reject pack with mutated body"
859        );
860    }
861
862    #[test]
863    fn pack_count_in_header_matches_index_entry_count() {
864        let tmp = tempfile::TempDir::new().unwrap();
865        let (mut b, _, idx_path) = fresh_builder(&tmp);
866        for i in 0..7u8 {
867            b.add(
868                deterministic_hash(i),
869                ObjectType::Blob,
870                format!("p{i}").into_bytes(),
871            )
872            .unwrap();
873        }
874        let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
875        // Header count is bytes 8..16 (big-endian).
876        let count = u64::from_be_bytes(pack_data[8..16].try_into().unwrap());
877        assert_eq!(count, 7);
878        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
879        assert_eq!(reader.list_ids().len(), 7);
880    }
881
882    #[test]
883    fn bucket_files_are_cleaned_on_successful_finalize() {
884        let tmp = tempfile::TempDir::new().unwrap();
885        let bucket_dir = tmp.path().join("buckets");
886        let idx_path = tmp.path().join("test.idx");
887        let cursor = Cursor::new(Vec::<u8>::new());
888        let mut b = StreamingPackBuilder::new(
889            cursor,
890            idx_path.clone(),
891            CompressionConfig::default(),
892            bucket_dir.clone(),
893        )
894        .unwrap();
895        for i in 0..50u8 {
896            b.add(deterministic_hash(i), ObjectType::Blob, vec![i; 32])
897                .unwrap();
898        }
899        // Buckets exist and contain data.
900        assert!(bucket_dir.exists());
901        let bucket_count = std::fs::read_dir(&bucket_dir).unwrap().count();
902        assert!(bucket_count > 0, "bucket dir should hold some files");
903        let _ = finalize_cursor(b, &idx_path);
904        assert!(
905            !bucket_dir.exists(),
906            "bucket dir should be removed on finalize"
907        );
908    }
909
910    #[test]
911    fn bucket_files_are_cleaned_on_drop_without_finalize() {
912        let tmp = tempfile::TempDir::new().unwrap();
913        let bucket_dir = tmp.path().join("buckets");
914        let idx_path = tmp.path().join("test.idx");
915        {
916            let cursor = Cursor::new(Vec::<u8>::new());
917            let mut b = StreamingPackBuilder::new(
918                cursor,
919                idx_path.clone(),
920                CompressionConfig::default(),
921                bucket_dir.clone(),
922            )
923            .unwrap();
924            for i in 0..10u8 {
925                b.add(deterministic_hash(i), ObjectType::Blob, vec![0; 32])
926                    .unwrap();
927            }
928            assert!(bucket_dir.exists());
929            // Drop without finalize — Drop impl should clean up.
930        }
931        assert!(
932            !idx_path.exists(),
933            "no index file should have been created without finalize"
934        );
935        assert!(
936            !bucket_dir.exists(),
937            "bucket dir should be removed on Drop when finalize never ran"
938        );
939    }
940
941    #[test]
942    fn large_blob_streams_to_disk_without_double_buffering() {
943        // 4 MiB blob — well under the actual streaming target but big
944        // enough to confirm we're not buffering the entire pack body in
945        // RAM. The pack data on disk should be at least 4 MiB; the
946        // builder's in-memory state is per-object only.
947        let tmp = tempfile::TempDir::new().unwrap();
948        let bucket_dir = tmp.path().join("buckets");
949        let pack_path = tmp.path().join("pack.dat");
950        let idx_path = tmp.path().join("pack.idx");
951        let file = std::fs::OpenOptions::new()
952            .read(true)
953            .write(true)
954            .create(true)
955            .truncate(true)
956            .open(&pack_path)
957            .unwrap();
958        let mut b = StreamingPackBuilder::new(
959            file,
960            idx_path.clone(),
961            CompressionConfig::default(),
962            bucket_dir,
963        )
964        .unwrap();
965        let payload: Vec<u8> = (0..4 * 1024 * 1024u32).map(|i| (i & 0xff) as u8).collect();
966        let hash = deterministic_hash(0xff);
967        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
968        let (_, stats) = b.finalize().unwrap();
969        let index_data = std::fs::read(&idx_path).unwrap();
970        assert_eq!(stats.object_count, 1);
971        let pack_bytes = std::fs::read(&pack_path).unwrap();
972        // Pack on disk holds the whole compressed payload + headers
973        // + trailer. Confirm it round-trips.
974        let reader = PackReader::from_bytes(pack_bytes, index_data).unwrap();
975        let (_ty, got) = reader
976            .get_object(&PackObjectId::Hash(hash))
977            .unwrap()
978            .unwrap();
979        assert_eq!(got, payload);
980    }
981
982    #[test]
983    fn bucket_distribution_for_random_hashes_is_roughly_uniform() {
984        // Confirms our sort-time peak memory bound. We accumulate
985        // 1024 random hashes through a builder and check that no
986        // single bucket holds more than ~3× the average. (BLAKE3 hash
987        // first-byte distribution is uniform; this is mostly a
988        // sanity check that we route to the right bucket and aren't
989        // accidentally collapsing.)
990        let tmp = tempfile::TempDir::new().unwrap();
991        let bucket_dir = tmp.path().join("buckets");
992        let idx_path = tmp.path().join("test.idx");
993        let cursor = Cursor::new(Vec::<u8>::new());
994        let mut b = StreamingPackBuilder::new(
995            cursor,
996            idx_path.clone(),
997            CompressionConfig::default(),
998            bucket_dir.clone(),
999        )
1000        .unwrap();
1001        for i in 0..1024u32 {
1002            let h = blake3::hash(&i.to_le_bytes());
1003            let hash = ContentHash::from_bytes(*h.as_bytes());
1004            b.add(hash, ObjectType::Blob, b"x".to_vec()).unwrap();
1005        }
1006        // Inspect bucket file sizes BEFORE finalize (which deletes them).
1007        b.pack_writer.as_mut().unwrap().flush().unwrap();
1008        let mut max_entries = 0usize;
1009        let entry_size = 33 + 8; // tagged-hash + u64 offset
1010        for path in b.bucket_paths.iter() {
1011            if path.exists() {
1012                let size = std::fs::metadata(path).unwrap().len() as usize;
1013                let entries = size / entry_size;
1014                if entries > max_entries {
1015                    max_entries = entries;
1016                }
1017            }
1018        }
1019        // Average is 1024 / 256 = 4 entries per bucket. Allow up to 16
1020        // (4× average) — uniformity isn't perfect on small samples.
1021        assert!(
1022            max_entries <= 16,
1023            "max bucket has {max_entries} entries; uniform expected ~4"
1024        );
1025        let _ = finalize_cursor(b, &idx_path);
1026    }
1027
1028    #[test]
1029    fn finalize_returns_correct_stats() {
1030        let tmp = tempfile::TempDir::new().unwrap();
1031        let (mut b, _, idx_path) = fresh_builder(&tmp);
1032        let payload = vec![0xabu8; 1024];
1033        for i in 0..5u8 {
1034            b.add(deterministic_hash(i), ObjectType::Blob, payload.clone())
1035                .unwrap();
1036        }
1037        let (_, _, stats) = finalize_cursor(b, &idx_path);
1038        assert_eq!(stats.object_count, 5);
1039        assert_eq!(stats.total_uncompressed, 5 * 1024);
1040        assert!(stats.total_compressed > 0);
1041        assert!(stats.compression_ratio > 0.0);
1042        assert_eq!(stats.delta_count, 0, "streaming builder never deltas");
1043    }
1044
1045    #[cfg(feature = "zstd")]
1046    #[test]
1047    fn streaming_compression_roundtrips_through_zstd_frame() {
1048        // Force the streaming path with a payload that compresses
1049        // well (long runs of identical bytes). Verifies:
1050        //  1. Streaming output decodes back to the original bytes.
1051        //  2. The compressed body is genuinely smaller than the
1052        //     uncompressed input (proving zstd ran), and
1053        //  3. The non-canonical 10-byte varint patched into the
1054        //     compressed_size slot decodes to the right value.
1055        let tmp = tempfile::TempDir::new().unwrap();
1056        let (mut b, _, idx_path) = fresh_builder(&tmp);
1057        // 64 KiB of zeros — compresses to a tiny zstd frame, well
1058        // above the default `min_size` so we hit the streaming branch.
1059        let payload = vec![0u8; 64 * 1024];
1060        let hash = deterministic_hash(0x77);
1061        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1062        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
1063        assert!(
1064            stats.total_compressed < stats.total_uncompressed,
1065            "expected compression ratio < 1.0, got {}/{}",
1066            stats.total_compressed,
1067            stats.total_uncompressed
1068        );
1069        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1070        let (_ty, got) = reader
1071            .get_object(&PackObjectId::Hash(hash))
1072            .unwrap()
1073            .unwrap();
1074        assert_eq!(got, payload);
1075    }
1076
1077    #[cfg(feature = "zstd")]
1078    #[test]
1079    fn padded_varint_decodes_to_original_value_for_canonical_decoder() {
1080        // Sanity for the seek-back scheme: for every value we'd want
1081        // to encode (small, mid, large), confirm the existing
1082        // `decode_varint` returns the same `value` from a 10-byte
1083        // padded encoding. If this ever fails the streaming path's
1084        // patched compressed_size would be misread by readers.
1085        let cases: &[u64] = &[0, 1, 127, 128, 4096, 1_000_000, 1_000_000_000_000, u64::MAX];
1086        for &value in cases {
1087            let mut buf = [0u8; 10];
1088            super::encode_varint_padded_to_10(value, &mut buf);
1089            let (decoded, consumed) = super::super::varint::decode_varint(&buf)
1090                .expect("padded varint should always decode");
1091            assert_eq!(decoded, value, "varint roundtrip failed for {value}");
1092            assert_eq!(
1093                consumed, 10,
1094                "padded encoding should consume all 10 bytes for {value}"
1095            );
1096        }
1097    }
1098
1099    #[cfg(feature = "zstd")]
1100    #[test]
1101    fn streaming_path_does_not_buffer_compressed_payload_in_memory() {
1102        // Smoke check: write a single 8 MiB payload, observe the
1103        // pack file size on disk during/after the add. The pack file
1104        // grows incrementally during the streaming compression — if
1105        // we were buffering an intermediate compressed `Vec<u8>` the
1106        // on-disk size would jump by ~8 MiB at finalize, not stay
1107        // bounded as the encoder pumps bytes through.
1108        //
1109        // We can't easily measure peak heap from inside Rust without
1110        // a custom allocator. What we *can* verify is that calling
1111        // `add` returns control with the pack file already at its
1112        // final body size, demonstrating the encoder wrote through
1113        // and didn't accumulate.
1114        let tmp = tempfile::TempDir::new().unwrap();
1115        let bucket_dir = tmp.path().join("buckets");
1116        let pack_path = tmp.path().join("pack.dat");
1117        let idx_path = tmp.path().join("pack.idx");
1118        let file = std::fs::OpenOptions::new()
1119            .read(true)
1120            .write(true)
1121            .create(true)
1122            .truncate(true)
1123            .open(&pack_path)
1124            .unwrap();
1125        let mut b = StreamingPackBuilder::new(
1126            file,
1127            idx_path.clone(),
1128            CompressionConfig::default(),
1129            bucket_dir,
1130        )
1131        .unwrap();
1132        let payload = vec![0xa5u8; 8 * 1024 * 1024];
1133        let hash = deterministic_hash(0x66);
1134        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1135        // Pack file already on disk holds at least the entry header +
1136        // compressed payload (excluding the 32-byte trailer the builder
1137        // appends at finalize).
1138        let mid_size = std::fs::metadata(&pack_path).unwrap().len();
1139        assert!(
1140            mid_size > 16 + 40,
1141            "pack file should hold real entry data after add; size={mid_size}"
1142        );
1143        let (_, _) = b.finalize().unwrap();
1144        let pack_bytes = std::fs::read(&pack_path).unwrap();
1145        let index_bytes = std::fs::read(&idx_path).unwrap();
1146        let reader = PackReader::from_bytes(pack_bytes, index_bytes).unwrap();
1147        let (_ty, got) = reader
1148            .get_object(&PackObjectId::Hash(hash))
1149            .unwrap()
1150            .unwrap();
1151        assert_eq!(got, payload);
1152    }
1153
1154    #[test]
1155    fn list_ids_returns_all_added_ids_sorted() {
1156        let tmp = tempfile::TempDir::new().unwrap();
1157        let (mut b, _, idx_path) = fresh_builder(&tmp);
1158        let mut added: Vec<PackObjectId> = Vec::new();
1159        // Mix of Hash and ChangeId in a non-sorted order on input.
1160        for seed in [0x05u8, 0xa0, 0x12, 0x9f, 0x33] {
1161            let id = PackObjectId::Hash(deterministic_hash(seed));
1162            b.add_id(id, ObjectType::Blob, vec![seed; 4]).unwrap();
1163            added.push(id);
1164        }
1165        for seed in [0x80u8, 0x10, 0xff] {
1166            let id = PackObjectId::ChangeId(deterministic_change_id(seed));
1167            b.add_id(id, ObjectType::State, vec![seed; 4]).unwrap();
1168            added.push(id);
1169        }
1170        let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1171        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1172        let mut got = reader.list_ids();
1173        // PackReader's list_ids returns index order — should already be
1174        // sorted because we sort on finalize.
1175        let mut sorted = got.clone();
1176        sorted.sort();
1177        assert_eq!(got, sorted, "list_ids must come back sorted");
1178        // And every added id should appear.
1179        added.sort();
1180        got.sort();
1181        assert_eq!(got, added);
1182    }
1183}