use crate::error::{Result, SammError};
use crate::metamodel::Aspect;
use crate::parser::parse_aspect_from_string;
use crate::serializer::serialize_aspect_to_string;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use tracing::{debug, error, info};
#[async_trait]
pub trait CloudStorageBackend: Send + Sync {
async fn upload(&self, key: &str, data: Vec<u8>) -> std::result::Result<(), String>;
async fn download(&self, key: &str) -> std::result::Result<Vec<u8>, String>;
async fn exists(&self, key: &str) -> std::result::Result<bool, String>;
async fn delete(&self, key: &str) -> std::result::Result<(), String>;
async fn list(&self, prefix: &str) -> std::result::Result<Vec<String>, String>;
async fn get_metadata(&self, key: &str) -> std::result::Result<ObjectMetadata, String> {
Ok(ObjectMetadata {
key: key.to_string(),
size: 0,
last_modified: None,
})
}
}
pub struct CloudModelStorage {
backend: Box<dyn CloudStorageBackend>,
cache: Option<Arc<Mutex<ModelCache>>>,
}
#[derive(Debug)]
struct ModelCache {
models: HashMap<String, (Aspect, SystemTime)>,
ttl: Duration,
}
impl ModelCache {
fn new(ttl: Duration) -> Self {
Self {
models: HashMap::new(),
ttl,
}
}
fn get(&mut self, key: &str) -> Option<Aspect> {
if let Some((model, timestamp)) = self.models.get(key) {
if timestamp.elapsed().unwrap_or(Duration::MAX) < self.ttl {
debug!("Cache hit for model: {}", key);
return Some(model.clone());
} else {
debug!("Cache expired for model: {}", key);
self.models.remove(key);
}
}
None
}
fn put(&mut self, key: String, model: Aspect) {
self.models.insert(key, (model, SystemTime::now()));
}
fn clear(&mut self) {
self.models.clear();
}
}
impl CloudModelStorage {
pub fn new(backend: Box<dyn CloudStorageBackend>) -> Self {
info!("Initialized cloud model storage");
Self {
backend,
cache: Some(Arc::new(Mutex::new(ModelCache::new(Duration::from_secs(
3600,
))))),
}
}
pub fn new_without_cache(backend: Box<dyn CloudStorageBackend>) -> Self {
info!("Initialized cloud model storage (no cache)");
Self {
backend,
cache: None,
}
}
pub async fn upload_model(&mut self, key: &str, aspect: &Aspect) -> Result<()> {
info!("Uploading model to cloud: {}", key);
let ttl_content = serialize_aspect_to_string(aspect)?;
self.backend
.upload(key, ttl_content.into_bytes())
.await
.map_err(|e| SammError::cloud_error(format!("Upload failed: {}", e)))?;
if let Some(cache) = &self.cache {
if let Ok(mut cache_guard) = cache.lock() {
cache_guard.put(key.to_string(), aspect.clone());
}
}
info!("Successfully uploaded model: {}", key);
Ok(())
}
pub async fn download_model(&mut self, key: &str) -> Result<Aspect> {
if let Some(cache) = &self.cache {
if let Ok(mut cache_guard) = cache.lock() {
if let Some(model) = cache_guard.get(key) {
return Ok(model);
}
}
}
info!("Downloading model from cloud: {}", key);
let data = self
.backend
.download(key)
.await
.map_err(|e| SammError::cloud_error(format!("Download failed: {}", e)))?;
let ttl_content = String::from_utf8(data)
.map_err(|e| SammError::ParseError(format!("Invalid UTF-8: {}", e)))?;
let aspect = parse_aspect_from_string(&ttl_content, "urn:samm:org.eclipse.esmf").await?;
if let Some(cache) = &self.cache {
if let Ok(mut cache_guard) = cache.lock() {
cache_guard.put(key.to_string(), aspect.clone());
}
}
info!("Successfully downloaded model: {}", key);
Ok(aspect)
}
pub async fn model_exists(&self, key: &str) -> Result<bool> {
self.backend
.exists(key)
.await
.map_err(|e| SammError::cloud_error(format!("Existence check failed: {}", e)))
}
pub async fn delete_model(&mut self, key: &str) -> Result<()> {
info!("Deleting model from cloud: {}", key);
self.backend
.delete(key)
.await
.map_err(|e| SammError::cloud_error(format!("Delete failed: {}", e)))?;
if let Some(cache) = &self.cache {
if let Ok(mut cache_guard) = cache.lock() {
cache_guard.models.remove(key);
}
}
info!("Successfully deleted model: {}", key);
Ok(())
}
pub async fn list_models(&self, prefix: &str) -> Result<Vec<ModelInfo>> {
info!("Listing models with prefix: {}", prefix);
let keys = self
.backend
.list(prefix)
.await
.map_err(|e| SammError::cloud_error(format!("List failed: {}", e)))?;
let mut models = Vec::new();
for key in keys {
if key.ends_with(".ttl") {
if let Ok(metadata) = self.backend.get_metadata(&key).await {
models.push(ModelInfo {
key: metadata.key,
size: metadata.size,
last_modified: metadata.last_modified,
});
}
}
}
Ok(models)
}
pub async fn upload_models_batch(
&mut self,
models: Vec<(String, Aspect)>,
) -> Result<BatchResult> {
info!("Uploading {} models in batch", models.len());
let mut successful = 0;
let mut failed = Vec::new();
for (key, aspect) in models {
match self.upload_model(&key, &aspect).await {
Ok(_) => successful += 1,
Err(e) => {
error!("Failed to upload {}: {}", key, e);
failed.push((key, e.to_string()));
}
}
}
let failed_count = failed.len();
info!(
"Batch upload complete: {} successful, {} failed",
successful, failed_count
);
Ok(BatchResult {
successful,
failed,
total: successful + failed_count,
})
}
pub fn clear_cache(&mut self) {
if let Some(cache) = &self.cache {
if let Ok(mut cache_guard) = cache.lock() {
cache_guard.clear();
info!("Cache cleared");
}
}
}
pub fn cache_stats(&self) -> Option<CacheStats> {
self.cache.as_ref().and_then(|cache| {
cache.lock().ok().map(|guard| CacheStats {
entries: guard.models.len(),
ttl_seconds: guard.ttl.as_secs(),
})
})
}
}
pub struct MemoryBackend {
storage: Arc<Mutex<HashMap<String, Vec<u8>>>>,
}
impl MemoryBackend {
pub fn new() -> Self {
Self {
storage: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl Default for MemoryBackend {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl CloudStorageBackend for MemoryBackend {
async fn upload(&self, key: &str, data: Vec<u8>) -> std::result::Result<(), String> {
let mut storage = self
.storage
.lock()
.expect("storage mutex should not be poisoned");
storage.insert(key.to_string(), data);
Ok(())
}
async fn download(&self, key: &str) -> std::result::Result<Vec<u8>, String> {
let storage = self.storage.lock().expect("lock should not be poisoned");
storage
.get(key)
.cloned()
.ok_or_else(|| format!("Key not found: {}", key))
}
async fn exists(&self, key: &str) -> std::result::Result<bool, String> {
let storage = self.storage.lock().expect("lock should not be poisoned");
Ok(storage.contains_key(key))
}
async fn delete(&self, key: &str) -> std::result::Result<(), String> {
let mut storage = self
.storage
.lock()
.expect("storage mutex should not be poisoned");
storage.remove(key);
Ok(())
}
async fn list(&self, prefix: &str) -> std::result::Result<Vec<String>, String> {
let storage = self.storage.lock().expect("lock should not be poisoned");
Ok(storage
.keys()
.filter(|k| k.starts_with(prefix))
.cloned()
.collect())
}
async fn get_metadata(&self, key: &str) -> std::result::Result<ObjectMetadata, String> {
let storage = self.storage.lock().expect("lock should not be poisoned");
storage
.get(key)
.map(|data| ObjectMetadata {
key: key.to_string(),
size: data.len(),
last_modified: Some(SystemTime::now()),
})
.ok_or_else(|| format!("Key not found: {}", key))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectMetadata {
pub key: String,
pub size: usize,
pub last_modified: Option<SystemTime>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelInfo {
pub key: String,
pub size: usize,
pub last_modified: Option<SystemTime>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchResult {
pub successful: usize,
pub failed: Vec<(String, String)>,
pub total: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheStats {
pub entries: usize,
pub ttl_seconds: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metamodel::ModelElement;
#[test]
fn test_model_info_creation() {
let info = ModelInfo {
key: "models/test.ttl".to_string(),
size: 1024,
last_modified: Some(SystemTime::now()),
};
assert_eq!(info.key, "models/test.ttl");
assert_eq!(info.size, 1024);
assert!(info.last_modified.is_some());
}
#[test]
fn test_batch_result() {
let result = BatchResult {
successful: 5,
failed: vec![("model1.ttl".to_string(), "Error".to_string())],
total: 6,
};
assert_eq!(result.successful, 5);
assert_eq!(result.failed.len(), 1);
assert_eq!(result.total, 6);
}
#[test]
fn test_cache_stats() {
let stats = CacheStats {
entries: 10,
ttl_seconds: 3600,
};
assert_eq!(stats.entries, 10);
assert_eq!(stats.ttl_seconds, 3600);
}
#[tokio::test]
async fn test_memory_backend() {
let backend = MemoryBackend::new();
let data = b"test data".to_vec();
backend
.upload("test.txt", data.clone())
.await
.expect("async operation should succeed");
assert!(backend
.exists("test.txt")
.await
.expect("async operation should succeed"));
assert!(!backend
.exists("nonexistent.txt")
.await
.expect("async operation should succeed"));
let downloaded = backend
.download("test.txt")
.await
.expect("async operation should succeed");
assert_eq!(downloaded, data);
backend
.upload("dir/file1.txt", vec![])
.await
.expect("async operation should succeed");
backend
.upload("dir/file2.txt", vec![])
.await
.expect("async operation should succeed");
let files = backend
.list("dir/")
.await
.expect("async operation should succeed");
assert_eq!(files.len(), 2);
backend
.delete("test.txt")
.await
.expect("async operation should succeed");
assert!(!backend
.exists("test.txt")
.await
.expect("async operation should succeed"));
}
#[tokio::test]
async fn test_cloud_model_storage() {
let backend = MemoryBackend::new();
let mut storage = CloudModelStorage::new(Box::new(backend));
let aspect = Aspect::new("urn:samm:org.test:1.0.0#TestAspect".to_string());
storage
.upload_model("models/test.ttl", &aspect)
.await
.expect("operation should succeed");
assert!(storage
.model_exists("models/test.ttl")
.await
.expect("async operation should succeed"));
let downloaded = storage
.download_model("models/test.ttl")
.await
.expect("async operation should succeed");
assert_eq!(downloaded.name(), aspect.name());
let models = storage
.list_models("models/")
.await
.expect("async operation should succeed");
assert_eq!(models.len(), 1);
storage
.delete_model("models/test.ttl")
.await
.expect("async operation should succeed");
assert!(!storage
.model_exists("models/test.ttl")
.await
.expect("async operation should succeed"));
}
#[tokio::test]
async fn test_cache_functionality() {
let backend = MemoryBackend::new();
let mut storage = CloudModelStorage::new(Box::new(backend));
let aspect = Aspect::new("urn:samm:org.test:1.0.0#CachedAspect".to_string());
storage
.upload_model("cached/model.ttl", &aspect)
.await
.expect("operation should succeed");
let _first = storage
.download_model("cached/model.ttl")
.await
.expect("async operation should succeed");
let stats = storage.cache_stats().expect("operation should succeed");
assert_eq!(stats.entries, 1);
let _second = storage
.download_model("cached/model.ttl")
.await
.expect("async operation should succeed");
storage.clear_cache();
let stats_after_clear = storage.cache_stats().expect("clear should succeed");
assert_eq!(stats_after_clear.entries, 0);
}
}