Skip to main content

objects/store/pack/
streaming_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Streaming pack builder for bounded-memory imports.
3//!
4//! `PackBuilder` accumulates every `(id, type, data)` tuple in memory
5//! before producing a pack. That's fine for sync-protocol packets and
6//! small batches, but the import path can produce millions of objects
7//! and would OOM on large repos.
8//!
9//! `StreamingPackBuilder` removes the in-memory buffering by:
10//!
11//! 1. **Streaming pack data to disk** as objects are added. Compression
12//!    runs per-object (the existing zstd path is non-streaming, so the
13//!    one compressed payload is held briefly in a `Vec<u8>` before
14//!    being written), but the writer never holds more than one
15//!    object's worth of data plus its `BufWriter` capacity.
16//!
17//! 2. **External sorting the index** via 512 hash-prefix bucket files
18//!    on disk (256 for `Hash` ids, 256 for `ChangeId` ids). Each
19//!    `add()` appends one fixed-shape `(id, offset)` record to the
20//!    bucket whose first byte matches the id's first inner byte. At
21//!    finalize, each bucket is small enough to sort in memory; the
22//!    concatenation of `Hash` buckets followed by `ChangeId` buckets
23//!    in byte order produces the exact same global sort `PackBuilder`
24//!    would have via `entries.sort_by_key(|e| e.id)`.
25//!
26//! ## Memory bound
27//!
28//! - Pack data on disk: streamed; only one compressed object held in
29//!   memory at a time.
30//! - Index entries in bucket buffers: at most 32 bucket files are held
31//!   open at once, each behind a default-capacity `BufWriter` (~8 KB),
32//!   so peak buffering is ~256 KB.
33//! - Sort scratch at finalize: O(largest bucket). For uniformly-
34//!   distributed BLAKE3 hashes / ULID change-ids and N total objects,
35//!   the largest bucket is ~N/256 entries ≈ 40 bytes each. Even at
36//!   100 M objects that's ~16 MB peak.
37//!
38//! Net peak memory: ~20 MB regardless of repo size, modulo the size
39//! of the largest single object (which is unavoidable while the zstd
40//! API is non-streaming).
41//!
42//! ## Trade-offs vs `PackBuilder`
43//!
44//! - **No delta encoding.** Streaming and sliding-window deltas are
45//!   incompatible — delta search needs random access to recently-
46//!   written objects. The import path runs with deltas disabled
47//!   anyway (the cost-benefit is bad on real Heddle history), so this
48//!   is a non-issue for the call site that motivated this builder.
49//! - **No path-grouped reordering.** Entries land in the order added.
50//! - **Output is a pack file at a path** rather than `(Vec<u8>, Vec<u8>)`.
51//!   Callers pair this with [`crate::store::ObjectStore::install_pack_from_path`]
52//!   which moves/installs the pack without copying it through RAM.
53//! - **Re-reads the pack at finalize** to compute the BLAKE3 trailer
54//!   checksum (the pack format hashes header+body, and the count goes
55//!   in the header — we patch it on finalize via seek-back, then
56//!   re-stream the body to the hasher). 2× sequential disk I/O on the
57//!   pack data is the cost of sticking with the current format. A
58//!   future format change could put the count in the footer to avoid
59//!   the second pass.
60
61use std::{
62    fs::{File, OpenOptions},
63    io::{BufWriter, Read, Seek, SeekFrom, Write},
64    path::PathBuf,
65};
66
67use heddle_format::compression::CompressionConfig;
68
69use super::{ObjectType, PackObjectId, PackStats, pack_container_spec, write_container_header};
70
71/// How many bytes to reserve for the compressed-size varint in the
72/// streaming path. 10 is enough to encode any `u64` (max 9 7-bit
73/// continuation bytes plus 1 terminator). After streaming we patch
74/// the placeholder with a non-canonical varint that pads to exactly
75/// this length. Only the zstd-enabled compress path uses it.
76#[cfg(feature = "zstd")]
77const CSIZE_PLACEHOLDER_LEN: usize = 10;
78use crate::{
79    object::ContentHash,
80    store::{Result, StoreError},
81};
82
83/// Number of buckets per id variant. 256 = one bucket per first byte
84/// of the inner id. We want the bucket boundaries to align with the
85/// `PackObjectId`'s `Ord` derivation (variant tag major, inner bytes
86/// minor) so the concatenated bucket output matches what
87/// `PackIndex::sort()` would have produced.
88const BUCKETS_PER_VARIANT: usize = 256;
89/// 256 for Hash ids + 256 for ChangeId ids.
90const TOTAL_BUCKETS: usize = BUCKETS_PER_VARIANT * 2;
91/// Cap concurrently-open index-bucket files. macOS GUI-launched
92/// processes commonly inherit a 256-fd soft limit; imports also need
93/// room for Git pack/index files, sqlite maps, the output pack, etc.
94const MAX_OPEN_BUCKET_WRITERS: usize = 32;
95
96/// Variant indices into the `bucket_*` arrays. `Hash` ids fill the
97/// lower half (matches the variant order in `PackObjectId` which makes
98/// `Hash(_) < ChangeId(_)`).
99const HASH_VARIANT: usize = 0;
100const CHANGEID_VARIANT: usize = 1;
101
102/// Streaming pack builder. Held generic over the pack writer (`File`
103/// in production, `Cursor<Vec<u8>>` in tests).
104pub struct StreamingPackBuilder<W: Write + Read + Seek> {
105    /// Writer for the pack's `[header][body]` content. The trailer
106    /// checksum is appended to the same writer at `finalize`.
107    /// Wrapped in `Option` so `finalize` can `.take()` it out without
108    /// running afoul of the `Drop` impl's restriction on moving fields.
109    /// `None` after `finalize` succeeds.
110    pack_writer: Option<BufWriter<W>>,
111    /// Position in the pack writer where the header was written, so
112    /// we can seek back at finalize and patch the real `object_count`
113    /// into bytes 8..16.
114    header_offset: u64,
115    /// Logical append position. Avoids flushing the buffered writer before
116    /// every object just to ask the file for its current offset.
117    pack_position: u64,
118    object_count: u64,
119    declared_object_count: Option<u64>,
120    total_uncompressed: u64,
121    total_compressed: u64,
122    /// Compression knobs. Only consulted when the `zstd` feature is on
123    /// (`enabled` and `min_size` decide whether each entry compresses;
124    /// `level` parameterizes the encoder). Without `zstd` every entry
125    /// takes the raw branch and this field is just along for the ride.
126    #[cfg_attr(not(feature = "zstd"), allow(dead_code))]
127    compression: CompressionConfig,
128    /// Directory holding the 512 bucket files. Owned by the builder
129    /// so we can clean up on `Drop` if `finalize` is never called.
130    bucket_dir: PathBuf,
131    /// Buckets `[variant][prefix_byte]` → optional buffered file.
132    /// Lazily opened on first write and capped with LRU eviction so a
133    /// large import cannot exhaust the process fd limit.
134    bucket_writers: Vec<Option<BucketWriter>>,
135    open_bucket_writers: usize,
136    bucket_access_tick: u64,
137    bucket_paths: Vec<PathBuf>,
138    /// File path where the pack index is materialized at `finalize`.
139    /// Bytes are written incrementally as buckets are sorted, so the
140    /// index never sits in memory in its entirety.
141    index_path: PathBuf,
142    /// Set true on `finalize` so `Drop` knows the bucket dir was
143    /// already cleaned and shouldn't be removed again.
144    finalized: bool,
145}
146
147struct BucketWriter {
148    writer: BufWriter<File>,
149    last_used: u64,
150}
151
152#[cfg(feature = "zstd")]
153struct CountingWriter<'a, W: Write> {
154    inner: &'a mut W,
155    written: u64,
156}
157
158#[cfg(feature = "zstd")]
159impl<'a, W: Write> CountingWriter<'a, W> {
160    fn new(inner: &'a mut W) -> Self {
161        Self { inner, written: 0 }
162    }
163}
164
165#[cfg(feature = "zstd")]
166impl<W: Write> Write for CountingWriter<'_, W> {
167    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
168        let written = self.inner.write(buf)?;
169        self.written = self.written.saturating_add(written as u64);
170        Ok(written)
171    }
172
173    fn flush(&mut self) -> std::io::Result<()> {
174        self.inner.flush()
175    }
176}
177
178impl<W: Write + Read + Seek> StreamingPackBuilder<W> {
179    /// Open a streaming builder against `pack_writer`, using
180    /// `bucket_dir` for transient index buckets and writing the
181    /// finalized index to `index_path`. The bucket dir is created if
182    /// it doesn't exist; on a successful `finalize` it's removed
183    /// (along with any bucket files left in it).
184    ///
185    /// `index_path` is *not* created by `new` — opening happens at
186    /// finalize so a misconfigured caller doesn't leave an empty index
187    /// file behind on early failure. It's still recorded here so
188    /// `finalize` can write to a known location and the caller can
189    /// install the file by path.
190    ///
191    /// The `pack_writer` must support `Read` because finalize re-streams
192    /// the body to compute the trailer checksum — see the module-level
193    /// note on the format.
194    pub fn new(
195        pack_writer: W,
196        index_path: PathBuf,
197        compression: CompressionConfig,
198        bucket_dir: PathBuf,
199    ) -> Result<Self> {
200        Self::new_inner(pack_writer, index_path, compression, bucket_dir, None)
201    }
202
203    /// Open a streaming builder whose final object count is known up front.
204    ///
205    /// This writes the real count into the pack header immediately, so callers
206    /// may safely stream already-flushed pack bytes before `finalize()` appends
207    /// the trailer checksum. `finalize()` still verifies that exactly this many
208    /// objects were added before producing the index.
209    pub fn new_with_object_count(
210        pack_writer: W,
211        index_path: PathBuf,
212        compression: CompressionConfig,
213        bucket_dir: PathBuf,
214        object_count: u64,
215    ) -> Result<Self> {
216        Self::new_inner(
217            pack_writer,
218            index_path,
219            compression,
220            bucket_dir,
221            Some(object_count),
222        )
223    }
224
225    fn new_inner(
226        mut pack_writer: W,
227        index_path: PathBuf,
228        compression: CompressionConfig,
229        bucket_dir: PathBuf,
230        declared_object_count: Option<u64>,
231    ) -> Result<Self> {
232        std::fs::create_dir_all(&bucket_dir).map_err(StoreError::from)?;
233        let header_offset = pack_writer.stream_position().map_err(StoreError::from)?;
234
235        // Write a placeholder header with `count = 0` unless the caller knows
236        // the count up front. The known-count path lets network senders tail
237        // flushed pack bytes before finalize without later mutating bytes they
238        // already sent.
239        let mut header_bytes = Vec::with_capacity(16);
240        write_container_header(
241            &mut header_bytes,
242            pack_container_spec(),
243            declared_object_count.unwrap_or(0),
244        );
245        pack_writer
246            .write_all(&header_bytes)
247            .map_err(StoreError::from)?;
248
249        let bucket_paths: Vec<PathBuf> = (0..TOTAL_BUCKETS)
250            .map(|i| {
251                let variant = if i < BUCKETS_PER_VARIANT { 'h' } else { 'c' };
252                let prefix = i % BUCKETS_PER_VARIANT;
253                bucket_dir.join(format!("bucket-{variant}-{prefix:02x}"))
254            })
255            .collect();
256        for path in &bucket_paths {
257            let _ = std::fs::remove_file(path);
258        }
259
260        Ok(Self {
261            pack_writer: Some(BufWriter::new(pack_writer)),
262            header_offset,
263            pack_position: header_offset + header_bytes.len() as u64,
264            object_count: 0,
265            declared_object_count,
266            total_uncompressed: 0,
267            total_compressed: 0,
268            compression,
269            bucket_dir,
270            bucket_writers: (0..TOTAL_BUCKETS).map(|_| None).collect(),
271            open_bucket_writers: 0,
272            bucket_access_tick: 0,
273            bucket_paths,
274            index_path,
275            finalized: false,
276        })
277    }
278
279    /// Flush pack bytes written so far to the underlying writer.
280    ///
281    /// Used by hosted sync's interleaved build/send path: after each complete
282    /// entry is added, the sender flushes and drains full chunks from the file.
283    pub fn flush_pack(&mut self) -> Result<()> {
284        if let Some(writer) = self.pack_writer.as_mut() {
285            writer.flush().map_err(StoreError::from)?;
286        }
287        Ok(())
288    }
289
290    /// Add an object with a content-hash id.
291    pub fn add(&mut self, hash: ContentHash, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
292        self.add_id(PackObjectId::Hash(hash), obj_type, data)
293    }
294
295    /// Add an object with an explicit id. Mirrors [`super::PackBuilder::add_id`].
296    ///
297    /// # Memory shape
298    ///
299    /// Per-entry, the only allocations are:
300    ///
301    /// - `data: Vec<u8>` (the input, owned by the caller — comes from
302    ///   gix' `find_object` and isn't ours to stream further).
303    /// - A ~40-byte stack scratch for the entry header.
304    /// - zstd's internal compression context (~128 KB constant).
305    /// - One 50-byte index-bucket entry buffered into the bucket's
306    ///   `BufWriter`.
307    ///
308    /// The compressed payload is **never materialized** as a `Vec<u8>` —
309    /// it streams directly through `zstd::stream::write::Encoder` into
310    /// the pack writer. The pack format requires a `compressed_size`
311    /// varint *before* the compressed bytes, which we don't know yet
312    /// when we write the header; we reserve a 10-byte placeholder and
313    /// seek-back to patch it after the encoder finishes. Heddle's
314    /// varint decoder accepts non-canonical encodings (it walks
315    /// continuation bits without enforcing minimum-byte form), so the
316    /// padded write decodes back to the same value any reader expects.
317    pub fn add_id(&mut self, id: PackObjectId, obj_type: ObjectType, data: Vec<u8>) -> Result<()> {
318        // Compute the entry's offset relative to the header from our logical
319        // append cursor. Asking the underlying file for its position would
320        // flush the BufWriter on every object, which defeats the streaming
321        // sender's chunk-sized drain cadence.
322        let pw = self
323            .pack_writer
324            .as_mut()
325            .expect("add_id called after finalize");
326        let entry_start = self.pack_position;
327        let offset = entry_start
328            .checked_sub(self.header_offset)
329            .expect("header_offset should never be past current position");
330
331        self.total_uncompressed += data.len() as u64;
332
333        // Phase 1: write the entry header up to (but not including) the
334        // compressed-size varint. Always small, fits in `entry_header_buf`.
335        let mut header_buf = Vec::with_capacity(40);
336        id.encode_tagged(&mut header_buf);
337        super::varint::encode_type_and_size(obj_type, data.len() as u64, &mut header_buf);
338        pw.write_all(&header_buf).map_err(StoreError::from)?;
339        self.pack_position = self
340            .pack_position
341            .checked_add(header_buf.len() as u64)
342            .ok_or_else(|| {
343                StoreError::InvalidObject("streaming pack position overflow".to_string())
344            })?;
345        // Only consumed by the zstd-enabled streaming branch below, but
346        // we compute it here while we already have `header_buf`'s length
347        // in scope.
348        #[cfg(feature = "zstd")]
349        let csize_pos = entry_start + header_buf.len() as u64;
350
351        // Phase 2: stream the compressed payload. We branch here on
352        // whether to compress at all — for tiny objects (`< min_size`)
353        // the bulk path traditionally wrote raw bytes to skip zstd
354        // overhead, and the reader's existing `compressed_size ==
355        // uncompressed_size` heuristic in `pack_reader.rs:128` reads
356        // raw entries back unchanged. We preserve that policy.
357        // `want_compress` gates the zstd path. Even with the feature
358        // enabled we fall through to raw for tiny entries (where
359        // zstd's frame overhead dominates) or when the caller
360        // explicitly disabled compression in `CompressionConfig`.
361        // Without the `zstd` Cargo feature, every entry takes the raw
362        // branch — same fallback shape as `compress_pack_payload`.
363        let want_compress: bool;
364        #[cfg(feature = "zstd")]
365        {
366            want_compress = self.compression.enabled && data.len() >= self.compression.min_size;
367        }
368        #[cfg(not(feature = "zstd"))]
369        {
370            want_compress = false;
371        }
372        if !want_compress {
373            // Raw entry: known compressed_size = data.len(). One canonical
374            // varint + the data itself. No seek-back needed.
375            let mut csize_buf = Vec::with_capacity(10);
376            super::varint::encode_varint(data.len() as u64, &mut csize_buf);
377            pw.write_all(&csize_buf).map_err(StoreError::from)?;
378            self.pack_position = self
379                .pack_position
380                .checked_add(csize_buf.len() as u64)
381                .ok_or_else(|| {
382                    StoreError::InvalidObject("streaming pack position overflow".to_string())
383                })?;
384            pw.write_all(&data).map_err(StoreError::from)?;
385            self.pack_position = self
386                .pack_position
387                .checked_add(data.len() as u64)
388                .ok_or_else(|| {
389                    StoreError::InvalidObject("streaming pack position overflow".to_string())
390                })?;
391            self.total_compressed += data.len() as u64;
392        } else {
393            #[cfg(feature = "zstd")]
394            {
395                // Streaming entry: reserve 10 bytes for compressed_size,
396                // stream-compress the payload, then seek back to patch.
397                pw.write_all(&[0u8; CSIZE_PLACEHOLDER_LEN])
398                    .map_err(StoreError::from)?;
399                self.pack_position = self
400                    .pack_position
401                    .checked_add(CSIZE_PLACEHOLDER_LEN as u64)
402                    .ok_or_else(|| {
403                        StoreError::InvalidObject("streaming pack position overflow".to_string())
404                    })?;
405                let body_start = self.pack_position;
406                let compressed_size;
407                {
408                    let mut counting = CountingWriter::new(&mut *pw);
409                    let mut enc =
410                        zstd::stream::write::Encoder::new(&mut counting, self.compression.level)
411                            .map_err(StoreError::from)?;
412                    // Pass the source size so the zstd frame's optional
413                    // Frame Content Size field is set — lets decoders
414                    // preallocate output buffers and validates that we
415                    // wrote exactly what we promised at finish().
416                    enc.set_pledged_src_size(Some(data.len() as u64))
417                        .map_err(StoreError::from)?;
418                    enc.write_all(&data).map_err(StoreError::from)?;
419                    enc.finish().map_err(StoreError::from)?;
420                    compressed_size = counting.written;
421                }
422                self.pack_position =
423                    self.pack_position
424                        .checked_add(compressed_size)
425                        .ok_or_else(|| {
426                            StoreError::InvalidObject(
427                                "streaming pack position overflow".to_string(),
428                            )
429                        })?;
430                let body_end = body_start.checked_add(compressed_size).ok_or_else(|| {
431                    StoreError::InvalidObject("streaming pack position overflow".to_string())
432                })?;
433                self.total_compressed += compressed_size;
434
435                // Seek back over the placeholder, write a 10-byte
436                // non-canonical varint encoding the actual compressed_size,
437                // then seek forward to where we left off so subsequent
438                // adds append correctly.
439                let mut csize_bytes = [0u8; CSIZE_PLACEHOLDER_LEN];
440                encode_varint_padded_to_10(compressed_size, &mut csize_bytes);
441                pw.flush().map_err(StoreError::from)?;
442                let inner = pw.get_mut();
443                inner
444                    .seek(SeekFrom::Start(csize_pos))
445                    .map_err(StoreError::from)?;
446                inner.write_all(&csize_bytes).map_err(StoreError::from)?;
447                inner
448                    .seek(SeekFrom::Start(body_end))
449                    .map_err(StoreError::from)?;
450            }
451            #[cfg(not(feature = "zstd"))]
452            {
453                // Unreachable: `want_compress` is forced to `false`
454                // when the `zstd` feature is off.
455                unreachable!("compression branch reached without `zstd` feature");
456            }
457        }
458
459        // Append the index entry (id || u64 BE offset) to the bucket
460        // matching the id's first inner byte. The bucket file is opened
461        // lazily so a sparse pack only creates files it actually uses.
462        let bucket_idx = bucket_index_for(&id);
463        let bucket = self.get_or_open_bucket(bucket_idx)?;
464        let mut idx_entry = Vec::with_capacity(33 + 8);
465        id.encode_tagged(&mut idx_entry);
466        idx_entry.extend_from_slice(&offset.to_be_bytes());
467        bucket.write_all(&idx_entry).map_err(StoreError::from)?;
468
469        self.object_count += 1;
470        Ok(())
471    }
472
473    fn get_or_open_bucket(&mut self, idx: usize) -> Result<&mut BufWriter<File>> {
474        self.bucket_access_tick = self.bucket_access_tick.wrapping_add(1);
475        let last_used = self.bucket_access_tick;
476        if self.bucket_writers[idx].is_none() {
477            if self.open_bucket_writers >= MAX_OPEN_BUCKET_WRITERS {
478                self.evict_lru_bucket()?;
479            }
480            let path = &self.bucket_paths[idx];
481            let f = OpenOptions::new()
482                .create(true)
483                .append(true)
484                .open(path)
485                .map_err(StoreError::from)?;
486            self.bucket_writers[idx] = Some(BucketWriter {
487                writer: BufWriter::new(f),
488                last_used,
489            });
490            self.open_bucket_writers += 1;
491        } else if let Some(bucket) = self.bucket_writers[idx].as_mut() {
492            bucket.last_used = last_used;
493        }
494        Ok(&mut self.bucket_writers[idx]
495            .as_mut()
496            .expect("just inserted above")
497            .writer)
498    }
499
500    fn evict_lru_bucket(&mut self) -> Result<()> {
501        let Some((idx, _)) = self
502            .bucket_writers
503            .iter()
504            .enumerate()
505            .filter_map(|(idx, bucket)| bucket.as_ref().map(|bucket| (idx, bucket.last_used)))
506            .min_by_key(|(_, last_used)| *last_used)
507        else {
508            return Ok(());
509        };
510
511        if let Some(mut bucket) = self.bucket_writers[idx].take() {
512            bucket.writer.flush().map_err(StoreError::from)?;
513            self.open_bucket_writers -= 1;
514        }
515        Ok(())
516    }
517
518    /// Close the pack: patch the header count, append the BLAKE3
519    /// trailer, build the sorted index from bucket files, and clean up
520    /// the bucket directory. Returns `(pack_writer, index_bytes,
521    /// stats)` so the caller can install the pack into its store.
522    ///
523    /// On any failure the bucket dir is left in place; rerunning the
524    /// import will overwrite stale bucket files (they're keyed by
525    /// fixed name, not content) so this isn't a correctness issue —
526    /// just a small amount of disk churn until the next clean
527    /// finalize.
528    pub fn finalize(mut self) -> Result<(W, PackStats)> {
529        // 1. Flush every bucket so reads in the next phase see all
530        //    queued entries. `flatten()` skips the never-opened slots.
531        for bucket in self.bucket_writers.iter_mut().flatten() {
532            bucket.writer.flush().map_err(StoreError::from)?;
533        }
534        // Drop the writers so the OS file handles close before we
535        // re-open the same paths for reading.
536        for slot in self.bucket_writers.iter_mut() {
537            *slot = None;
538        }
539        self.open_bucket_writers = 0;
540
541        // 2. Patch the pack header with the real object count unless the
542        //    caller declared it up front, then re-stream the [header][body]
543        //    bytes to compute the trailer checksum.
544        let bw = self
545            .pack_writer
546            .take()
547            .expect("finalize called twice — pack_writer already consumed");
548        let mut writer = bw
549            .into_inner()
550            .map_err(|e| StoreError::from(std::io::Error::other(e.to_string())))?;
551        if let Some(expected) = self.declared_object_count {
552            if expected != self.object_count {
553                return Err(StoreError::InvalidObject(format!(
554                    "streaming pack declared {expected} object(s) but added {}",
555                    self.object_count
556                )));
557            }
558        } else {
559            writer
560                .seek(SeekFrom::Start(self.header_offset))
561                .map_err(StoreError::from)?;
562            let mut header_bytes = Vec::with_capacity(16);
563            write_container_header(&mut header_bytes, pack_container_spec(), self.object_count);
564            writer.write_all(&header_bytes).map_err(StoreError::from)?;
565        }
566
567        // 3. Hash the on-disk content from header_offset to current
568        //    position (which is just past the body). One sequential
569        //    pass; the BufWriter we drained is gone so this read is
570        //    on the raw writer.
571        writer
572            .seek(SeekFrom::Start(self.header_offset))
573            .map_err(StoreError::from)?;
574        let mut hasher = blake3::Hasher::new();
575        let mut buf = vec![0u8; 64 * 1024];
576        loop {
577            let n = writer.read(&mut buf).map_err(StoreError::from)?;
578            if n == 0 {
579                break;
580            }
581            hasher.update(&buf[..n]);
582        }
583        let checksum = hasher.finalize();
584
585        // 4. Append the trailer checksum.
586        writer.seek(SeekFrom::End(0)).map_err(StoreError::from)?;
587        writer
588            .write_all(checksum.as_bytes())
589            .map_err(StoreError::from)?;
590        writer.flush().map_err(StoreError::from)?;
591
592        // 5. Stream the final sorted index directly to disk. We open
593        //    a `BufWriter` against `index_path`, write the index
594        //    container header (magic + version + count — count is
595        //    already known from the per-add bookkeeping), then walk
596        //    the 512 buckets in `(variant, prefix)` order, sorting
597        //    each in memory and writing entries to the file as they
598        //    come off the sort. The intermediate `PackIndex` Vec —
599        //    O(K) in the previous implementation — is gone; the
600        //    largest in-memory state is one bucket's worth of entries.
601        //    Bucket distribution is uniform via BLAKE3 so each bucket
602        //    is ~K/256 entries × ~50 bytes; even at 100M objects that's
603        //    a ~16 MB sort scratch.
604        let idx_file = File::create(&self.index_path).map_err(StoreError::from)?;
605        let mut idx_writer = BufWriter::new(idx_file);
606        write_index_header(&mut idx_writer, self.object_count)?;
607        let mut entries_written: u64 = 0;
608        for path in self.bucket_paths.iter() {
609            if !path.exists() {
610                continue;
611            }
612            let bucket_bytes = std::fs::read(path).map_err(StoreError::from)?;
613            let mut entries = decode_bucket_file(&bucket_bytes)?;
614            // Local sort by `PackObjectId` matches the global sort
615            // because all entries in a bucket share the same variant
616            // tag *and* the same first inner byte; only the remaining
617            // bytes differ between them.
618            entries.sort_by_key(|(id, _)| *id);
619            for (id, offset) in entries {
620                write_index_entry(&mut idx_writer, id, offset)?;
621                entries_written += 1;
622            }
623        }
624        idx_writer.flush().map_err(StoreError::from)?;
625        debug_assert_eq!(
626            entries_written, self.object_count,
627            "streaming index entry count drifted from add() count"
628        );
629
630        // 6. Clean up the bucket dir so the heddle store doesn't carry
631        //    transient artifacts. Deletion failures are non-fatal —
632        //    the dir is uniquely named per import so leftovers are at
633        //    worst stale, not corrupting.
634        for path in self.bucket_paths.iter() {
635            let _ = std::fs::remove_file(path);
636        }
637        let _ = std::fs::remove_dir(&self.bucket_dir);
638        self.finalized = true;
639
640        let stats = PackStats {
641            object_count: self.object_count,
642            total_uncompressed: self.total_uncompressed,
643            total_compressed: self.total_compressed,
644            delta_count: 0,
645            compression_ratio: if self.total_uncompressed == 0 {
646                0.0
647            } else {
648                self.total_compressed as f64 / self.total_uncompressed as f64
649            },
650        };
651
652        Ok((writer, stats))
653    }
654}
655
656/// Write the index container header to `out`. Mirrors
657/// [`PackIndex::to_bytes`]'s prefix exactly (4-byte magic, 4-byte
658/// big-endian version, 8-byte big-endian count) so a reader written
659/// against the existing format works without modification.
660fn write_index_header<W: Write>(out: &mut W, count: u64) -> Result<()> {
661    super::pack_index::index_header().write_to(out, count)
662}
663
664/// Append one `(id, offset)` index entry to `out`. The encoding
665/// matches [`PackIndex::to_bytes`]: tagged id immediately followed by
666/// an 8-byte big-endian offset.
667fn write_index_entry<W: Write>(out: &mut W, id: PackObjectId, offset: u64) -> Result<()> {
668    let mut buf = Vec::with_capacity(33 + 8);
669    id.encode_tagged(&mut buf);
670    buf.extend_from_slice(&offset.to_be_bytes());
671    out.write_all(&buf).map_err(StoreError::from)
672}
673
674/// Encode a `u64` as a non-canonical 10-byte LEB128 varint. The first
675/// 9 bytes always set the continuation bit (`0x80`), the 10th never
676/// does — so the decoder reads exactly 10 bytes regardless of the
677/// value. Used by the streaming path to reserve a fixed-width
678/// placeholder for `compressed_size` before stream-compressing the
679/// payload, then patch the placeholder with the actual size after.
680///
681/// `decode_varint` ignores the canonicalness of the encoding (it
682/// walks continuation bits without checking minimum-byte form), so
683/// the value round-trips exactly. Cost is up to 9 wasted bytes per
684/// entry, ~115 KB on a 13 K-entry import — negligible relative to
685/// the pack body.
686#[cfg(feature = "zstd")]
687fn encode_varint_padded_to_10(value: u64, out: &mut [u8; 10]) {
688    let mut v = value;
689    for slot in out.iter_mut().take(9) {
690        *slot = 0x80 | ((v & 0x7F) as u8);
691        v >>= 7;
692    }
693    out[9] = (v & 0x7F) as u8;
694}
695
696impl<W: Write + Read + Seek> Drop for StreamingPackBuilder<W> {
697    fn drop(&mut self) {
698        if self.finalized {
699            return;
700        }
701        // Best-effort cleanup of bucket dir on abort. Errors here are
702        // suppressed because Drop can't propagate them.
703        for path in self.bucket_paths.iter() {
704            let _ = std::fs::remove_file(path);
705        }
706        let _ = std::fs::remove_dir(&self.bucket_dir);
707    }
708}
709
710/// Map a `PackObjectId` to one of `TOTAL_BUCKETS` buckets. The variant
711/// (Hash vs ChangeId) picks the upper half; the first byte of the
712/// inner id picks the slot within the half.
713fn bucket_index_for(id: &PackObjectId) -> usize {
714    match id {
715        PackObjectId::Hash(h) => HASH_VARIANT * BUCKETS_PER_VARIANT + h.as_bytes()[0] as usize,
716        PackObjectId::ChangeId(c) => {
717            CHANGEID_VARIANT * BUCKETS_PER_VARIANT + c.as_bytes()[0] as usize
718        }
719    }
720}
721
722/// Decode `(id, offset)` records from a bucket file. The format
723/// matches `PackObjectId::encode_tagged` followed by a u64 BE offset,
724/// repeated. Unrecognized tags or truncated trailers fail loudly —
725/// we wrote the bytes, so any corruption is a bug, not user input.
726fn decode_bucket_file(bytes: &[u8]) -> Result<Vec<(PackObjectId, u64)>> {
727    let mut out = Vec::new();
728    let mut pos = 0;
729    while pos < bytes.len() {
730        let (id, id_len) = PackObjectId::decode_tagged(&bytes[pos..])?;
731        pos += id_len;
732        if pos + 8 > bytes.len() {
733            return Err(StoreError::InvalidObject(
734                "streaming bucket entry truncated at offset".to_string(),
735            ));
736        }
737        let offset = u64::from_be_bytes(bytes[pos..pos + 8].try_into().map_err(|_| {
738            StoreError::InvalidObject("streaming bucket bad offset slice".to_string())
739        })?);
740        pos += 8;
741        out.push((id, offset));
742    }
743    Ok(out)
744}
745
746// ---------------------- Tests ----------------------
747
748#[cfg(test)]
749mod tests {
750    use std::io::Cursor;
751
752    use super::*;
753    use crate::{
754        object::ChangeId,
755        store::pack::{PackReader, PackStats},
756    };
757
758    fn deterministic_hash(seed: u8) -> ContentHash {
759        // Spread `seed` across the high byte so different seeds end up
760        // in different hash-prefix buckets. We don't actually want
761        // collisions in the tests that check distribution.
762        let mut bytes = [0u8; 32];
763        bytes[0] = seed;
764        for (i, b) in bytes.iter_mut().enumerate().skip(1) {
765            *b = seed.wrapping_mul(31).wrapping_add(i as u8);
766        }
767        ContentHash::from_bytes(bytes)
768    }
769
770    fn deterministic_change_id(seed: u8) -> ChangeId {
771        let mut bytes = [0u8; 16];
772        bytes[0] = seed;
773        for (i, b) in bytes.iter_mut().enumerate().skip(1) {
774            *b = seed.wrapping_add(i as u8 * 7);
775        }
776        ChangeId::from_bytes(bytes)
777    }
778
779    /// Test rig: returns the builder, the bucket dir (for cleanup
780    /// inspection), and the index path the builder will write at
781    /// finalize. The index path lives in the temp dir so it gets
782    /// auto-cleaned with `tmp`.
783    fn fresh_builder(
784        tmp: &tempfile::TempDir,
785    ) -> (StreamingPackBuilder<Cursor<Vec<u8>>>, PathBuf, PathBuf) {
786        let bucket_dir = tmp.path().join("buckets");
787        let index_path = tmp.path().join("test.idx");
788        let cursor = Cursor::new(Vec::<u8>::new());
789        let b = StreamingPackBuilder::new(
790            cursor,
791            index_path.clone(),
792            CompressionConfig::default(),
793            bucket_dir.clone(),
794        )
795        .unwrap();
796        (b, bucket_dir, index_path)
797    }
798
799    /// Finalize the builder and return `(pack_bytes, index_bytes, stats)`.
800    /// The index bytes are read back from the file the builder wrote
801    /// to — verifying that the streaming index path actually produced
802    /// readable bytes.
803    fn finalize_cursor(
804        b: StreamingPackBuilder<Cursor<Vec<u8>>>,
805        index_path: &std::path::Path,
806    ) -> (Vec<u8>, Vec<u8>, PackStats) {
807        let (cursor, stats) = b.finalize().unwrap();
808        let index_bytes = std::fs::read(index_path).unwrap();
809        (cursor.into_inner(), index_bytes, stats)
810    }
811
812    #[test]
813    fn empty_pack_finalizes_to_valid_zero_count_pack() {
814        let tmp = tempfile::TempDir::new().unwrap();
815        let (b, bucket_dir, idx_path) = fresh_builder(&tmp);
816        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
817
818        assert_eq!(stats.object_count, 0);
819        // PackReader can parse the empty pack and reports zero objects.
820        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
821        assert!(reader.list_ids().is_empty());
822        // Bucket dir was removed.
823        assert!(
824            !bucket_dir.exists(),
825            "bucket dir should be cleaned on successful finalize"
826        );
827    }
828
829    #[test]
830    fn single_blob_with_hash_id_round_trips() {
831        let tmp = tempfile::TempDir::new().unwrap();
832        let (mut b, _, idx_path) = fresh_builder(&tmp);
833        let hash = deterministic_hash(0x42);
834        let payload = b"hello, streaming pack".to_vec();
835        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
836        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
837
838        assert_eq!(stats.object_count, 1);
839        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
840        let id = PackObjectId::Hash(hash);
841        assert!(reader.has_object(&id));
842        let (got_type, got_data) = reader.get_object(&id).unwrap().unwrap();
843        assert_eq!(got_type, ObjectType::Blob);
844        assert_eq!(got_data, payload);
845    }
846
847    #[test]
848    fn single_state_with_change_id_round_trips() {
849        let tmp = tempfile::TempDir::new().unwrap();
850        let (mut b, _, idx_path) = fresh_builder(&tmp);
851        let cid = deterministic_change_id(0xa5);
852        let payload = b"serialized-state-bytes".to_vec();
853        b.add_id(
854            PackObjectId::ChangeId(cid),
855            ObjectType::State,
856            payload.clone(),
857        )
858        .unwrap();
859        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
860
861        assert_eq!(stats.object_count, 1);
862        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
863        let id = PackObjectId::ChangeId(cid);
864        let (ty, data) = reader.get_object(&id).unwrap().unwrap();
865        assert_eq!(ty, ObjectType::State);
866        assert_eq!(data, payload);
867    }
868
869    #[test]
870    fn mixed_hash_and_changeid_ids_all_retrievable() {
871        let tmp = tempfile::TempDir::new().unwrap();
872        let (mut b, _, idx_path) = fresh_builder(&tmp);
873        let blob_hash = deterministic_hash(0x10);
874        let tree_hash = deterministic_hash(0x20);
875        let state_cid = deterministic_change_id(0x80);
876
877        b.add(blob_hash, ObjectType::Blob, b"blob-bytes".to_vec())
878            .unwrap();
879        b.add(tree_hash, ObjectType::Tree, b"serialized-tree".to_vec())
880            .unwrap();
881        b.add_id(
882            PackObjectId::ChangeId(state_cid),
883            ObjectType::State,
884            b"serialized-state".to_vec(),
885        )
886        .unwrap();
887
888        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
889        assert_eq!(stats.object_count, 3);
890        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
891        assert_eq!(
892            reader
893                .get_object(&PackObjectId::Hash(blob_hash))
894                .unwrap()
895                .unwrap()
896                .1,
897            b"blob-bytes".to_vec()
898        );
899        assert_eq!(
900            reader
901                .get_object(&PackObjectId::Hash(tree_hash))
902                .unwrap()
903                .unwrap()
904                .1,
905            b"serialized-tree".to_vec()
906        );
907        assert_eq!(
908            reader
909                .get_object(&PackObjectId::ChangeId(state_cid))
910                .unwrap()
911                .unwrap()
912                .1,
913            b"serialized-state".to_vec()
914        );
915    }
916
917    #[test]
918    fn ten_thousand_objects_round_trip_correctly() {
919        // Stresses the bucket sort: 10K objects spread across
920        // 256 hash buckets averages 40 entries per bucket — well
921        // within in-memory sort capacity but covers every bucket.
922        let tmp = tempfile::TempDir::new().unwrap();
923        let (mut b, _, idx_path) = fresh_builder(&tmp);
924        let mut hashes = Vec::with_capacity(10_000);
925        for i in 0..10_000u32 {
926            // Use BLAKE3 over the index so first-byte distribution is
927            // pseudo-uniform across the 256 hash buckets.
928            let h = blake3::hash(&i.to_le_bytes());
929            let hash = ContentHash::from_bytes(*h.as_bytes());
930            hashes.push(hash);
931            b.add(hash, ObjectType::Blob, format!("payload-{i}").into_bytes())
932                .unwrap();
933        }
934        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
935        assert_eq!(stats.object_count, 10_000);
936
937        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
938        assert_eq!(reader.list_ids().len(), 10_000);
939        // Spot-check ten across the range.
940        for i in [0, 1, 99, 1234, 5_000, 9_999] {
941            let id = PackObjectId::Hash(hashes[i]);
942            let (_ty, data) = reader.get_object(&id).unwrap().unwrap();
943            assert_eq!(data, format!("payload-{i}").into_bytes());
944        }
945    }
946
947    #[test]
948    fn bucket_writers_are_lru_capped_below_fd_limit() {
949        let tmp = tempfile::TempDir::new().unwrap();
950        let (mut b, _bucket_dir, idx_path) = fresh_builder(&tmp);
951        let mut ids = Vec::new();
952
953        for i in 0..BUCKETS_PER_VARIANT {
954            let hash = deterministic_hash(i as u8);
955            ids.push(PackObjectId::Hash(hash));
956            b.add(hash, ObjectType::Blob, format!("hash-{i}").into_bytes())
957                .unwrap();
958            assert!(
959                b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
960                "open bucket writers should stay capped"
961            );
962        }
963
964        for i in 0..BUCKETS_PER_VARIANT {
965            let cid = deterministic_change_id(i as u8);
966            ids.push(PackObjectId::ChangeId(cid));
967            b.add_id(
968                PackObjectId::ChangeId(cid),
969                ObjectType::State,
970                format!("state-{i}").into_bytes(),
971            )
972            .unwrap();
973            assert!(
974                b.open_bucket_writers <= MAX_OPEN_BUCKET_WRITERS,
975                "open bucket writers should stay capped"
976            );
977        }
978
979        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
980        assert_eq!(stats.object_count, TOTAL_BUCKETS as u64);
981        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
982        for id in ids {
983            assert!(reader.has_object(&id), "missing id {id:?}");
984        }
985    }
986
987    #[test]
988    fn index_id_sort_order_matches_packbuilder_output() {
989        // PackBuilder groups objects by `ObjectType` before encoding,
990        // which changes the byte offsets relative to a streaming builder
991        // that writes in added-order. The bytes of the two indices
992        // therefore can't match exactly. What MUST match is the
993        // **sort order of ids** — both builders ultimately call
994        // `PackIndex::sort()` (or the bucket-equivalent), and any
995        // reader binary-searches against that order.
996        use crate::store::pack::PackBuilder;
997        let payloads: Vec<(PackObjectId, ObjectType, Vec<u8>)> = (0..200u32)
998            .map(|i| {
999                let h = blake3::hash(&i.to_le_bytes());
1000                (
1001                    PackObjectId::Hash(ContentHash::from_bytes(*h.as_bytes())),
1002                    if i % 3 == 0 {
1003                        ObjectType::Tree
1004                    } else {
1005                        ObjectType::Blob
1006                    },
1007                    format!("body-{i}").into_bytes(),
1008                )
1009            })
1010            .collect();
1011
1012        // Disable delta encoding so the classic builder produces a pack
1013        // shape comparable to the streaming one (which never deltas).
1014        let compression = CompressionConfig {
1015            max_delta_size: 0,
1016            ..CompressionConfig::default()
1017        };
1018        let mut classic = PackBuilder::new(compression);
1019        for (id, ty, data) in payloads.iter() {
1020            classic.add_id(*id, *ty, data.clone());
1021        }
1022        let (classic_pack, classic_index, _) = classic.build().unwrap();
1023        let classic_reader = PackReader::from_bytes(classic_pack, classic_index).unwrap();
1024
1025        let tmp = tempfile::TempDir::new().unwrap();
1026        let bucket_dir = tmp.path().join("buckets");
1027        let idx_path = tmp.path().join("test.idx");
1028        let cursor = Cursor::new(Vec::<u8>::new());
1029        let mut streaming =
1030            StreamingPackBuilder::new(cursor, idx_path.clone(), compression, bucket_dir).unwrap();
1031        for (id, ty, data) in payloads.iter() {
1032            streaming.add_id(*id, *ty, data.clone()).unwrap();
1033        }
1034        let (streaming_pack, streaming_index, _) = finalize_cursor(streaming, &idx_path);
1035        let streaming_reader = PackReader::from_bytes(streaming_pack, streaming_index).unwrap();
1036
1037        // Same set of ids in the same sorted order — that's the
1038        // contract for binary search to work.
1039        assert_eq!(
1040            streaming_reader.list_ids(),
1041            classic_reader.list_ids(),
1042            "streaming and classic indices should report the same id sequence"
1043        );
1044        // Spot-check that each id resolves to a payload that matches
1045        // the classic builder's output (equal bytes after decompression).
1046        for (id, _ty, want) in payloads.iter().take(10).chain(payloads.iter().skip(190)) {
1047            let (_, got) = streaming_reader.get_object(id).unwrap().unwrap();
1048            assert_eq!(&got, want);
1049            let (_, classic_got) = classic_reader.get_object(id).unwrap().unwrap();
1050            assert_eq!(got, classic_got);
1051        }
1052    }
1053
1054    #[test]
1055    fn corrupted_pack_fails_checksum_verification() {
1056        let tmp = tempfile::TempDir::new().unwrap();
1057        let (mut b, _, idx_path) = fresh_builder(&tmp);
1058        b.add(
1059            deterministic_hash(0x01),
1060            ObjectType::Blob,
1061            b"some bytes".to_vec(),
1062        )
1063        .unwrap();
1064        let (mut pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1065        // Flip one byte in the body. The trailer checksum must reject.
1066        let body_byte = 18; // past the 16-byte header
1067        pack_data[body_byte] ^= 0xff;
1068        let result = PackReader::from_bytes(pack_data, index_data);
1069        assert!(
1070            result.is_err(),
1071            "PackReader should reject pack with mutated body"
1072        );
1073    }
1074
1075    #[test]
1076    fn pack_count_in_header_matches_index_entry_count() {
1077        let tmp = tempfile::TempDir::new().unwrap();
1078        let (mut b, _, idx_path) = fresh_builder(&tmp);
1079        for i in 0..7u8 {
1080            b.add(
1081                deterministic_hash(i),
1082                ObjectType::Blob,
1083                format!("p{i}").into_bytes(),
1084            )
1085            .unwrap();
1086        }
1087        let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1088        // Header count is bytes 8..16 (big-endian).
1089        let count = u64::from_be_bytes(pack_data[8..16].try_into().unwrap());
1090        assert_eq!(count, 7);
1091        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1092        assert_eq!(reader.list_ids().len(), 7);
1093    }
1094
1095    #[test]
1096    fn declared_pack_count_is_written_before_finalize() {
1097        let tmp = tempfile::TempDir::new().unwrap();
1098        let bucket_dir = tmp.path().join("buckets");
1099        let idx_path = tmp.path().join("test.idx");
1100        let cursor = Cursor::new(Vec::<u8>::new());
1101        let mut b = StreamingPackBuilder::new_with_object_count(
1102            cursor,
1103            idx_path.clone(),
1104            CompressionConfig::default(),
1105            bucket_dir,
1106            2,
1107        )
1108        .unwrap();
1109
1110        b.flush_pack().unwrap();
1111        let initial = b.pack_writer.as_ref().unwrap().get_ref().get_ref().clone();
1112        assert_eq!(u64::from_be_bytes(initial[8..16].try_into().unwrap()), 2);
1113
1114        let hash = deterministic_hash(0x40);
1115        b.add(hash, ObjectType::Blob, b"known-count-entry".to_vec())
1116            .unwrap();
1117        b.flush_pack().unwrap();
1118        let after_add = b.pack_writer.as_ref().unwrap().get_ref().get_ref().clone();
1119        assert_eq!(u64::from_be_bytes(after_add[8..16].try_into().unwrap()), 2);
1120
1121        let second_hash = deterministic_hash(0x41);
1122        b.add(second_hash, ObjectType::Blob, b"second-entry".to_vec())
1123            .unwrap();
1124        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
1125
1126        assert_eq!(stats.object_count, 2);
1127        assert_eq!(u64::from_be_bytes(pack_data[8..16].try_into().unwrap()), 2);
1128        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1129        assert!(reader.has_object(&PackObjectId::Hash(hash)));
1130        assert!(reader.has_object(&PackObjectId::Hash(second_hash)));
1131    }
1132
1133    #[test]
1134    fn declared_pack_count_mismatch_fails_finalize() {
1135        let tmp = tempfile::TempDir::new().unwrap();
1136        let bucket_dir = tmp.path().join("buckets");
1137        let idx_path = tmp.path().join("test.idx");
1138        let cursor = Cursor::new(Vec::<u8>::new());
1139        let mut b = StreamingPackBuilder::new_with_object_count(
1140            cursor,
1141            idx_path,
1142            CompressionConfig::default(),
1143            bucket_dir,
1144            2,
1145        )
1146        .unwrap();
1147
1148        b.add(
1149            deterministic_hash(0x50),
1150            ObjectType::Blob,
1151            b"only-entry".to_vec(),
1152        )
1153        .unwrap();
1154        let error = b.finalize().unwrap_err();
1155
1156        assert!(
1157            error
1158                .to_string()
1159                .contains("streaming pack declared 2 object(s) but added 1")
1160        );
1161    }
1162
1163    #[test]
1164    fn bucket_files_are_cleaned_on_successful_finalize() {
1165        let tmp = tempfile::TempDir::new().unwrap();
1166        let bucket_dir = tmp.path().join("buckets");
1167        let idx_path = tmp.path().join("test.idx");
1168        let cursor = Cursor::new(Vec::<u8>::new());
1169        let mut b = StreamingPackBuilder::new(
1170            cursor,
1171            idx_path.clone(),
1172            CompressionConfig::default(),
1173            bucket_dir.clone(),
1174        )
1175        .unwrap();
1176        for i in 0..50u8 {
1177            b.add(deterministic_hash(i), ObjectType::Blob, vec![i; 32])
1178                .unwrap();
1179        }
1180        // Buckets exist and contain data.
1181        assert!(bucket_dir.exists());
1182        let bucket_count = std::fs::read_dir(&bucket_dir).unwrap().count();
1183        assert!(bucket_count > 0, "bucket dir should hold some files");
1184        let _ = finalize_cursor(b, &idx_path);
1185        assert!(
1186            !bucket_dir.exists(),
1187            "bucket dir should be removed on finalize"
1188        );
1189    }
1190
1191    #[test]
1192    fn bucket_files_are_cleaned_on_drop_without_finalize() {
1193        let tmp = tempfile::TempDir::new().unwrap();
1194        let bucket_dir = tmp.path().join("buckets");
1195        let idx_path = tmp.path().join("test.idx");
1196        {
1197            let cursor = Cursor::new(Vec::<u8>::new());
1198            let mut b = StreamingPackBuilder::new(
1199                cursor,
1200                idx_path.clone(),
1201                CompressionConfig::default(),
1202                bucket_dir.clone(),
1203            )
1204            .unwrap();
1205            for i in 0..10u8 {
1206                b.add(deterministic_hash(i), ObjectType::Blob, vec![0; 32])
1207                    .unwrap();
1208            }
1209            assert!(bucket_dir.exists());
1210            // Drop without finalize — Drop impl should clean up.
1211        }
1212        assert!(
1213            !idx_path.exists(),
1214            "no index file should have been created without finalize"
1215        );
1216        assert!(
1217            !bucket_dir.exists(),
1218            "bucket dir should be removed on Drop when finalize never ran"
1219        );
1220    }
1221
1222    #[test]
1223    fn large_blob_streams_to_disk_without_double_buffering() {
1224        // 4 MiB blob — well under the actual streaming target but big
1225        // enough to confirm we're not buffering the entire pack body in
1226        // RAM. The pack data on disk should be at least 4 MiB; the
1227        // builder's in-memory state is per-object only.
1228        let tmp = tempfile::TempDir::new().unwrap();
1229        let bucket_dir = tmp.path().join("buckets");
1230        let pack_path = tmp.path().join("pack.dat");
1231        let idx_path = tmp.path().join("pack.idx");
1232        let file = std::fs::OpenOptions::new()
1233            .read(true)
1234            .write(true)
1235            .create(true)
1236            .truncate(true)
1237            .open(&pack_path)
1238            .unwrap();
1239        let mut b = StreamingPackBuilder::new(
1240            file,
1241            idx_path.clone(),
1242            CompressionConfig::default(),
1243            bucket_dir,
1244        )
1245        .unwrap();
1246        let payload: Vec<u8> = (0..4 * 1024 * 1024u32).map(|i| (i & 0xff) as u8).collect();
1247        let hash = deterministic_hash(0xff);
1248        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1249        let (_, stats) = b.finalize().unwrap();
1250        let index_data = std::fs::read(&idx_path).unwrap();
1251        assert_eq!(stats.object_count, 1);
1252        let pack_bytes = std::fs::read(&pack_path).unwrap();
1253        // Pack on disk holds the whole compressed payload + headers
1254        // + trailer. Confirm it round-trips.
1255        let reader = PackReader::from_bytes(pack_bytes, index_data).unwrap();
1256        let (_ty, got) = reader
1257            .get_object(&PackObjectId::Hash(hash))
1258            .unwrap()
1259            .unwrap();
1260        assert_eq!(got, payload);
1261    }
1262
1263    #[test]
1264    fn bucket_distribution_for_random_hashes_is_roughly_uniform() {
1265        // Confirms our sort-time peak memory bound. We accumulate
1266        // 1024 random hashes through a builder and check that no
1267        // single bucket holds more than ~3× the average. (BLAKE3 hash
1268        // first-byte distribution is uniform; this is mostly a
1269        // sanity check that we route to the right bucket and aren't
1270        // accidentally collapsing.)
1271        let tmp = tempfile::TempDir::new().unwrap();
1272        let bucket_dir = tmp.path().join("buckets");
1273        let idx_path = tmp.path().join("test.idx");
1274        let cursor = Cursor::new(Vec::<u8>::new());
1275        let mut b = StreamingPackBuilder::new(
1276            cursor,
1277            idx_path.clone(),
1278            CompressionConfig::default(),
1279            bucket_dir.clone(),
1280        )
1281        .unwrap();
1282        for i in 0..1024u32 {
1283            let h = blake3::hash(&i.to_le_bytes());
1284            let hash = ContentHash::from_bytes(*h.as_bytes());
1285            b.add(hash, ObjectType::Blob, b"x".to_vec()).unwrap();
1286        }
1287        // Inspect bucket file sizes BEFORE finalize (which deletes them).
1288        b.pack_writer.as_mut().unwrap().flush().unwrap();
1289        let mut max_entries = 0usize;
1290        let entry_size = 33 + 8; // tagged-hash + u64 offset
1291        for path in b.bucket_paths.iter() {
1292            if path.exists() {
1293                let size = std::fs::metadata(path).unwrap().len() as usize;
1294                let entries = size / entry_size;
1295                if entries > max_entries {
1296                    max_entries = entries;
1297                }
1298            }
1299        }
1300        // Average is 1024 / 256 = 4 entries per bucket. Allow up to 16
1301        // (4× average) — uniformity isn't perfect on small samples.
1302        assert!(
1303            max_entries <= 16,
1304            "max bucket has {max_entries} entries; uniform expected ~4"
1305        );
1306        let _ = finalize_cursor(b, &idx_path);
1307    }
1308
1309    #[test]
1310    fn finalize_returns_correct_stats() {
1311        let tmp = tempfile::TempDir::new().unwrap();
1312        let (mut b, _, idx_path) = fresh_builder(&tmp);
1313        let payload = vec![0xabu8; 1024];
1314        for i in 0..5u8 {
1315            b.add(deterministic_hash(i), ObjectType::Blob, payload.clone())
1316                .unwrap();
1317        }
1318        let (_, _, stats) = finalize_cursor(b, &idx_path);
1319        assert_eq!(stats.object_count, 5);
1320        assert_eq!(stats.total_uncompressed, 5 * 1024);
1321        assert!(stats.total_compressed > 0);
1322        assert!(stats.compression_ratio > 0.0);
1323        assert_eq!(stats.delta_count, 0, "streaming builder never deltas");
1324    }
1325
1326    #[cfg(feature = "zstd")]
1327    #[test]
1328    fn streaming_compression_roundtrips_through_zstd_frame() {
1329        // Force the streaming path with a payload that compresses
1330        // well (long runs of identical bytes). Verifies:
1331        //  1. Streaming output decodes back to the original bytes.
1332        //  2. The compressed body is genuinely smaller than the
1333        //     uncompressed input (proving zstd ran), and
1334        //  3. The non-canonical 10-byte varint patched into the
1335        //     compressed_size slot decodes to the right value.
1336        let tmp = tempfile::TempDir::new().unwrap();
1337        let (mut b, _, idx_path) = fresh_builder(&tmp);
1338        // 64 KiB of zeros — compresses to a tiny zstd frame, well
1339        // above the default `min_size` so we hit the streaming branch.
1340        let payload = vec![0u8; 64 * 1024];
1341        let hash = deterministic_hash(0x77);
1342        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1343        let (pack_data, index_data, stats) = finalize_cursor(b, &idx_path);
1344        assert!(
1345            stats.total_compressed < stats.total_uncompressed,
1346            "expected compression ratio < 1.0, got {}/{}",
1347            stats.total_compressed,
1348            stats.total_uncompressed
1349        );
1350        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1351        let (_ty, got) = reader
1352            .get_object(&PackObjectId::Hash(hash))
1353            .unwrap()
1354            .unwrap();
1355        assert_eq!(got, payload);
1356    }
1357
1358    #[cfg(feature = "zstd")]
1359    #[test]
1360    fn padded_varint_decodes_to_original_value_for_canonical_decoder() {
1361        // Sanity for the seek-back scheme: for every value we'd want
1362        // to encode (small, mid, large), confirm the existing
1363        // `decode_varint` returns the same `value` from a 10-byte
1364        // padded encoding. If this ever fails the streaming path's
1365        // patched compressed_size would be misread by readers.
1366        let cases: &[u64] = &[0, 1, 127, 128, 4096, 1_000_000, 1_000_000_000_000, u64::MAX];
1367        for &value in cases {
1368            let mut buf = [0u8; 10];
1369            super::encode_varint_padded_to_10(value, &mut buf);
1370            let (decoded, consumed) = super::super::varint::decode_varint(&buf)
1371                .expect("padded varint should always decode");
1372            assert_eq!(decoded, value, "varint roundtrip failed for {value}");
1373            assert_eq!(
1374                consumed, 10,
1375                "padded encoding should consume all 10 bytes for {value}"
1376            );
1377        }
1378    }
1379
1380    #[cfg(feature = "zstd")]
1381    #[test]
1382    fn streaming_path_does_not_buffer_compressed_payload_in_memory() {
1383        // Smoke check: write a single 8 MiB payload, observe the
1384        // pack file size on disk during/after the add. The pack file
1385        // grows incrementally during the streaming compression — if
1386        // we were buffering an intermediate compressed `Vec<u8>` the
1387        // on-disk size would jump by ~8 MiB at finalize, not stay
1388        // bounded as the encoder pumps bytes through.
1389        //
1390        // We can't easily measure peak heap from inside Rust without
1391        // a custom allocator. What we *can* verify is that calling
1392        // `add` returns control with the pack file already at its
1393        // final body size, demonstrating the encoder wrote through
1394        // and didn't accumulate.
1395        let tmp = tempfile::TempDir::new().unwrap();
1396        let bucket_dir = tmp.path().join("buckets");
1397        let pack_path = tmp.path().join("pack.dat");
1398        let idx_path = tmp.path().join("pack.idx");
1399        let file = std::fs::OpenOptions::new()
1400            .read(true)
1401            .write(true)
1402            .create(true)
1403            .truncate(true)
1404            .open(&pack_path)
1405            .unwrap();
1406        let mut b = StreamingPackBuilder::new(
1407            file,
1408            idx_path.clone(),
1409            CompressionConfig::default(),
1410            bucket_dir,
1411        )
1412        .unwrap();
1413        let payload = vec![0xa5u8; 8 * 1024 * 1024];
1414        let hash = deterministic_hash(0x66);
1415        b.add(hash, ObjectType::Blob, payload.clone()).unwrap();
1416        // Pack file already on disk holds at least the entry header +
1417        // compressed payload (excluding the 32-byte trailer the builder
1418        // appends at finalize).
1419        let mid_size = std::fs::metadata(&pack_path).unwrap().len();
1420        assert!(
1421            mid_size > 16 + 40,
1422            "pack file should hold real entry data after add; size={mid_size}"
1423        );
1424        let (_, _) = b.finalize().unwrap();
1425        let pack_bytes = std::fs::read(&pack_path).unwrap();
1426        let index_bytes = std::fs::read(&idx_path).unwrap();
1427        let reader = PackReader::from_bytes(pack_bytes, index_bytes).unwrap();
1428        let (_ty, got) = reader
1429            .get_object(&PackObjectId::Hash(hash))
1430            .unwrap()
1431            .unwrap();
1432        assert_eq!(got, payload);
1433    }
1434
1435    #[test]
1436    fn list_ids_returns_all_added_ids_sorted() {
1437        let tmp = tempfile::TempDir::new().unwrap();
1438        let (mut b, _, idx_path) = fresh_builder(&tmp);
1439        let mut added: Vec<PackObjectId> = Vec::new();
1440        // Mix of Hash and ChangeId in a non-sorted order on input.
1441        for seed in [0x05u8, 0xa0, 0x12, 0x9f, 0x33] {
1442            let id = PackObjectId::Hash(deterministic_hash(seed));
1443            b.add_id(id, ObjectType::Blob, vec![seed; 4]).unwrap();
1444            added.push(id);
1445        }
1446        for seed in [0x80u8, 0x10, 0xff] {
1447            let id = PackObjectId::ChangeId(deterministic_change_id(seed));
1448            b.add_id(id, ObjectType::State, vec![seed; 4]).unwrap();
1449            added.push(id);
1450        }
1451        let (pack_data, index_data, _) = finalize_cursor(b, &idx_path);
1452        let reader = PackReader::from_bytes(pack_data, index_data).unwrap();
1453        let mut got = reader.list_ids();
1454        // PackReader's list_ids returns index order — should already be
1455        // sorted because we sort on finalize.
1456        let mut sorted = got.clone();
1457        sorted.sort();
1458        assert_eq!(got, sorted, "list_ids must come back sorted");
1459        // And every added id should appear.
1460        added.sort();
1461        got.sort();
1462        assert_eq!(got, added);
1463    }
1464}