Skip to main content

snapdir_stores/
pack.rs

1//! SNAPPACK 1 — the snapdir pack wire format.
2//!
3//! A pack is a single self-verifying byte stream that carries raw
4//! content-addressed objects (and at most one manifest) between two snapdir
5//! processes, e.g. `snapdir send-pack | ssh host 'snapdir receive-pack'` — the
6//! acceleration path of the upcoming `ssh://` store. Both ends of the pipe are
7//! snapdir itself, so the format is deliberately minimal (no tar semantics, no
8//! entry names, no padding).
9//!
10//! # Grammar (normative)
11//!
12//! ```text
13//! stream   := "SNAPPACK 1\n" record* "end\n"
14//! record   := "obj " hex64 " " len "\n" payload(len)
15//!           | "manifest " hex64 " " len "\n" payload(len)   ; at most one; must be the LAST record
16//! hex64    := 64 lowercase hex chars, regex ^[0-9a-f]{64}$ (validated on read AND write)
17//! len      := decimal u64
18//! payload  := exactly len raw bytes, no padding/terminator
19//! ```
20//!
21//! # Invariants
22//!
23//! - **Header memory bound:** every header line (including its terminating
24//!   `\n`) is at most [`MAX_HEADER_BYTES`] bytes. The reader rejects a longer
25//!   line as soon as the cap is hit, without buffering more.
26//! - **Verify-before-file:** an `obj` payload streams through an INCREMENTAL
27//!   BLAKE3 hasher while it is staged; it is committed at its claimed
28//!   content-address only if the computed hash equals the claimed `hex64`. A
29//!   mismatch removes the staged bytes (temp file) and aborts the WHOLE stream
30//!   with [`StoreError::Integrity`] — a corrupt stream taints everything after
31//!   it, so nothing past the bad record is trusted.
32//! - **Manifest-last / commit-at-`end`:** the optional `manifest` record must
33//!   be the last record (any record after it is rejected), its payload is
34//!   buffered (capped at [`MAX_MANIFEST_BYTES`]), and it is committed to the
35//!   sink only after the `end` trailer has been read. EOF before `end` is a
36//!   hard error and the manifest is NEVER committed — so a truncated stream or
37//!   dropped connection can file (verified) objects but can never make the
38//!   snapshot observable, preserving the store-wide manifest-last invariant.
39//! - **Idempotent duplicates:** a duplicate `obj` record is skipped
40//!   (write-once), but its bytes are still read and hash-verified — the stream
41//!   cannot seek, and a hash mismatch on ANY record (present or not) aborts.
42//! - **No path input:** the on-disk location of every payload is derived
43//!   exclusively from the validated claimed checksum
44//!   ([`snapdir_core::store::object_path`] /
45//!   [`snapdir_core::store::manifest_path`]); there is no entry-name concept,
46//!   so the path-traversal class is structurally absent.
47//!
48//! # Memory profile
49//!
50//! [`read_pack`] into a [`FileSink`] is O(1) memory per record regardless of
51//! object size: payload bytes stream through a fixed-size buffer into a temp
52//! sibling of the final object path (the same temp+atomic-rename discipline as
53//! `file_store.rs`) while the incremental hasher runs. The generic
54//! [`StreamSink`] buffers ONE object record at a time (its
55//! [`StreamStore::put_object`] primitive takes whole buffers); the manifest
56//! record is always buffered, capped at [`MAX_MANIFEST_BYTES`].
57//!
58//! [`write_pack`] reads one object at a time via
59//! [`StreamStore::get_object`] (one whole object buffered at a time; the
60//! send-pack CLI layers any further streaming on top in a later gate).
61
62use std::fs;
63use std::io::{self, BufRead, BufReader, Read, Write};
64use std::path::{Path, PathBuf};
65
66use snapdir_core::manifest::Manifest;
67use snapdir_core::merkle::{snapshot_id, Blake3Hasher, Hasher};
68use snapdir_core::store::{manifest_path, object_path, StoreError};
69
70use crate::file_store::FileStore;
71use crate::stream::StreamStore;
72
73/// The pack wire-format version this build speaks. Single source of truth for
74/// the wire: the capability line (`snapdir version --capabilities`) bakes this
75/// value, and [`read_pack`] negotiates on an exact integer match only.
76pub const WIRE_VERSION: u32 = 1;
77
78/// The plumbing capabilities this build advertises alongside [`WIRE_VERSION`].
79pub const WIRE_CAPS: &[&str] = &["objects-needed", "send-pack", "receive-pack"];
80
81/// The exact magic line that opens every pack stream (version baked in; a
82/// unit test pins it to [`WIRE_VERSION`]).
83pub const WIRE_MAGIC: &str = "SNAPPACK 1\n";
84
85/// Hard cap on a header line, INCLUDING its terminating `\n`. The reader
86/// rejects a longer line the moment the cap is reached — this bounds reader
87/// memory before any validation happens. (The longest valid header —
88/// `manifest <hex64> <u64::MAX>\n` — is 95 bytes, so the cap is comfortable.)
89pub const MAX_HEADER_BYTES: usize = 128;
90
91/// Hard cap on a `manifest` record's payload, which (unlike `obj` payloads)
92/// is buffered in memory until the `end` trailer commits it.
93pub const MAX_MANIFEST_BYTES: u64 = 64 * 1024 * 1024;
94
95/// Cap on the up-front `Vec` preallocation for a buffered payload, so a
96/// header that LIES about a huge `len` (while sending few bytes) cannot force
97/// a giant allocation before a single payload byte arrives. The buffer still
98/// grows organically with the bytes actually received.
99const STAGE_PREALLOC_CAP: u64 = 8 * 1024 * 1024;
100
101/// Returns `true` when `s` is a syntactically valid snapdir content address:
102/// exactly 64 lowercase hex characters (`^[0-9a-f]{64}$`).
103///
104/// This is the wire's single checksum validator — used by [`write_pack`]
105/// (before emitting a record), [`read_pack`] (on every record header), and
106/// [`StreamStore::objects_needed`] (fail-closed input validation).
107#[must_use]
108pub fn is_hex64(s: &str) -> bool {
109    s.len() == 64 && s.bytes().all(|b| matches!(b, b'0'..=b'9' | b'a'..=b'f'))
110}
111
112/// What [`write_pack`] emitted.
113#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
114pub struct PackWriteReport {
115    /// Number of `obj` records emitted (duplicates in `ids` emit duplicate
116    /// records — deduplication is the caller's job).
117    pub objects_written: u64,
118    /// Whether a `manifest` record was emitted (i.e. `manifest_id` was given).
119    pub manifest_written: bool,
120}
121
122/// What [`read_pack`] filed into its sink.
123#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
124pub struct PackReadReport {
125    /// Objects verified and newly committed to the sink.
126    pub objects_written: u64,
127    /// Objects whose bytes were read and hash-verified but NOT rewritten
128    /// because the sink already had them (idempotent duplicates).
129    pub objects_skipped: u64,
130    /// Whether a manifest was committed (only ever `true` after a verified
131    /// `manifest` record AND the `end` trailer).
132    pub manifest_committed: bool,
133}
134
135/// Where [`read_pack`] files verified records.
136///
137/// The reader owns the framing, the incremental hashing, and the
138/// verify-before-commit decision; a sink only stages bytes and
139/// commits/aborts on the reader's instruction:
140///
141/// 1. [`has_object`](Self::has_object) — skip-but-verify gate for duplicates.
142/// 2. [`stage_object`](Self::stage_object) — pull the (length-limited) payload
143///    into staging (temp file / memory buffer). Called only for absent
144///    objects; the reader hashes every byte the sink pulls.
145/// 3. [`commit_object`](Self::commit_object) on hash match, or
146///    [`abort_object`](Self::abort_object) on mismatch/truncation/error —
147///    abort must leave NOTHING behind (no temp files, no partial objects).
148/// 4. [`put_manifest`](Self::put_manifest) — called only after the `end`
149///    trailer, with an id the reader has already verified.
150pub trait PackSink {
151    /// Returns `true` when the sink already holds `checksum` (the record's
152    /// bytes will then be verified-and-discarded rather than re-written).
153    fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError>;
154
155    /// Stages the payload for `checksum` by reading `payload` to EOF (the
156    /// reader has already limited it to exactly `len` bytes). Must not make
157    /// the object observable at its final address yet. On error the sink must
158    /// clean up after itself or tolerate the follow-up
159    /// [`abort_object`](Self::abort_object) call.
160    fn stage_object(
161        &mut self,
162        checksum: &str,
163        len: u64,
164        payload: &mut dyn Read,
165    ) -> Result<(), StoreError>;
166
167    /// Commits the staged payload at its (reader-verified) content-address.
168    fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError>;
169
170    /// Discards any staged payload for `checksum`, leaving no trace (best
171    /// effort; must tolerate nothing being staged).
172    fn abort_object(&mut self, checksum: &str);
173
174    /// Commits the manifest under `id`. Called only after the `end` trailer of
175    /// a fully verified stream (manifest-last survives truncation).
176    fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError>;
177}
178
179/// Generic [`PackSink`] over any [`StreamStore`]: buffers one `obj` payload at
180/// a time in memory, then files it via the store's verify-before-write
181/// [`put_object`](StreamStore::put_object) (so the store's own integrity
182/// discipline re-checks the commit). Use [`FileSink`] for `file://`-rooted
183/// sinks to get O(1) memory per record.
184pub struct StreamSink<'a> {
185    store: &'a dyn StreamStore,
186    staged: Option<(String, Vec<u8>)>,
187}
188
189impl<'a> StreamSink<'a> {
190    /// Wraps `store` as a pack sink.
191    #[must_use]
192    pub fn new(store: &'a dyn StreamStore) -> Self {
193        Self {
194            store,
195            staged: None,
196        }
197    }
198}
199
200impl PackSink for StreamSink<'_> {
201    fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError> {
202        self.store.has_object(checksum)
203    }
204
205    fn stage_object(
206        &mut self,
207        checksum: &str,
208        len: u64,
209        payload: &mut dyn Read,
210    ) -> Result<(), StoreError> {
211        // Defensive: a stage while something else is staged means the reader
212        // sequencing was violated; drop the stale staging rather than leak it.
213        self.staged = None;
214        // Preallocate at most STAGE_PRELLOC_CAP so a lying `len` cannot force a
215        // huge allocation; the buffer grows with the bytes actually received
216        // (which the reader caps at the true `len`).
217        let prealloc = usize::try_from(len.min(STAGE_PREALLOC_CAP)).unwrap_or(0);
218        let mut buf = Vec::with_capacity(prealloc);
219        payload.read_to_end(&mut buf)?;
220        self.staged = Some((checksum.to_owned(), buf));
221        Ok(())
222    }
223
224    fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError> {
225        match self.staged.take() {
226            Some((staged_checksum, bytes)) if staged_checksum == checksum => {
227                // `put_object` re-verifies bytes-hash-to-address before writing
228                // (the store's own verify-before-write discipline).
229                self.store.put_object(checksum, bytes)
230            }
231            _ => Err(protocol(format!(
232                "internal pack sink error: commit of {checksum} without a matching staged payload"
233            ))),
234        }
235    }
236
237    fn abort_object(&mut self, _checksum: &str) {
238        self.staged = None;
239    }
240
241    fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
242        self.store.put_manifest(id, manifest)
243    }
244}
245
246/// File-backed [`PackSink`] over a [`FileStore`]: `obj` payloads stream
247/// through a fixed-size buffer straight into a unique temp sibling of the
248/// final object path, then an atomic rename commits on hash match — O(1)
249/// memory per record regardless of object size.
250///
251/// This mirrors `file_store.rs`'s private `temp_sibling`/persist discipline
252/// (temp file in the SAME directory so the rename is an atomic,
253/// same-filesystem move; a partially-written object is never visible at its
254/// content-address; a failed record removes its temp file).
255pub struct FileSink<'a> {
256    store: &'a FileStore,
257    staged: Option<StagedFile>,
258}
259
260/// A staged-but-uncommitted object payload on disk.
261struct StagedFile {
262    checksum: String,
263    tmp: PathBuf,
264    target: PathBuf,
265}
266
267impl<'a> FileSink<'a> {
268    /// Wraps `store` as a streaming, file-backed pack sink.
269    #[must_use]
270    pub fn new(store: &'a FileStore) -> Self {
271        Self {
272            store,
273            staged: None,
274        }
275    }
276}
277
278impl Drop for FileSink<'_> {
279    /// Last-resort cleanup: never leave a stray temp file in `.objects/` even
280    /// if the reader bails between stage and commit/abort.
281    fn drop(&mut self) {
282        if let Some(staged) = self.staged.take() {
283            let _ = fs::remove_file(&staged.tmp);
284        }
285    }
286}
287
288impl PackSink for FileSink<'_> {
289    fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError> {
290        StreamStore::has_object(self.store, checksum)
291    }
292
293    fn stage_object(
294        &mut self,
295        checksum: &str,
296        _len: u64,
297        payload: &mut dyn Read,
298    ) -> Result<(), StoreError> {
299        // Defensive: drop (and remove) any stale staging first.
300        self.abort_object(checksum);
301
302        // The on-disk location derives EXCLUSIVELY from the validated claimed
303        // checksum — never from any stream-supplied name.
304        let target = self.store.root().join(object_path(checksum));
305        if let Some(parent) = target.parent() {
306            fs::create_dir_all(parent)?;
307        }
308        let tmp = temp_sibling(&target);
309        let mut file = fs::File::create(&tmp)?;
310        // `io::copy` streams through a fixed-size buffer (O(1) memory); the
311        // reader-side incremental hasher sees every byte we pull here.
312        let copied = io::copy(payload, &mut file);
313        drop(file);
314        if let Err(err) = copied {
315            // Failed mid-write: remove the temp file, leave nothing behind.
316            let _ = fs::remove_file(&tmp);
317            return Err(err.into());
318        }
319        self.staged = Some(StagedFile {
320            checksum: checksum.to_owned(),
321            tmp,
322            target,
323        });
324        Ok(())
325    }
326
327    fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError> {
328        match self.staged.take() {
329            Some(staged) if staged.checksum == checksum => {
330                // Atomic rename into the final content-addressed location; the
331                // reader has already verified the streamed bytes hash to
332                // `checksum`, so this is the rename-on-match step.
333                fs::rename(&staged.tmp, &staged.target)?;
334                Ok(())
335            }
336            other => {
337                if let Some(staged) = other {
338                    let _ = fs::remove_file(&staged.tmp);
339                }
340                Err(protocol(format!(
341                    "internal pack sink error: commit of {checksum} without a matching staged payload"
342                )))
343            }
344        }
345    }
346
347    fn abort_object(&mut self, _checksum: &str) {
348        if let Some(staged) = self.staged.take() {
349            let _ = fs::remove_file(&staged.tmp);
350        }
351    }
352
353    fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
354        // FileStore::put_manifest re-verifies snapshot_id(manifest) == id and
355        // writes via its own temp+atomic-rename path.
356        self.store.put_manifest(id, manifest)
357    }
358}
359
360/// Emits a SNAPPACK 1 stream: magic, one `obj` record per entry of `ids` IN
361/// INPUT ORDER, then (if `manifest_id` is given) the `manifest` record LAST,
362/// then the `end` trailer.
363///
364/// Fail-closed discipline:
365///
366/// - Every id (and `manifest_id`) is validated against `^[0-9a-f]{64}$` BEFORE
367///   any byte is written.
368/// - The manifest is fetched and serialized up front (fail fast) but emitted
369///   last; its serialized bytes (`Manifest` text + trailing `\n`, exactly the
370///   stored byte form `file_store.rs` writes) must hash back to `manifest_id`.
371/// - Each object's bytes are re-verified to hash to its id after
372///   [`get_object`](StreamStore::get_object) (belt and braces over the store's
373///   own read verification) before its record is written.
374/// - Any failure — including a missing object — aborts BEFORE the `end`
375///   trailer is emitted, so a consumer of the partial stream also fails
376///   (no silent partial transfer).
377///
378/// Duplicates in `ids` emit duplicate records (the reader handles them
379/// idempotently); deduplication is the caller's job.
380pub fn write_pack(
381    source: &dyn StreamStore,
382    ids: &[String],
383    manifest_id: Option<&str>,
384    mut out: impl Write,
385) -> Result<PackWriteReport, StoreError> {
386    // Validate EVERY checksum before emitting anything (fail closed).
387    for id in ids {
388        if !is_hex64(id) {
389            return Err(protocol(format!(
390                "invalid object checksum {id:?}: expected 64 lowercase hex characters"
391            )));
392        }
393    }
394    if let Some(id) = manifest_id {
395        if !is_hex64(id) {
396            return Err(protocol(format!(
397                "invalid manifest id {id:?}: expected 64 lowercase hex characters"
398            )));
399        }
400    }
401
402    let hasher = Blake3Hasher::new();
403
404    // Fetch + serialize the manifest UP FRONT (fail fast before streaming
405    // megabytes of objects), but emit it LAST. Serialization matches
406    // `file_store.rs::write_manifest` exactly — `to_string()` + trailing `\n`,
407    // the byte form `snapshot_id` hashes — so ids round-trip.
408    let manifest_payload: Option<(&str, Vec<u8>)> = match manifest_id {
409        Some(id) => {
410            let manifest = source.get_manifest(id)?;
411            let mut text = manifest.to_string();
412            text.push('\n');
413            let bytes = text.into_bytes();
414            let actual = hasher.hash_hex(&bytes);
415            if actual != id {
416                return Err(StoreError::Integrity {
417                    address: manifest_path(id),
418                    expected: id.to_owned(),
419                    actual,
420                });
421            }
422            Some((id, bytes))
423        }
424        None => None,
425    };
426
427    out.write_all(WIRE_MAGIC.as_bytes())?;
428    let mut report = PackWriteReport::default();
429
430    for id in ids {
431        // `get_object` already verifies the read; re-verify here anyway so a
432        // non-verifying StreamStore impl can never push corrupt bytes onto the
433        // wire under a clean address (fail closed).
434        let bytes = source.get_object(id)?;
435        let actual = hasher.hash_hex(&bytes);
436        if actual != *id {
437            return Err(StoreError::Integrity {
438                address: object_path(id),
439                expected: id.clone(),
440                actual,
441            });
442        }
443        writeln!(out, "obj {id} {}", bytes.len())?;
444        out.write_all(&bytes)?;
445        report.objects_written += 1;
446    }
447
448    if let Some((id, bytes)) = manifest_payload {
449        writeln!(out, "manifest {id} {}", bytes.len())?;
450        out.write_all(&bytes)?;
451        report.manifest_written = true;
452    }
453
454    out.write_all(b"end\n")?;
455    out.flush()?;
456    Ok(report)
457}
458
459/// Consumes a SNAPPACK 1 stream from `input`, filing verified records into
460/// `sink`. See the [module docs](crate::pack) for the full invariant list;
461/// in short:
462///
463/// - every record header is validated (magic, version, `hex64`, decimal len,
464///   [`MAX_HEADER_BYTES`] cap);
465/// - every payload is incrementally BLAKE3-hashed and must match its claimed
466///   checksum (mismatch ⇒ staged bytes discarded, whole stream aborted);
467/// - duplicate objects are verified-but-skipped (write-once);
468/// - the manifest (if any) must be the last record and is committed ONLY
469///   after the `end` trailer; EOF before `end` is a hard error and never
470///   commits the manifest.
471pub fn read_pack(input: impl Read, sink: &mut dyn PackSink) -> Result<PackReadReport, StoreError> {
472    let mut input = BufReader::new(input);
473
474    check_magic(&read_header_line(&mut input)?)?;
475
476    let mut report = PackReadReport::default();
477    let mut pending_manifest: Option<(String, Manifest)> = None;
478
479    loop {
480        let line = read_header_line(&mut input)?;
481        if line == "end" {
482            // The `end` trailer is the ONLY place a manifest commits:
483            // truncation anywhere above has already errored out, so a
484            // committed manifest proves the whole stream verified.
485            if let Some((id, manifest)) = pending_manifest.take() {
486                sink.put_manifest(&id, &manifest)?;
487                report.manifest_committed = true;
488            }
489            return Ok(report);
490        }
491        if pending_manifest.is_some() {
492            return Err(protocol(format!(
493                "record after the manifest record (only the `end` trailer may follow it): {line:?}"
494            )));
495        }
496        let (kind, checksum, len) = parse_record_header(&line)?;
497        match kind {
498            RecordKind::Obj => read_obj_record(&mut input, sink, &checksum, len, &mut report)?,
499            RecordKind::Manifest => {
500                pending_manifest = Some(read_manifest_record(&mut input, &checksum, len)?);
501            }
502        }
503    }
504}
505
506/// A record header's type tag.
507enum RecordKind {
508    Obj,
509    Manifest,
510}
511
512/// Reads, verifies, and files (or verified-skips) one `obj` payload.
513fn read_obj_record(
514    input: &mut dyn Read,
515    sink: &mut dyn PackSink,
516    checksum: &str,
517    len: u64,
518    report: &mut PackReadReport,
519) -> Result<(), StoreError> {
520    let present = sink.has_object(checksum)?;
521    let mut payload = HashingTake::new(input, len);
522
523    if present {
524        // Idempotent duplicate / pre-seeded object: the stream cannot seek, so
525        // the bytes are still consumed AND hash-verified, but not re-written.
526        payload.drain()?;
527    } else if let Err(err) = sink.stage_object(checksum, len, &mut payload) {
528        sink.abort_object(checksum);
529        return Err(err);
530    }
531
532    if payload.remaining() > 0 {
533        if !present {
534            sink.abort_object(checksum);
535        }
536        return Err(if payload.hit_eof() {
537            protocol(format!(
538                "truncated pack stream: EOF inside the payload of object {checksum} \
539                 ({} of {len} bytes missing)",
540                payload.remaining()
541            ))
542        } else {
543            protocol(format!(
544                "internal pack sink error: sink consumed only {} of {len} payload bytes \
545                 for object {checksum}",
546                len - payload.remaining()
547            ))
548        });
549    }
550
551    // Verify the streamed bytes hash to the CLAIMED address. A mismatch files
552    // nothing under the claimed checksum (the staged temp is removed) and
553    // aborts the whole stream — everything after a corrupt record is tainted.
554    let actual = payload.finalize_hex();
555    if actual != checksum {
556        if !present {
557            sink.abort_object(checksum);
558        }
559        return Err(StoreError::Integrity {
560            address: object_path(checksum),
561            expected: checksum.to_owned(),
562            actual,
563        });
564    }
565
566    if present {
567        report.objects_skipped += 1;
568    } else {
569        sink.commit_object(checksum)?;
570        report.objects_written += 1;
571    }
572    Ok(())
573}
574
575/// Reads and verifies one `manifest` payload, returning it for the
576/// commit-at-`end` step (it is NEVER committed here).
577fn read_manifest_record(
578    input: &mut dyn Read,
579    id: &str,
580    len: u64,
581) -> Result<(String, Manifest), StoreError> {
582    if len > MAX_MANIFEST_BYTES {
583        return Err(protocol(format!(
584            "manifest record of {len} bytes exceeds the {MAX_MANIFEST_BYTES}-byte cap"
585        )));
586    }
587    let mut payload = HashingTake::new(input, len);
588    // Bounded by the cap check above (and the prealloc guard for lying
589    // headers), so buffering the manifest is safe.
590    let mut buf = Vec::with_capacity(usize::try_from(len.min(STAGE_PREALLOC_CAP)).unwrap_or(0));
591    payload.read_to_end(&mut buf)?;
592    if payload.remaining() > 0 {
593        return Err(protocol(format!(
594            "truncated pack stream: EOF inside the payload of manifest {id} \
595             ({} of {len} bytes missing)",
596            payload.remaining()
597        )));
598    }
599
600    // 1) The raw payload bytes must hash to the claimed snapshot id (the
601    //    stored manifest byte form is exactly what `snapshot_id` hashes).
602    let actual = payload.finalize_hex();
603    if actual != id {
604        return Err(StoreError::Integrity {
605            address: manifest_path(id),
606            expected: id.to_owned(),
607            actual,
608        });
609    }
610
611    // 2) The payload must PARSE, and the parsed manifest must re-render to the
612    //    same snapshot id — rejecting a payload that raw-hashes correctly but
613    //    is not the canonical serialization (it would not round-trip).
614    let text = String::from_utf8(buf).map_err(|err| StoreError::Backend {
615        message: format!("manifest {id} payload is not valid UTF-8"),
616        source: Some(Box::new(err)),
617    })?;
618    let manifest = Manifest::parse(&text)?;
619    let rendered_id = snapshot_id(&manifest, &Blake3Hasher::new());
620    if rendered_id != id {
621        return Err(StoreError::Integrity {
622            address: manifest_path(id),
623            expected: id.to_owned(),
624            actual: rendered_id,
625        });
626    }
627
628    Ok((id.to_owned(), manifest))
629}
630
631/// Builds a protocol-violation error (malformed/truncated stream, bad header,
632/// cap exceeded, …). Hash mismatches use [`StoreError::Integrity`] instead.
633fn protocol(message: impl Into<String>) -> StoreError {
634    StoreError::Backend {
635        message: message.into(),
636        source: None,
637    }
638}
639
640/// Reads one `\n`-terminated header line (returned WITHOUT the `\n`),
641/// enforcing the [`MAX_HEADER_BYTES`] cap while reading — an over-long line is
642/// rejected the moment the cap is hit, without buffering more. EOF at any
643/// point inside a header position is a hard truncation error (`end` is the
644/// only legitimate way to finish a stream).
645fn read_header_line(input: &mut impl BufRead) -> Result<String, StoreError> {
646    let mut line: Vec<u8> = Vec::with_capacity(32);
647    loop {
648        let mut byte = [0u8; 1];
649        let n = input.read(&mut byte)?;
650        if n == 0 {
651            return Err(protocol(if line.is_empty() {
652                "truncated pack stream: unexpected EOF before the `end` trailer".to_owned()
653            } else {
654                format!(
655                    "truncated pack stream: EOF inside a header line (read {:?} so far)",
656                    String::from_utf8_lossy(&line)
657                )
658            }));
659        }
660        if byte[0] == b'\n' {
661            break;
662        }
663        line.push(byte[0]);
664        if line.len() >= MAX_HEADER_BYTES {
665            return Err(protocol(format!(
666                "header line exceeds the {MAX_HEADER_BYTES}-byte cap"
667            )));
668        }
669    }
670    String::from_utf8(line).map_err(|err| StoreError::Backend {
671        message: "header line is not valid UTF-8".to_owned(),
672        source: Some(Box::new(err)),
673    })
674}
675
676/// Validates the magic line (already stripped of its `\n`). Negotiation is on
677/// the exact `wire` integer only: a different version — newer OR older — is
678/// rejected, and the caller falls back to the dumb path.
679fn check_magic(line: &str) -> Result<(), StoreError> {
680    let Some(version) = line.strip_prefix("SNAPPACK ") else {
681        return Err(protocol(format!(
682            "bad pack magic {line:?} (expected {:?})",
683            WIRE_MAGIC.trim_end()
684        )));
685    };
686    if version != WIRE_VERSION.to_string() {
687        return Err(protocol(format!(
688            "unsupported pack wire version {version:?}: this build speaks wire={WIRE_VERSION}"
689        )));
690    }
691    Ok(())
692}
693
694/// Parses a record header line into `(kind, hex64, len)`, enforcing the exact
695/// single-space token grammar, the `^[0-9a-f]{64}$` checksum shape, and a
696/// strictly-decimal `u64` length.
697fn parse_record_header(line: &str) -> Result<(RecordKind, String, u64), StoreError> {
698    let mut parts = line.split(' ');
699    let kind = match parts.next() {
700        Some("obj") => RecordKind::Obj,
701        Some("manifest") => RecordKind::Manifest,
702        _ => {
703            return Err(protocol(format!(
704                "unknown record header {line:?} (expected `obj`, `manifest`, or `end`)"
705            )));
706        }
707    };
708    let (Some(checksum), Some(len_token), None) = (parts.next(), parts.next(), parts.next()) else {
709        return Err(protocol(format!(
710            "malformed record header {line:?} (expected `<kind> <hex64> <len>`)"
711        )));
712    };
713    if !is_hex64(checksum) {
714        return Err(protocol(format!(
715            "invalid checksum {checksum:?} in record header: expected 64 lowercase hex characters"
716        )));
717    }
718    if len_token.is_empty() || !len_token.bytes().all(|b| b.is_ascii_digit()) {
719        return Err(protocol(format!(
720            "invalid payload length {len_token:?} in record header: expected a decimal u64"
721        )));
722    }
723    let len: u64 = len_token.parse().map_err(|_| {
724        protocol(format!(
725            "payload length {len_token:?} does not fit in a u64"
726        ))
727    })?;
728    Ok((kind, checksum.to_owned(), len))
729}
730
731/// A length-limited reader that incrementally BLAKE3-hashes everything read
732/// through it. This is how the reader keeps verification O(1)-memory while a
733/// sink streams the payload to disk: the sink pulls bytes, the hasher sees
734/// every one of them, and [`finalize_hex`](Self::finalize_hex) yields the
735/// digest once the payload is exhausted.
736struct HashingTake<'a> {
737    inner: &'a mut dyn Read,
738    remaining: u64,
739    hit_eof: bool,
740    hasher: blake3::Hasher,
741}
742
743impl<'a> HashingTake<'a> {
744    fn new(inner: &'a mut dyn Read, len: u64) -> Self {
745        Self {
746            inner,
747            remaining: len,
748            hit_eof: false,
749            hasher: blake3::Hasher::new(),
750        }
751    }
752
753    /// Payload bytes not yet read. Non-zero after EOF means truncation.
754    fn remaining(&self) -> u64 {
755        self.remaining
756    }
757
758    /// Whether the underlying stream hit EOF while payload bytes were still
759    /// owed (distinguishes a truncated stream from a sink that under-read).
760    fn hit_eof(&self) -> bool {
761        self.hit_eof
762    }
763
764    /// Lowercase hex BLAKE3 digest of every byte read through this reader.
765    fn finalize_hex(&self) -> String {
766        self.hasher.finalize().to_hex().to_string()
767    }
768
769    /// Reads (and hashes) the rest of the payload through a fixed-size buffer,
770    /// discarding the bytes — the verified-skip path for duplicate objects.
771    fn drain(&mut self) -> io::Result<()> {
772        let mut buf = [0u8; 8 * 1024];
773        loop {
774            if self.read(&mut buf)? == 0 {
775                return Ok(());
776            }
777        }
778    }
779}
780
781impl Read for HashingTake<'_> {
782    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
783        if self.remaining == 0 || buf.is_empty() {
784            return Ok(0);
785        }
786        let cap = usize::try_from(self.remaining)
787            .unwrap_or(usize::MAX)
788            .min(buf.len());
789        let n = self.inner.read(&mut buf[..cap])?;
790        if n == 0 {
791            self.hit_eof = true;
792            return Ok(0);
793        }
794        self.hasher.update(&buf[..n]);
795        self.remaining -= n as u64;
796        Ok(n)
797    }
798}
799
800/// Builds a unique temp sibling path for `target` (same directory, so the
801/// final rename stays on one filesystem). Mirrors the private
802/// `file_store.rs::temp_sibling` discipline — pid + a process-monotonic
803/// counter so concurrent stages never collide.
804fn temp_sibling(target: &Path) -> PathBuf {
805    use std::sync::atomic::{AtomicU64, Ordering};
806    static COUNTER: AtomicU64 = AtomicU64::new(0);
807    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
808    let pid = std::process::id();
809    let file_name = target
810        .file_name()
811        .map(|s| s.to_string_lossy().into_owned())
812        .unwrap_or_default();
813    let tmp_name = format!("{file_name}.{pid}.{n}.tmp");
814    match target.parent() {
815        Some(parent) => parent.join(tmp_name),
816        None => PathBuf::from(tmp_name),
817    }
818}
819
820#[cfg(test)]
821mod tests {
822    use super::*;
823    use snapdir_core::manifest::{ManifestEntry, PathType};
824    use snapdir_core::store::Store;
825    use std::fs;
826
827    // A tiny temp-dir helper so tests don't pull in a dev-dependency (same
828    // pattern as file_store.rs tests).
829    struct TempDir {
830        path: PathBuf,
831    }
832
833    impl TempDir {
834        fn new(tag: &str) -> Self {
835            use std::sync::atomic::{AtomicU64, Ordering};
836            static COUNTER: AtomicU64 = AtomicU64::new(0);
837            let n = COUNTER.fetch_add(1, Ordering::Relaxed);
838            let path = std::env::temp_dir().join(format!(
839                "snapdir-pack-test-{}-{tag}-{n}",
840                std::process::id()
841            ));
842            fs::create_dir_all(&path).expect("create temp dir");
843            Self { path }
844        }
845
846        fn path(&self) -> &Path {
847            &self.path
848        }
849    }
850
851    impl Drop for TempDir {
852        fn drop(&mut self) {
853            let _ = fs::remove_dir_all(&self.path);
854        }
855    }
856
857    /// Deterministic multi-MB payload (exercises the streaming path).
858    fn big_payload(len: usize) -> Vec<u8> {
859        (0..len).map(|i| u8::try_from(i % 251).unwrap()).collect()
860    }
861
862    /// Files `payloads` as objects in a fresh `FileStore`, returning the store
863    /// (+ its tempdir guard) and the content-addresses in payload order.
864    fn seed_store(tag: &str, payloads: &[Vec<u8>]) -> (TempDir, FileStore, Vec<String>) {
865        let dir = TempDir::new(tag);
866        let store = FileStore::from_root(dir.path());
867        let hasher = Blake3Hasher::new();
868        let ids = payloads
869            .iter()
870            .map(|p| {
871                let checksum = hasher.hash_hex(p);
872                store.put_object(&checksum, p.clone()).expect("seed object");
873                checksum
874            })
875            .collect();
876        (dir, store, ids)
877    }
878
879    /// Builds a manifest whose file entries reference `payloads` (real BLAKE3
880    /// checksums) and returns it with its snapshot id.
881    fn manifest_for(payloads: &[Vec<u8>]) -> (Manifest, String) {
882        let hasher = Blake3Hasher::new();
883        let mut manifest = Manifest::new();
884        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
885        for (i, payload) in payloads.iter().enumerate() {
886            manifest.push(ManifestEntry::new(
887                PathType::File,
888                "600",
889                hasher.hash_hex(payload),
890                u64::try_from(payload.len()).unwrap(),
891                format!("./obj-{i:02}"),
892            ));
893        }
894        let manifest = Manifest::from_entries(manifest.entries().to_vec());
895        let id = snapshot_id(&manifest, &hasher);
896        (manifest, id)
897    }
898
899    /// The serialized (stored) byte form of a manifest: text + trailing `\n`.
900    fn manifest_bytes(manifest: &Manifest) -> Vec<u8> {
901        let mut text = manifest.to_string();
902        text.push('\n');
903        text.into_bytes()
904    }
905
906    /// Recursively collects every regular FILE under `dir` (used to prove
907    /// no stray temp files survive a failed stream).
908    fn files_under(dir: &Path) -> Vec<PathBuf> {
909        let mut files = Vec::new();
910        let Ok(entries) = fs::read_dir(dir) else {
911            return files;
912        };
913        for entry in entries.flatten() {
914            let path = entry.path();
915            if path.is_dir() {
916                files.extend(files_under(&path));
917            } else {
918                files.push(path);
919            }
920        }
921        files
922    }
923
924    /// Hand-builds a raw record (`<kind> <hex> <len>\n<payload>`).
925    fn raw_record(kind: &str, hex: &str, payload: &[u8]) -> Vec<u8> {
926        let mut out = format!("{kind} {hex} {}\n", payload.len()).into_bytes();
927        out.extend_from_slice(payload);
928        out
929    }
930
931    /// Hand-builds a full stream: magic + records + `end\n`.
932    fn raw_stream(records: &[Vec<u8>]) -> Vec<u8> {
933        let mut out = WIRE_MAGIC.as_bytes().to_vec();
934        for record in records {
935            out.extend_from_slice(record);
936        }
937        out.extend_from_slice(b"end\n");
938        out
939    }
940
941    fn hex_of(bytes: &[u8]) -> String {
942        Blake3Hasher::new().hash_hex(bytes)
943    }
944
945    // --- wire constants ----------------------------------------------------
946
947    #[test]
948    fn pack_wire_constants_are_consistent() {
949        assert_eq!(WIRE_MAGIC, format!("SNAPPACK {WIRE_VERSION}\n"));
950        assert_eq!(WIRE_VERSION, 1);
951        assert_eq!(WIRE_CAPS, &["objects-needed", "send-pack", "receive-pack"]);
952    }
953
954    #[test]
955    fn pack_is_hex64_validates_shape() {
956        let valid = "0123456789abcdef".repeat(4);
957        assert!(is_hex64(&valid));
958        assert!(!is_hex64(&valid[..63])); // 63 chars
959        assert!(!is_hex64(&format!("{valid}0"))); // 65 chars
960        assert!(!is_hex64(&valid.to_uppercase())); // uppercase
961        assert!(!is_hex64(&format!("g{}", &valid[1..]))); // non-hex
962        assert!(!is_hex64("")); // empty
963    }
964
965    // --- roundtrips ----------------------------------------------------------
966
967    #[test]
968    fn pack_roundtrip_file_sink_streams_objects_and_manifest() {
969        // Includes a 0-byte object and a multi-MB object (streaming path).
970        let payloads = vec![
971            Vec::new(),
972            b"hello pack\n".to_vec(),
973            big_payload(3 * 1024 * 1024 + 7),
974        ];
975        let (a_dir, a, ids) = seed_store("rt-a", &payloads);
976        let (manifest, man_id) = manifest_for(&payloads);
977        a.put_manifest(&man_id, &manifest).expect("seed manifest");
978
979        let mut pack = Vec::new();
980        let wrote = write_pack(&a, &ids, Some(&man_id), &mut pack).expect("write_pack");
981        assert_eq!(wrote.objects_written, 3);
982        assert!(wrote.manifest_written);
983
984        let b_dir = TempDir::new("rt-b");
985        let b = FileStore::from_root(b_dir.path());
986        let mut sink = FileSink::new(&b);
987        let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
988        assert_eq!(read.objects_written, 3);
989        assert_eq!(read.objects_skipped, 0);
990        assert!(read.manifest_committed);
991
992        // B's objects are byte-equal to A's at the identical sharded keys.
993        for (id, payload) in ids.iter().zip(&payloads) {
994            let key = object_path(id);
995            assert_eq!(
996                fs::read(b_dir.path().join(&key)).expect("b object"),
997                *payload,
998                "object {key} byte-equal"
999            );
1000            assert_eq!(
1001                fs::read(a_dir.path().join(&key)).expect("a object"),
1002                fs::read(b_dir.path().join(&key)).expect("b object"),
1003            );
1004        }
1005        // Manifest present in B and round-trips to the same id + entries.
1006        let back = b.get_manifest(&man_id).expect("manifest in B");
1007        assert_eq!(back, manifest);
1008        assert_eq!(snapshot_id(&back, &Blake3Hasher::new()), man_id);
1009        // No temp litter anywhere.
1010        assert!(
1011            !files_under(b_dir.path())
1012                .iter()
1013                .any(|p| p.to_string_lossy().ends_with(".tmp")),
1014            "no stray temp files after a clean stream"
1015        );
1016    }
1017
1018    #[test]
1019    fn pack_roundtrip_stream_sink_generic() {
1020        let payloads = vec![b"alpha\n".to_vec(), b"beta\n".to_vec()];
1021        let (_a_dir, a, ids) = seed_store("ss-a", &payloads);
1022        let (manifest, man_id) = manifest_for(&payloads);
1023        a.put_manifest(&man_id, &manifest).expect("seed manifest");
1024
1025        let mut pack = Vec::new();
1026        write_pack(&a, &ids, Some(&man_id), &mut pack).expect("write_pack");
1027
1028        let b_dir = TempDir::new("ss-b");
1029        let b = FileStore::from_root(b_dir.path());
1030        let mut sink = StreamSink::new(&b);
1031        let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
1032        assert_eq!(read.objects_written, 2);
1033        assert!(read.manifest_committed);
1034
1035        for (id, payload) in ids.iter().zip(&payloads) {
1036            assert_eq!(b.get_object(id).expect("object"), *payload);
1037        }
1038        assert_eq!(b.get_manifest(&man_id).expect("manifest"), manifest);
1039    }
1040
1041    #[test]
1042    fn pack_empty_stream_roundtrips() {
1043        // "SNAPPACK 1\nend\n" is a valid, empty pack.
1044        let payloads: Vec<Vec<u8>> = Vec::new();
1045        let (_a_dir, a, ids) = seed_store("empty-a", &payloads);
1046        let mut pack = Vec::new();
1047        let wrote = write_pack(&a, &ids, None, &mut pack).expect("write_pack");
1048        assert_eq!(wrote, PackWriteReport::default());
1049        assert_eq!(pack, b"SNAPPACK 1\nend\n");
1050
1051        let b_dir = TempDir::new("empty-b");
1052        let b = FileStore::from_root(b_dir.path());
1053        let mut sink = FileSink::new(&b);
1054        let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
1055        assert_eq!(read, PackReadReport::default());
1056    }
1057
1058    #[test]
1059    fn pack_manifest_only_stream_completes_interrupted_push() {
1060        // Empty object set + manifest: the manifest-only pack that finishes an
1061        // interrupted push whose objects all landed earlier.
1062        let payloads = vec![b"already there\n".to_vec()];
1063        let (_a_dir, a, _ids) = seed_store("mo-a", &payloads);
1064        let (manifest, man_id) = manifest_for(&payloads);
1065        a.put_manifest(&man_id, &manifest).expect("seed manifest");
1066
1067        let mut pack = Vec::new();
1068        let wrote = write_pack(&a, &[], Some(&man_id), &mut pack).expect("write_pack");
1069        assert_eq!(wrote.objects_written, 0);
1070        assert!(wrote.manifest_written);
1071
1072        let b_dir = TempDir::new("mo-b");
1073        let b = FileStore::from_root(b_dir.path());
1074        let mut sink = FileSink::new(&b);
1075        let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
1076        assert!(read.manifest_committed);
1077        assert_eq!(b.get_manifest(&man_id).expect("manifest"), manifest);
1078    }
1079
1080    // --- header edge cases ---------------------------------------------------
1081
1082    #[test]
1083    fn pack_rejects_oversized_header_line() {
1084        let mut stream = WIRE_MAGIC.as_bytes().to_vec();
1085        stream.extend_from_slice("o".repeat(200).as_bytes());
1086        stream.extend_from_slice(b"\nend\n");
1087        let b_dir = TempDir::new("hdr-cap");
1088        let b = FileStore::from_root(b_dir.path());
1089        let mut sink = FileSink::new(&b);
1090        let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
1091        assert!(err.to_string().contains("128-byte cap"), "got: {err}");
1092    }
1093
1094    #[test]
1095    fn pack_rejects_bad_magic_and_bad_version() {
1096        let b_dir = TempDir::new("magic");
1097        let b = FileStore::from_root(b_dir.path());
1098        for stream in [
1099            &b"SNAPHACK 1\nend\n"[..],
1100            &b"GARBAGE\nend\n"[..],
1101            &b"SNAPPACK 2\nend\n"[..],
1102            &b"SNAPPACK 01\nend\n"[..], // non-canonical version token
1103            &b"SNAPPACK \nend\n"[..],
1104            &b""[..], // empty input: truncated before the magic
1105        ] {
1106            let mut sink = FileSink::new(&b);
1107            assert!(
1108                read_pack(stream, &mut sink).is_err(),
1109                "stream {:?} must be rejected",
1110                String::from_utf8_lossy(stream)
1111            );
1112        }
1113    }
1114
1115    #[test]
1116    fn pack_rejects_malformed_checksums_and_lengths() {
1117        let valid = "0123456789abcdef".repeat(4);
1118        let headers = [
1119            format!("obj {} 0", valid.to_uppercase()),   // uppercase
1120            format!("obj {} 0", &valid[..63]),           // 63 chars
1121            format!("obj {valid}0 0"),                   // 65 chars
1122            format!("obj g{} 0", &valid[1..]),           // non-hex
1123            format!("obj {valid} 12x"),                  // garbage len
1124            format!("obj {valid} +5"),                   // sign is not decimal
1125            format!("obj {valid} 99999999999999999999"), // > u64::MAX
1126            format!("obj {valid}"),                      // missing len
1127            format!("obj {valid} 0 extra"),              // trailing token
1128            format!("blob {valid} 0"),                   // unknown kind
1129            format!("obj  {valid} 0"),                   // double space
1130        ];
1131        let b_dir = TempDir::new("hdr-bad");
1132        let b = FileStore::from_root(b_dir.path());
1133        for header in headers {
1134            let mut stream = WIRE_MAGIC.as_bytes().to_vec();
1135            stream.extend_from_slice(header.as_bytes());
1136            stream.extend_from_slice(b"\nend\n");
1137            let mut sink = FileSink::new(&b);
1138            assert!(
1139                read_pack(stream.as_slice(), &mut sink).is_err(),
1140                "header {header:?} must be rejected"
1141            );
1142        }
1143    }
1144
1145    // --- security: hash-mismatch fails closed --------------------------------
1146
1147    #[test]
1148    fn pack_mismatched_object_files_nothing_and_leaves_no_temp() {
1149        // A record CLAIMS checksum X but its bytes hash to Y: hard error,
1150        // nothing filed at X's path, no manifest committed, no temp litter.
1151        let claimed = hex_of(b"good bytes");
1152        let evil = b"evil bytes"; // same length as "good bytes"
1153        let (manifest, man_id) = manifest_for(&[b"good bytes".to_vec()]);
1154        let stream = raw_stream(&[
1155            raw_record("obj", &claimed, evil),
1156            raw_record("manifest", &man_id, &manifest_bytes(&manifest)),
1157        ]);
1158
1159        let b_dir = TempDir::new("sec");
1160        let b = FileStore::from_root(b_dir.path());
1161        let mut sink = FileSink::new(&b);
1162        let err = read_pack(stream.as_slice(), &mut sink).expect_err("must abort");
1163        match err {
1164            StoreError::Integrity {
1165                expected, actual, ..
1166            } => {
1167                assert_eq!(expected, claimed);
1168                assert_eq!(actual, hex_of(evil));
1169            }
1170            other => panic!("expected Integrity, got {other:?}"),
1171        }
1172        drop(sink);
1173
1174        // NOTHING filed under the claimed checksum; no manifest; no temp files
1175        // (or any other file) anywhere in the sink store.
1176        assert!(!StreamStore::has_object(&b, &claimed).unwrap());
1177        assert!(!b_dir.path().join(object_path(&claimed)).exists());
1178        assert!(matches!(
1179            b.get_manifest(&man_id),
1180            Err(StoreError::ManifestNotFound { .. })
1181        ));
1182        assert_eq!(
1183            files_under(b_dir.path()),
1184            Vec::<PathBuf>::new(),
1185            "no file (object, manifest, or stray temp) may survive"
1186        );
1187    }
1188
1189    #[test]
1190    fn pack_mismatch_aborts_even_when_object_already_present() {
1191        // Skip-but-verify: a record for an ALREADY-PRESENT object whose stream
1192        // bytes are corrupt still aborts the whole stream (and everything
1193        // after the bad record is dropped).
1194        let good = b"present object\n".to_vec();
1195        let payloads = vec![good.clone()];
1196        let (b_dir, b, ids) = seed_store("dup-bad", &payloads);
1197
1198        let corrupt = b"PRESENT OBJECT\n"; // same length, different bytes
1199        let later = b"later payload\n";
1200        let stream = raw_stream(&[
1201            raw_record("obj", &ids[0], corrupt),
1202            raw_record("obj", &hex_of(later), later),
1203        ]);
1204
1205        let mut sink = FileSink::new(&b);
1206        let err = read_pack(stream.as_slice(), &mut sink).expect_err("must abort");
1207        assert!(matches!(err, StoreError::Integrity { .. }));
1208        drop(sink);
1209
1210        // The pre-existing object is untouched; the record AFTER the corrupt
1211        // one was never processed.
1212        assert_eq!(b.get_object(&ids[0]).unwrap(), good);
1213        assert!(!StreamStore::has_object(&b, &hex_of(later)).unwrap());
1214        assert!(
1215            !files_under(b_dir.path())
1216                .iter()
1217                .any(|p| p.to_string_lossy().ends_with(".tmp")),
1218            "no stray temp files"
1219        );
1220    }
1221
1222    // --- truncation ----------------------------------------------------------
1223
1224    #[test]
1225    fn pack_truncated_before_end_files_objects_but_never_manifest() {
1226        // Cut the stream after all records but WITHOUT the `end` trailer: the
1227        // verified objects ARE filed, the manifest is NOT committed even
1228        // though its record was fully read before the cut.
1229        let payloads = vec![b"one\n".to_vec(), b"two\n".to_vec()];
1230        let (_a_dir, a, ids) = seed_store("trunc-a", &payloads);
1231        let (manifest, man_id) = manifest_for(&payloads);
1232        a.put_manifest(&man_id, &manifest).expect("seed manifest");
1233
1234        let mut pack = Vec::new();
1235        write_pack(&a, &ids, Some(&man_id), &mut pack).expect("write_pack");
1236        assert!(pack.ends_with(b"end\n"));
1237        let cut = &pack[..pack.len() - b"end\n".len()];
1238
1239        let b_dir = TempDir::new("trunc-b");
1240        let b = FileStore::from_root(b_dir.path());
1241        let mut sink = FileSink::new(&b);
1242        let err = read_pack(cut, &mut sink).expect_err("truncation is a hard error");
1243        assert!(err.to_string().contains("truncated"), "got: {err}");
1244        drop(sink);
1245
1246        // The N verified objects ARE filed (a retry resumes incrementally)...
1247        for (id, payload) in ids.iter().zip(&payloads) {
1248            assert_eq!(b.get_object(id).unwrap(), *payload);
1249        }
1250        // ...but the manifest must NEVER be committed (manifest-last).
1251        assert!(matches!(
1252            b.get_manifest(&man_id),
1253            Err(StoreError::ManifestNotFound { .. })
1254        ));
1255        assert!(
1256            !files_under(b_dir.path())
1257                .iter()
1258                .any(|p| p.to_string_lossy().ends_with(".tmp")),
1259            "no stray temp files"
1260        );
1261    }
1262
1263    #[test]
1264    fn pack_truncated_mid_payload_keeps_earlier_objects_drops_partial() {
1265        let payloads = vec![b"first object\n".to_vec(), big_payload(256 * 1024)];
1266        let (_a_dir, a, ids) = seed_store("midcut-a", &payloads);
1267
1268        let mut pack = Vec::new();
1269        write_pack(&a, &ids, None, &mut pack).expect("write_pack");
1270        // Cut inside the SECOND object's payload.
1271        let cut = &pack[..pack.len() - 100_000];
1272
1273        let b_dir = TempDir::new("midcut-b");
1274        let b = FileStore::from_root(b_dir.path());
1275        let mut sink = FileSink::new(&b);
1276        let err = read_pack(cut, &mut sink).expect_err("mid-payload truncation");
1277        assert!(err.to_string().contains("truncated"), "got: {err}");
1278        drop(sink);
1279
1280        // Object 1 (fully verified before the cut) is filed; the partial
1281        // object 2 is NOT, and its temp file was removed.
1282        assert_eq!(b.get_object(&ids[0]).unwrap(), payloads[0]);
1283        assert!(!StreamStore::has_object(&b, &ids[1]).unwrap());
1284        assert!(
1285            !files_under(b_dir.path())
1286                .iter()
1287                .any(|p| p.to_string_lossy().ends_with(".tmp")),
1288            "partial payload temp must be removed"
1289        );
1290    }
1291
1292    // --- duplicates ------------------------------------------------------------
1293
1294    #[test]
1295    fn pack_duplicate_object_records_are_idempotent_write_once() {
1296        let payload = b"duplicated payload\n".to_vec();
1297        let checksum = hex_of(&payload);
1298        let stream = raw_stream(&[
1299            raw_record("obj", &checksum, &payload),
1300            raw_record("obj", &checksum, &payload),
1301        ]);
1302
1303        let b_dir = TempDir::new("dup");
1304        let b = FileStore::from_root(b_dir.path());
1305        let mut sink = FileSink::new(&b);
1306        let read = read_pack(stream.as_slice(), &mut sink).expect("duplicates are fine");
1307        assert_eq!(read.objects_written, 1, "write-once");
1308        assert_eq!(
1309            read.objects_skipped, 1,
1310            "second record verified-but-skipped"
1311        );
1312        assert_eq!(b.get_object(&checksum).unwrap(), payload);
1313    }
1314
1315    #[test]
1316    fn pack_preseeded_object_is_skipped_but_verified() {
1317        // The sink already holds the object: the record's bytes are consumed
1318        // (the stream cannot seek) and verified, but not rewritten.
1319        let payload = b"already in the store\n".to_vec();
1320        let (_b_dir, b, ids) = seed_store("preseed", std::slice::from_ref(&payload));
1321
1322        let stream = raw_stream(&[raw_record("obj", &ids[0], &payload)]);
1323        let mut sink = FileSink::new(&b);
1324        let read = read_pack(stream.as_slice(), &mut sink).expect("read_pack");
1325        assert_eq!(read.objects_written, 0);
1326        assert_eq!(read.objects_skipped, 1);
1327        assert_eq!(b.get_object(&ids[0]).unwrap(), payload);
1328    }
1329
1330    // --- manifest rules ----------------------------------------------------------
1331
1332    #[test]
1333    fn pack_rejects_record_after_manifest() {
1334        let payload = b"object after manifest\n".to_vec();
1335        let (manifest, man_id) = manifest_for(std::slice::from_ref(&payload));
1336        let stream = raw_stream(&[
1337            raw_record("manifest", &man_id, &manifest_bytes(&manifest)),
1338            raw_record("obj", &hex_of(&payload), &payload),
1339        ]);
1340
1341        let b_dir = TempDir::new("after-man");
1342        let b = FileStore::from_root(b_dir.path());
1343        let mut sink = FileSink::new(&b);
1344        let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
1345        assert!(err.to_string().contains("after the manifest"), "got: {err}");
1346        // The pending manifest must NOT have been committed.
1347        assert!(matches!(
1348            b.get_manifest(&man_id),
1349            Err(StoreError::ManifestNotFound { .. })
1350        ));
1351    }
1352
1353    #[test]
1354    fn pack_manifest_payload_must_hash_to_claimed_id() {
1355        // (a) claimed id != raw hash of the payload -> Integrity.
1356        let (manifest, _real_id) = manifest_for(&[b"whatever\n".to_vec()]);
1357        let wrong_id = hex_of(b"some other bytes");
1358        let stream = raw_stream(&[raw_record(
1359            "manifest",
1360            &wrong_id,
1361            &manifest_bytes(&manifest),
1362        )]);
1363        let b_dir = TempDir::new("man-bad-id");
1364        let b = FileStore::from_root(b_dir.path());
1365        let mut sink = FileSink::new(&b);
1366        let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
1367        assert!(matches!(err, StoreError::Integrity { .. }));
1368        assert!(matches!(
1369            b.get_manifest(&wrong_id),
1370            Err(StoreError::ManifestNotFound { .. })
1371        ));
1372
1373        // (b) raw hash matches the claimed id but the payload is not a
1374        // parseable manifest -> rejected, nothing committed.
1375        let garbage = b"not a manifest at all\n".to_vec();
1376        let garbage_id = hex_of(&garbage);
1377        let stream = raw_stream(&[raw_record("manifest", &garbage_id, &garbage)]);
1378        let mut sink = FileSink::new(&b);
1379        assert!(read_pack(stream.as_slice(), &mut sink).is_err());
1380        assert!(matches!(
1381            b.get_manifest(&garbage_id),
1382            Err(StoreError::ManifestNotFound { .. })
1383        ));
1384    }
1385
1386    #[test]
1387    fn pack_rejects_manifest_over_64mib_cap() {
1388        let big_len: u64 = MAX_MANIFEST_BYTES + 1;
1389        let claimed = hex_of(b"irrelevant");
1390        let mut stream = WIRE_MAGIC.as_bytes().to_vec();
1391        stream.extend_from_slice(format!("manifest {claimed} {big_len}\n").as_bytes());
1392        // No payload needed: the cap check fires on the header alone.
1393        let b_dir = TempDir::new("man-cap");
1394        let b = FileStore::from_root(b_dir.path());
1395        let mut sink = FileSink::new(&b);
1396        let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
1397        assert!(err.to_string().contains("cap"), "got: {err}");
1398    }
1399
1400    // --- write_pack fail-closed ---------------------------------------------------
1401
1402    #[test]
1403    fn pack_write_missing_object_aborts_before_end() {
1404        let payloads = vec![b"present\n".to_vec()];
1405        let (_a_dir, a, mut ids) = seed_store("wmiss-a", &payloads);
1406        // A syntactically valid but ABSENT object id.
1407        let absent = hex_of(b"never stored");
1408        ids.push(absent.clone());
1409
1410        let mut pack = Vec::new();
1411        let err = write_pack(&a, &ids, None, &mut pack).expect_err("missing object");
1412        assert!(matches!(err, StoreError::ObjectNotFound { .. }));
1413        // The partial stream has NO `end` trailer, so a consumer fails too.
1414        assert!(!pack.ends_with(b"end\n"));
1415        let b_dir = TempDir::new("wmiss-b");
1416        let b = FileStore::from_root(b_dir.path());
1417        let mut sink = FileSink::new(&b);
1418        assert!(read_pack(pack.as_slice(), &mut sink).is_err());
1419    }
1420
1421    #[test]
1422    fn pack_write_invalid_id_emits_nothing() {
1423        let (_a_dir, a, _ids) = seed_store("winv-a", &[b"x\n".to_vec()]);
1424        let mut pack = Vec::new();
1425        let err = write_pack(&a, &["NOT-HEX".to_owned()], None, &mut pack).expect_err("invalid id");
1426        assert!(
1427            err.to_string().contains("invalid object checksum"),
1428            "got: {err}"
1429        );
1430        assert!(pack.is_empty(), "fail closed: not a single byte written");
1431
1432        // Same for an invalid manifest id.
1433        let mut pack = Vec::new();
1434        let err = write_pack(&a, &[], Some("zz"), &mut pack).expect_err("invalid manifest id");
1435        assert!(
1436            err.to_string().contains("invalid manifest id"),
1437            "got: {err}"
1438        );
1439        assert!(pack.is_empty());
1440    }
1441
1442    #[test]
1443    fn pack_write_emits_records_in_input_order() {
1444        let payloads = vec![b"bbb\n".to_vec(), b"aaa\n".to_vec(), b"ccc\n".to_vec()];
1445        let (_a_dir, a, ids) = seed_store("order-a", &payloads);
1446        let mut pack = Vec::new();
1447        write_pack(&a, &ids, None, &mut pack).expect("write_pack");
1448        let text = String::from_utf8_lossy(&pack);
1449        let positions: Vec<usize> = ids
1450            .iter()
1451            .map(|id| text.find(id.as_str()).expect("record present"))
1452            .collect();
1453        let mut sorted = positions.clone();
1454        sorted.sort_unstable();
1455        assert_eq!(positions, sorted, "obj records keep input order");
1456    }
1457
1458    // --- StreamStore::objects_needed -------------------------------------------
1459
1460    #[test]
1461    fn pack_objects_needed_returns_absent_subset_in_input_order() {
1462        let p1 = b"seeded one\n".to_vec();
1463        let p3 = b"seeded three\n".to_vec();
1464        let (_dir, store, seeded) = seed_store("needed", &[p1, p3]);
1465        let absent_a = hex_of(b"absent a");
1466        let absent_b = hex_of(b"absent b");
1467
1468        // Full ordered list interleaving present + absent.
1469        let list = vec![
1470            seeded[0].clone(),
1471            absent_a.clone(),
1472            seeded[1].clone(),
1473            absent_b.clone(),
1474        ];
1475        let needed = store.objects_needed(&list).expect("objects_needed");
1476        assert_eq!(needed, vec![absent_a.clone(), absent_b.clone()]);
1477
1478        // Dedup is the caller's job: an absent checksum supplied twice is
1479        // reported twice, still in input order.
1480        let list = vec![absent_b.clone(), absent_a.clone(), absent_b.clone()];
1481        let needed = store.objects_needed(&list).expect("objects_needed");
1482        assert_eq!(needed, vec![absent_b.clone(), absent_a, absent_b]);
1483
1484        // Everything present -> empty complement.
1485        assert_eq!(
1486            store.objects_needed(&seeded).expect("ok"),
1487            Vec::<String>::new()
1488        );
1489    }
1490
1491    #[test]
1492    fn pack_objects_needed_invalid_checksum_fails_closed() {
1493        let (_dir, store, seeded) = seed_store("needed-bad", &[b"x\n".to_vec()]);
1494        let valid_absent = hex_of(b"absent");
1495        for bad in [
1496            "UPPERCASE0000000000000000000000000000000000000000000000000000AA".to_owned(),
1497            "0123456789abcdef".repeat(4)[..63].to_owned(),
1498            format!("{}0", "0123456789abcdef".repeat(4)),
1499            "not hex at all".to_owned(),
1500            String::new(),
1501        ] {
1502            // The invalid entry errors the WHOLE call even when valid entries
1503            // precede it — nothing is returned (fail closed).
1504            let list = vec![seeded[0].clone(), valid_absent.clone(), bad.clone()];
1505            let err = store.objects_needed(&list).expect_err("must fail closed");
1506            assert!(
1507                err.to_string().contains("invalid object checksum"),
1508                "checksum {bad:?}: got {err}"
1509            );
1510        }
1511    }
1512}