use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use flate2::Compression;
use flate2::read::ZlibDecoder;
use flate2::write::ZlibEncoder;
use crate::adapter::Fs;
use crate::error::SessionError;
use crate::layout::StorePaths;
const MAGIC: &[u8; 8] = b"ZNBDL1\0\0";
type BundleEntry = (String, Vec<u8>);
pub fn bundle(fs: &impl Fs, paths: &StorePaths, doc_id: &str) -> Result<Vec<u8>, SessionError> {
let doc_dir = paths.doc_dir(doc_id);
if !fs.exists(&doc_dir) {
return Err(SessionError::new(format!(
"bundle: document directory does not exist: {}",
doc_dir.display()
)));
}
let mut entries: Vec<(String, Vec<u8>)> = Vec::new();
collect_files(fs, &doc_dir, &doc_dir, &mut entries)?;
entries.sort_by(|(a, _), (b, _)| a.as_bytes().cmp(b.as_bytes()));
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
write_payload(&mut encoder, doc_id, &entries)?;
let compressed = encoder.finish().map_err(SessionError::from)?;
let mut out = Vec::with_capacity(MAGIC.len() + compressed.len());
out.extend_from_slice(MAGIC);
out.extend_from_slice(&compressed);
Ok(out)
}
pub fn unbundle(fs: &impl Fs, paths: &StorePaths, data: &[u8]) -> Result<String, SessionError> {
let magic = data
.get(..MAGIC.len())
.ok_or_else(|| SessionError::new("unbundle: data too short to contain magic header"))?;
if magic != MAGIC {
return Err(SessionError::new(format!(
"unbundle: bad magic header — expected {:?}, got {:?}",
MAGIC, magic
)));
}
let compressed = &data[MAGIC.len()..];
let mut decoder = ZlibDecoder::new(compressed);
let mut payload = Vec::new();
decoder
.read_to_end(&mut payload)
.map_err(|e| SessionError::new(format!("unbundle: decompression failed: {e}")))?;
let (doc_id, entries) = parse_payload(&payload)?;
let doc_dir = paths.doc_dir(&doc_id);
for (rel_path, content) in &entries {
let abs_path = join_relative(&doc_dir, rel_path)?;
let parent = abs_path.parent().ok_or_else(|| {
SessionError::new(format!("unbundle: entry path has no parent: {rel_path}"))
})?;
fs.create_dir_all(parent)?;
fs.write(&abs_path, content)?;
}
Ok(doc_id)
}
fn collect_files(
fs: &impl Fs,
base: &Path,
dir: &Path,
out: &mut Vec<(String, Vec<u8>)>,
) -> Result<(), SessionError> {
let children = fs.read_dir(dir)?;
for child in children {
let rel = relative_path(base, &child)?;
match fs.read_dir(&child) {
Ok(_) => {
collect_files(fs, base, &child, out)?;
}
Err(_) => {
let content = fs.read(&child)?;
out.push((rel, content));
}
}
}
Ok(())
}
fn relative_path(base: &Path, path: &Path) -> Result<String, SessionError> {
let rel = path.strip_prefix(base).map_err(|_| {
SessionError::new(format!(
"bundle: path '{}' is not under base '{}'",
path.display(),
base.display()
))
})?;
let mut parts = Vec::new();
for component in rel.components() {
parts.push(
component
.as_os_str()
.to_str()
.ok_or_else(|| SessionError::new("bundle: non-UTF-8 path component"))?
.to_owned(),
);
}
Ok(parts.join("/"))
}
fn join_relative(base: &Path, rel_path: &str) -> Result<PathBuf, SessionError> {
let mut result = base.to_path_buf();
for component in rel_path.split('/') {
if component == ".." || component == "." || component.is_empty() {
return Err(SessionError::new(format!(
"unbundle: invalid path component in entry: {rel_path:?}"
)));
}
result.push(component);
}
Ok(result)
}
fn write_payload(
w: &mut impl Write,
doc_id: &str,
entries: &[(String, Vec<u8>)],
) -> Result<(), SessionError> {
let id_bytes = doc_id.as_bytes();
let id_len = u32::try_from(id_bytes.len())
.map_err(|_| SessionError::new("bundle: doc_id is too long to encode"))?;
w.write_all(&id_len.to_le_bytes())
.map_err(SessionError::from)?;
w.write_all(id_bytes).map_err(SessionError::from)?;
let count = u32::try_from(entries.len())
.map_err(|_| SessionError::new("bundle: too many entries to encode"))?;
w.write_all(&count.to_le_bytes())
.map_err(SessionError::from)?;
for (rel_path, content) in entries {
let path_bytes = rel_path.as_bytes();
let path_len = u32::try_from(path_bytes.len()).map_err(|_| {
SessionError::new(format!("bundle: relative path too long: {rel_path}"))
})?;
w.write_all(&path_len.to_le_bytes())
.map_err(SessionError::from)?;
w.write_all(path_bytes).map_err(SessionError::from)?;
let content_len = u64::try_from(content.len()).map_err(|_| {
SessionError::new(format!("bundle: content too large for entry: {rel_path}"))
})?;
w.write_all(&content_len.to_le_bytes())
.map_err(SessionError::from)?;
w.write_all(content).map_err(SessionError::from)?;
}
Ok(())
}
fn parse_payload(payload: &[u8]) -> Result<(String, Vec<BundleEntry>), SessionError> {
let mut pos = 0usize;
let id_len = usize::try_from(read_u32_le(payload, &mut pos, "doc_id length")?)
.map_err(|_| SessionError::new("unbundle: doc_id length exceeds platform usize"))?;
let id_bytes = payload
.get(pos..pos + id_len)
.ok_or_else(|| SessionError::new("unbundle: truncated payload reading doc_id"))?;
let doc_id = std::str::from_utf8(id_bytes)
.map_err(|_| SessionError::new("unbundle: doc_id is not valid UTF-8"))?
.to_owned();
pos += id_len;
let count = usize::try_from(read_u32_le(payload, &mut pos, "entry count")?)
.map_err(|_| SessionError::new("unbundle: entry count exceeds platform usize"))?;
let max_entries = payload.len().saturating_sub(pos) / 12;
let mut entries = Vec::with_capacity(count.min(max_entries));
for i in 0..count {
let path_len = usize::try_from(read_u32_le(
payload,
&mut pos,
&format!("path length for entry {i}"),
)?)
.map_err(|_| {
SessionError::new(format!(
"unbundle: path length for entry {i} exceeds platform usize"
))
})?;
let path_bytes = payload.get(pos..pos + path_len).ok_or_else(|| {
SessionError::new(format!(
"unbundle: truncated payload reading path for entry {i}"
))
})?;
let rel_path = std::str::from_utf8(path_bytes)
.map_err(|_| {
SessionError::new(format!("unbundle: path for entry {i} is not valid UTF-8"))
})?
.to_owned();
pos += path_len;
let content_len = usize::try_from(read_u64_le(
payload,
&mut pos,
&format!("content length for entry {i}"),
)?)
.map_err(|_| {
SessionError::new(format!(
"unbundle: content length for entry {i} exceeds platform usize"
))
})?;
let content = payload
.get(pos..pos + content_len)
.ok_or_else(|| {
SessionError::new(format!(
"unbundle: truncated payload reading content for entry {i}"
))
})?
.to_vec();
pos += content_len;
entries.push((rel_path, content));
}
Ok((doc_id, entries))
}
fn read_u32_le(data: &[u8], pos: &mut usize, field: &str) -> Result<u32, SessionError> {
let bytes = data
.get(*pos..*pos + 4)
.ok_or_else(|| SessionError::new(format!("unbundle: truncated payload reading {field}")))?;
let arr: [u8; 4] = bytes.try_into().map_err(|_| {
SessionError::new(format!("unbundle: internal slice error reading {field}"))
})?;
*pos += 4;
Ok(u32::from_le_bytes(arr))
}
fn read_u64_le(data: &[u8], pos: &mut usize, field: &str) -> Result<u64, SessionError> {
let bytes = data
.get(*pos..*pos + 8)
.ok_or_else(|| SessionError::new(format!("unbundle: truncated payload reading {field}")))?;
let arr: [u8; 8] = bytes.try_into().map_err(|_| {
SessionError::new(format!("unbundle: internal slice error reading {field}"))
})?;
*pos += 8;
Ok(u64::from_le_bytes(arr))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::MemFs;
fn make_store(doc_id: &str) -> (MemFs, StorePaths) {
let fs = MemFs::new();
let paths = StorePaths::new("/data");
let doc_dir = paths.doc_dir(doc_id);
fs.create_dir_all(&doc_dir).unwrap();
fs.write(&doc_dir.join("versions.jsonl"), b"{\"id\":\"v0\"}\n")
.unwrap();
fs.write(&doc_dir.join("runs.jsonl"), b"{\"run\":1}\n")
.unwrap();
let obj_shard = doc_dir.join("objects").join("ab");
fs.create_dir_all(&obj_shard).unwrap();
fs.write(&obj_shard.join("cdef1234"), b"object-bytes")
.unwrap();
let scratch_dir = doc_dir.join("scratch");
fs.create_dir_all(&scratch_dir).unwrap();
fs.write(&scratch_dir.join("index.jsonl"), b"{\"cand\":\"c0\"}\n")
.unwrap();
(fs, paths)
}
#[test]
fn bundle_unbundle_roundtrip() {
let doc_id = "test-doc-001";
let (fs, paths) = make_store(doc_id);
let blob = bundle(&fs, &paths, doc_id).unwrap();
let fs2 = MemFs::new();
let paths2 = StorePaths::new("/data2");
let returned_id = unbundle(&fs2, &paths2, &blob).unwrap();
assert_eq!(returned_id, doc_id, "returned doc_id must match");
let doc_dir = paths.doc_dir(doc_id);
let doc_dir2 = paths2.doc_dir(doc_id);
let check = |rel: &str| {
let orig = fs.read(&doc_dir.join(rel)).unwrap();
let copy = fs2.read(&doc_dir2.join(rel)).unwrap();
assert_eq!(orig, copy, "content mismatch for {rel}");
};
check("versions.jsonl");
check("runs.jsonl");
check("objects/ab/cdef1234");
check("scratch/index.jsonl");
}
#[test]
fn bundle_is_deterministic() {
let doc_id = "det-doc";
let (fs, paths) = make_store(doc_id);
let blob1 = bundle(&fs, &paths, doc_id).unwrap();
let blob2 = bundle(&fs, &paths, doc_id).unwrap();
assert_eq!(
blob1, blob2,
"two bundles of the same store must be byte-identical"
);
}
#[test]
fn unbundle_bad_magic_errors() {
let result = unbundle(&MemFs::new(), &StorePaths::new("/x"), b"not-a-bundle");
assert!(result.is_err(), "garbage input must return Err");
let msg = result.unwrap_err().message;
assert!(
msg.contains("magic"),
"error must mention 'magic'; got: {msg}"
);
let mut bad = b"BADMAGIC".to_vec();
bad.extend_from_slice(b"\x78\x9c"); let result2 = unbundle(&MemFs::new(), &StorePaths::new("/x"), &bad);
assert!(result2.is_err(), "wrong-magic input must return Err");
}
#[test]
fn unbundle_truncated_errors() {
let doc_id = "trunc-doc";
let (fs, paths) = make_store(doc_id);
let blob = bundle(&fs, &paths, doc_id).unwrap();
let truncated = &blob[..MAGIC.len() + 2];
let result = unbundle(&MemFs::new(), &StorePaths::new("/x"), truncated);
assert!(result.is_err(), "truncated bundle must return Err");
}
#[test]
fn bundle_missing_doc_errors() {
let fs = MemFs::new();
let paths = StorePaths::new("/data");
let result = bundle(&fs, &paths, "ghost-doc");
assert!(result.is_err(), "bundling a missing doc must return Err");
let msg = result.unwrap_err().message;
assert!(
msg.contains("ghost-doc"),
"error must mention the doc_id; got: {msg}"
);
}
}