use async_trait::async_trait;
use std::path::{Path, PathBuf};
use tokio::fs;
use uuid::Uuid;
use crate::models::workflow_packages::StorageType;
use crate::registry::error::StorageError;
use crate::registry::traits::RegistryStorage;
#[derive(Debug, Clone)]
pub struct FilesystemRegistryStorage {
storage_dir: PathBuf,
}
impl FilesystemRegistryStorage {
pub fn new<P: AsRef<Path>>(storage_dir: P) -> Result<Self, std::io::Error> {
let storage_dir = storage_dir.as_ref().to_path_buf();
std::fs::create_dir_all(&storage_dir)?;
let test_file = storage_dir.join(".write_test");
std::fs::write(&test_file, b"test")?;
std::fs::remove_file(&test_file)?;
Ok(Self { storage_dir })
}
pub fn storage_dir(&self) -> &Path {
&self.storage_dir
}
fn file_path(&self, id: &str) -> PathBuf {
self.storage_dir.join(format!("{}.so", id))
}
pub async fn check_disk_space(&self) -> Result<u64, StorageError> {
match fs::metadata(&self.storage_dir).await {
Ok(_) => {
Ok(u64::MAX)
}
Err(e) => Err(StorageError::Backend(format!(
"Failed to check disk space: {}",
e
))),
}
}
}
#[async_trait]
impl RegistryStorage for FilesystemRegistryStorage {
async fn store_binary(&mut self, data: Vec<u8>) -> Result<String, StorageError> {
let id = Uuid::new_v4();
let file_path = self.file_path(&id.to_string());
if file_path.exists() {
return Err(StorageError::Backend(format!(
"File already exists: {}",
file_path.display()
)));
}
let temp_path = file_path.with_extension("tmp");
match fs::write(&temp_path, &data).await {
Ok(()) => {
match fs::rename(&temp_path, &file_path).await {
Ok(()) => Ok(id.to_string()),
Err(e) => {
let _ = fs::remove_file(&temp_path).await;
Err(StorageError::Backend(format!(
"Failed to move file to final location: {}",
e
)))
}
}
}
Err(e) => {
let _ = fs::remove_file(&temp_path).await;
if e.kind() == std::io::ErrorKind::OutOfMemory {
Err(StorageError::QuotaExceeded {
used_bytes: data.len() as u64,
quota_bytes: 0, })
} else if e.kind() == std::io::ErrorKind::PermissionDenied {
Err(StorageError::Backend(format!(
"Permission denied writing to: {}",
file_path.display()
)))
} else {
Err(StorageError::Backend(format!(
"Failed to write file {}: {}",
file_path.display(),
e
)))
}
}
}
}
async fn retrieve_binary(&self, id: &str) -> Result<Option<Vec<u8>>, StorageError> {
if Uuid::parse_str(id).is_err() {
return Err(StorageError::InvalidId { id: id.to_string() });
}
let file_path = self.file_path(id);
match fs::read(&file_path).await {
Ok(data) => {
Ok(Some(data))
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(StorageError::Backend(format!(
"Failed to read file {}: {}",
file_path.display(),
e
))),
}
}
async fn delete_binary(&mut self, id: &str) -> Result<(), StorageError> {
if Uuid::parse_str(id).is_err() {
return Err(StorageError::InvalidId { id: id.to_string() });
}
let file_path = self.file_path(id);
match fs::remove_file(&file_path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
Ok(())
}
Err(e) => Err(StorageError::Backend(format!(
"Failed to delete file {}: {}",
file_path.display(),
e
))),
}
}
fn storage_type(&self) -> StorageType {
StorageType::Filesystem
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
async fn create_test_storage() -> (FilesystemRegistryStorage, TempDir) {
let temp_dir = TempDir::new().unwrap();
let dal = FilesystemRegistryStorage::new(temp_dir.path()).unwrap();
(dal, temp_dir)
}
#[tokio::test]
async fn test_store_and_retrieve() {
let (mut dal, _temp_dir) = create_test_storage().await;
let test_data = b"test workflow binary data".to_vec();
let id = dal.store_binary(test_data.clone()).await.unwrap();
let retrieved = dal.retrieve_binary(&id).await.unwrap();
assert_eq!(retrieved, Some(test_data));
}
#[tokio::test]
async fn test_retrieve_nonexistent() {
let (dal, _temp_dir) = create_test_storage().await;
let fake_id = Uuid::new_v4().to_string();
let result = dal.retrieve_binary(&fake_id).await.unwrap();
assert_eq!(result, None);
}
#[tokio::test]
async fn test_delete_binary() {
let (mut dal, _temp_dir) = create_test_storage().await;
let test_data = b"test data for deletion".to_vec();
let id = dal.store_binary(test_data).await.unwrap();
let retrieved = dal.retrieve_binary(&id).await.unwrap();
assert!(retrieved.is_some());
dal.delete_binary(&id).await.unwrap();
let retrieved = dal.retrieve_binary(&id).await.unwrap();
assert_eq!(retrieved, None);
dal.delete_binary(&id).await.unwrap();
}
#[tokio::test]
async fn test_invalid_uuid() {
let (dal, _temp_dir) = create_test_storage().await;
let result = dal.retrieve_binary("not-a-uuid").await;
assert!(matches!(result, Err(StorageError::InvalidId { .. })));
let mut dal = dal;
let result = dal.delete_binary("not-a-uuid").await;
assert!(matches!(result, Err(StorageError::InvalidId { .. })));
}
#[tokio::test]
async fn test_empty_file_handling() {
let (dal, temp_dir) = create_test_storage().await;
let id = Uuid::new_v4().to_string();
let file_path = temp_dir.path().join(format!("{}.so", id));
fs::write(&file_path, b"").await.unwrap();
let result = dal.retrieve_binary(&id).await;
assert!(matches!(result, Ok(Some(data)) if data.is_empty()));
}
#[tokio::test]
async fn test_atomic_write() {
let (mut dal, temp_dir) = create_test_storage().await;
let test_data = b"test atomic write".to_vec();
let id = dal.store_binary(test_data.clone()).await.unwrap();
let temp_files: Vec<_> = std::fs::read_dir(temp_dir.path())
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "tmp"))
.collect();
assert_eq!(temp_files.len(), 0, "Temporary files should be cleaned up");
let retrieved = dal.retrieve_binary(&id).await.unwrap();
assert_eq!(retrieved, Some(test_data));
}
#[tokio::test]
async fn test_file_permissions() {
let (mut dal, temp_dir) = create_test_storage().await;
let test_data = b"test permissions".to_vec();
let id = dal.store_binary(test_data.clone()).await.unwrap();
let file_path = temp_dir.path().join(format!("{}.so", id));
assert!(file_path.exists(), "File should exist");
assert_eq!(
file_path.extension().unwrap(),
"so",
"File should have .so extension"
);
let file_contents = fs::read(&file_path).await.unwrap();
assert_eq!(file_contents, test_data);
}
#[tokio::test]
async fn test_directory_creation() {
let temp_dir = TempDir::new().unwrap();
let nested_path = temp_dir.path().join("deeply").join("nested").join("path");
let dal = FilesystemRegistryStorage::new(&nested_path).unwrap();
assert!(nested_path.exists(), "Nested directories should be created");
assert!(nested_path.is_dir(), "Path should be a directory");
let mut dal = dal;
let test_data = b"test nested storage".to_vec();
let id = dal.store_binary(test_data.clone()).await.unwrap();
let retrieved = dal.retrieve_binary(&id).await.unwrap();
assert_eq!(retrieved, Some(test_data));
}
#[tokio::test]
async fn test_uuid_format() {
let (mut dal, _temp_dir) = create_test_storage().await;
let test_data = b"test data".to_vec();
let id = dal.store_binary(test_data).await.unwrap();
let parsed_uuid = Uuid::parse_str(&id);
assert!(
parsed_uuid.is_ok(),
"Returned ID should be a valid UUID: {}",
id
);
}
#[tokio::test]
async fn test_binary_data_integrity() {
let (mut dal, _temp_dir) = create_test_storage().await;
let mut binary_data = Vec::with_capacity(256);
for i in 0..=255u8 {
binary_data.push(i);
}
let id = dal.store_binary(binary_data.clone()).await.unwrap();
let retrieved = dal.retrieve_binary(&id).await.unwrap();
assert_eq!(retrieved, Some(binary_data));
}
#[tokio::test]
async fn test_very_large_file() {
let (mut dal, _temp_dir) = create_test_storage().await;
let large_data = vec![0xAB; 1024 * 1024];
let id = dal.store_binary(large_data.clone()).await.unwrap();
let retrieved = dal.retrieve_binary(&id).await.unwrap();
assert_eq!(retrieved, Some(large_data));
}
#[tokio::test]
async fn test_storage_dir_access() {
let (dal, temp_dir) = create_test_storage().await;
assert_eq!(dal.storage_dir(), temp_dir.path());
}
#[tokio::test]
async fn test_check_disk_space() {
let (dal, _temp_dir) = create_test_storage().await;
let disk_space = dal.check_disk_space().await.unwrap();
assert!(disk_space > 0, "Should report some available disk space");
}
}