use std::path::PathBuf;
use bytes::Bytes;
use object_store::path::Path as ObjectPath;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use crate::error::{JammiError, Result};
use crate::storage::{JammiObjectStore, Scheme, StorageError, StorageRegistry, StorageUrl};
const MANIFEST_NAME: &str = "manifest.json";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct ManifestEntry {
name: String,
sha256: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct Manifest {
files: Vec<ManifestEntry>,
}
impl Manifest {
fn combined_hash(&self) -> String {
let mut hasher = Sha256::new();
for entry in &self.files {
hasher.update(entry.name.as_bytes());
hasher.update(b"\0");
hasher.update(entry.sha256.as_bytes());
hasher.update(b"\0");
}
hex::encode(hasher.finalize())
}
}
fn sha256_hex(bytes: &[u8]) -> String {
hex::encode(Sha256::digest(bytes))
}
#[derive(Debug, Clone)]
pub struct LocalArtifact {
dir: PathBuf,
}
impl LocalArtifact {
pub fn dir(&self) -> &std::path::Path {
&self.dir
}
}
pub struct ArtifactStore {
root: StorageUrl,
registry: StorageRegistry,
cache_root: PathBuf,
}
impl ArtifactStore {
pub fn with_root(
root: StorageUrl,
registry: StorageRegistry,
cache_root: PathBuf,
) -> Result<Self> {
if root.scheme() == Scheme::File {
std::fs::create_dir_all(root.path())?;
}
std::fs::create_dir_all(&cache_root)?;
Ok(Self {
root,
registry,
cache_root,
})
}
pub async fn put_artifact(
&self,
prefix_segments: &[&str],
files: &[(String, Bytes)],
) -> Result<StorageUrl> {
let prefix = self.prefix_url(prefix_segments)?;
let handle = self.handle(&prefix)?;
let mut sorted: Vec<&(String, Bytes)> = files.iter().collect();
sorted.sort_by(|a, b| a.0.cmp(&b.0));
let mut entries = Vec::with_capacity(sorted.len());
for (name, bytes) in &sorted {
let path = self.child(&prefix, name)?;
handle.put_bytes(&path, bytes.clone()).await?;
entries.push(ManifestEntry {
name: (*name).clone(),
sha256: sha256_hex(bytes),
});
}
let manifest = Manifest { files: entries };
let manifest_bytes = Bytes::from(serde_json::to_vec(&manifest)?);
let manifest_path = self.child(&prefix, MANIFEST_NAME)?;
handle.put_bytes(&manifest_path, manifest_bytes).await?;
Ok(prefix)
}
pub async fn fetch_artifact(&self, prefix: &StorageUrl) -> Result<LocalArtifact> {
let handle = self.handle(prefix)?;
let manifest = self.read_manifest(&handle, prefix).await?;
if prefix.scheme() == Scheme::File {
self.verify_files(&handle, prefix, &manifest).await?;
return Ok(LocalArtifact {
dir: PathBuf::from(prefix.path()),
});
}
let cache_dir = self.cache_root.join(manifest.combined_hash());
if cache_dir.is_dir() {
return Ok(LocalArtifact { dir: cache_dir });
}
let tmp = tempfile::tempdir_in(&self.cache_root)?;
for entry in &manifest.files {
let path = self.child(prefix, &entry.name)?;
let bytes = handle.get_bytes(&path).await?;
verify_sha256(prefix, entry, &bytes)?;
std::fs::write(tmp.path().join(&entry.name), &bytes)?;
}
match std::fs::rename(tmp.path(), &cache_dir) {
Ok(()) => Ok(LocalArtifact { dir: cache_dir }),
Err(_) if cache_dir.is_dir() => Ok(LocalArtifact { dir: cache_dir }),
Err(e) => Err(JammiError::Io(e)),
}
}
pub async fn delete_artifact_prefix(&self, prefix: &StorageUrl) -> Result<()> {
let handle = self.handle(prefix)?;
let manifest_path = self.child(prefix, MANIFEST_NAME)?;
let manifest = if handle.exists(&manifest_path).await? {
self.read_manifest(&handle, prefix).await.ok()
} else {
None
};
if let Some(manifest) = manifest {
for entry in &manifest.files {
let path = self.child(prefix, &entry.name)?;
handle.delete_if_exists(&path).await?;
}
}
handle.delete_if_exists(&manifest_path).await?;
Ok(())
}
async fn read_manifest(
&self,
handle: &JammiObjectStore,
prefix: &StorageUrl,
) -> Result<Manifest> {
let manifest_path = self.child(prefix, MANIFEST_NAME)?;
let bytes = handle.get_bytes(&manifest_path).await?;
serde_json::from_slice(&bytes).map_err(|e| {
JammiError::Storage(StorageError::layout(
prefix.as_str(),
format!("malformed artifact manifest: {e}"),
))
})
}
async fn verify_files(
&self,
handle: &JammiObjectStore,
prefix: &StorageUrl,
manifest: &Manifest,
) -> Result<()> {
for entry in &manifest.files {
let path = self.child(prefix, &entry.name)?;
let bytes = handle.get_bytes(&path).await?;
verify_sha256(prefix, entry, &bytes)?;
}
Ok(())
}
fn handle(&self, prefix: &StorageUrl) -> Result<JammiObjectStore> {
let driver = self.registry.driver_for(prefix, None)?;
Ok(JammiObjectStore::new(driver, prefix.clone()))
}
fn child(&self, prefix: &StorageUrl, name: &str) -> Result<ObjectPath> {
let key = format!("{}/{}", prefix.path(), name);
let stripped = match prefix.scheme() {
Scheme::File | Scheme::Memory => key.trim_start_matches('/').to_string(),
_ => key
.split_once('/')
.map(|(_, rest)| rest.to_string())
.unwrap_or_default(),
};
ObjectPath::parse(&stripped)
.map_err(|e| JammiError::Storage(StorageError::layout(&key, e.to_string())))
}
fn prefix_url(&self, segments: &[&str]) -> Result<StorageUrl> {
let root = self.root.as_str().trim_end_matches('/');
let mut joined = String::from(root);
for seg in segments {
joined.push('/');
joined.push_str(&sanitize_segment(seg));
}
StorageUrl::parse(&joined).map_err(JammiError::from)
}
}
fn verify_sha256(prefix: &StorageUrl, entry: &ManifestEntry, bytes: &[u8]) -> Result<()> {
let actual = sha256_hex(bytes);
if actual != entry.sha256 {
return Err(JammiError::Storage(StorageError::layout(
prefix.as_str(),
format!(
"artifact file '{}' sha256 {actual} does not match manifest {}",
entry.name, entry.sha256
),
)));
}
Ok(())
}
fn sanitize_segment(seg: &str) -> String {
seg.chars()
.map(|c| match c {
'/' | '\\' | ':' | ' ' => '_',
other => other,
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
fn store_with_root(root: StorageUrl, cache: PathBuf) -> ArtifactStore {
ArtifactStore::with_root(root, StorageRegistry::new(), cache).unwrap()
}
fn sample_files() -> Vec<(String, Bytes)> {
vec![
(
"adapter.safetensors".to_string(),
Bytes::from_static(b"weights-bytes"),
),
(
"adapter_config.json".to_string(),
Bytes::from_static(b"{\"adapter_type\":\"x\"}"),
),
]
}
#[tokio::test]
async fn memory_round_trip_fetches_manifest_keys() {
let cache = tempfile::tempdir().unwrap();
let store = store_with_root(StorageUrl::memory("artifacts"), cache.path().to_path_buf());
let files = sample_files();
let prefix = store
.put_artifact(&["job-1", "worker-a", "0"], &files)
.await
.unwrap();
assert!(prefix.as_str().ends_with("artifacts/job-1/worker-a/0"));
let fetched = store.fetch_artifact(&prefix).await.unwrap();
for (name, bytes) in &files {
let got = std::fs::read(fetched.dir().join(name)).unwrap();
assert_eq!(&got[..], &bytes[..], "fetched file '{name}' differs");
}
assert!(!fetched.dir().join(MANIFEST_NAME).exists());
}
#[tokio::test]
async fn file_scheme_reads_in_place_without_copy() {
let root_dir = tempfile::tempdir().unwrap();
let cache = tempfile::tempdir().unwrap();
let root = StorageUrl::parse(root_dir.path().to_str().unwrap()).unwrap();
let store = store_with_root(root, cache.path().to_path_buf());
let files = sample_files();
let prefix = store
.put_artifact(&["job-2", "worker-b", "1"], &files)
.await
.unwrap();
let fetched = store.fetch_artifact(&prefix).await.unwrap();
assert_eq!(fetched.dir(), std::path::Path::new(prefix.path()));
assert!(fetched.dir().starts_with(root_dir.path()));
assert!(!fetched.dir().starts_with(cache.path()));
for (name, bytes) in &files {
let got = std::fs::read(fetched.dir().join(name)).unwrap();
assert_eq!(&got[..], &bytes[..]);
}
}
#[tokio::test]
async fn sha256_mismatch_is_a_hard_error() {
let cache = tempfile::tempdir().unwrap();
let store = store_with_root(
StorageUrl::memory("artifacts-corrupt"),
cache.path().to_path_buf(),
);
let prefix = store
.put_artifact(&["job-3", "worker-c", "0"], &sample_files())
.await
.unwrap();
let handle = store.handle(&prefix).unwrap();
let path = store.child(&prefix, "adapter.safetensors").unwrap();
handle
.put_bytes(&path, Bytes::from_static(b"tampered"))
.await
.unwrap();
let err = store.fetch_artifact(&prefix).await.unwrap_err();
assert!(
err.to_string().contains("does not match manifest"),
"expected a sha256 mismatch error, got: {err}"
);
}
#[tokio::test]
async fn missing_manifest_is_a_hard_error() {
let cache = tempfile::tempdir().unwrap();
let store = store_with_root(
StorageUrl::memory("artifacts-nomanifest"),
cache.path().to_path_buf(),
);
let prefix = store.prefix_url(&["ghost", "worker", "0"]).unwrap();
let err = store.fetch_artifact(&prefix).await.unwrap_err();
assert!(!err.to_string().is_empty());
}
#[tokio::test]
async fn cache_hit_avoids_redownload() {
let cache = tempfile::tempdir().unwrap();
let store = store_with_root(
StorageUrl::memory("artifacts-cache"),
cache.path().to_path_buf(),
);
let prefix = store
.put_artifact(&["job-4", "worker-d", "0"], &sample_files())
.await
.unwrap();
let first = store.fetch_artifact(&prefix).await.unwrap();
let marker = first.dir().join("adapter.safetensors");
std::fs::write(&marker, b"locally-edited").unwrap();
let second = store.fetch_artifact(&prefix).await.unwrap();
assert_eq!(first.dir(), second.dir());
assert_eq!(std::fs::read(&marker).unwrap(), b"locally-edited");
}
#[tokio::test]
async fn concurrent_fetch_is_torn_free() {
let cache = tempfile::tempdir().unwrap();
let store = Arc::new(store_with_root(
StorageUrl::memory("artifacts-concurrent"),
cache.path().to_path_buf(),
));
let files = sample_files();
let prefix = store
.put_artifact(&["job-5", "worker-e", "0"], &files)
.await
.unwrap();
let mut handles = Vec::new();
for _ in 0..8 {
let store = Arc::clone(&store);
let prefix = prefix.clone();
handles.push(tokio::spawn(
async move { store.fetch_artifact(&prefix).await },
));
}
for h in handles {
let fetched = h.await.unwrap().unwrap();
for (name, bytes) in &files {
let got = std::fs::read(fetched.dir().join(name)).unwrap();
assert_eq!(&got[..], &bytes[..], "torn fetch of '{name}'");
}
}
}
#[tokio::test]
async fn delete_prefix_removes_objects_and_is_idempotent() {
let cache = tempfile::tempdir().unwrap();
let store = store_with_root(
StorageUrl::memory("artifacts-delete"),
cache.path().to_path_buf(),
);
let prefix = store
.put_artifact(&["job-6", "worker-f", "0"], &sample_files())
.await
.unwrap();
store.delete_artifact_prefix(&prefix).await.unwrap();
assert!(store.fetch_artifact(&prefix).await.is_err());
store.delete_artifact_prefix(&prefix).await.unwrap();
}
#[test]
fn combined_hash_is_stable_for_identical_content() {
let m1 = Manifest {
files: vec![
ManifestEntry {
name: "a".into(),
sha256: "11".into(),
},
ManifestEntry {
name: "b".into(),
sha256: "22".into(),
},
],
};
let m2 = m1.clone();
assert_eq!(m1.combined_hash(), m2.combined_hash());
}
}