Skip to main content

zenith_session/
bundle.rs

1//! Portable, deterministic document-store bundle (`*.zenithbundle`).
2//!
3//! # Format
4//!
5//! ```text
6//! [8 bytes]  magic: b"ZNBDL1\0\0"  (written raw, NOT compressed)
7//! [remaining] single DEFLATE stream (flate2 ZlibEncoder/ZlibDecoder,
8//!             pure-Rust miniz_oxide backend) over the following payload:
9//!
10//!   doc_id        : u32 LE length  +  UTF-8 bytes
11//!   entry_count   : u32 LE
12//!   for each entry (sorted ascending by relative path bytes):
13//!     rel_path    : u32 LE length  +  UTF-8 bytes
14//!                   (forward-slash separated; relative to doc_dir)
15//!     content     : u64 LE length  +  raw bytes
16//! ```
17//!
18//! All integers are little-endian. Determinism is guaranteed by collecting
19//! all (relative_path, content) pairs from a depth-first walk of `doc_dir`,
20//! then sorting the collected Vec by relative path bytes before serialising.
21//! No timestamps or filesystem metadata are included.
22//!
23//! `unbundle` writes are not transactional: a partial write is possible if
24//! the process is killed mid-way. A future unit may stage into a temp dir and
25//! rename for atomicity.
26
27use std::io::{Read, Write};
28use std::path::{Path, PathBuf};
29
30use flate2::Compression;
31use flate2::read::ZlibDecoder;
32use flate2::write::ZlibEncoder;
33
34use crate::adapter::Fs;
35use crate::error::SessionError;
36use crate::layout::StorePaths;
37
38// ── Magic constant ─────────────────────────────────────────────────────────────
39
40/// 8-byte magic header written at the start of every bundle (raw, uncompressed).
41const MAGIC: &[u8; 8] = b"ZNBDL1\0\0";
42
43/// One bundled file: its store-relative path and raw content bytes.
44type BundleEntry = (String, Vec<u8>);
45
46// ── Public API ─────────────────────────────────────────────────────────────────
47
48/// Pack a document's entire store directory into one portable, deterministic
49/// byte blob.
50///
51/// The directory bundled is `<root>/docs/<doc_id>/` (objects/, versions.jsonl,
52/// runs.jsonl, previews.jsonl, scratch/, meta.json, …). Returns an error if
53/// `doc_dir` does not exist, since bundling a non-existent document is a user
54/// mistake and an empty bundle would silently succeed with no useful data.
55pub fn bundle(fs: &impl Fs, paths: &StorePaths, doc_id: &str) -> Result<Vec<u8>, SessionError> {
56    let doc_dir = paths.doc_dir(doc_id);
57    if !fs.exists(&doc_dir) {
58        return Err(SessionError::new(format!(
59            "bundle: document directory does not exist: {}",
60            doc_dir.display()
61        )));
62    }
63
64    // Collect all files recursively, recording relative paths.
65    let mut entries: Vec<(String, Vec<u8>)> = Vec::new();
66    collect_files(fs, &doc_dir, &doc_dir, &mut entries)?;
67
68    // Sort by relative path bytes for determinism.
69    entries.sort_by(|(a, _), (b, _)| a.as_bytes().cmp(b.as_bytes()));
70
71    // Serialise payload into a DEFLATE stream.
72    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
73    write_payload(&mut encoder, doc_id, &entries)?;
74    let compressed = encoder.finish().map_err(SessionError::from)?;
75
76    // Prepend magic.
77    let mut out = Vec::with_capacity(MAGIC.len() + compressed.len());
78    out.extend_from_slice(MAGIC);
79    out.extend_from_slice(&compressed);
80    Ok(out)
81}
82
83/// Reconstruct a document's store directory from a bundle blob.
84///
85/// Returns the `doc_id` recorded in the bundle. Writes every entry under
86/// `<root>/docs/<doc_id>/` using the injected `Fs`. The `paths` root
87/// determines where files land; pass a different `StorePaths` root than the
88/// source to restore into a separate store.
89pub fn unbundle(fs: &impl Fs, paths: &StorePaths, data: &[u8]) -> Result<String, SessionError> {
90    // Check magic.
91    let magic = data
92        .get(..MAGIC.len())
93        .ok_or_else(|| SessionError::new("unbundle: data too short to contain magic header"))?;
94    if magic != MAGIC {
95        return Err(SessionError::new(format!(
96            "unbundle: bad magic header — expected {:?}, got {:?}",
97            MAGIC, magic
98        )));
99    }
100
101    // Decompress the remainder.
102    let compressed = &data[MAGIC.len()..];
103    let mut decoder = ZlibDecoder::new(compressed);
104    let mut payload = Vec::new();
105    decoder
106        .read_to_end(&mut payload)
107        .map_err(|e| SessionError::new(format!("unbundle: decompression failed: {e}")))?;
108
109    // Parse the payload.
110    let (doc_id, entries) = parse_payload(&payload)?;
111
112    // Write every entry under doc_dir.
113    let doc_dir = paths.doc_dir(&doc_id);
114    for (rel_path, content) in &entries {
115        let abs_path = join_relative(&doc_dir, rel_path)?;
116        let parent = abs_path.parent().ok_or_else(|| {
117            SessionError::new(format!("unbundle: entry path has no parent: {rel_path}"))
118        })?;
119        fs.create_dir_all(parent)?;
120        fs.write(&abs_path, content)?;
121    }
122
123    Ok(doc_id)
124}
125
126// ── Private helpers ────────────────────────────────────────────────────────────
127
128/// Recursively walk `dir` and append (relative_path, content) pairs to `out`.
129/// `base` is the root doc_dir; relative paths are computed from it.
130fn collect_files(
131    fs: &impl Fs,
132    base: &Path,
133    dir: &Path,
134    out: &mut Vec<(String, Vec<u8>)>,
135) -> Result<(), SessionError> {
136    let children = fs.read_dir(dir)?;
137    for child in children {
138        let rel = relative_path(base, &child)?;
139        // Distinguish a directory from a file by trying read_dir on the child.
140        // The Fs contract: read_dir succeeds on directories and errors on files.
141        match fs.read_dir(&child) {
142            Ok(_) => {
143                // It's a directory — recurse.
144                collect_files(fs, base, &child, out)?;
145            }
146            Err(_) => {
147                // It's a file — read and record.
148                let content = fs.read(&child)?;
149                out.push((rel, content));
150            }
151        }
152    }
153    Ok(())
154}
155
156/// Compute the relative path (forward-slash separated) of `path` with respect
157/// to `base`. Errors if `path` is not under `base`.
158fn relative_path(base: &Path, path: &Path) -> Result<String, SessionError> {
159    let rel = path.strip_prefix(base).map_err(|_| {
160        SessionError::new(format!(
161            "bundle: path '{}' is not under base '{}'",
162            path.display(),
163            base.display()
164        ))
165    })?;
166    // Convert to forward-slash string.
167    let mut parts = Vec::new();
168    for component in rel.components() {
169        parts.push(
170            component
171                .as_os_str()
172                .to_str()
173                .ok_or_else(|| SessionError::new("bundle: non-UTF-8 path component"))?
174                .to_owned(),
175        );
176    }
177    Ok(parts.join("/"))
178}
179
180/// Re-join a forward-slash relative path onto an absolute base, rejecting any
181/// `..`, `.`, or empty component to prevent path traversal.
182fn join_relative(base: &Path, rel_path: &str) -> Result<PathBuf, SessionError> {
183    let mut result = base.to_path_buf();
184    for component in rel_path.split('/') {
185        if component == ".." || component == "." || component.is_empty() {
186            return Err(SessionError::new(format!(
187                "unbundle: invalid path component in entry: {rel_path:?}"
188            )));
189        }
190        result.push(component);
191    }
192    Ok(result)
193}
194
195/// Serialise the bundle payload into a writer.
196fn write_payload(
197    w: &mut impl Write,
198    doc_id: &str,
199    entries: &[(String, Vec<u8>)],
200) -> Result<(), SessionError> {
201    // doc_id
202    let id_bytes = doc_id.as_bytes();
203    let id_len = u32::try_from(id_bytes.len())
204        .map_err(|_| SessionError::new("bundle: doc_id is too long to encode"))?;
205    w.write_all(&id_len.to_le_bytes())
206        .map_err(SessionError::from)?;
207    w.write_all(id_bytes).map_err(SessionError::from)?;
208
209    // entry count
210    let count = u32::try_from(entries.len())
211        .map_err(|_| SessionError::new("bundle: too many entries to encode"))?;
212    w.write_all(&count.to_le_bytes())
213        .map_err(SessionError::from)?;
214
215    // entries
216    for (rel_path, content) in entries {
217        let path_bytes = rel_path.as_bytes();
218        let path_len = u32::try_from(path_bytes.len()).map_err(|_| {
219            SessionError::new(format!("bundle: relative path too long: {rel_path}"))
220        })?;
221        w.write_all(&path_len.to_le_bytes())
222            .map_err(SessionError::from)?;
223        w.write_all(path_bytes).map_err(SessionError::from)?;
224
225        let content_len = u64::try_from(content.len()).map_err(|_| {
226            SessionError::new(format!("bundle: content too large for entry: {rel_path}"))
227        })?;
228        w.write_all(&content_len.to_le_bytes())
229            .map_err(SessionError::from)?;
230        w.write_all(content).map_err(SessionError::from)?;
231    }
232    Ok(())
233}
234
235/// Parse the decompressed bundle payload; return (doc_id, entries).
236///
237/// All slice indexing is checked via `.get(..).ok_or(...)` — a truncated or
238/// corrupt payload returns a clean `SessionError` rather than panicking.
239fn parse_payload(payload: &[u8]) -> Result<(String, Vec<BundleEntry>), SessionError> {
240    let mut pos = 0usize;
241
242    // doc_id length
243    let id_len = usize::try_from(read_u32_le(payload, &mut pos, "doc_id length")?)
244        .map_err(|_| SessionError::new("unbundle: doc_id length exceeds platform usize"))?;
245
246    // doc_id bytes
247    let id_bytes = payload
248        .get(pos..pos + id_len)
249        .ok_or_else(|| SessionError::new("unbundle: truncated payload reading doc_id"))?;
250    let doc_id = std::str::from_utf8(id_bytes)
251        .map_err(|_| SessionError::new("unbundle: doc_id is not valid UTF-8"))?
252        .to_owned();
253    pos += id_len;
254
255    // entry count
256    let count = usize::try_from(read_u32_le(payload, &mut pos, "entry count")?)
257        .map_err(|_| SessionError::new("unbundle: entry count exceeds platform usize"))?;
258
259    // Guard against a maliciously large count field by capping the pre-allocation
260    // to what the remaining payload could possibly contain (each entry is at
261    // minimum 12 bytes: 4 for path_len + 8 for content_len).
262    let max_entries = payload.len().saturating_sub(pos) / 12;
263    let mut entries = Vec::with_capacity(count.min(max_entries));
264    for i in 0..count {
265        // rel_path length
266        let path_len = usize::try_from(read_u32_le(
267            payload,
268            &mut pos,
269            &format!("path length for entry {i}"),
270        )?)
271        .map_err(|_| {
272            SessionError::new(format!(
273                "unbundle: path length for entry {i} exceeds platform usize"
274            ))
275        })?;
276
277        // rel_path bytes
278        let path_bytes = payload.get(pos..pos + path_len).ok_or_else(|| {
279            SessionError::new(format!(
280                "unbundle: truncated payload reading path for entry {i}"
281            ))
282        })?;
283        let rel_path = std::str::from_utf8(path_bytes)
284            .map_err(|_| {
285                SessionError::new(format!("unbundle: path for entry {i} is not valid UTF-8"))
286            })?
287            .to_owned();
288        pos += path_len;
289
290        // content length
291        let content_len = usize::try_from(read_u64_le(
292            payload,
293            &mut pos,
294            &format!("content length for entry {i}"),
295        )?)
296        .map_err(|_| {
297            SessionError::new(format!(
298                "unbundle: content length for entry {i} exceeds platform usize"
299            ))
300        })?;
301
302        // content bytes
303        let content = payload
304            .get(pos..pos + content_len)
305            .ok_or_else(|| {
306                SessionError::new(format!(
307                    "unbundle: truncated payload reading content for entry {i}"
308                ))
309            })?
310            .to_vec();
311        pos += content_len;
312
313        entries.push((rel_path, content));
314    }
315
316    Ok((doc_id, entries))
317}
318
319/// Read a u32 little-endian from `data` at `*pos`, advance `pos` by 4.
320fn read_u32_le(data: &[u8], pos: &mut usize, field: &str) -> Result<u32, SessionError> {
321    let bytes = data
322        .get(*pos..*pos + 4)
323        .ok_or_else(|| SessionError::new(format!("unbundle: truncated payload reading {field}")))?;
324    let arr: [u8; 4] = bytes.try_into().map_err(|_| {
325        SessionError::new(format!("unbundle: internal slice error reading {field}"))
326    })?;
327    *pos += 4;
328    Ok(u32::from_le_bytes(arr))
329}
330
331/// Read a u64 little-endian from `data` at `*pos`, advance `pos` by 8.
332fn read_u64_le(data: &[u8], pos: &mut usize, field: &str) -> Result<u64, SessionError> {
333    let bytes = data
334        .get(*pos..*pos + 8)
335        .ok_or_else(|| SessionError::new(format!("unbundle: truncated payload reading {field}")))?;
336    let arr: [u8; 8] = bytes.try_into().map_err(|_| {
337        SessionError::new(format!("unbundle: internal slice error reading {field}"))
338    })?;
339    *pos += 8;
340    Ok(u64::from_le_bytes(arr))
341}
342
343// ── Tests ──────────────────────────────────────────────────────────────────────
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use crate::adapter::MemFs;
349
350    fn make_store(doc_id: &str) -> (MemFs, StorePaths) {
351        let fs = MemFs::new();
352        let paths = StorePaths::new("/data");
353        let doc_dir = paths.doc_dir(doc_id);
354
355        // versions.jsonl
356        fs.create_dir_all(&doc_dir).unwrap();
357        fs.write(&doc_dir.join("versions.jsonl"), b"{\"id\":\"v0\"}\n")
358            .unwrap();
359
360        // runs.jsonl
361        fs.write(&doc_dir.join("runs.jsonl"), b"{\"run\":1}\n")
362            .unwrap();
363
364        // objects/ab/cdef...
365        let obj_shard = doc_dir.join("objects").join("ab");
366        fs.create_dir_all(&obj_shard).unwrap();
367        fs.write(&obj_shard.join("cdef1234"), b"object-bytes")
368            .unwrap();
369
370        // scratch/index.jsonl
371        let scratch_dir = doc_dir.join("scratch");
372        fs.create_dir_all(&scratch_dir).unwrap();
373        fs.write(&scratch_dir.join("index.jsonl"), b"{\"cand\":\"c0\"}\n")
374            .unwrap();
375
376        (fs, paths)
377    }
378
379    #[test]
380    fn bundle_unbundle_roundtrip() {
381        let doc_id = "test-doc-001";
382        let (fs, paths) = make_store(doc_id);
383
384        let blob = bundle(&fs, &paths, doc_id).unwrap();
385
386        // Unbundle into a FRESH store root.
387        let fs2 = MemFs::new();
388        let paths2 = StorePaths::new("/data2");
389        let returned_id = unbundle(&fs2, &paths2, &blob).unwrap();
390
391        assert_eq!(returned_id, doc_id, "returned doc_id must match");
392
393        let doc_dir = paths.doc_dir(doc_id);
394        let doc_dir2 = paths2.doc_dir(doc_id);
395
396        // Check every original file is present and identical in the new store.
397        let check = |rel: &str| {
398            let orig = fs.read(&doc_dir.join(rel)).unwrap();
399            let copy = fs2.read(&doc_dir2.join(rel)).unwrap();
400            assert_eq!(orig, copy, "content mismatch for {rel}");
401        };
402        check("versions.jsonl");
403        check("runs.jsonl");
404        check("objects/ab/cdef1234");
405        check("scratch/index.jsonl");
406    }
407
408    #[test]
409    fn bundle_is_deterministic() {
410        let doc_id = "det-doc";
411        let (fs, paths) = make_store(doc_id);
412        let blob1 = bundle(&fs, &paths, doc_id).unwrap();
413        let blob2 = bundle(&fs, &paths, doc_id).unwrap();
414        assert_eq!(
415            blob1, blob2,
416            "two bundles of the same store must be byte-identical"
417        );
418    }
419
420    #[test]
421    fn unbundle_bad_magic_errors() {
422        // Pure garbage.
423        let result = unbundle(&MemFs::new(), &StorePaths::new("/x"), b"not-a-bundle");
424        assert!(result.is_err(), "garbage input must return Err");
425        let msg = result.unwrap_err().message;
426        assert!(
427            msg.contains("magic"),
428            "error must mention 'magic'; got: {msg}"
429        );
430
431        // Wrong magic prefix, correct length.
432        let mut bad = b"BADMAGIC".to_vec();
433        bad.extend_from_slice(b"\x78\x9c"); // zlib header (meaningless here)
434        let result2 = unbundle(&MemFs::new(), &StorePaths::new("/x"), &bad);
435        assert!(result2.is_err(), "wrong-magic input must return Err");
436    }
437
438    #[test]
439    fn unbundle_truncated_errors() {
440        let doc_id = "trunc-doc";
441        let (fs, paths) = make_store(doc_id);
442        let blob = bundle(&fs, &paths, doc_id).unwrap();
443
444        // Truncate to just past the magic — valid magic but truncated/empty deflate.
445        let truncated = &blob[..MAGIC.len() + 2];
446        let result = unbundle(&MemFs::new(), &StorePaths::new("/x"), truncated);
447        assert!(result.is_err(), "truncated bundle must return Err");
448    }
449
450    #[test]
451    fn bundle_missing_doc_errors() {
452        let fs = MemFs::new();
453        let paths = StorePaths::new("/data");
454        // "ghost-doc" directory was never created.
455        let result = bundle(&fs, &paths, "ghost-doc");
456        assert!(result.is_err(), "bundling a missing doc must return Err");
457        let msg = result.unwrap_err().message;
458        assert!(
459            msg.contains("ghost-doc"),
460            "error must mention the doc_id; got: {msg}"
461        );
462    }
463}