Skip to main content

objects/store/pack/
streaming_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Streaming pack builder for bounded-memory imports.
3//!
4//! `PackBuilder` accumulates every `(id, type, data)` tuple in memory
5//! before producing a pack. That's fine for sync-protocol packets and
6//! small batches, but the import path can produce millions of objects
7//! and would OOM on large repos.
8//!
9//! `StreamingPackBuilder` removes the in-memory buffering by:
10//!
11//! 1. **Streaming pack data to disk** as objects are added. Compression
12//!    runs per-object (the existing zstd path is non-streaming, so the
13//!    one compressed payload is held briefly in a `Vec<u8>` before
14//!    being written), but the writer never holds more than one
15//!    object's worth of data plus its `BufWriter` capacity.
16//!
17//! 2. **External sorting the index** via 512 hash-prefix bucket files
18//!    on disk (256 for `Hash` ids, 256 for `ChangeId` ids). Each
19//!    `add()` appends one fixed-shape `(id, offset)` record to the
20//!    bucket whose first byte matches the id's first inner byte. At
21//!    finalize, each bucket is small enough to sort in memory; the
22//!    concatenation of `Hash` buckets followed by `ChangeId` buckets
23//!    in byte order produces the exact same global sort `PackBuilder`
24//!    would have via `entries.sort_by_key(|e| e.id)`.
25//!
26//! ## Memory bound
27//!
28//! - Pack data on disk: streamed; only one compressed object held in
29//!   memory at a time.
30//! - Index entries in bucket buffers: at most 32 bucket files are held
31//!   open at once, each behind a default-capacity `BufWriter` (~8 KB),
32//!   so peak buffering is ~256 KB.
33//! - Sort scratch at finalize: O(largest bucket). For uniformly-
34//!   distributed BLAKE3 hashes / ULID change-ids and N total objects,
35//!   the largest bucket is ~N/256 entries ≈ 40 bytes each. Even at
36//!   100 M objects that's ~16 MB peak.
37//!
38//! Net peak memory: ~20 MB regardless of repo size, modulo the size
39//! of the largest single object (which is unavoidable while the zstd
40//! API is non-streaming).
41//!
42//! ## Trade-offs vs `PackBuilder`
43//!
44//! - **No delta encoding.** Streaming and sliding-window deltas are
45//!   incompatible — delta search needs random access to recently-
46//!   written objects. The import path runs with deltas disabled
47//!   anyway (the cost-benefit is bad on real Heddle history), so this
48//!   is a non-issue for the call site that motivated this builder.
49//! - **No path-grouped reordering.** Entries land in the order added.
50//! - **Output is a pack file at a path** rather than `(Vec<u8>, Vec<u8>)`.
51//!   Callers pair this with [`crate::store::ObjectStore::install_pack_from_path`]
52//!   which moves/installs the pack without copying it through RAM.
53//! - **Re-reads the pack at finalize** to compute the BLAKE3 trailer
54//!   checksum (the pack format hashes header+body, and the count goes
55//!   in the header — we patch it on finalize via seek-back, then
56//!   re-stream the body to the hasher). 2× sequential disk I/O on the
57//!   pack data is the cost of sticking with the current format. A
58//!   future format change could put the count in the footer to avoid
59//!   the second pass.
60
61use std::{
62    fs::{File, OpenOptions},
63    io::{BufWriter, Read, Seek, SeekFrom, Write},
64    path::PathBuf,
65};
66
67use super::{ObjectType, PackObjectId, PackStats, pack_container_spec, write_container_header};
68
69/// How many bytes to reserve for the compressed-size varint in the
70/// streaming path. 10 is enough to encode any `u64` (max 9 7-bit
71/// continuation bytes plus 1 terminator). After streaming we patch
72/// the placeholder with a non-canonical varint that pads to exactly
73/// this length. Only the zstd-enabled compress path uses it.
74#[cfg(feature = "zstd")]
75const CSIZE_PLACEHOLDER_LEN: usize = 10;
76use crate::{
77    object::ContentHash,
78    store::{Result, StoreError, compression::CompressionConfig},
79};
80
81/// Number of buckets per id variant. 256 = one bucket per first byte
82/// of the inner id. We want the bucket boundaries to align with the
83/// `PackObjectId`'s `Ord` derivation (variant tag major, inner bytes
84/// minor) so the concatenated bucket output matches what
85/// `PackIndex::sort()` would have produced.
86const BUCKETS_PER_VARIANT: usize = 256;
87/// 256 for Hash ids + 256 for ChangeId ids.
88const TOTAL_BUCKETS: usize = BUCKETS_PER_VARIANT * 2;
89/// Cap concurrently-open index-bucket files. macOS GUI-launched
90/// processes commonly inherit a 256-fd soft limit; imports also need
91/// room for Git pack/index files, sqlite maps, the output pack, etc.
92const MAX_OPEN_BUCKET_WRITERS: usize = 32;
93
94/// Variant indices into the `bucket_*` arrays. `Hash` ids fill the
95/// lower half (matches the variant order in `PackObjectId` which makes
96/// `Hash(_) < ChangeId(_)`).
97const HASH_VARIANT: usize = 0;
98const CHANGEID_VARIANT: usize = 1;
99
100/// Streaming pack builder. Held generic over the pack writer (`File`
101/// in production, `Cursor<Vec<u8>>` in tests).
102pub struct StreamingPackBuilder<W: Write + Read + Seek> {
103    /// Writer for the pack's `[header][body]` content. The trailer
104    /// checksum is appended to the same writer at `finalize`.
105    /// Wrapped in `Option` so `finalize` can `.take()` it out without
106    /// running afoul of the `Drop` impl's restriction on moving fields.
107    /// `None` after `finalize` succeeds.
108    pack_writer: Option<BufWriter<W>>,
109    /// Position in the pack writer where the header was written, so
110    /// we can seek back at finalize and patch the real `object_count`
111    /// into bytes 8..16.
112    header_offset: u64,
113    object_count: u64,
114    total_uncompressed: u64,
115    total_compressed: u64,
116    /// Compression knobs. Only consulted when the `zstd` feature is on
117    /// (`enabled` and `min_size` decide whether each entry compresses;
118    /// `level` parameterizes the encoder). Without `zstd` every entry
119    /// takes the raw branch and this field is just along for the ride.
120    #[cfg_attr(not(feature = "zstd"), allow(dead_code))]
121    compression: CompressionConfig,
122    /// Directory holding the 512 bucket files. Owned by the builder
123    /// so we can clean up on `Drop` if `finalize` is never called.
124    bucket_dir: PathBuf,
125    /// Buckets `[variant][prefix_byte]` → optional buffered file.
126    /// Lazily opened on first write and capped with LRU eviction so a
127    /// large import cannot exhaust the process fd limit.
128    bucket_writers: Vec<Option<BucketWriter>>,
129    open_bucket_writers: usize,
130    bucket_access_tick: u64,
131    bucket_paths: Vec<PathBuf>,
132    /// File path where the pack index is materialized at `finalize`.
133    /// Bytes are written incrementally as buckets are sorted, so the
134    /// index never sits in memory in its entirety.
135    index_path: PathBuf,
136    /// Set true on `finalize` so `Drop` knows the bucket dir was
137    /// already cleaned and shouldn't be removed again.
138    finalized: bool,
139}
140
141struct BucketWriter {
142    writer: BufWriter<File>,
143    last_used: u64,
144}
145
146impl<W: Write + Read + Seek> StreamingPackBuilder<W> {
147    /// Open a streaming builder against `pack_writer`, using
148    /// `bucket_dir` for transient index buckets and writing the
149    /// finalized index to `index_path`. The bucket dir is created if
150    /// it doesn't exist; on a successful `finalize` it's removed
151    /// (along with any bucket files left in it).
152    ///
153    /// `index_path` is *not* created by `new` — opening happens at
154    /// finalize so a misconfigured caller doesn't leave an empty index
155    /// file behind on early failure. It's still recorded here so
156    /// `finalize` can write to a known location and the caller can
157    /// install the file by path.
158    ///
159    /// The `pack_writer` must support `Read` because finalize re-streams
160    /// the body to compute the trailer checksum — see the module-level
161    /// note on the format.
162    pub fn new(
163        mut pack_writer: W,
164        index_path: PathBuf,
165        compression: CompressionConfig,
166        bucket_dir: PathBuf,
167    ) -> Result<Self> {
168        std::fs::create_dir_all(&bucket_dir).map_err(StoreError::from)?;
169        let header_offset = pack_writer.stream_position().map_err(StoreError::from)?;
170
171        // Write a placeholder header with `count = 0`; finalize seeks
172        // back here and rewrites the real count.
173        let mut header_bytes = Vec::with_capacity(16);
174        write_container_header(&mut header_bytes, pack_container_spec(), 0);
175        pack_writer
176            .write_all(&header_bytes)
177            .map_err(StoreError::from)?;
178
179        let bucket_paths: Vec<PathBuf> = (0..TOTAL_BUCKETS)
180            .map(|i| {
181                let variant = if i < BUCKETS_PER_VARIANT { 'h' } else { 'c' };
182                let prefix = i % BUCKETS_PER_VARIANT;
183                bucket_dir.join(format!("bucket-{variant}-{prefix:02x}"))
184            })
185            .collect();
186        for path in &bucket_paths {
187            let _ = std::fs::remove_file(path);
188        }
189
190        Ok(Self {
191            pack_writer: Some(BufWriter::new(pack_writer)),
192            header_offset,
193            object_count: 0,
194            total_uncompressed: 0,
195            total_compressed: 0,
196            compression,
197            bucket_dir,
198            bucket_writers: (0..TOTAL_BUCKETS).map(|_| None).collect(),
199            open_bucket_writers: 0,
200            bucket_access_tick: 0,
201            bucket_paths,
202            index_path,
203            finalized: false,
204        })
205    }
206
207    /// Add an object with a content-hash id.
208    pub fn add(&mut self, hash: ContentHash, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
209        self.add_id(PackObjectId::Hash(hash), obj_type, data)
210    }
211
212    /// Add an object with an explicit id. Mirrors [`super::PackBuilder::add_id`].
213    ///
214    /// # Memory shape
215    ///
216    /// Per-entry, the only allocations are:
217    ///
218    /// - `data: Vec<u8>` (the input, owned by the caller — comes from
219    ///   gix' `find_object` and isn't ours to stream further).
220    /// - A ~40-byte stack scratch for the entry header.
221    /// - zstd's internal compression context (~128 KB constant).
222    /// - One 50-byte index-bucket entry buffered into the bucket's
223    ///   `BufWriter`.
224    ///
225    /// The compressed payload is **never materialized** as a `Vec<u8>` —
226    /// it streams directly through `zstd::stream::write::Encoder` into
227    /// the pack writer. The pack format requires a `compressed_size`
228    /// varint *before* the compressed bytes, which we don't know yet
229    /// when we write the header; we reserve a 10-byte placeholder and
230    /// seek-back to patch it after the encoder finishes. Heddle's
231    /// varint decoder accepts non-canonical encodings (it walks
232    /// continuation bits without enforcing minimum-byte form), so the
233    /// padded write decodes back to the same value any reader expects.
234    pub fn add_id(&mut self, id: PackObjectId, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
235        // Compute the entry's offset relative to the header. Flush the
236        // BufWriter first so `stream_position` reflects bytes actually
237        // committed to the underlying writer.
238        let pw = self
239            .pack_writer
240            .as_mut()
241            .expect("add_id called after finalize");
242        pw.flush().map_err(StoreError::from)?;
243        let entry_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
244        let offset = entry_start
245            .checked_sub(self.header_offset)
246            .expect("header_offset should never be past current position");
247
248        self.total_uncompressed += data.len() as u64;
249
250        // Phase 1: write the entry header up to (but not including) the
251        // compressed-size varint. Always small, fits in `entry_header_buf`.
252        let mut header_buf = Vec::with_capacity(40);
253        id.encode_tagged(&mut header_buf);
254        super::varint::encode_type_and_size(obj_type, data.len() as u64, &mut header_buf);
255        pw.write_all(&header_buf).map_err(StoreError::from)?;
256        // Only consumed by the zstd-enabled streaming branch below, but
257        // we compute it here while we already have `header_buf`'s length
258        // in scope.
259        #[cfg(feature = "zstd")]
260        let csize_pos = entry_start + header_buf.len() as u64;
261
262        // Phase 2: stream the compressed payload. We branch here on
263        // whether to compress at all — for tiny objects (`< min_size`)
264        // the bulk path traditionally wrote raw bytes to skip zstd
265        // overhead, and the reader's existing `compressed_size ==
266        // uncompressed_size` heuristic in `pack_reader.rs:128` reads
267        // raw entries back unchanged. We preserve that policy.
268        // `want_compress` gates the zstd path. Even with the feature
269        // enabled we fall through to raw for tiny entries (where
270        // zstd's frame overhead dominates) or when the caller
271        // explicitly disabled compression in `CompressionConfig`.
272        // Without the `zstd` Cargo feature, every entry takes the raw
273        // branch — same fallback shape as `compress_pack_payload`.
274        let want_compress: bool;
275        #[cfg(feature = "zstd")]
276        {
277            want_compress = self.compression.enabled && data.len() >= self.compression.min_size;
278        }
279        #[cfg(not(feature = "zstd"))]
280        {
281            want_compress = false;
282        }
283        if !want_compress {
284            // Raw entry: known compressed_size = data.len(). One canonical
285            // varint + the data itself. No seek-back needed.
286            let mut csize_buf = Vec::with_capacity(10);
287            super::varint::encode_varint(data.len() as u64, &mut csize_buf);
288            pw.write_all(&csize_buf).map_err(StoreError::from)?;
289            pw.write_all(&data).map_err(StoreError::from)?;
290            self.total_compressed += data.len() as u64;
291        } else {
292            #[cfg(feature = "zstd")]
293            {
294                // Streaming entry: reserve 10 bytes for compressed_size,
295                // stream-compress the payload, then seek back to patch.
296                pw.write_all(&[0u8; CSIZE_PLACEHOLDER_LEN])
297                    .map_err(StoreError::from)?;
298                pw.flush().map_err(StoreError::from)?;
299                let body_start = pw.get_mut().stream_position().map_err(StoreError::from)?;
300                {
301                    let mut enc =
302                        zstd::stream::write::Encoder::new(&mut *pw, self.compression.level)
303                            .map_err(StoreError::from)?;
304                    // Pass the source size so the zstd frame's optional
305                    // Frame Content Size field is set — lets decoders
306                    // preallocate output buffers and validates that we
307                    // wrote exactly what we promised at finish().
308                    enc.set_pledged_src_size(Some(data.len() as u64))
309                        .map_err(StoreError::from)?;
310                    enc.write_all(&data).map_err(StoreError::from)?;
311                    enc.finish().map_err(StoreError::from)?;
312                }
313                pw.flush().map_err(StoreError::from)?;
314                let body_end = pw.get_mut().stream_position().map_err(StoreError::from)?;
315                let compressed_size = body_end - body_start;
316                self.total_compressed += compressed_size;
317
318                // Seek back over the placeholder, write a 10-byte
319                // non-canonical varint encoding the actual compressed_size,
320                // then seek forward to where we left off so subsequent
321                // adds append correctly.
322                let mut csize_bytes = [0u8; CSIZE_PLACEHOLDER_LEN];
323                encode_varint_padded_to_10(compressed_size, &mut csize_bytes);
324                let inner = pw.get_mut();
325                inner
326                    .seek(SeekFrom::Start(csize_pos))
327                    .map_err(StoreError::from)?;
328                inner.write_all(&csize_bytes).map_err(StoreError::from)?;
329                inner
330                    .seek(SeekFrom::Start(body_end))
331                    .map_err(StoreError::from)?;
332            }
333            #[cfg(not(feature = "zstd"))]
334            {
335                // Unreachable: `want_compress` is forced to `false`
336                // when the `zstd` feature is off.
337                unreachable!("compression branch reached without `zstd` feature");
338            }
339        }
340
341        // Append the index entry (id || u64 BE offset) to the bucket
342        // matching the id's first inner byte. The bucket file is opened
343        // lazily so a sparse pack only creates files it actually uses.
344        let bucket_idx = bucket_index_for(&id);
345        let bucket = self.get_or_open_bucket(bucket_idx)?;
346        let mut idx_entry = Vec::with_capacity(33 + 8);
347        id.encode_tagged(&mut idx_entry);
348        idx_entry.extend_from_slice(&offset.to_be_bytes());
349        bucket.write_all(&idx_entry).map_err(StoreError::from)?;
350
351        self.object_count += 1;
352        Ok(())
353    }
354
355    fn get_or_open_bucket(&mut self, idx: usize) -> Result<&mut BufWriter<File>> {
356        self.bucket_access_tick = self.bucket_access_tick.wrapping_add(1);
357        let last_used = self.bucket_access_tick;
358        if self.bucket_writers[idx].is_none() {
359            if self.open_bucket_writers >= MAX_OPEN_BUCKET_WRITERS {
360                self.evict_lru_bucket()?;
361            }
362            let path = &self.bucket_paths[idx];
363            let f = OpenOptions::new()
364                .create(true)
365                .append(true)
366                .open(path)
367                .map_err(StoreError::from)?;
368            self.bucket_writers[idx] = Some(BucketWriter {
369                writer: BufWriter::new(f),
370                last_used,
371            });
372            self.open_bucket_writers += 1;
373        } else if let Some(bucket) = self.bucket_writers[idx].as_mut() {
374            bucket.last_used = last_used;
375        }
376        Ok(&mut self.bucket_writers[idx]
377            .as_mut()
378            .expect("just inserted above")
379            .writer)
380    }
381
382    fn evict_lru_bucket(&mut self) -> Result<()> {
383        let Some((idx, _)) = self
384            .bucket_writers
385            .iter()
386            .enumerate()
387            .filter_map(|(idx, bucket)| bucket.as_ref().map(|bucket| (idx, bucket.last_used)))
388            .min_by_key(|(_, last_used)| *last_used)
389        else {
390            return Ok(());
391        };
392
393        if let Some(mut bucket) = self.bucket_writers[idx].take() {
394            bucket.writer.flush().map_err(StoreError::from)?;
395            self.open_bucket_writers -= 1;
396        }
397        Ok(())
398    }
399
400    /// Close the pack: patch the header count, append the BLAKE3
401    /// trailer, build the sorted index from bucket files, and clean up
402    /// the bucket directory. Returns `(pack_writer, index_bytes,
403    /// stats)` so the caller can install the pack into its store.
404    ///
405    /// On any failure the bucket dir is left in place; rerunning the
406    /// import will overwrite stale bucket files (they're keyed by
407    /// fixed name, not content) so this isn't a correctness issue —
408    /// just a small amount of disk churn until the next clean
409    /// finalize.
410    pub fn finalize(mut self) -> Result<(W, PackStats)> {
411        // 1. Flush every bucket so reads in the next phase see all
412        //    queued entries. `flatten()` skips the never-opened slots.
413        for bucket in self.bucket_writers.iter_mut().flatten() {
414            bucket.writer.flush().map_err(StoreError::from)?;
415        }
416        // Drop the writers so the OS file handles close before we
417        // re-open the same paths for reading.
418        for slot in self.bucket_writers.iter_mut() {
419            *slot = None;
420        }
421        self.open_bucket_writers = 0;
422
423        // 2. Patch the pack header with the real object count, then
424        //    re-stream the [header][body] bytes to compute the
425        //    trailer checksum.
426        let bw = self
427            .pack_writer
428            .take()
429            .expect("finalize called twice — pack_writer already consumed");
430        let mut writer = bw
431            .into_inner()
432            .map_err(|e| StoreError::from(std::io::Error::other(e.to_string())))?;
433        writer
434            .seek(SeekFrom::Start(self.header_offset))
435            .map_err(StoreError::from)?;
436        let mut header_bytes = Vec::with_capacity(16);
437        write_container_header(&mut header_bytes, pack_container_spec(), self.object_count);
438        writer.write_all(&header_bytes).map_err(StoreError::from)?;
439
440        // 3. Hash the on-disk content from header_offset to current
441        //    position (which is just past the body). One sequential
442        //    pass; the BufWriter we drained is gone so this read is
443        //    on the raw writer.
444        writer
445            .seek(SeekFrom::Start(self.header_offset))
446            .map_err(StoreError::from)?;
447        let mut hasher = blake3::Hasher::new();
448        let mut buf = vec![0u8; 64 * 1024];
449        loop {
450            let n = writer.read(&mut buf).map_err(StoreError::from)?;
451            if n == 0 {
452                break;
453            }
454            hasher.update(&buf[..n]);
455        }
456        let checksum = hasher.finalize();
457
458        // 4. Append the trailer checksum.
459        writer.seek(SeekFrom::End(0)).map_err(StoreError::from)?;
460        writer
461            .write_all(checksum.as_bytes())
462            .map_err(StoreError::from)?;
463        writer.flush().map_err(StoreError::from)?;
464
465        // 5. Stream the final sorted index directly to disk. We open
466        //    a `BufWriter` against `index_path`, write the index
467        //    container header (magic + version + count — count is
468        //    already known from the per-add bookkeeping), then walk
469        //    the 512 buckets in `(variant, prefix)` order, sorting
470        //    each in memory and writing entries to the file as they
471        //    come off the sort. The intermediate `PackIndex` Vec —
472        //    O(K) in the previous implementation — is gone; the
473        //    largest in-memory state is one bucket's worth of entries.
474        //    Bucket distribution is uniform via BLAKE3 so each bucket
475        //    is ~K/256 entries × ~50 bytes; even at 100M objects that's
476        //    a ~16 MB sort scratch.
477        let idx_file = File::create(&self.index_path).map_err(StoreError::from)?;
478        let mut idx_writer = BufWriter::new(idx_file);
479        write_index_header(&mut idx_writer, self.object_count)?;
480        let mut entries_written: u64 = 0;
481        for path in self.bucket_paths.iter() {
482            if !path.exists() {
483                continue;
484            }
485            let bucket_bytes = std::fs::read(path).map_err(StoreError::from)?;
486            let mut entries = decode_bucket_file(&bucket_bytes)?;
487            // Local sort by `PackObjectId` matches the global sort
488            // because all entries in a bucket share the same variant
489            // tag *and* the same first inner byte; only the remaining
490            // bytes differ between them.
491            entries.sort_by_key(|(id, _)| *id);
492            for (id, offset) in entries {
493                write_index_entry(&mut idx_writer, id, offset)?;
494                entries_written += 1;
495            }
496        }
497        idx_writer.flush().map_err(StoreError::from)?;
498        debug_assert_eq!(
499            entries_written, self.object_count,
500            "streaming index entry count drifted from add() count"
501        );
502
503        // 6. Clean up the bucket dir so the heddle store doesn't carry
504        //    transient artifacts. Deletion failures are non-fatal —
505        //    the dir is uniquely named per import so leftovers are at
506        //    worst stale, not corrupting.
507        for path in self.bucket_paths.iter() {
508            let _ = std::fs::remove_file(path);
509        }
510        let _ = std::fs::remove_dir(&self.bucket_dir);
511        self.finalized = true;
512
513        let stats = PackStats {
514            object_count: self.object_count,
515            total_uncompressed: self.total_uncompressed,
516            total_compressed: self.total_compressed,
517            delta_count: 0,
518            compression_ratio: if self.total_uncompressed == 0 {
519                0.0
520            } else {
521                self.total_compressed as f64 / self.total_uncompressed as f64
522            },
523        };
524
525        Ok((writer, stats))
526    }
527}
528
529/// Write the index container header to `out`. Mirrors
530/// [`PackIndex::to_bytes`]'s prefix exactly (4-byte magic, 4-byte
531/// big-endian version, 8-byte big-endian count) so a reader written
532/// against the existing format works without modification.
533fn write_index_header<W: Write>(out: &mut W, count: u64) -> Result<()> {
534    super::pack_index::index_header().write_to(out, count)
535}
536
537/// Append one `(id, offset)` index entry to `out`. The encoding
538/// matches [`PackIndex::to_bytes`]: tagged id immediately followed by
539/// an 8-byte big-endian offset.
540fn write_index_entry<W: Write>(out: &mut W, id: PackObjectId, offset: u64) -> Result<()> {
541    let mut buf = Vec::with_capacity(33 + 8);
542    id.encode_tagged(&mut buf);
543    buf.extend_from_slice(&offset.to_be_bytes());
544    out.write_all(&buf).map_err(StoreError::from)
545}
546
547/// Encode a `u64` as a non-canonical 10-byte LEB128 varint. The first
548/// 9 bytes always set the continuation bit (`0x80`), the 10th never
549/// does — so the decoder reads exactly 10 bytes regardless of the
550/// value. Used by the streaming path to reserve a fixed-width
551/// placeholder for `compressed_size` before stream-compressing the
552/// payload, then patch the placeholder with the actual size after.
553///
554/// `decode_varint` ignores the canonicalness of the encoding (it
555/// walks continuation bits without checking minimum-byte form), so
556/// the value round-trips exactly. Cost is up to 9 wasted bytes per
557/// entry, ~115 KB on a 13 K-entry import — negligible relative to
558/// the pack body.
559#[cfg(feature = "zstd")]
560fn encode_varint_padded_to_10(value: u64, out: &mut [u8; 10]) {
561    let mut v = value;
562    for slot in out.iter_mut().take(9) {
563        *slot = 0x80 | ((v & 0x7F) as u8);
564        v >>= 7;
565    }
566    out[9] = (v & 0x7F) as u8;
567}
568
569impl<W: Write + Read + Seek> Drop for StreamingPackBuilder<W> {
570    fn drop(&mut self) {
571        if self.finalized {
572            return;
573        }
574        // Best-effort cleanup of bucket dir on abort. Errors here are
575        // suppressed because Drop can't propagate them.
576        for path in self.bucket_paths.iter() {
577            let _ = std::fs::remove_file(path);
578        }
579        let _ = std::fs::remove_dir(&self.bucket_dir);
580    }
581}
582
583/// Map a `PackObjectId` to one of `TOTAL_BUCKETS` buckets. The variant
584/// (Hash vs ChangeId) picks the upper half; the first byte of the
585/// inner id picks the slot within the half.
586fn bucket_index_for(id: &PackObjectId) -> usize {
587    match id {
588        PackObjectId::Hash(h) => HASH_VARIANT * BUCKETS_PER_VARIANT + h.as_bytes()[0] as usize,
589        PackObjectId::ChangeId(c) => {
590            CHANGEID_VARIANT * BUCKETS_PER_VARIANT + c.as_bytes()[0] as usize
591        }
592    }
593}
594
595/// Decode `(id, offset)` records from a bucket file. The format
596/// matches `PackObjectId::encode_tagged` followed by a u64 BE offset,
597/// repeated. Unrecognized tags or truncated trailers fail loudly —
598/// we wrote the bytes, so any corruption is a bug, not user input.
599fn decode_bucket_file(bytes: &[u8]) -> Result<Vec<(PackObjectId, u64)>> {
600    let mut out = Vec::new();
601    let mut pos = 0;
602    while pos < bytes.len() {
603        let (id, id_len) = PackObjectId::decode_tagged(&bytes[pos..])?;
604        pos += id_len;
605        if pos + 8 > bytes.len() {
606            return Err(StoreError::InvalidObject(
607                "streaming bucket entry truncated at offset".to_string(),
608            ));
609        }
610        let offset = u64::from_be_bytes(bytes[pos..pos + 8].try_into().map_err(|_| {
611            StoreError::InvalidObject("streaming bucket bad offset slice".to_string())
612        })?);
613        pos += 8;
614        out.push((id, offset));
615    }
616    Ok(out)
617}
618
619// ---------------------- Tests ----------------------
620
621#[cfg(test)]
622mod tests {
623    use std::io::Cursor;
624
625    use super::*;
626    use crate::{
627        object::ChangeId,
628        store::pack::{PackReader, PackStats},
629    };
630
631    fn deterministic_hash(seed: u8) -> ContentHash {
632        // Spread `seed` across the high byte so different seeds end up
633        // in different hash-prefix buckets. We don't actually want
634        // collisions in the tests that check distribution.
635        let mut bytes = [0u8; 32];
636        bytes[0] = seed;
637        for (i, b) in bytes.iter_mut().enumerate().skip(1) {
638            *b = seed.wrapping_mul(31).wrapping_add(i as u8);
639        }
640        ContentHash::from_bytes(bytes)
641    }
642
643    fn deterministic_change_id(seed: u8) -> ChangeId {
644        let mut bytes = [0u8; 16];
645        bytes[0] = seed;
646        for (i, b) in bytes.iter_mut().enumerate().skip(1) {
647            *b = seed.wrapping_add(i as u8 * 7);
648        }
649        ChangeId::from_bytes(bytes)
650    }
651
652    /// Test rig: returns the builder, the bucket dir (for cleanup
653    /// inspection), and the index path the builder will write at
654    /// finalize. The index path lives in the temp dir so it gets
655    /// auto-cleaned with `tmp`.
656    fn fresh_builder(
657        tmp: &tempfile::TempDir,
658    ) -> (StreamingPackBuilder<Cursor<Vec<u8>>>, PathBuf, PathBuf) {
659        let bucket_dir = tmp.path().join("buckets");
660        let index_path = tmp.path().join("test.idx");
661        let cursor = Cursor::new(Vec::<u8>::new());
662        let b = StreamingPackBuilder::new(
663            cursor,
664            index_path.clone(),
665            CompressionConfig::default(),
666            bucket_dir.clone(),
667        )
668        .unwrap();
669        (b, bucket_dir, index_path)
670    }
671
672    /// Finalize the builder and return `(pack_bytes, index_bytes, stats)`.
673    /// The index bytes are read back from the file the builder wrote
674    /// to — verifying that the streaming index path actually produced
675    /// readable bytes.
676    fn finalize_cursor(
677        b: StreamingPackBuilder<Cursor<Vec<u8>>>,
678        index_path: &std::path::Path,
679    ) -> (Vec<u8>, Vec<u8>, PackStats) {
680        let (cursor, stats) = b.finalize().unwrap();
681        let index_bytes = std::fs::read(index_path).unwrap();
682        (cursor.into_inner(), index_bytes, stats)
683    }
684
685    #[test]
686    fn empty_pack_finalizes_to_valid_zero_count_pack() {
687        let tmp = tempfile::TempDir::new().unwrap();
688        let (b, bucket_dir, idx_path) = fresh_builder(&tmp);
689        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
690
691        assert_eq!(stats.object_count, 0);
692        // PackReader can parse the empty pack and reports zero objects.
693        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
694        assert!(reader.list_ids().is_empty());
695        // Bucket dir was removed.
696        assert!(
697            !bucket_dir.exists(),
698            "bucket dir should be cleaned on successful finalize"
699        );
700    }
701
702    #[test]
703    fn single_blob_with_hash_id_round_trips() {
704        let tmp = tempfile::TempDir::new().unwrap();
705        let (mut b, _, idx_path) = fresh_builder(&tmp);
706        let hash = deterministic_hash(0x42);
707        let payload = b"hello, streaming pack".to_vec();
708        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
709        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
710
711        assert_eq!(stats.object_count, 1);
712        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
713        let id = PackObjectId::Hash(hash);
714        assert!(reader.has_object(&id));
715        let (got_type, got_data) = reader.get_object(&id).unwrap().unwrap();
716        assert_eq!(got_type, ObjectType::Blob);
717        assert_eq!(got_data, payload);
718    }
719
720    #[test]
721    fn single_state_with_change_id_round_trips() {
722        let tmp = tempfile::TempDir::new().unwrap();
723        let (mut b, _, idx_path) = fresh_builder(&tmp);
724        let cid = deterministic_change_id(0xa5);
725        let payload = b"serialized-state-bytes".to_vec();
726        b.add_id(
727            PackObjectId::ChangeId(cid),
728            ObjectType::State,
729            payload.clone(),
730        )
731        .unwrap();
732        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
733
734        assert_eq!(stats.object_count, 1);
735        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
736        let id = PackObjectId::ChangeId(cid);
737        let (ty, data) = reader.get_object(&id).unwrap().unwrap();
738        assert_eq!(ty, ObjectType::State);
739        assert_eq!(data, payload);
740    }
741
742    #[test]
743    fn mixed_hash_and_changeid_ids_all_retrievable() {
744        let tmp = tempfile::TempDir::new().unwrap();
745        let (mut b, _, idx_path) = fresh_builder(&tmp);
746        let blob_hash = deterministic_hash(0x10);
747        let tree_hash = deterministic_hash(0x20);
748        let state_cid = deterministic_change_id(0x80);
749
750        b.add(blob_hash, ObjectType::Blob, b"blob-bytes".to_vec())
751            .unwrap();
752        b.add(tree_hash, ObjectType::Tree, b"serialized-tree".to_vec())
753            .unwrap();
754        b.add_id(
755            PackObjectId::ChangeId(state_cid),
756            ObjectType::State,
757            b"serialized-state".to_vec(),
758        )
759        .unwrap();
760
761        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
762        assert_eq!(stats.object_count, 3);
763        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
764        assert_eq!(
765            reader
766                .get_object(&PackObjectId::Hash(blob_hash))
767                .unwrap()
768                .unwrap()
769                .1,
770            b"blob-bytes".to_vec()
771        );
772        assert_eq!(
773            reader
774                .get_object(&PackObjectId::Hash(tree_hash))
775                .unwrap()
776                .unwrap()
777                .1,
778            b"serialized-tree".to_vec()
779        );
780        assert_eq!(
781            reader
782                .get_object(&PackObjectId::ChangeId(state_cid))
783                .unwrap()
784                .unwrap()
785                .1,
786            b"serialized-state".to_vec()
787        );
788    }
789
790    #[test]
791    fn ten_thousand_objects_round_trip_correctly() {
792        // Stresses the bucket sort: 10K objects spread across
793        // 256 hash buckets averages 40 entries per bucket — well
794        // within in-memory sort capacity but covers every bucket.
795        let tmp = tempfile::TempDir::new().unwrap();
796        let (mut b, _, idx_path) = fresh_builder(&tmp);
797        let mut hashes = Vec::with_capacity(10_000);
798        for i in 0..10_000u32 {
799            // Use BLAKE3 over the index so first-byte distribution is
800            // pseudo-uniform across the 256 hash buckets.
801            let h = blake3::hash(&i.to_le_bytes());
802            let hash = ContentHash::from_bytes(*h.as_bytes());
803            hashes.push(hash);
804            b.add(hash, ObjectType::Blob, format!("payload-{i}").into_bytes())
805                .unwrap();
806        }
807        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
808        assert_eq!(stats.object_count, 10_000);
809
810        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
811        assert_eq!(reader.list_ids().len(), 10_000);
812        // Spot-check ten across the range.
813        for i in [0, 1, 99, 1234, 5_000, 9_999] {
814            let id = PackObjectId::Hash(hashes[i]);
815            let (_ty, data) = reader.get_object(&id).unwrap().unwrap();
816            assert_eq!(data, format!("payload-{i}").into_bytes());
817        }
818    }
819
820    #[test]
821    fn bucket_writers_are_lru_capped_below_fd_limit() {
822        let tmp = tempfile::TempDir::new().unwrap();
823        let (mut b, _bucket_dir, idx_path) = fresh_builder(&tmp);
824        let mut ids = Vec::new();
825
826        for i in 0..BUCKETS_PER_VARIANT {
827            let hash = deterministic_hash(i as u8);
828            ids.push(PackObjectId::Hash(hash));
829            b.add(hash, ObjectType::Blob, format!("hash-{i}").into_bytes())
830                .unwrap();
831            assert!(
832                b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
833                "open bucket writers should stay capped"
834            );
835        }
836
837        for i in 0..BUCKETS_PER_VARIANT {
838            let cid = deterministic_change_id(i as u8);
839            ids.push(PackObjectId::ChangeId(cid));
840            b.add_id(
841                PackObjectId::ChangeId(cid),
842                ObjectType::State,
843                format!("state-{i}").into_bytes(),
844            )
845            .unwrap();
846            assert!(
847                b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
848                "open bucket writers should stay capped"
849            );
850        }
851
852        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
853        assert_eq!(stats.object_count, TOTAL_BUCKETS as u64);
854        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
855        for id in ids {
856            assert!(reader.has_object(&id), "missing id {id:?}");
857        }
858    }
859
860    #[test]
861    fn index_id_sort_order_matches_packbuilder_output() {
862        // PackBuilder groups objects by `ObjectType` before encoding,
863        // which changes the byte offsets relative to a streaming builder
864        // that writes in added-order. The bytes of the two indices
865        // therefore can't match exactly. What MUST match is the
866        // **sort order of ids** — both builders ultimately call
867        // `PackIndex::sort()` (or the bucket-equivalent), and any
868        // reader binary-searches against that order.
869        use crate::store::pack::PackBuilder;
870        let payloads: Vec<(PackObjectId, ObjectType, Vec<u8>)> = (0..200u32)
871            .map(|i| {
872                let h = blake3::hash(&i.to_le_bytes());
873                (
874                    PackObjectId::Hash(ContentHash::from_bytes(*h.as_bytes())),
875                    if i % 3 == 0 {
876                        ObjectType::Tree
877                    } else {
878                        ObjectType::Blob
879                    },
880                    format!("body-{i}").into_bytes(),
881                )
882            })
883            .collect();
884
885        // Disable delta encoding so the classic builder produces a pack
886        // shape comparable to the streaming one (which never deltas).
887        let compression = CompressionConfig {
888            max_delta_size: 0,
889            ..CompressionConfig::default()
890        };
891        let mut classic = PackBuilder::new(compression);
892        for (id, ty, data) in payloads.iter() {
893            classic.add_id(*id, *ty, data.clone());
894        }
895        let (classic_pack, classic_index, _) = classic.build().unwrap();
896        let classic_reader = PackReader::from_bytes(classic_pack, classic_index).unwrap();
897
898        let tmp = tempfile::TempDir::new().unwrap();
899        let bucket_dir = tmp.path().join("buckets");
900        let idx_path = tmp.path().join("test.idx");
901        let cursor = Cursor::new(Vec::<u8>::new());
902        let mut streaming =
903            StreamingPackBuilder::new(cursor, idx_path.clone(), compression, bucket_dir).unwrap();
904        for (id, ty, data) in payloads.iter() {
905            streaming.add_id(*id, *ty, data.clone()).unwrap();
906        }
907        let (streaming_pack, streaming_index, _) = finalize_cursor(streaming, &idx_path);
908        let streaming_reader = PackReader::from_bytes(streaming_pack, streaming_index).unwrap();
909
910        // Same set of ids in the same sorted order — that's the
911        // contract for binary search to work.
912        assert_eq!(
913            streaming_reader.list_ids(),
914            classic_reader.list_ids(),
915            "streaming and classic indices should report the same id sequence"
916        );
917        // Spot-check that each id resolves to a payload that matches
918        // the classic builder's output (equal bytes after decompression).
919        for (id, _ty, want) in payloads.iter().take(10).chain(payloads.iter().skip(190)) {
920            let (_, got) = streaming_reader.get_object(id).unwrap().unwrap();
921            assert_eq!(&got, want);
922            let (_, classic_got) = classic_reader.get_object(id).unwrap().unwrap();
923            assert_eq!(got, classic_got);
924        }
925    }
926
927    #[test]
928    fn corrupted_pack_fails_checksum_verification() {
929        let tmp = tempfile::TempDir::new().unwrap();
930        let (mut b, _, idx_path) = fresh_builder(&tmp);
931        b.add(
932            deterministic_hash(0x01),
933            ObjectType::Blob,
934            b"some bytes".to_vec(),
935        )
936        .unwrap();
937        let (mut pack_data, index_data, _) = finalize_cursor(b, &idx_path);
938        // Flip one byte in the body. The trailer checksum must reject.
939        let body_byte = 18; // past the 16-byte header
940        pack_data[body_byte] ^= 0xff;
941        let result = PackReader::from_bytes(pack_data, index_data);
942        assert!(
943            result.is_err(),
944            "PackReader should reject pack with mutated body"
945        );
946    }
947
948    #[test]
949    fn pack_count_in_header_matches_index_entry_count() {
950        let tmp = tempfile::TempDir::new().unwrap();
951        let (mut b, _, idx_path) = fresh_builder(&tmp);
952        for i in 0..7u8 {
953            b.add(
954                deterministic_hash(i),
955                ObjectType::Blob,
956                format!("p{i}").into_bytes(),
957            )
958            .unwrap();
959        }
960        let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
961        // Header count is bytes 8..16 (big-endian).
962        let count = u64::from_be_bytes(pack_data[8..16].try_into().unwrap());
963        assert_eq!(count, 7);
964        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
965        assert_eq!(reader.list_ids().len(), 7);
966    }
967
968    #[test]
969    fn bucket_files_are_cleaned_on_successful_finalize() {
970        let tmp = tempfile::TempDir::new().unwrap();
971        let bucket_dir = tmp.path().join("buckets");
972        let idx_path = tmp.path().join("test.idx");
973        let cursor = Cursor::new(Vec::<u8>::new());
974        let mut b = StreamingPackBuilder::new(
975            cursor,
976            idx_path.clone(),
977            CompressionConfig::default(),
978            bucket_dir.clone(),
979        )
980        .unwrap();
981        for i in 0..50u8 {
982            b.add(deterministic_hash(i), ObjectType::Blob, vec![i; 32])
983                .unwrap();
984        }
985        // Buckets exist and contain data.
986        assert!(bucket_dir.exists());
987        let bucket_count = std::fs::read_dir(&bucket_dir).unwrap().count();
988        assert!(bucket_count > 0, "bucket dir should hold some files");
989        let _ = finalize_cursor(b, &idx_path);
990        assert!(
991            !bucket_dir.exists(),
992            "bucket dir should be removed on finalize"
993        );
994    }
995
996    #[test]
997    fn bucket_files_are_cleaned_on_drop_without_finalize() {
998        let tmp = tempfile::TempDir::new().unwrap();
999        let bucket_dir = tmp.path().join("buckets");
1000        let idx_path = tmp.path().join("test.idx");
1001        {
1002            let cursor = Cursor::new(Vec::<u8>::new());
1003            let mut b = StreamingPackBuilder::new(
1004                cursor,
1005                idx_path.clone(),
1006                CompressionConfig::default(),
1007                bucket_dir.clone(),
1008            )
1009            .unwrap();
1010            for i in 0..10u8 {
1011                b.add(deterministic_hash(i), ObjectType::Blob, vec![0; 32])
1012                    .unwrap();
1013            }
1014            assert!(bucket_dir.exists());
1015            // Drop without finalize — Drop impl should clean up.
1016        }
1017        assert!(
1018            !idx_path.exists(),
1019            "no index file should have been created without finalize"
1020        );
1021        assert!(
1022            !bucket_dir.exists(),
1023            "bucket dir should be removed on Drop when finalize never ran"
1024        );
1025    }
1026
1027    #[test]
1028    fn large_blob_streams_to_disk_without_double_buffering() {
1029        // 4 MiB blob — well under the actual streaming target but big
1030        // enough to confirm we're not buffering the entire pack body in
1031        // RAM. The pack data on disk should be at least 4 MiB; the
1032        // builder's in-memory state is per-object only.
1033        let tmp = tempfile::TempDir::new().unwrap();
1034        let bucket_dir = tmp.path().join("buckets");
1035        let pack_path = tmp.path().join("pack.dat");
1036        let idx_path = tmp.path().join("pack.idx");
1037        let file = std::fs::OpenOptions::new()
1038            .read(true)
1039            .write(true)
1040            .create(true)
1041            .truncate(true)
1042            .open(&pack_path)
1043            .unwrap();
1044        let mut b = StreamingPackBuilder::new(
1045            file,
1046            idx_path.clone(),
1047            CompressionConfig::default(),
1048            bucket_dir,
1049        )
1050        .unwrap();
1051        let payload: Vec<u8> = (0..4 * 1024 * 1024u32).map(|i| (i & 0xff) as u8).collect();
1052        let hash = deterministic_hash(0xff);
1053        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1054        let (_, stats) = b.finalize().unwrap();
1055        let index_data = std::fs::read(&idx_path).unwrap();
1056        assert_eq!(stats.object_count, 1);
1057        let pack_bytes = std::fs::read(&pack_path).unwrap();
1058        // Pack on disk holds the whole compressed payload + headers
1059        // + trailer. Confirm it round-trips.
1060        let reader = PackReader::from_bytes(pack_bytes, index_data).unwrap();
1061        let (_ty, got) = reader
1062            .get_object(&PackObjectId::Hash(hash))
1063            .unwrap()
1064            .unwrap();
1065        assert_eq!(got, payload);
1066    }
1067
1068    #[test]
1069    fn bucket_distribution_for_random_hashes_is_roughly_uniform() {
1070        // Confirms our sort-time peak memory bound. We accumulate
1071        // 1024 random hashes through a builder and check that no
1072        // single bucket holds more than ~3× the average. (BLAKE3 hash
1073        // first-byte distribution is uniform; this is mostly a
1074        // sanity check that we route to the right bucket and aren't
1075        // accidentally collapsing.)
1076        let tmp = tempfile::TempDir::new().unwrap();
1077        let bucket_dir = tmp.path().join("buckets");
1078        let idx_path = tmp.path().join("test.idx");
1079        let cursor = Cursor::new(Vec::<u8>::new());
1080        let mut b = StreamingPackBuilder::new(
1081            cursor,
1082            idx_path.clone(),
1083            CompressionConfig::default(),
1084            bucket_dir.clone(),
1085        )
1086        .unwrap();
1087        for i in 0..1024u32 {
1088            let h = blake3::hash(&i.to_le_bytes());
1089            let hash = ContentHash::from_bytes(*h.as_bytes());
1090            b.add(hash, ObjectType::Blob, b"x".to_vec()).unwrap();
1091        }
1092        // Inspect bucket file sizes BEFORE finalize (which deletes them).
1093        b.pack_writer.as_mut().unwrap().flush().unwrap();
1094        let mut max_entries = 0usize;
1095        let entry_size = 33 + 8; // tagged-hash + u64 offset
1096        for path in b.bucket_paths.iter() {
1097            if path.exists() {
1098                let size = std::fs::metadata(path).unwrap().len() as usize;
1099                let entries = size / entry_size;
1100                if entries > max_entries {
1101                    max_entries = entries;
1102                }
1103            }
1104        }
1105        // Average is 1024 / 256 = 4 entries per bucket. Allow up to 16
1106        // (4× average) — uniformity isn't perfect on small samples.
1107        assert!(
1108            max_entries <= 16,
1109            "max bucket has {max_entries} entries; uniform expected ~4"
1110        );
1111        let _ = finalize_cursor(b, &idx_path);
1112    }
1113
1114    #[test]
1115    fn finalize_returns_correct_stats() {
1116        let tmp = tempfile::TempDir::new().unwrap();
1117        let (mut b, _, idx_path) = fresh_builder(&tmp);
1118        let payload = vec![0xabu8; 1024];
1119        for i in 0..5u8 {
1120            b.add(deterministic_hash(i), ObjectType::Blob, payload.clone())
1121                .unwrap();
1122        }
1123        let (_, _, stats) = finalize_cursor(b, &idx_path);
1124        assert_eq!(stats.object_count, 5);
1125        assert_eq!(stats.total_uncompressed, 5 * 1024);
1126        assert!(stats.total_compressed > 0);
1127        assert!(stats.compression_ratio > 0.0);
1128        assert_eq!(stats.delta_count, 0, "streaming builder never deltas");
1129    }
1130
1131    #[cfg(feature = "zstd")]
1132    #[test]
1133    fn streaming_compression_roundtrips_through_zstd_frame() {
1134        // Force the streaming path with a payload that compresses
1135        // well (long runs of identical bytes). Verifies:
1136        //  1. Streaming output decodes back to the original bytes.
1137        //  2. The compressed body is genuinely smaller than the
1138        //     uncompressed input (proving zstd ran), and
1139        //  3. The non-canonical 10-byte varint patched into the
1140        //     compressed_size slot decodes to the right value.
1141        let tmp = tempfile::TempDir::new().unwrap();
1142        let (mut b, _, idx_path) = fresh_builder(&tmp);
1143        // 64 KiB of zeros — compresses to a tiny zstd frame, well
1144        // above the default `min_size` so we hit the streaming branch.
1145        let payload = vec![0u8; 64 * 1024];
1146        let hash = deterministic_hash(0x77);
1147        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1148        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
1149        assert!(
1150            stats.total_compressed < stats.total_uncompressed,
1151            "expected compression ratio < 1.0, got {}/{}",
1152            stats.total_compressed,
1153            stats.total_uncompressed
1154        );
1155        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1156        let (_ty, got) = reader
1157            .get_object(&PackObjectId::Hash(hash))
1158            .unwrap()
1159            .unwrap();
1160        assert_eq!(got, payload);
1161    }
1162
1163    #[cfg(feature = "zstd")]
1164    #[test]
1165    fn padded_varint_decodes_to_original_value_for_canonical_decoder() {
1166        // Sanity for the seek-back scheme: for every value we'd want
1167        // to encode (small, mid, large), confirm the existing
1168        // `decode_varint` returns the same `value` from a 10-byte
1169        // padded encoding. If this ever fails the streaming path's
1170        // patched compressed_size would be misread by readers.
1171        let cases: &[u64] = &[0, 1, 127, 128, 4096, 1_000_000, 1_000_000_000_000, u64::MAX];
1172        for &value in cases {
1173            let mut buf = [0u8; 10];
1174            super::encode_varint_padded_to_10(value, &mut buf);
1175            let (decoded, consumed) = super::super::varint::decode_varint(&buf)
1176                .expect("padded varint should always decode");
1177            assert_eq!(decoded, value, "varint roundtrip failed for {value}");
1178            assert_eq!(
1179                consumed, 10,
1180                "padded encoding should consume all 10 bytes for {value}"
1181            );
1182        }
1183    }
1184
1185    #[cfg(feature = "zstd")]
1186    #[test]
1187    fn streaming_path_does_not_buffer_compressed_payload_in_memory() {
1188        // Smoke check: write a single 8 MiB payload, observe the
1189        // pack file size on disk during/after the add. The pack file
1190        // grows incrementally during the streaming compression — if
1191        // we were buffering an intermediate compressed `Vec<u8>` the
1192        // on-disk size would jump by ~8 MiB at finalize, not stay
1193        // bounded as the encoder pumps bytes through.
1194        //
1195        // We can't easily measure peak heap from inside Rust without
1196        // a custom allocator. What we *can* verify is that calling
1197        // `add` returns control with the pack file already at its
1198        // final body size, demonstrating the encoder wrote through
1199        // and didn't accumulate.
1200        let tmp = tempfile::TempDir::new().unwrap();
1201        let bucket_dir = tmp.path().join("buckets");
1202        let pack_path = tmp.path().join("pack.dat");
1203        let idx_path = tmp.path().join("pack.idx");
1204        let file = std::fs::OpenOptions::new()
1205            .read(true)
1206            .write(true)
1207            .create(true)
1208            .truncate(true)
1209            .open(&pack_path)
1210            .unwrap();
1211        let mut b = StreamingPackBuilder::new(
1212            file,
1213            idx_path.clone(),
1214            CompressionConfig::default(),
1215            bucket_dir,
1216        )
1217        .unwrap();
1218        let payload = vec![0xa5u8; 8 * 1024 * 1024];
1219        let hash = deterministic_hash(0x66);
1220        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1221        // Pack file already on disk holds at least the entry header +
1222        // compressed payload (excluding the 32-byte trailer the builder
1223        // appends at finalize).
1224        let mid_size = std::fs::metadata(&pack_path).unwrap().len();
1225        assert!(
1226            mid_size > 16 + 40,
1227            "pack file should hold real entry data after add; size={mid_size}"
1228        );
1229        let (_, _) = b.finalize().unwrap();
1230        let pack_bytes = std::fs::read(&pack_path).unwrap();
1231        let index_bytes = std::fs::read(&idx_path).unwrap();
1232        let reader = PackReader::from_bytes(pack_bytes, index_bytes).unwrap();
1233        let (_ty, got) = reader
1234            .get_object(&PackObjectId::Hash(hash))
1235            .unwrap()
1236            .unwrap();
1237        assert_eq!(got, payload);
1238    }
1239
1240    #[test]
1241    fn list_ids_returns_all_added_ids_sorted() {
1242        let tmp = tempfile::TempDir::new().unwrap();
1243        let (mut b, _, idx_path) = fresh_builder(&tmp);
1244        let mut added: Vec<PackObjectId> = Vec::new();
1245        // Mix of Hash and ChangeId in a non-sorted order on input.
1246        for seed in [0x05u8, 0xa0, 0x12, 0x9f, 0x33] {
1247            let id = PackObjectId::Hash(deterministic_hash(seed));
1248            b.add_id(id, ObjectType::Blob, vec![seed; 4]).unwrap();
1249            added.push(id);
1250        }
1251        for seed in [0x80u8, 0x10, 0xff] {
1252            let id = PackObjectId::ChangeId(deterministic_change_id(seed));
1253            b.add_id(id, ObjectType::State, vec![seed; 4]).unwrap();
1254            added.push(id);
1255        }
1256        let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1257        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1258        let mut got = reader.list_ids();
1259        // PackReader's list_ids returns index order — should already be
1260        // sorted because we sort on finalize.
1261        let mut sorted = got.clone();
1262        sorted.sort();
1263        assert_eq!(got, sorted, "list_ids must come back sorted");
1264        // And every added id should appear.
1265        added.sort();
1266        got.sort();
1267        assert_eq!(got, added);
1268    }
1269}