use crate::{
error::{AiFormatError, Result},
AiArtifact, ArtifactId, ArtifactMetadata, ArtifactRef, StorageLocation,
};
use async_trait::async_trait;
use dashmap::DashMap;
use reqwest::Client;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tracing::{debug, info, warn};
#[async_trait]
pub trait StorageBackend: Send + Sync {
async fn exists(&self, id: &ArtifactId) -> Result<bool>;
async fn get_metadata(&self, id: &ArtifactId) -> Result<ArtifactMetadata>;
async fn download(&self, id: &ArtifactId, dest: &Path) -> Result<PathBuf>;
async fn upload(&self, artifact: &AiArtifact) -> Result<StorageLocation>;
async fn list(&self) -> Result<Vec<ArtifactRef>>;
async fn delete(&self, id: &ArtifactId) -> Result<()>;
fn location_type(&self) -> &'static str;
}
pub struct Storage {
local: LocalStorage,
huggingface: Option<HuggingFaceStorage>,
ipfs: Option<IpfsStorage>,
node_storage: Vec<NodeStorage>,
cache: Arc<DashMap<ArtifactId, PathBuf>>,
}
impl Storage {
pub fn new(local_path: impl Into<PathBuf>) -> Self {
Self {
local: LocalStorage::new(local_path),
huggingface: None,
ipfs: None,
node_storage: Vec::new(),
cache: Arc::new(DashMap::new()),
}
}
pub fn with_hf_fallback(local_path: impl Into<PathBuf>, hf_token: Option<String>) -> Self {
Self {
local: LocalStorage::new(local_path),
huggingface: Some(HuggingFaceStorage::new(hf_token)),
ipfs: None,
node_storage: Vec::new(),
cache: Arc::new(DashMap::new()),
}
}
pub fn with_ipfs(mut self, gateway: impl Into<String>) -> Self {
self.ipfs = Some(IpfsStorage::new(gateway));
self
}
pub fn with_node_storage(mut self, peer_id: impl Into<String>, endpoint: impl Into<String>) -> Self {
self.node_storage.push(NodeStorage::new(peer_id, endpoint));
self
}
pub async fn get(&self, id: &ArtifactId) -> Result<PathBuf> {
if let Some(path) = self.cache.get(id) {
if path.exists() {
return Ok(path.clone());
}
}
if self.local.exists(id).await? {
let path = self.local.get_path(id);
self.cache.insert(id.clone(), path.clone());
return Ok(path);
}
for node in &self.node_storage {
if let Ok(true) = node.exists(id).await {
let path = node.download(id, &self.local.base_path).await?;
self.cache.insert(id.clone(), path.clone());
return Ok(path);
}
}
if let Some(hf) = &self.huggingface {
if let Ok(true) = hf.exists(id).await {
let path = hf.download(id, &self.local.base_path).await?;
self.cache.insert(id.clone(), path.clone());
return Ok(path);
}
}
if let Some(ipfs) = &self.ipfs {
if let Ok(true) = ipfs.exists(id).await {
let path = ipfs.download(id, &self.local.base_path).await?;
self.cache.insert(id.clone(), path.clone());
return Ok(path);
}
}
Err(AiFormatError::artifact_not_found(id))
}
pub async fn store(&self, artifact: &AiArtifact, replicate: bool) -> Result<Vec<StorageLocation>> {
let mut locations = Vec::new();
let local_loc = self.local.upload(artifact).await?;
locations.push(local_loc);
if replicate {
if let Some(hf) = &self.huggingface {
match hf.upload(artifact).await {
Ok(loc) => locations.push(loc),
Err(e) => warn!("Failed to replicate to HuggingFace: {}", e),
}
}
if let Some(ipfs) = &self.ipfs {
match ipfs.upload(artifact).await {
Ok(loc) => locations.push(loc),
Err(e) => warn!("Failed to replicate to IPFS: {}", e),
}
}
}
Ok(locations)
}
pub async fn load(&self, id: &ArtifactId) -> Result<AiArtifact> {
let path = self.get(id).await?;
AiArtifact::load(&path).await
}
pub fn local_path(&self) -> &Path {
&self.local.base_path
}
pub fn clear_cache(&self) {
self.cache.clear();
}
}
pub struct LocalStorage {
base_path: PathBuf,
}
impl LocalStorage {
pub fn new(base_path: impl Into<PathBuf>) -> Self {
Self {
base_path: base_path.into(),
}
}
pub fn get_path(&self, id: &ArtifactId) -> PathBuf {
self.base_path.join(format!("{}.ai", id))
}
pub fn default_path() -> PathBuf {
dirs::data_local_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join("hanzo")
.join("artifacts")
}
}
#[async_trait]
impl StorageBackend for LocalStorage {
async fn exists(&self, id: &ArtifactId) -> Result<bool> {
Ok(self.get_path(id).exists())
}
async fn get_metadata(&self, id: &ArtifactId) -> Result<ArtifactMetadata> {
let artifact = AiArtifact::load(self.get_path(id)).await?;
Ok(artifact.metadata)
}
async fn download(&self, id: &ArtifactId, _dest: &Path) -> Result<PathBuf> {
let path = self.get_path(id);
if path.exists() {
Ok(path)
} else {
Err(AiFormatError::artifact_not_found(id))
}
}
async fn upload(&self, artifact: &AiArtifact) -> Result<StorageLocation> {
fs::create_dir_all(&self.base_path).await?;
let path = self.get_path(&artifact.metadata.id);
let mut artifact = artifact.clone();
artifact.save(&path).await?;
info!("Saved artifact {} to {:?}", artifact.metadata.id, path);
Ok(StorageLocation::local(path.to_string_lossy()))
}
async fn list(&self) -> Result<Vec<ArtifactRef>> {
let mut refs = Vec::new();
if !self.base_path.exists() {
return Ok(refs);
}
let mut entries = fs::read_dir(&self.base_path).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().map(|e| e == "ai").unwrap_or(false) {
if let Ok(artifact) = AiArtifact::load(&path).await {
refs.push(ArtifactRef::new(
artifact.metadata.content_hash,
artifact.metadata.name,
artifact.metadata.version,
));
}
}
}
Ok(refs)
}
async fn delete(&self, id: &ArtifactId) -> Result<()> {
let path = self.get_path(id);
if path.exists() {
fs::remove_file(path).await?;
}
Ok(())
}
fn location_type(&self) -> &'static str {
"local"
}
}
pub struct HuggingFaceStorage {
client: Client,
token: Option<String>,
api_base: String,
}
impl HuggingFaceStorage {
pub fn new(token: Option<String>) -> Self {
Self {
client: Client::new(),
token,
api_base: "https://huggingface.co/api".to_string(),
}
}
pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.api_base = endpoint.into();
self
}
fn parse_repo_id(id: &ArtifactId) -> String {
if id.contains('/') {
id.clone()
} else {
format!("hanzo-ai/{}", id)
}
}
}
#[async_trait]
impl StorageBackend for HuggingFaceStorage {
async fn exists(&self, id: &ArtifactId) -> Result<bool> {
let repo_id = Self::parse_repo_id(id);
let url = format!("{}/models/{}", self.api_base, repo_id);
let mut req = self.client.head(&url);
if let Some(token) = &self.token {
req = req.bearer_auth(token);
}
match req.send().await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
}
async fn get_metadata(&self, id: &ArtifactId) -> Result<ArtifactMetadata> {
let repo_id = Self::parse_repo_id(id);
let url = format!("{}/models/{}", self.api_base, repo_id);
let mut req = self.client.get(&url);
if let Some(token) = &self.token {
req = req.bearer_auth(token);
}
let resp = req.send().await?;
if !resp.status().is_success() {
return Err(AiFormatError::artifact_not_found(id));
}
let data: serde_json::Value = resp.json().await?;
let name = data["modelId"].as_str().unwrap_or(id).to_string();
let metadata = ArtifactMetadata::new(&name, crate::ArtifactType::Model);
Ok(metadata)
}
async fn download(&self, id: &ArtifactId, dest: &Path) -> Result<PathBuf> {
let repo_id = Self::parse_repo_id(id);
debug!("Downloading from HuggingFace: {}", repo_id);
let url = format!(
"https://huggingface.co/{}/resolve/main/artifact.ai",
repo_id
);
let mut req = self.client.get(&url);
if let Some(token) = &self.token {
req = req.bearer_auth(token);
}
let resp = req.send().await?;
if !resp.status().is_success() {
return Err(AiFormatError::HuggingFace(format!(
"Failed to download {}: {}",
repo_id,
resp.status()
)));
}
let bytes = resp.bytes().await?;
let dest_path = dest.join(format!("{}.ai", id.replace('/', "_")));
fs::create_dir_all(dest).await?;
fs::write(&dest_path, bytes).await?;
info!("Downloaded {} to {:?}", repo_id, dest_path);
Ok(dest_path)
}
async fn upload(&self, artifact: &AiArtifact) -> Result<StorageLocation> {
let _token = self.token.as_ref().ok_or_else(|| {
AiFormatError::HuggingFace("HuggingFace token required for upload".to_string())
})?;
let repo_id = format!("hanzo-ai/{}", artifact.metadata.name);
debug!("Would upload to HuggingFace repo: {}", repo_id);
Ok(StorageLocation::huggingface(&repo_id))
}
async fn list(&self) -> Result<Vec<ArtifactRef>> {
let url = format!("{}/models?author=hanzo-ai", self.api_base);
let mut req = self.client.get(&url);
if let Some(token) = &self.token {
req = req.bearer_auth(token);
}
let resp = req.send().await?;
if !resp.status().is_success() {
return Ok(Vec::new());
}
let data: Vec<serde_json::Value> = resp.json().await?;
let refs = data
.iter()
.filter_map(|model| {
let id = model["modelId"].as_str()?;
Some(ArtifactRef::new(
String::new(), id.to_string(),
"latest".to_string(),
))
})
.collect();
Ok(refs)
}
async fn delete(&self, _id: &ArtifactId) -> Result<()> {
Err(AiFormatError::HuggingFace(
"Deletion not supported via API".to_string(),
))
}
fn location_type(&self) -> &'static str {
"huggingface"
}
}
pub struct IpfsStorage {
client: Client,
gateway: String,
}
impl IpfsStorage {
pub fn new(gateway: impl Into<String>) -> Self {
Self {
client: Client::new(),
gateway: gateway.into(),
}
}
pub fn default_gateway() -> Self {
Self::new("https://ipfs.io")
}
}
#[async_trait]
impl StorageBackend for IpfsStorage {
async fn exists(&self, id: &ArtifactId) -> Result<bool> {
let url = format!("{}/ipfs/{}", self.gateway, id);
match self.client.head(&url).send().await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
}
async fn get_metadata(&self, id: &ArtifactId) -> Result<ArtifactMetadata> {
let artifact = self.download_artifact(id).await?;
Ok(artifact.metadata)
}
async fn download(&self, id: &ArtifactId, dest: &Path) -> Result<PathBuf> {
let url = format!("{}/ipfs/{}", self.gateway, id);
debug!("Downloading from IPFS: {}", url);
let resp = self.client.get(&url).send().await?;
if !resp.status().is_success() {
return Err(AiFormatError::Network(format!(
"IPFS download failed: {}",
resp.status()
)));
}
let bytes = resp.bytes().await?;
let dest_path = dest.join(format!("{}.ai", id));
fs::create_dir_all(dest).await?;
fs::write(&dest_path, bytes).await?;
info!("Downloaded IPFS {} to {:?}", id, dest_path);
Ok(dest_path)
}
async fn upload(&self, _artifact: &AiArtifact) -> Result<StorageLocation> {
Err(AiFormatError::Network(
"IPFS upload requires pinning service configuration".to_string(),
))
}
async fn list(&self) -> Result<Vec<ArtifactRef>> {
Ok(Vec::new())
}
async fn delete(&self, _id: &ArtifactId) -> Result<()> {
Err(AiFormatError::Network(
"IPFS content is immutable".to_string(),
))
}
fn location_type(&self) -> &'static str {
"ipfs"
}
}
impl IpfsStorage {
async fn download_artifact(&self, id: &ArtifactId) -> Result<AiArtifact> {
let url = format!("{}/ipfs/{}", self.gateway, id);
let resp = self.client.get(&url).send().await?;
if !resp.status().is_success() {
return Err(AiFormatError::artifact_not_found(id));
}
let bytes = resp.bytes().await?;
let temp_path = std::env::temp_dir().join(format!("{}.ai", id));
fs::write(&temp_path, &bytes).await?;
let artifact = AiArtifact::load(&temp_path).await?;
let _ = fs::remove_file(&temp_path).await;
Ok(artifact)
}
}
pub struct NodeStorage {
peer_id: String,
endpoint: String,
client: Client,
}
impl NodeStorage {
pub fn new(peer_id: impl Into<String>, endpoint: impl Into<String>) -> Self {
Self {
peer_id: peer_id.into(),
endpoint: endpoint.into(),
client: Client::new(),
}
}
}
#[async_trait]
impl StorageBackend for NodeStorage {
async fn exists(&self, id: &ArtifactId) -> Result<bool> {
let url = format!("{}/artifacts/{}/exists", self.endpoint, id);
match self.client.get(&url).send().await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
}
async fn get_metadata(&self, id: &ArtifactId) -> Result<ArtifactMetadata> {
let url = format!("{}/artifacts/{}/metadata", self.endpoint, id);
let resp = self.client.get(&url).send().await?;
if !resp.status().is_success() {
return Err(AiFormatError::artifact_not_found(id));
}
let metadata: ArtifactMetadata = resp.json().await?;
Ok(metadata)
}
async fn download(&self, id: &ArtifactId, dest: &Path) -> Result<PathBuf> {
let url = format!("{}/artifacts/{}/download", self.endpoint, id);
debug!("Downloading from node {}: {}", self.peer_id, id);
let resp = self.client.get(&url).send().await?;
if !resp.status().is_success() {
return Err(AiFormatError::PeerNotFound(self.peer_id.clone()));
}
let bytes = resp.bytes().await?;
let dest_path = dest.join(format!("{}.ai", id));
fs::create_dir_all(dest).await?;
fs::write(&dest_path, bytes).await?;
info!("Downloaded {} from node {}", id, self.peer_id);
Ok(dest_path)
}
async fn upload(&self, artifact: &AiArtifact) -> Result<StorageLocation> {
let url = format!("{}/artifacts/upload", self.endpoint);
let temp_path = std::env::temp_dir().join(format!("{}.ai", artifact.metadata.id));
let mut artifact = artifact.clone();
artifact.save(&temp_path).await?;
let bytes = fs::read(&temp_path).await?;
let _ = fs::remove_file(&temp_path).await;
let resp = self.client
.post(&url)
.body(bytes)
.send()
.await?;
if !resp.status().is_success() {
return Err(AiFormatError::storage("Node upload failed"));
}
Ok(StorageLocation::NodeStorage {
peer_id: self.peer_id.clone(),
path: artifact.metadata.id.clone(),
})
}
async fn list(&self) -> Result<Vec<ArtifactRef>> {
let url = format!("{}/artifacts", self.endpoint);
let resp = self.client.get(&url).send().await?;
if !resp.status().is_success() {
return Ok(Vec::new());
}
let refs: Vec<ArtifactRef> = resp.json().await?;
Ok(refs)
}
async fn delete(&self, id: &ArtifactId) -> Result<()> {
let url = format!("{}/artifacts/{}", self.endpoint, id);
let resp = self.client.delete(&url).send().await?;
if !resp.status().is_success() {
return Err(AiFormatError::storage("Node delete failed"));
}
Ok(())
}
fn location_type(&self) -> &'static str {
"node"
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_local_storage() {
let dir = tempdir().unwrap();
let storage = LocalStorage::new(dir.path());
let artifact = crate::AiArtifact::builder()
.name("test-model")
.artifact_type(crate::ArtifactType::Model)
.add_config("config.json", b"{}".to_vec())
.build()
.unwrap();
let id = artifact.metadata.id.clone();
storage.upload(&artifact).await.unwrap();
assert!(storage.exists(&id).await.unwrap());
let list = storage.list().await.unwrap();
assert!(!list.is_empty());
}
#[tokio::test]
async fn test_storage_manager() {
let dir = tempdir().unwrap();
let storage = Storage::new(dir.path());
let artifact = crate::AiArtifact::builder()
.name("test-artifact")
.artifact_type(crate::ArtifactType::Weights)
.add_file("weights.bin", vec![1, 2, 3, 4])
.build()
.unwrap();
let id = artifact.metadata.id.clone();
let locations = storage.store(&artifact, false).await.unwrap();
assert_eq!(locations.len(), 1);
assert!(matches!(locations[0], StorageLocation::Local(_)));
let path = storage.get(&id).await.unwrap();
assert!(path.exists());
}
#[test]
fn test_hf_repo_id_parsing() {
assert_eq!(
HuggingFaceStorage::parse_repo_id(&"meta-llama/Llama-2-7b".to_string()),
"meta-llama/Llama-2-7b"
);
assert_eq!(
HuggingFaceStorage::parse_repo_id(&"my-model".to_string()),
"hanzo-ai/my-model"
);
}
}