1use std::io::{Read, Write};
28use std::path::{Path, PathBuf};
29
30use flate2::Compression;
31use flate2::read::ZlibDecoder;
32use flate2::write::ZlibEncoder;
33
34use crate::adapter::Fs;
35use crate::error::SessionError;
36use crate::layout::StorePaths;
37
38const MAGIC: &[u8; 8] = b"ZNBDL1\0\0";
42
43type BundleEntry = (String, Vec<u8>);
45
46pub fn bundle(fs: &impl Fs, paths: &StorePaths, doc_id: &str) -> Result<Vec<u8>, SessionError> {
56 let doc_dir = paths.doc_dir(doc_id);
57 if !fs.exists(&doc_dir) {
58 return Err(SessionError::new(format!(
59 "bundle: document directory does not exist: {}",
60 doc_dir.display()
61 )));
62 }
63
64 let mut entries: Vec<(String, Vec<u8>)> = Vec::new();
66 collect_files(fs, &doc_dir, &doc_dir, &mut entries)?;
67
68 entries.sort_by(|(a, _), (b, _)| a.as_bytes().cmp(b.as_bytes()));
70
71 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
73 write_payload(&mut encoder, doc_id, &entries)?;
74 let compressed = encoder.finish().map_err(SessionError::from)?;
75
76 let mut out = Vec::with_capacity(MAGIC.len() + compressed.len());
78 out.extend_from_slice(MAGIC);
79 out.extend_from_slice(&compressed);
80 Ok(out)
81}
82
83pub fn unbundle(fs: &impl Fs, paths: &StorePaths, data: &[u8]) -> Result<String, SessionError> {
90 let magic = data
92 .get(..MAGIC.len())
93 .ok_or_else(|| SessionError::new("unbundle: data too short to contain magic header"))?;
94 if magic != MAGIC {
95 return Err(SessionError::new(format!(
96 "unbundle: bad magic header — expected {:?}, got {:?}",
97 MAGIC, magic
98 )));
99 }
100
101 let compressed = &data[MAGIC.len()..];
103 let mut decoder = ZlibDecoder::new(compressed);
104 let mut payload = Vec::new();
105 decoder
106 .read_to_end(&mut payload)
107 .map_err(|e| SessionError::new(format!("unbundle: decompression failed: {e}")))?;
108
109 let (doc_id, entries) = parse_payload(&payload)?;
111
112 let doc_dir = paths.doc_dir(&doc_id);
114 for (rel_path, content) in &entries {
115 let abs_path = join_relative(&doc_dir, rel_path)?;
116 let parent = abs_path.parent().ok_or_else(|| {
117 SessionError::new(format!("unbundle: entry path has no parent: {rel_path}"))
118 })?;
119 fs.create_dir_all(parent)?;
120 fs.write(&abs_path, content)?;
121 }
122
123 Ok(doc_id)
124}
125
126fn collect_files(
131 fs: &impl Fs,
132 base: &Path,
133 dir: &Path,
134 out: &mut Vec<(String, Vec<u8>)>,
135) -> Result<(), SessionError> {
136 let children = fs.read_dir(dir)?;
137 for child in children {
138 let rel = relative_path(base, &child)?;
139 match fs.read_dir(&child) {
142 Ok(_) => {
143 collect_files(fs, base, &child, out)?;
145 }
146 Err(_) => {
147 let content = fs.read(&child)?;
149 out.push((rel, content));
150 }
151 }
152 }
153 Ok(())
154}
155
156fn relative_path(base: &Path, path: &Path) -> Result<String, SessionError> {
159 let rel = path.strip_prefix(base).map_err(|_| {
160 SessionError::new(format!(
161 "bundle: path '{}' is not under base '{}'",
162 path.display(),
163 base.display()
164 ))
165 })?;
166 let mut parts = Vec::new();
168 for component in rel.components() {
169 parts.push(
170 component
171 .as_os_str()
172 .to_str()
173 .ok_or_else(|| SessionError::new("bundle: non-UTF-8 path component"))?
174 .to_owned(),
175 );
176 }
177 Ok(parts.join("/"))
178}
179
180fn join_relative(base: &Path, rel_path: &str) -> Result<PathBuf, SessionError> {
183 let mut result = base.to_path_buf();
184 for component in rel_path.split('/') {
185 if component == ".." || component == "." || component.is_empty() {
186 return Err(SessionError::new(format!(
187 "unbundle: invalid path component in entry: {rel_path:?}"
188 )));
189 }
190 result.push(component);
191 }
192 Ok(result)
193}
194
195fn write_payload(
197 w: &mut impl Write,
198 doc_id: &str,
199 entries: &[(String, Vec<u8>)],
200) -> Result<(), SessionError> {
201 let id_bytes = doc_id.as_bytes();
203 let id_len = u32::try_from(id_bytes.len())
204 .map_err(|_| SessionError::new("bundle: doc_id is too long to encode"))?;
205 w.write_all(&id_len.to_le_bytes())
206 .map_err(SessionError::from)?;
207 w.write_all(id_bytes).map_err(SessionError::from)?;
208
209 let count = u32::try_from(entries.len())
211 .map_err(|_| SessionError::new("bundle: too many entries to encode"))?;
212 w.write_all(&count.to_le_bytes())
213 .map_err(SessionError::from)?;
214
215 for (rel_path, content) in entries {
217 let path_bytes = rel_path.as_bytes();
218 let path_len = u32::try_from(path_bytes.len()).map_err(|_| {
219 SessionError::new(format!("bundle: relative path too long: {rel_path}"))
220 })?;
221 w.write_all(&path_len.to_le_bytes())
222 .map_err(SessionError::from)?;
223 w.write_all(path_bytes).map_err(SessionError::from)?;
224
225 let content_len = u64::try_from(content.len()).map_err(|_| {
226 SessionError::new(format!("bundle: content too large for entry: {rel_path}"))
227 })?;
228 w.write_all(&content_len.to_le_bytes())
229 .map_err(SessionError::from)?;
230 w.write_all(content).map_err(SessionError::from)?;
231 }
232 Ok(())
233}
234
235fn parse_payload(payload: &[u8]) -> Result<(String, Vec<BundleEntry>), SessionError> {
240 let mut pos = 0usize;
241
242 let id_len = usize::try_from(read_u32_le(payload, &mut pos, "doc_id length")?)
244 .map_err(|_| SessionError::new("unbundle: doc_id length exceeds platform usize"))?;
245
246 let id_bytes = payload
248 .get(pos..pos + id_len)
249 .ok_or_else(|| SessionError::new("unbundle: truncated payload reading doc_id"))?;
250 let doc_id = std::str::from_utf8(id_bytes)
251 .map_err(|_| SessionError::new("unbundle: doc_id is not valid UTF-8"))?
252 .to_owned();
253 pos += id_len;
254
255 let count = usize::try_from(read_u32_le(payload, &mut pos, "entry count")?)
257 .map_err(|_| SessionError::new("unbundle: entry count exceeds platform usize"))?;
258
259 let max_entries = payload.len().saturating_sub(pos) / 12;
263 let mut entries = Vec::with_capacity(count.min(max_entries));
264 for i in 0..count {
265 let path_len = usize::try_from(read_u32_le(
267 payload,
268 &mut pos,
269 &format!("path length for entry {i}"),
270 )?)
271 .map_err(|_| {
272 SessionError::new(format!(
273 "unbundle: path length for entry {i} exceeds platform usize"
274 ))
275 })?;
276
277 let path_bytes = payload.get(pos..pos + path_len).ok_or_else(|| {
279 SessionError::new(format!(
280 "unbundle: truncated payload reading path for entry {i}"
281 ))
282 })?;
283 let rel_path = std::str::from_utf8(path_bytes)
284 .map_err(|_| {
285 SessionError::new(format!("unbundle: path for entry {i} is not valid UTF-8"))
286 })?
287 .to_owned();
288 pos += path_len;
289
290 let content_len = usize::try_from(read_u64_le(
292 payload,
293 &mut pos,
294 &format!("content length for entry {i}"),
295 )?)
296 .map_err(|_| {
297 SessionError::new(format!(
298 "unbundle: content length for entry {i} exceeds platform usize"
299 ))
300 })?;
301
302 let content = payload
304 .get(pos..pos + content_len)
305 .ok_or_else(|| {
306 SessionError::new(format!(
307 "unbundle: truncated payload reading content for entry {i}"
308 ))
309 })?
310 .to_vec();
311 pos += content_len;
312
313 entries.push((rel_path, content));
314 }
315
316 Ok((doc_id, entries))
317}
318
319fn read_u32_le(data: &[u8], pos: &mut usize, field: &str) -> Result<u32, SessionError> {
321 let bytes = data
322 .get(*pos..*pos + 4)
323 .ok_or_else(|| SessionError::new(format!("unbundle: truncated payload reading {field}")))?;
324 let arr: [u8; 4] = bytes.try_into().map_err(|_| {
325 SessionError::new(format!("unbundle: internal slice error reading {field}"))
326 })?;
327 *pos += 4;
328 Ok(u32::from_le_bytes(arr))
329}
330
331fn read_u64_le(data: &[u8], pos: &mut usize, field: &str) -> Result<u64, SessionError> {
333 let bytes = data
334 .get(*pos..*pos + 8)
335 .ok_or_else(|| SessionError::new(format!("unbundle: truncated payload reading {field}")))?;
336 let arr: [u8; 8] = bytes.try_into().map_err(|_| {
337 SessionError::new(format!("unbundle: internal slice error reading {field}"))
338 })?;
339 *pos += 8;
340 Ok(u64::from_le_bytes(arr))
341}
342
343#[cfg(test)]
346mod tests {
347 use super::*;
348 use crate::adapter::MemFs;
349
350 fn make_store(doc_id: &str) -> (MemFs, StorePaths) {
351 let fs = MemFs::new();
352 let paths = StorePaths::new("/data");
353 let doc_dir = paths.doc_dir(doc_id);
354
355 fs.create_dir_all(&doc_dir).unwrap();
357 fs.write(&doc_dir.join("versions.jsonl"), b"{\"id\":\"v0\"}\n")
358 .unwrap();
359
360 fs.write(&doc_dir.join("runs.jsonl"), b"{\"run\":1}\n")
362 .unwrap();
363
364 let obj_shard = doc_dir.join("objects").join("ab");
366 fs.create_dir_all(&obj_shard).unwrap();
367 fs.write(&obj_shard.join("cdef1234"), b"object-bytes")
368 .unwrap();
369
370 let scratch_dir = doc_dir.join("scratch");
372 fs.create_dir_all(&scratch_dir).unwrap();
373 fs.write(&scratch_dir.join("index.jsonl"), b"{\"cand\":\"c0\"}\n")
374 .unwrap();
375
376 (fs, paths)
377 }
378
379 #[test]
380 fn bundle_unbundle_roundtrip() {
381 let doc_id = "test-doc-001";
382 let (fs, paths) = make_store(doc_id);
383
384 let blob = bundle(&fs, &paths, doc_id).unwrap();
385
386 let fs2 = MemFs::new();
388 let paths2 = StorePaths::new("/data2");
389 let returned_id = unbundle(&fs2, &paths2, &blob).unwrap();
390
391 assert_eq!(returned_id, doc_id, "returned doc_id must match");
392
393 let doc_dir = paths.doc_dir(doc_id);
394 let doc_dir2 = paths2.doc_dir(doc_id);
395
396 let check = |rel: &str| {
398 let orig = fs.read(&doc_dir.join(rel)).unwrap();
399 let copy = fs2.read(&doc_dir2.join(rel)).unwrap();
400 assert_eq!(orig, copy, "content mismatch for {rel}");
401 };
402 check("versions.jsonl");
403 check("runs.jsonl");
404 check("objects/ab/cdef1234");
405 check("scratch/index.jsonl");
406 }
407
408 #[test]
409 fn bundle_is_deterministic() {
410 let doc_id = "det-doc";
411 let (fs, paths) = make_store(doc_id);
412 let blob1 = bundle(&fs, &paths, doc_id).unwrap();
413 let blob2 = bundle(&fs, &paths, doc_id).unwrap();
414 assert_eq!(
415 blob1, blob2,
416 "two bundles of the same store must be byte-identical"
417 );
418 }
419
420 #[test]
421 fn unbundle_bad_magic_errors() {
422 let result = unbundle(&MemFs::new(), &StorePaths::new("/x"), b"not-a-bundle");
424 assert!(result.is_err(), "garbage input must return Err");
425 let msg = result.unwrap_err().message;
426 assert!(
427 msg.contains("magic"),
428 "error must mention 'magic'; got: {msg}"
429 );
430
431 let mut bad = b"BADMAGIC".to_vec();
433 bad.extend_from_slice(b"\x78\x9c"); let result2 = unbundle(&MemFs::new(), &StorePaths::new("/x"), &bad);
435 assert!(result2.is_err(), "wrong-magic input must return Err");
436 }
437
438 #[test]
439 fn unbundle_truncated_errors() {
440 let doc_id = "trunc-doc";
441 let (fs, paths) = make_store(doc_id);
442 let blob = bundle(&fs, &paths, doc_id).unwrap();
443
444 let truncated = &blob[..MAGIC.len() + 2];
446 let result = unbundle(&MemFs::new(), &StorePaths::new("/x"), truncated);
447 assert!(result.is_err(), "truncated bundle must return Err");
448 }
449
450 #[test]
451 fn bundle_missing_doc_errors() {
452 let fs = MemFs::new();
453 let paths = StorePaths::new("/data");
454 let result = bundle(&fs, &paths, "ghost-doc");
456 assert!(result.is_err(), "bundling a missing doc must return Err");
457 let msg = result.unwrap_err().message;
458 assert!(
459 msg.contains("ghost-doc"),
460 "error must mention the doc_id; got: {msg}"
461 );
462 }
463}