use std::fs;
use std::io::{self, BufRead, BufReader, Read, Write};
use std::path::{Path, PathBuf};
use snapdir_core::manifest::Manifest;
use snapdir_core::merkle::{snapshot_id, Blake3Hasher, Hasher};
use snapdir_core::store::{manifest_path, object_path, StoreError};
use crate::file_store::FileStore;
use crate::stream::StreamStore;
pub const WIRE_VERSION: u32 = 1;
pub const WIRE_CAPS: &[&str] = &["objects-needed", "send-pack", "receive-pack"];
pub const WIRE_MAGIC: &str = "SNAPPACK 1\n";
pub const MAX_HEADER_BYTES: usize = 128;
pub const MAX_MANIFEST_BYTES: u64 = 64 * 1024 * 1024;
const STAGE_PREALLOC_CAP: u64 = 8 * 1024 * 1024;
#[must_use]
pub fn is_hex64(s: &str) -> bool {
s.len() == 64 && s.bytes().all(|b| matches!(b, b'0'..=b'9' | b'a'..=b'f'))
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct PackWriteReport {
pub objects_written: u64,
pub manifest_written: bool,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct PackReadReport {
pub objects_written: u64,
pub objects_skipped: u64,
pub manifest_committed: bool,
}
pub trait PackSink {
fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError>;
fn stage_object(
&mut self,
checksum: &str,
len: u64,
payload: &mut dyn Read,
) -> Result<(), StoreError>;
fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError>;
fn abort_object(&mut self, checksum: &str);
fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError>;
}
pub struct StreamSink<'a> {
store: &'a dyn StreamStore,
staged: Option<(String, Vec<u8>)>,
}
impl<'a> StreamSink<'a> {
#[must_use]
pub fn new(store: &'a dyn StreamStore) -> Self {
Self {
store,
staged: None,
}
}
}
impl PackSink for StreamSink<'_> {
fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError> {
self.store.has_object(checksum)
}
fn stage_object(
&mut self,
checksum: &str,
len: u64,
payload: &mut dyn Read,
) -> Result<(), StoreError> {
self.staged = None;
let prealloc = usize::try_from(len.min(STAGE_PREALLOC_CAP)).unwrap_or(0);
let mut buf = Vec::with_capacity(prealloc);
payload.read_to_end(&mut buf)?;
self.staged = Some((checksum.to_owned(), buf));
Ok(())
}
fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError> {
match self.staged.take() {
Some((staged_checksum, bytes)) if staged_checksum == checksum => {
self.store.put_object(checksum, bytes)
}
_ => Err(protocol(format!(
"internal pack sink error: commit of {checksum} without a matching staged payload"
))),
}
}
fn abort_object(&mut self, _checksum: &str) {
self.staged = None;
}
fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
self.store.put_manifest(id, manifest)
}
}
pub struct FileSink<'a> {
store: &'a FileStore,
staged: Option<StagedFile>,
}
struct StagedFile {
checksum: String,
tmp: PathBuf,
target: PathBuf,
}
impl<'a> FileSink<'a> {
#[must_use]
pub fn new(store: &'a FileStore) -> Self {
Self {
store,
staged: None,
}
}
}
impl Drop for FileSink<'_> {
fn drop(&mut self) {
if let Some(staged) = self.staged.take() {
let _ = fs::remove_file(&staged.tmp);
}
}
}
impl PackSink for FileSink<'_> {
fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError> {
StreamStore::has_object(self.store, checksum)
}
fn stage_object(
&mut self,
checksum: &str,
_len: u64,
payload: &mut dyn Read,
) -> Result<(), StoreError> {
self.abort_object(checksum);
let target = self.store.root().join(object_path(checksum));
if let Some(parent) = target.parent() {
fs::create_dir_all(parent)?;
}
let tmp = temp_sibling(&target);
let mut file = fs::File::create(&tmp)?;
let copied = io::copy(payload, &mut file);
drop(file);
if let Err(err) = copied {
let _ = fs::remove_file(&tmp);
return Err(err.into());
}
self.staged = Some(StagedFile {
checksum: checksum.to_owned(),
tmp,
target,
});
Ok(())
}
fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError> {
match self.staged.take() {
Some(staged) if staged.checksum == checksum => {
fs::rename(&staged.tmp, &staged.target)?;
Ok(())
}
other => {
if let Some(staged) = other {
let _ = fs::remove_file(&staged.tmp);
}
Err(protocol(format!(
"internal pack sink error: commit of {checksum} without a matching staged payload"
)))
}
}
}
fn abort_object(&mut self, _checksum: &str) {
if let Some(staged) = self.staged.take() {
let _ = fs::remove_file(&staged.tmp);
}
}
fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
self.store.put_manifest(id, manifest)
}
}
pub fn write_pack(
source: &dyn StreamStore,
ids: &[String],
manifest_id: Option<&str>,
mut out: impl Write,
) -> Result<PackWriteReport, StoreError> {
for id in ids {
if !is_hex64(id) {
return Err(protocol(format!(
"invalid object checksum {id:?}: expected 64 lowercase hex characters"
)));
}
}
if let Some(id) = manifest_id {
if !is_hex64(id) {
return Err(protocol(format!(
"invalid manifest id {id:?}: expected 64 lowercase hex characters"
)));
}
}
let hasher = Blake3Hasher::new();
let manifest_payload: Option<(&str, Vec<u8>)> = match manifest_id {
Some(id) => {
let manifest = source.get_manifest(id)?;
let mut text = manifest.to_string();
text.push('\n');
let bytes = text.into_bytes();
let actual = hasher.hash_hex(&bytes);
if actual != id {
return Err(StoreError::Integrity {
address: manifest_path(id),
expected: id.to_owned(),
actual,
});
}
Some((id, bytes))
}
None => None,
};
out.write_all(WIRE_MAGIC.as_bytes())?;
let mut report = PackWriteReport::default();
for id in ids {
let bytes = source.get_object(id)?;
let actual = hasher.hash_hex(&bytes);
if actual != *id {
return Err(StoreError::Integrity {
address: object_path(id),
expected: id.clone(),
actual,
});
}
writeln!(out, "obj {id} {}", bytes.len())?;
out.write_all(&bytes)?;
report.objects_written += 1;
}
if let Some((id, bytes)) = manifest_payload {
writeln!(out, "manifest {id} {}", bytes.len())?;
out.write_all(&bytes)?;
report.manifest_written = true;
}
out.write_all(b"end\n")?;
out.flush()?;
Ok(report)
}
pub fn read_pack(input: impl Read, sink: &mut dyn PackSink) -> Result<PackReadReport, StoreError> {
let mut input = BufReader::new(input);
check_magic(&read_header_line(&mut input)?)?;
let mut report = PackReadReport::default();
let mut pending_manifest: Option<(String, Manifest)> = None;
loop {
let line = read_header_line(&mut input)?;
if line == "end" {
if let Some((id, manifest)) = pending_manifest.take() {
sink.put_manifest(&id, &manifest)?;
report.manifest_committed = true;
}
return Ok(report);
}
if pending_manifest.is_some() {
return Err(protocol(format!(
"record after the manifest record (only the `end` trailer may follow it): {line:?}"
)));
}
let (kind, checksum, len) = parse_record_header(&line)?;
match kind {
RecordKind::Obj => read_obj_record(&mut input, sink, &checksum, len, &mut report)?,
RecordKind::Manifest => {
pending_manifest = Some(read_manifest_record(&mut input, &checksum, len)?);
}
}
}
}
enum RecordKind {
Obj,
Manifest,
}
fn read_obj_record(
input: &mut dyn Read,
sink: &mut dyn PackSink,
checksum: &str,
len: u64,
report: &mut PackReadReport,
) -> Result<(), StoreError> {
let present = sink.has_object(checksum)?;
let mut payload = HashingTake::new(input, len);
if present {
payload.drain()?;
} else if let Err(err) = sink.stage_object(checksum, len, &mut payload) {
sink.abort_object(checksum);
return Err(err);
}
if payload.remaining() > 0 {
if !present {
sink.abort_object(checksum);
}
return Err(if payload.hit_eof() {
protocol(format!(
"truncated pack stream: EOF inside the payload of object {checksum} \
({} of {len} bytes missing)",
payload.remaining()
))
} else {
protocol(format!(
"internal pack sink error: sink consumed only {} of {len} payload bytes \
for object {checksum}",
len - payload.remaining()
))
});
}
let actual = payload.finalize_hex();
if actual != checksum {
if !present {
sink.abort_object(checksum);
}
return Err(StoreError::Integrity {
address: object_path(checksum),
expected: checksum.to_owned(),
actual,
});
}
if present {
report.objects_skipped += 1;
} else {
sink.commit_object(checksum)?;
report.objects_written += 1;
}
Ok(())
}
fn read_manifest_record(
input: &mut dyn Read,
id: &str,
len: u64,
) -> Result<(String, Manifest), StoreError> {
if len > MAX_MANIFEST_BYTES {
return Err(protocol(format!(
"manifest record of {len} bytes exceeds the {MAX_MANIFEST_BYTES}-byte cap"
)));
}
let mut payload = HashingTake::new(input, len);
let mut buf = Vec::with_capacity(usize::try_from(len.min(STAGE_PREALLOC_CAP)).unwrap_or(0));
payload.read_to_end(&mut buf)?;
if payload.remaining() > 0 {
return Err(protocol(format!(
"truncated pack stream: EOF inside the payload of manifest {id} \
({} of {len} bytes missing)",
payload.remaining()
)));
}
let actual = payload.finalize_hex();
if actual != id {
return Err(StoreError::Integrity {
address: manifest_path(id),
expected: id.to_owned(),
actual,
});
}
let text = String::from_utf8(buf).map_err(|err| StoreError::Backend {
message: format!("manifest {id} payload is not valid UTF-8"),
source: Some(Box::new(err)),
})?;
let manifest = Manifest::parse(&text)?;
let rendered_id = snapshot_id(&manifest, &Blake3Hasher::new());
if rendered_id != id {
return Err(StoreError::Integrity {
address: manifest_path(id),
expected: id.to_owned(),
actual: rendered_id,
});
}
Ok((id.to_owned(), manifest))
}
fn protocol(message: impl Into<String>) -> StoreError {
StoreError::Backend {
message: message.into(),
source: None,
}
}
fn read_header_line(input: &mut impl BufRead) -> Result<String, StoreError> {
let mut line: Vec<u8> = Vec::with_capacity(32);
loop {
let mut byte = [0u8; 1];
let n = input.read(&mut byte)?;
if n == 0 {
return Err(protocol(if line.is_empty() {
"truncated pack stream: unexpected EOF before the `end` trailer".to_owned()
} else {
format!(
"truncated pack stream: EOF inside a header line (read {:?} so far)",
String::from_utf8_lossy(&line)
)
}));
}
if byte[0] == b'\n' {
break;
}
line.push(byte[0]);
if line.len() >= MAX_HEADER_BYTES {
return Err(protocol(format!(
"header line exceeds the {MAX_HEADER_BYTES}-byte cap"
)));
}
}
String::from_utf8(line).map_err(|err| StoreError::Backend {
message: "header line is not valid UTF-8".to_owned(),
source: Some(Box::new(err)),
})
}
fn check_magic(line: &str) -> Result<(), StoreError> {
let Some(version) = line.strip_prefix("SNAPPACK ") else {
return Err(protocol(format!(
"bad pack magic {line:?} (expected {:?})",
WIRE_MAGIC.trim_end()
)));
};
if version != WIRE_VERSION.to_string() {
return Err(protocol(format!(
"unsupported pack wire version {version:?}: this build speaks wire={WIRE_VERSION}"
)));
}
Ok(())
}
fn parse_record_header(line: &str) -> Result<(RecordKind, String, u64), StoreError> {
let mut parts = line.split(' ');
let kind = match parts.next() {
Some("obj") => RecordKind::Obj,
Some("manifest") => RecordKind::Manifest,
_ => {
return Err(protocol(format!(
"unknown record header {line:?} (expected `obj`, `manifest`, or `end`)"
)));
}
};
let (Some(checksum), Some(len_token), None) = (parts.next(), parts.next(), parts.next()) else {
return Err(protocol(format!(
"malformed record header {line:?} (expected `<kind> <hex64> <len>`)"
)));
};
if !is_hex64(checksum) {
return Err(protocol(format!(
"invalid checksum {checksum:?} in record header: expected 64 lowercase hex characters"
)));
}
if len_token.is_empty() || !len_token.bytes().all(|b| b.is_ascii_digit()) {
return Err(protocol(format!(
"invalid payload length {len_token:?} in record header: expected a decimal u64"
)));
}
let len: u64 = len_token.parse().map_err(|_| {
protocol(format!(
"payload length {len_token:?} does not fit in a u64"
))
})?;
Ok((kind, checksum.to_owned(), len))
}
struct HashingTake<'a> {
inner: &'a mut dyn Read,
remaining: u64,
hit_eof: bool,
hasher: blake3::Hasher,
}
impl<'a> HashingTake<'a> {
fn new(inner: &'a mut dyn Read, len: u64) -> Self {
Self {
inner,
remaining: len,
hit_eof: false,
hasher: blake3::Hasher::new(),
}
}
fn remaining(&self) -> u64 {
self.remaining
}
fn hit_eof(&self) -> bool {
self.hit_eof
}
fn finalize_hex(&self) -> String {
self.hasher.finalize().to_hex().to_string()
}
fn drain(&mut self) -> io::Result<()> {
let mut buf = [0u8; 8 * 1024];
loop {
if self.read(&mut buf)? == 0 {
return Ok(());
}
}
}
}
impl Read for HashingTake<'_> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.remaining == 0 || buf.is_empty() {
return Ok(0);
}
let cap = usize::try_from(self.remaining)
.unwrap_or(usize::MAX)
.min(buf.len());
let n = self.inner.read(&mut buf[..cap])?;
if n == 0 {
self.hit_eof = true;
return Ok(0);
}
self.hasher.update(&buf[..n]);
self.remaining -= n as u64;
Ok(n)
}
}
fn temp_sibling(target: &Path) -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
let file_name = target
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_default();
let tmp_name = format!("{file_name}.{pid}.{n}.tmp");
match target.parent() {
Some(parent) => parent.join(tmp_name),
None => PathBuf::from(tmp_name),
}
}
#[cfg(test)]
mod tests {
use super::*;
use snapdir_core::manifest::{ManifestEntry, PathType};
use snapdir_core::store::Store;
use std::fs;
struct TempDir {
path: PathBuf,
}
impl TempDir {
fn new(tag: &str) -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let path = std::env::temp_dir().join(format!(
"snapdir-pack-test-{}-{tag}-{n}",
std::process::id()
));
fs::create_dir_all(&path).expect("create temp dir");
Self { path }
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.path);
}
}
fn big_payload(len: usize) -> Vec<u8> {
(0..len).map(|i| u8::try_from(i % 251).unwrap()).collect()
}
fn seed_store(tag: &str, payloads: &[Vec<u8>]) -> (TempDir, FileStore, Vec<String>) {
let dir = TempDir::new(tag);
let store = FileStore::from_root(dir.path());
let hasher = Blake3Hasher::new();
let ids = payloads
.iter()
.map(|p| {
let checksum = hasher.hash_hex(p);
store.put_object(&checksum, p.clone()).expect("seed object");
checksum
})
.collect();
(dir, store, ids)
}
fn manifest_for(payloads: &[Vec<u8>]) -> (Manifest, String) {
let hasher = Blake3Hasher::new();
let mut manifest = Manifest::new();
manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
for (i, payload) in payloads.iter().enumerate() {
manifest.push(ManifestEntry::new(
PathType::File,
"600",
hasher.hash_hex(payload),
u64::try_from(payload.len()).unwrap(),
format!("./obj-{i:02}"),
));
}
let manifest = Manifest::from_entries(manifest.entries().to_vec());
let id = snapshot_id(&manifest, &hasher);
(manifest, id)
}
fn manifest_bytes(manifest: &Manifest) -> Vec<u8> {
let mut text = manifest.to_string();
text.push('\n');
text.into_bytes()
}
fn files_under(dir: &Path) -> Vec<PathBuf> {
let mut files = Vec::new();
let Ok(entries) = fs::read_dir(dir) else {
return files;
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
files.extend(files_under(&path));
} else {
files.push(path);
}
}
files
}
fn raw_record(kind: &str, hex: &str, payload: &[u8]) -> Vec<u8> {
let mut out = format!("{kind} {hex} {}\n", payload.len()).into_bytes();
out.extend_from_slice(payload);
out
}
fn raw_stream(records: &[Vec<u8>]) -> Vec<u8> {
let mut out = WIRE_MAGIC.as_bytes().to_vec();
for record in records {
out.extend_from_slice(record);
}
out.extend_from_slice(b"end\n");
out
}
fn hex_of(bytes: &[u8]) -> String {
Blake3Hasher::new().hash_hex(bytes)
}
#[test]
fn pack_wire_constants_are_consistent() {
assert_eq!(WIRE_MAGIC, format!("SNAPPACK {WIRE_VERSION}\n"));
assert_eq!(WIRE_VERSION, 1);
assert_eq!(WIRE_CAPS, &["objects-needed", "send-pack", "receive-pack"]);
}
#[test]
fn pack_is_hex64_validates_shape() {
let valid = "0123456789abcdef".repeat(4);
assert!(is_hex64(&valid));
assert!(!is_hex64(&valid[..63])); assert!(!is_hex64(&format!("{valid}0"))); assert!(!is_hex64(&valid.to_uppercase())); assert!(!is_hex64(&format!("g{}", &valid[1..]))); assert!(!is_hex64("")); }
#[test]
fn pack_roundtrip_file_sink_streams_objects_and_manifest() {
let payloads = vec![
Vec::new(),
b"hello pack\n".to_vec(),
big_payload(3 * 1024 * 1024 + 7),
];
let (a_dir, a, ids) = seed_store("rt-a", &payloads);
let (manifest, man_id) = manifest_for(&payloads);
a.put_manifest(&man_id, &manifest).expect("seed manifest");
let mut pack = Vec::new();
let wrote = write_pack(&a, &ids, Some(&man_id), &mut pack).expect("write_pack");
assert_eq!(wrote.objects_written, 3);
assert!(wrote.manifest_written);
let b_dir = TempDir::new("rt-b");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
assert_eq!(read.objects_written, 3);
assert_eq!(read.objects_skipped, 0);
assert!(read.manifest_committed);
for (id, payload) in ids.iter().zip(&payloads) {
let key = object_path(id);
assert_eq!(
fs::read(b_dir.path().join(&key)).expect("b object"),
*payload,
"object {key} byte-equal"
);
assert_eq!(
fs::read(a_dir.path().join(&key)).expect("a object"),
fs::read(b_dir.path().join(&key)).expect("b object"),
);
}
let back = b.get_manifest(&man_id).expect("manifest in B");
assert_eq!(back, manifest);
assert_eq!(snapshot_id(&back, &Blake3Hasher::new()), man_id);
assert!(
!files_under(b_dir.path())
.iter()
.any(|p| p.to_string_lossy().ends_with(".tmp")),
"no stray temp files after a clean stream"
);
}
#[test]
fn pack_roundtrip_stream_sink_generic() {
let payloads = vec![b"alpha\n".to_vec(), b"beta\n".to_vec()];
let (_a_dir, a, ids) = seed_store("ss-a", &payloads);
let (manifest, man_id) = manifest_for(&payloads);
a.put_manifest(&man_id, &manifest).expect("seed manifest");
let mut pack = Vec::new();
write_pack(&a, &ids, Some(&man_id), &mut pack).expect("write_pack");
let b_dir = TempDir::new("ss-b");
let b = FileStore::from_root(b_dir.path());
let mut sink = StreamSink::new(&b);
let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
assert_eq!(read.objects_written, 2);
assert!(read.manifest_committed);
for (id, payload) in ids.iter().zip(&payloads) {
assert_eq!(b.get_object(id).expect("object"), *payload);
}
assert_eq!(b.get_manifest(&man_id).expect("manifest"), manifest);
}
#[test]
fn pack_empty_stream_roundtrips() {
let payloads: Vec<Vec<u8>> = Vec::new();
let (_a_dir, a, ids) = seed_store("empty-a", &payloads);
let mut pack = Vec::new();
let wrote = write_pack(&a, &ids, None, &mut pack).expect("write_pack");
assert_eq!(wrote, PackWriteReport::default());
assert_eq!(pack, b"SNAPPACK 1\nend\n");
let b_dir = TempDir::new("empty-b");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
assert_eq!(read, PackReadReport::default());
}
#[test]
fn pack_manifest_only_stream_completes_interrupted_push() {
let payloads = vec![b"already there\n".to_vec()];
let (_a_dir, a, _ids) = seed_store("mo-a", &payloads);
let (manifest, man_id) = manifest_for(&payloads);
a.put_manifest(&man_id, &manifest).expect("seed manifest");
let mut pack = Vec::new();
let wrote = write_pack(&a, &[], Some(&man_id), &mut pack).expect("write_pack");
assert_eq!(wrote.objects_written, 0);
assert!(wrote.manifest_written);
let b_dir = TempDir::new("mo-b");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
assert!(read.manifest_committed);
assert_eq!(b.get_manifest(&man_id).expect("manifest"), manifest);
}
#[test]
fn pack_rejects_oversized_header_line() {
let mut stream = WIRE_MAGIC.as_bytes().to_vec();
stream.extend_from_slice("o".repeat(200).as_bytes());
stream.extend_from_slice(b"\nend\n");
let b_dir = TempDir::new("hdr-cap");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
assert!(err.to_string().contains("128-byte cap"), "got: {err}");
}
#[test]
fn pack_rejects_bad_magic_and_bad_version() {
let b_dir = TempDir::new("magic");
let b = FileStore::from_root(b_dir.path());
for stream in [
&b"SNAPHACK 1\nend\n"[..],
&b"GARBAGE\nend\n"[..],
&b"SNAPPACK 2\nend\n"[..],
&b"SNAPPACK 01\nend\n"[..], &b"SNAPPACK \nend\n"[..],
&b""[..], ] {
let mut sink = FileSink::new(&b);
assert!(
read_pack(stream, &mut sink).is_err(),
"stream {:?} must be rejected",
String::from_utf8_lossy(stream)
);
}
}
#[test]
fn pack_rejects_malformed_checksums_and_lengths() {
let valid = "0123456789abcdef".repeat(4);
let headers = [
format!("obj {} 0", valid.to_uppercase()), format!("obj {} 0", &valid[..63]), format!("obj {valid}0 0"), format!("obj g{} 0", &valid[1..]), format!("obj {valid} 12x"), format!("obj {valid} +5"), format!("obj {valid} 99999999999999999999"), format!("obj {valid}"), format!("obj {valid} 0 extra"), format!("blob {valid} 0"), format!("obj {valid} 0"), ];
let b_dir = TempDir::new("hdr-bad");
let b = FileStore::from_root(b_dir.path());
for header in headers {
let mut stream = WIRE_MAGIC.as_bytes().to_vec();
stream.extend_from_slice(header.as_bytes());
stream.extend_from_slice(b"\nend\n");
let mut sink = FileSink::new(&b);
assert!(
read_pack(stream.as_slice(), &mut sink).is_err(),
"header {header:?} must be rejected"
);
}
}
#[test]
fn pack_mismatched_object_files_nothing_and_leaves_no_temp() {
let claimed = hex_of(b"good bytes");
let evil = b"evil bytes"; let (manifest, man_id) = manifest_for(&[b"good bytes".to_vec()]);
let stream = raw_stream(&[
raw_record("obj", &claimed, evil),
raw_record("manifest", &man_id, &manifest_bytes(&manifest)),
]);
let b_dir = TempDir::new("sec");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let err = read_pack(stream.as_slice(), &mut sink).expect_err("must abort");
match err {
StoreError::Integrity {
expected, actual, ..
} => {
assert_eq!(expected, claimed);
assert_eq!(actual, hex_of(evil));
}
other => panic!("expected Integrity, got {other:?}"),
}
drop(sink);
assert!(!StreamStore::has_object(&b, &claimed).unwrap());
assert!(!b_dir.path().join(object_path(&claimed)).exists());
assert!(matches!(
b.get_manifest(&man_id),
Err(StoreError::ManifestNotFound { .. })
));
assert_eq!(
files_under(b_dir.path()),
Vec::<PathBuf>::new(),
"no file (object, manifest, or stray temp) may survive"
);
}
#[test]
fn pack_mismatch_aborts_even_when_object_already_present() {
let good = b"present object\n".to_vec();
let payloads = vec![good.clone()];
let (b_dir, b, ids) = seed_store("dup-bad", &payloads);
let corrupt = b"PRESENT OBJECT\n"; let later = b"later payload\n";
let stream = raw_stream(&[
raw_record("obj", &ids[0], corrupt),
raw_record("obj", &hex_of(later), later),
]);
let mut sink = FileSink::new(&b);
let err = read_pack(stream.as_slice(), &mut sink).expect_err("must abort");
assert!(matches!(err, StoreError::Integrity { .. }));
drop(sink);
assert_eq!(b.get_object(&ids[0]).unwrap(), good);
assert!(!StreamStore::has_object(&b, &hex_of(later)).unwrap());
assert!(
!files_under(b_dir.path())
.iter()
.any(|p| p.to_string_lossy().ends_with(".tmp")),
"no stray temp files"
);
}
#[test]
fn pack_truncated_before_end_files_objects_but_never_manifest() {
let payloads = vec![b"one\n".to_vec(), b"two\n".to_vec()];
let (_a_dir, a, ids) = seed_store("trunc-a", &payloads);
let (manifest, man_id) = manifest_for(&payloads);
a.put_manifest(&man_id, &manifest).expect("seed manifest");
let mut pack = Vec::new();
write_pack(&a, &ids, Some(&man_id), &mut pack).expect("write_pack");
assert!(pack.ends_with(b"end\n"));
let cut = &pack[..pack.len() - b"end\n".len()];
let b_dir = TempDir::new("trunc-b");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let err = read_pack(cut, &mut sink).expect_err("truncation is a hard error");
assert!(err.to_string().contains("truncated"), "got: {err}");
drop(sink);
for (id, payload) in ids.iter().zip(&payloads) {
assert_eq!(b.get_object(id).unwrap(), *payload);
}
assert!(matches!(
b.get_manifest(&man_id),
Err(StoreError::ManifestNotFound { .. })
));
assert!(
!files_under(b_dir.path())
.iter()
.any(|p| p.to_string_lossy().ends_with(".tmp")),
"no stray temp files"
);
}
#[test]
fn pack_truncated_mid_payload_keeps_earlier_objects_drops_partial() {
let payloads = vec![b"first object\n".to_vec(), big_payload(256 * 1024)];
let (_a_dir, a, ids) = seed_store("midcut-a", &payloads);
let mut pack = Vec::new();
write_pack(&a, &ids, None, &mut pack).expect("write_pack");
let cut = &pack[..pack.len() - 100_000];
let b_dir = TempDir::new("midcut-b");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let err = read_pack(cut, &mut sink).expect_err("mid-payload truncation");
assert!(err.to_string().contains("truncated"), "got: {err}");
drop(sink);
assert_eq!(b.get_object(&ids[0]).unwrap(), payloads[0]);
assert!(!StreamStore::has_object(&b, &ids[1]).unwrap());
assert!(
!files_under(b_dir.path())
.iter()
.any(|p| p.to_string_lossy().ends_with(".tmp")),
"partial payload temp must be removed"
);
}
#[test]
fn pack_duplicate_object_records_are_idempotent_write_once() {
let payload = b"duplicated payload\n".to_vec();
let checksum = hex_of(&payload);
let stream = raw_stream(&[
raw_record("obj", &checksum, &payload),
raw_record("obj", &checksum, &payload),
]);
let b_dir = TempDir::new("dup");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let read = read_pack(stream.as_slice(), &mut sink).expect("duplicates are fine");
assert_eq!(read.objects_written, 1, "write-once");
assert_eq!(
read.objects_skipped, 1,
"second record verified-but-skipped"
);
assert_eq!(b.get_object(&checksum).unwrap(), payload);
}
#[test]
fn pack_preseeded_object_is_skipped_but_verified() {
let payload = b"already in the store\n".to_vec();
let (_b_dir, b, ids) = seed_store("preseed", std::slice::from_ref(&payload));
let stream = raw_stream(&[raw_record("obj", &ids[0], &payload)]);
let mut sink = FileSink::new(&b);
let read = read_pack(stream.as_slice(), &mut sink).expect("read_pack");
assert_eq!(read.objects_written, 0);
assert_eq!(read.objects_skipped, 1);
assert_eq!(b.get_object(&ids[0]).unwrap(), payload);
}
#[test]
fn pack_rejects_record_after_manifest() {
let payload = b"object after manifest\n".to_vec();
let (manifest, man_id) = manifest_for(std::slice::from_ref(&payload));
let stream = raw_stream(&[
raw_record("manifest", &man_id, &manifest_bytes(&manifest)),
raw_record("obj", &hex_of(&payload), &payload),
]);
let b_dir = TempDir::new("after-man");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
assert!(err.to_string().contains("after the manifest"), "got: {err}");
assert!(matches!(
b.get_manifest(&man_id),
Err(StoreError::ManifestNotFound { .. })
));
}
#[test]
fn pack_manifest_payload_must_hash_to_claimed_id() {
let (manifest, _real_id) = manifest_for(&[b"whatever\n".to_vec()]);
let wrong_id = hex_of(b"some other bytes");
let stream = raw_stream(&[raw_record(
"manifest",
&wrong_id,
&manifest_bytes(&manifest),
)]);
let b_dir = TempDir::new("man-bad-id");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
assert!(matches!(err, StoreError::Integrity { .. }));
assert!(matches!(
b.get_manifest(&wrong_id),
Err(StoreError::ManifestNotFound { .. })
));
let garbage = b"not a manifest at all\n".to_vec();
let garbage_id = hex_of(&garbage);
let stream = raw_stream(&[raw_record("manifest", &garbage_id, &garbage)]);
let mut sink = FileSink::new(&b);
assert!(read_pack(stream.as_slice(), &mut sink).is_err());
assert!(matches!(
b.get_manifest(&garbage_id),
Err(StoreError::ManifestNotFound { .. })
));
}
#[test]
fn pack_rejects_manifest_over_64mib_cap() {
let big_len: u64 = MAX_MANIFEST_BYTES + 1;
let claimed = hex_of(b"irrelevant");
let mut stream = WIRE_MAGIC.as_bytes().to_vec();
stream.extend_from_slice(format!("manifest {claimed} {big_len}\n").as_bytes());
let b_dir = TempDir::new("man-cap");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
assert!(err.to_string().contains("cap"), "got: {err}");
}
#[test]
fn pack_write_missing_object_aborts_before_end() {
let payloads = vec![b"present\n".to_vec()];
let (_a_dir, a, mut ids) = seed_store("wmiss-a", &payloads);
let absent = hex_of(b"never stored");
ids.push(absent.clone());
let mut pack = Vec::new();
let err = write_pack(&a, &ids, None, &mut pack).expect_err("missing object");
assert!(matches!(err, StoreError::ObjectNotFound { .. }));
assert!(!pack.ends_with(b"end\n"));
let b_dir = TempDir::new("wmiss-b");
let b = FileStore::from_root(b_dir.path());
let mut sink = FileSink::new(&b);
assert!(read_pack(pack.as_slice(), &mut sink).is_err());
}
#[test]
fn pack_write_invalid_id_emits_nothing() {
let (_a_dir, a, _ids) = seed_store("winv-a", &[b"x\n".to_vec()]);
let mut pack = Vec::new();
let err = write_pack(&a, &["NOT-HEX".to_owned()], None, &mut pack).expect_err("invalid id");
assert!(
err.to_string().contains("invalid object checksum"),
"got: {err}"
);
assert!(pack.is_empty(), "fail closed: not a single byte written");
let mut pack = Vec::new();
let err = write_pack(&a, &[], Some("zz"), &mut pack).expect_err("invalid manifest id");
assert!(
err.to_string().contains("invalid manifest id"),
"got: {err}"
);
assert!(pack.is_empty());
}
#[test]
fn pack_write_emits_records_in_input_order() {
let payloads = vec![b"bbb\n".to_vec(), b"aaa\n".to_vec(), b"ccc\n".to_vec()];
let (_a_dir, a, ids) = seed_store("order-a", &payloads);
let mut pack = Vec::new();
write_pack(&a, &ids, None, &mut pack).expect("write_pack");
let text = String::from_utf8_lossy(&pack);
let positions: Vec<usize> = ids
.iter()
.map(|id| text.find(id.as_str()).expect("record present"))
.collect();
let mut sorted = positions.clone();
sorted.sort_unstable();
assert_eq!(positions, sorted, "obj records keep input order");
}
#[test]
fn pack_objects_needed_returns_absent_subset_in_input_order() {
let p1 = b"seeded one\n".to_vec();
let p3 = b"seeded three\n".to_vec();
let (_dir, store, seeded) = seed_store("needed", &[p1, p3]);
let absent_a = hex_of(b"absent a");
let absent_b = hex_of(b"absent b");
let list = vec![
seeded[0].clone(),
absent_a.clone(),
seeded[1].clone(),
absent_b.clone(),
];
let needed = store.objects_needed(&list).expect("objects_needed");
assert_eq!(needed, vec![absent_a.clone(), absent_b.clone()]);
let list = vec![absent_b.clone(), absent_a.clone(), absent_b.clone()];
let needed = store.objects_needed(&list).expect("objects_needed");
assert_eq!(needed, vec![absent_b.clone(), absent_a, absent_b]);
assert_eq!(
store.objects_needed(&seeded).expect("ok"),
Vec::<String>::new()
);
}
#[test]
fn pack_objects_needed_invalid_checksum_fails_closed() {
let (_dir, store, seeded) = seed_store("needed-bad", &[b"x\n".to_vec()]);
let valid_absent = hex_of(b"absent");
for bad in [
"UPPERCASE0000000000000000000000000000000000000000000000000000AA".to_owned(),
"0123456789abcdef".repeat(4)[..63].to_owned(),
format!("{}0", "0123456789abcdef".repeat(4)),
"not hex at all".to_owned(),
String::new(),
] {
let list = vec![seeded[0].clone(), valid_absent.clone(), bad.clone()];
let err = store.objects_needed(&list).expect_err("must fail closed");
assert!(
err.to_string().contains("invalid object checksum"),
"checksum {bad:?}: got {err}"
);
}
}
}