use std::collections::BTreeSet;
use std::fs::{self, File};
use std::io::{Read, Write};
use std::path::{Component, Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use crate::kv::{VersionToken, WatchCursor};
use crate::snapshot::SnapshotError;
pub const ARTIFACT_SCHEMA_VERSION: u32 = 1;
pub const MANIFEST_FILE: &str = "MANIFEST.json";
pub(crate) const PAYLOAD_DIR: &str = "data";
const HASH_BUF: usize = 1 << 20;
#[derive(Debug, Clone)]
pub struct ExportManifest {
pub schema_version: u32,
pub backend: String,
pub backend_version: String,
pub cursor: WatchCursor,
pub created_at_unix: u64,
pub files: Vec<ArtifactFile>,
}
#[derive(Debug, Clone)]
pub struct ArtifactFile {
pub path: String,
pub size: u64,
pub blake3: String,
}
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct ManifestWire {
schema_version: u32,
backend: String,
backend_version: String,
cursor_hex: String,
created_at_unix: u64,
files: Vec<FileWire>,
}
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct FileWire {
path: String,
size: u64,
blake3: String,
}
fn invalid(msg: impl Into<String>) -> SnapshotError {
SnapshotError::ArtifactInvalid(msg.into())
}
pub(crate) fn hex_encode(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
use std::fmt::Write as _;
let _ = write!(out, "{b:02x}");
}
out
}
pub(crate) fn hex_decode(s: &str) -> Option<Vec<u8>> {
if !s.len().is_multiple_of(2) {
return None;
}
(0..s.len())
.step_by(2)
.map(|i| u8::from_str_radix(s.get(i..i + 2)?, 16).ok())
.collect()
}
fn cursor_to_hex(cursor: &WatchCursor) -> String {
hex_encode(cursor.version().as_bytes())
}
fn cursor_from_hex(s: &str) -> Result<WatchCursor, SnapshotError> {
let bytes = hex_decode(s).ok_or_else(|| invalid(format!("malformed cursor_hex: {s:?}")))?;
let token = VersionToken::from_raw(&bytes).ok_or_else(|| {
invalid(format!(
"cursor_hex decodes to {} bytes, exceeds version token capacity",
bytes.len()
))
})?;
Ok(WatchCursor::from_version(token))
}
pub(crate) fn write_manifest(
artifact_root: &Path,
manifest: &ExportManifest,
) -> Result<(), SnapshotError> {
let wire = ManifestWire {
schema_version: manifest.schema_version,
backend: manifest.backend.clone(),
backend_version: manifest.backend_version.clone(),
cursor_hex: cursor_to_hex(&manifest.cursor),
created_at_unix: manifest.created_at_unix,
files: manifest
.files
.iter()
.map(|f| FileWire {
path: f.path.clone(),
size: f.size,
blake3: f.blake3.clone(),
})
.collect(),
};
let json = serde_json::to_vec_pretty(&wire)
.map_err(|e| SnapshotError::Backend(format!("manifest serialization failed: {e}")))?;
let mut tmp = tempfile::NamedTempFile::new_in(artifact_root)?;
tmp.write_all(&json)?;
tmp.as_file().sync_all()?;
tmp.persist(artifact_root.join(MANIFEST_FILE))
.map_err(|e| SnapshotError::Io(e.error))?;
Ok(())
}
pub(crate) fn read_manifest(artifact_dir: &Path) -> Result<ExportManifest, SnapshotError> {
let path = artifact_dir.join(MANIFEST_FILE);
let data = fs::read(&path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
invalid(format!("no {MANIFEST_FILE} in {}", artifact_dir.display()))
} else {
SnapshotError::Io(e)
}
})?;
manifest_from_slice(&data)
}
pub(crate) fn manifest_from_slice(data: &[u8]) -> Result<ExportManifest, SnapshotError> {
let wire: ManifestWire =
serde_json::from_slice(data).map_err(|e| invalid(format!("malformed manifest: {e}")))?;
if wire.schema_version != ARTIFACT_SCHEMA_VERSION {
return Err(invalid(format!(
"unsupported artifact schema_version {} (this build supports {})",
wire.schema_version, ARTIFACT_SCHEMA_VERSION
)));
}
for f in &wire.files {
validate_payload_path(&f.path)?;
}
Ok(ExportManifest {
schema_version: wire.schema_version,
backend: wire.backend,
backend_version: wire.backend_version,
cursor: cursor_from_hex(&wire.cursor_hex)?,
created_at_unix: wire.created_at_unix,
files: wire
.files
.into_iter()
.map(|f| ArtifactFile {
path: f.path,
size: f.size,
blake3: f.blake3,
})
.collect(),
})
}
fn validate_payload_path(p: &str) -> Result<(), SnapshotError> {
let prefix = format!("{PAYLOAD_DIR}/");
if !p.starts_with(&prefix) || p.len() == prefix.len() {
return Err(invalid(format!(
"manifest path {p:?} is not under {PAYLOAD_DIR}/"
)));
}
if p.contains('\\') {
return Err(invalid(format!("manifest path {p:?} contains a backslash")));
}
let path = Path::new(p);
for comp in path.components() {
match comp {
Component::Normal(_) => {}
_ => {
return Err(invalid(format!(
"manifest path {p:?} contains a non-normal component"
)));
}
}
}
Ok(())
}
fn hash_file(path: &Path, buf: &mut [u8]) -> Result<(File, u64, String), SnapshotError> {
let mut file = File::open(path)?;
let mut hasher = blake3::Hasher::new();
let mut size = 0u64;
loop {
let n = file.read(buf)?;
if n == 0 {
break;
}
size += n as u64;
hasher.update(&buf[..n]);
}
Ok((file, size, hasher.finalize().to_hex().to_string()))
}
fn list_payload_files(root: &Path) -> Result<Vec<PathBuf>, SnapshotError> {
let payload = root.join(PAYLOAD_DIR);
let mut out = Vec::new();
let mut stack = vec![payload.clone()];
while let Some(dir) = stack.pop() {
for entry in fs::read_dir(&dir)? {
let entry = entry?;
let ty = entry.file_type()?;
if ty.is_dir() {
stack.push(entry.path());
} else if ty.is_file() {
out.push(entry.path());
} else {
return Err(invalid(format!(
"payload contains a non-regular file: {}",
entry.path().display()
)));
}
}
}
out.sort();
Ok(out)
}
pub(crate) fn hash_payload(root: &Path) -> Result<Vec<ArtifactFile>, SnapshotError> {
let mut files = Vec::new();
let mut buf = vec![0u8; HASH_BUF];
for abs in list_payload_files(root)? {
let (file, size, blake3) = hash_file(&abs, &mut buf)?;
file.sync_all()?;
let rel = abs
.strip_prefix(root)
.map_err(|_| SnapshotError::Backend("payload path escaped artifact root".into()))?;
let rel = rel
.to_str()
.ok_or_else(|| invalid(format!("non-UTF-8 payload path: {}", rel.display())))?;
files.push(ArtifactFile {
path: rel.to_string(),
size,
blake3,
});
}
fsync_dir_tree(&root.join(PAYLOAD_DIR))?;
Ok(files)
}
fn fsync_dir(path: &Path) -> Result<(), SnapshotError> {
File::open(path)?.sync_all()?;
Ok(())
}
fn fsync_dir_tree(root: &Path) -> Result<(), SnapshotError> {
let mut stack = vec![root.to_path_buf()];
while let Some(dir) = stack.pop() {
fsync_dir(&dir)?;
for entry in fs::read_dir(&dir)? {
let entry = entry?;
if entry.file_type()?.is_dir() {
stack.push(entry.path());
}
}
}
Ok(())
}
pub(crate) fn check_dest_available(dest: &Path) -> Result<(), SnapshotError> {
match fs::metadata(dest) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(SnapshotError::Io(e)),
Ok(meta) if meta.is_dir() => {
if fs::read_dir(dest)?.next().is_some() {
Err(invalid(format!(
"destination {} exists and is not empty",
dest.display()
)))
} else {
Ok(())
}
}
Ok(_) => Err(invalid(format!(
"destination {} already exists",
dest.display()
))),
}
}
pub(crate) fn rename_into_place(from: &Path, dest: &Path) -> Result<(), SnapshotError> {
if dest.is_dir() {
fs::remove_dir(dest)?;
}
fs::rename(from, dest)?;
if let Some(parent) = dest.parent() {
fsync_dir(parent)?;
}
Ok(())
}
fn stage_dir_in(parent: &Path) -> Result<tempfile::TempDir, SnapshotError> {
Ok(tempfile::Builder::new()
.prefix(".slipstream-artifact-")
.tempdir_in(parent)?)
}
fn dest_parent(dest: &Path) -> Result<&Path, SnapshotError> {
dest.parent()
.filter(|p| !p.as_os_str().is_empty())
.ok_or_else(|| {
invalid(format!(
"destination {} has no parent directory",
dest.display()
))
})
}
pub(crate) struct ExportStage {
dir: tempfile::TempDir,
dest: PathBuf,
}
impl ExportStage {
pub(crate) fn new(dest_dir: &Path) -> Result<Self, SnapshotError> {
check_dest_available(dest_dir)?;
let parent = dest_parent(dest_dir)?;
let dir = stage_dir_in(parent)?;
Ok(Self {
dir,
dest: dest_dir.to_path_buf(),
})
}
pub(crate) fn payload(&self) -> PathBuf {
self.dir.path().join(PAYLOAD_DIR)
}
pub(crate) fn seal_and_finalize(
self,
backend: &str,
backend_version: &str,
cursor: &WatchCursor,
) -> Result<ExportManifest, SnapshotError> {
let root = self.dir.path();
let files = hash_payload(root)?;
let manifest = ExportManifest {
schema_version: ARTIFACT_SCHEMA_VERSION,
backend: backend.to_string(),
backend_version: backend_version.to_string(),
cursor: cursor.clone(),
created_at_unix: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
files,
};
write_manifest(root, &manifest)?;
fsync_dir(root)?;
check_dest_available(&self.dest)?;
let dest = self.dest.clone();
let root = self.dir.keep();
rename_into_place(&root, &dest)?;
Ok(manifest)
}
}
pub(crate) struct ImportStage {
dir: tempfile::TempDir,
dest: PathBuf,
}
impl ImportStage {
pub(crate) fn payload(&self) -> PathBuf {
self.dir.path().join(PAYLOAD_DIR)
}
#[cfg(any(feature = "fjall", feature = "rocksdb"))]
pub(crate) fn finalize_dir(self) -> Result<(), SnapshotError> {
check_dest_available(&self.dest)?;
rename_into_place(&self.payload(), &self.dest)
}
pub(crate) fn finalize_file(self, rel: &str) -> Result<(), SnapshotError> {
check_dest_available(&self.dest)?;
rename_into_place(&self.payload().join(rel), &self.dest)
}
}
pub(crate) fn verify_and_stage_import(
artifact_dir: &Path,
dest: &Path,
expected_backend: &str,
check_backend_version: impl Fn(&str) -> Result<(), SnapshotError>,
) -> Result<(ExportManifest, ImportStage), SnapshotError> {
let manifest = read_manifest(artifact_dir)?;
if manifest.backend != expected_backend {
return Err(invalid(format!(
"artifact backend is {:?}, expected {:?}",
manifest.backend, expected_backend
)));
}
check_backend_version(&manifest.backend_version)?;
check_dest_available(dest)?;
let declared: BTreeSet<&str> = manifest.files.iter().map(|f| f.path.as_str()).collect();
for abs in list_payload_files(artifact_dir)? {
let rel = abs
.strip_prefix(artifact_dir)
.map_err(|_| SnapshotError::Backend("payload path escaped artifact dir".into()))?;
let rel = rel
.to_str()
.ok_or_else(|| invalid(format!("non-UTF-8 payload path: {}", rel.display())))?;
if !declared.contains(rel) {
return Err(invalid(format!("payload contains undeclared file: {rel}")));
}
}
let parent = dest_parent(dest)?;
let dir = stage_dir_in(parent)?;
let stage = ImportStage {
dir,
dest: dest.to_path_buf(),
};
let mut buf = vec![0u8; HASH_BUF];
for f in &manifest.files {
let src_path = artifact_dir.join(&f.path);
let dst_path = stage.dir.path().join(&f.path);
if let Some(p) = dst_path.parent() {
fs::create_dir_all(p)?;
}
let mut src = File::open(&src_path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
invalid(format!("payload file missing: {}", f.path))
} else {
SnapshotError::Io(e)
}
})?;
let mut dst = File::create(&dst_path)?;
let mut hasher = blake3::Hasher::new();
let mut size = 0u64;
loop {
let n = src.read(&mut buf)?;
if n == 0 {
break;
}
size += n as u64;
hasher.update(&buf[..n]);
dst.write_all(&buf[..n])?;
}
dst.sync_all()?;
if size != f.size {
return Err(invalid(format!(
"payload file {} is {size} bytes, manifest says {}",
f.path, f.size
)));
}
let digest = hasher.finalize().to_hex().to_string();
if digest != f.blake3 {
return Err(invalid(format!(
"payload file {} checksum mismatch (got {digest}, manifest says {})",
f.path, f.blake3
)));
}
}
fsync_dir_tree(&stage.payload())?;
Ok((manifest, stage))
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn manifest_with(files: Vec<ArtifactFile>, cursor: WatchCursor) -> ExportManifest {
ExportManifest {
schema_version: ARTIFACT_SCHEMA_VERSION,
backend: "append-log".into(),
backend_version: "2".into(),
cursor,
created_at_unix: 1_765_400_000,
files,
}
}
#[test]
fn manifest_round_trips() {
let dir = TempDir::new().unwrap();
let m = manifest_with(
vec![ArtifactFile {
path: "data/fold.snap".into(),
size: 42,
blake3: "ab".repeat(32),
}],
WatchCursor::from_u64(184_467),
);
write_manifest(dir.path(), &m).unwrap();
let got = read_manifest(dir.path()).unwrap();
assert_eq!(got.schema_version, m.schema_version);
assert_eq!(got.backend, m.backend);
assert_eq!(got.backend_version, m.backend_version);
assert_eq!(got.cursor, m.cursor);
assert_eq!(got.created_at_unix, m.created_at_unix);
assert_eq!(got.files.len(), 1);
assert_eq!(got.files[0].path, "data/fold.snap");
assert_eq!(got.files[0].size, 42);
}
#[test]
fn manifest_round_trips_none_cursor() {
let dir = TempDir::new().unwrap();
let m = manifest_with(vec![], WatchCursor::none());
write_manifest(dir.path(), &m).unwrap();
let got = read_manifest(dir.path()).unwrap();
assert!(got.cursor.is_none(), "none cursor survives the round trip");
}
#[test]
fn manifest_round_trips_fdb_width_cursor() {
let raw = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let cursor = WatchCursor::from_version(VersionToken::from_raw(&raw).unwrap());
let dir = TempDir::new().unwrap();
write_manifest(dir.path(), &manifest_with(vec![], cursor.clone())).unwrap();
let got = read_manifest(dir.path()).unwrap();
assert_eq!(got.cursor, cursor);
}
fn write_raw_manifest(dir: &Path, json: &str) {
fs::write(dir.join(MANIFEST_FILE), json).unwrap();
}
fn wire_json(cursor_hex: &str, files: &str, schema: u32) -> String {
format!(
r#"{{"schema_version":{schema},"backend":"append-log","backend_version":"2",
"cursor_hex":"{cursor_hex}","created_at_unix":0,"files":{files}}}"#
)
}
#[test]
fn rejects_bad_cursor_hex() {
let dir = TempDir::new().unwrap();
for bad in ["zz", "abc", "0102030405060708090a0b"] {
write_raw_manifest(dir.path(), &wire_json(bad, "[]", ARTIFACT_SCHEMA_VERSION));
match read_manifest(dir.path()) {
Err(SnapshotError::ArtifactInvalid(_)) => {}
other => panic!("cursor_hex {bad:?}: expected ArtifactInvalid, got {other:?}"),
}
}
}
#[test]
fn rejects_wrong_schema_version() {
let dir = TempDir::new().unwrap();
write_raw_manifest(
dir.path(),
&wire_json("", "[]", ARTIFACT_SCHEMA_VERSION + 1),
);
match read_manifest(dir.path()) {
Err(SnapshotError::ArtifactInvalid(msg)) => {
assert!(msg.contains("schema_version"), "{msg}");
}
other => panic!("expected ArtifactInvalid, got {other:?}"),
}
}
#[test]
fn rejects_path_traversal() {
let dir = TempDir::new().unwrap();
for bad in [
"../escape",
"/abs/path",
"data/../escape",
"data/a\\b",
"nondata/x",
"data/",
"data",
] {
let files = format!(
r#"[{{"path":"{}","size":0,"blake3":""}}]"#,
bad.replace('\\', "\\\\")
);
write_raw_manifest(dir.path(), &wire_json("", &files, ARTIFACT_SCHEMA_VERSION));
match read_manifest(dir.path()) {
Err(SnapshotError::ArtifactInvalid(_)) => {}
other => panic!("path {bad:?}: expected ArtifactInvalid, got {other:?}"),
}
}
}
#[test]
fn rejects_malformed_manifest_json() {
let dir = TempDir::new().unwrap();
write_raw_manifest(dir.path(), "not json at all {{{");
match read_manifest(dir.path()) {
Err(SnapshotError::ArtifactInvalid(msg)) => {
assert!(msg.contains("malformed"), "{msg}");
}
other => panic!("expected ArtifactInvalid, got {other:?}"),
}
}
#[cfg(unix)]
#[test]
fn hash_payload_rejects_symlink() {
let dir = TempDir::new().unwrap();
let payload = dir.path().join(PAYLOAD_DIR);
fs::create_dir(&payload).unwrap();
fs::write(payload.join("real"), b"data").unwrap();
let target = dir.path().join("outside");
fs::write(&target, b"outside the payload").unwrap();
std::os::unix::fs::symlink(&target, payload.join("link")).unwrap();
match hash_payload(dir.path()) {
Err(SnapshotError::ArtifactInvalid(msg)) => {
assert!(msg.contains("non-regular"), "{msg}");
}
other => panic!("expected ArtifactInvalid, got {other:?}"),
}
}
#[test]
fn export_stage_fails_closed_when_dest_appears_before_seal() {
let dir = TempDir::new().unwrap();
let dest = dir.path().join("artifact");
let stage = ExportStage::new(&dest).unwrap();
fs::create_dir(stage.payload()).unwrap();
fs::write(stage.payload().join("fold.snap"), b"data").unwrap();
fs::create_dir(&dest).unwrap();
fs::write(dest.join("stray"), b"x").unwrap();
let err = stage
.seal_and_finalize("append-log", "2", &WatchCursor::from_u64(1))
.unwrap_err();
assert!(matches!(err, SnapshotError::ArtifactInvalid(_)));
assert!(
dest.join("stray").exists(),
"occupied destination is untouched"
);
}
#[test]
fn hex_round_trips() {
for bytes in [&[][..], &[0u8][..], &[0xde, 0xad, 0xbe, 0xef][..]] {
assert_eq!(hex_decode(&hex_encode(bytes)).unwrap(), bytes);
}
assert!(hex_decode("0g").is_none());
assert!(hex_decode("a").is_none());
}
#[test]
fn dest_preconditions() {
let dir = TempDir::new().unwrap();
check_dest_available(&dir.path().join("absent")).unwrap();
let empty = dir.path().join("empty");
fs::create_dir(&empty).unwrap();
check_dest_available(&empty).unwrap();
let full = dir.path().join("full");
fs::create_dir(&full).unwrap();
fs::write(full.join("x"), b"x").unwrap();
assert!(matches!(
check_dest_available(&full),
Err(SnapshotError::ArtifactInvalid(_))
));
let file = dir.path().join("file");
fs::write(&file, b"x").unwrap();
assert!(matches!(
check_dest_available(&file),
Err(SnapshotError::ArtifactInvalid(_))
));
}
}