1use crate::error::{Result, SammError};
78use crate::metamodel::Aspect;
79use crate::parser::parse_aspect_from_string;
80use crate::serializer::serialize_aspect_to_string;
81use async_trait::async_trait;
82use serde::{Deserialize, Serialize};
83use std::collections::HashMap;
84use std::sync::{Arc, Mutex};
85use std::time::{Duration, SystemTime};
86use tracing::{debug, error, info};
87
88#[async_trait]
93pub trait CloudStorageBackend: Send + Sync {
94 async fn upload(&self, key: &str, data: Vec<u8>) -> std::result::Result<(), String>;
96
97 async fn download(&self, key: &str) -> std::result::Result<Vec<u8>, String>;
99
100 async fn exists(&self, key: &str) -> std::result::Result<bool, String>;
102
103 async fn delete(&self, key: &str) -> std::result::Result<(), String>;
105
106 async fn list(&self, prefix: &str) -> std::result::Result<Vec<String>, String>;
108
109 async fn get_metadata(&self, key: &str) -> std::result::Result<ObjectMetadata, String> {
111 Ok(ObjectMetadata {
112 key: key.to_string(),
113 size: 0,
114 last_modified: None,
115 })
116 }
117}
118
119pub struct CloudModelStorage {
121 backend: Box<dyn CloudStorageBackend>,
122 cache: Option<Arc<Mutex<ModelCache>>>,
123}
124
125#[derive(Debug)]
127struct ModelCache {
128 models: HashMap<String, (Aspect, SystemTime)>,
129 ttl: Duration,
130}
131
132impl ModelCache {
133 fn new(ttl: Duration) -> Self {
134 Self {
135 models: HashMap::new(),
136 ttl,
137 }
138 }
139
140 fn get(&mut self, key: &str) -> Option<Aspect> {
141 if let Some((model, timestamp)) = self.models.get(key) {
142 if timestamp.elapsed().unwrap_or(Duration::MAX) < self.ttl {
143 debug!("Cache hit for model: {}", key);
144 return Some(model.clone());
145 } else {
146 debug!("Cache expired for model: {}", key);
147 self.models.remove(key);
148 }
149 }
150 None
151 }
152
153 fn put(&mut self, key: String, model: Aspect) {
154 self.models.insert(key, (model, SystemTime::now()));
155 }
156
157 fn clear(&mut self) {
158 self.models.clear();
159 }
160}
161
162impl CloudModelStorage {
163 pub fn new(backend: Box<dyn CloudStorageBackend>) -> Self {
177 info!("Initialized cloud model storage");
178 Self {
179 backend,
180 cache: Some(Arc::new(Mutex::new(ModelCache::new(Duration::from_secs(
181 3600,
182 ))))),
183 }
184 }
185
186 pub fn new_without_cache(backend: Box<dyn CloudStorageBackend>) -> Self {
188 info!("Initialized cloud model storage (no cache)");
189 Self {
190 backend,
191 cache: None,
192 }
193 }
194
195 pub async fn upload_model(&mut self, key: &str, aspect: &Aspect) -> Result<()> {
197 info!("Uploading model to cloud: {}", key);
198
199 let ttl_content = serialize_aspect_to_string(aspect)?;
201
202 self.backend
204 .upload(key, ttl_content.into_bytes())
205 .await
206 .map_err(|e| SammError::cloud_error(format!("Upload failed: {}", e)))?;
207
208 if let Some(cache) = &self.cache {
210 if let Ok(mut cache_guard) = cache.lock() {
211 cache_guard.put(key.to_string(), aspect.clone());
212 }
213 }
214
215 info!("Successfully uploaded model: {}", key);
216 Ok(())
217 }
218
219 pub async fn download_model(&mut self, key: &str) -> Result<Aspect> {
221 if let Some(cache) = &self.cache {
223 if let Ok(mut cache_guard) = cache.lock() {
224 if let Some(model) = cache_guard.get(key) {
225 return Ok(model);
226 }
227 }
228 }
229
230 info!("Downloading model from cloud: {}", key);
231
232 let data = self
234 .backend
235 .download(key)
236 .await
237 .map_err(|e| SammError::cloud_error(format!("Download failed: {}", e)))?;
238
239 let ttl_content = String::from_utf8(data)
241 .map_err(|e| SammError::ParseError(format!("Invalid UTF-8: {}", e)))?;
242
243 let aspect = parse_aspect_from_string(&ttl_content, "urn:samm:org.eclipse.esmf").await?;
245
246 if let Some(cache) = &self.cache {
248 if let Ok(mut cache_guard) = cache.lock() {
249 cache_guard.put(key.to_string(), aspect.clone());
250 }
251 }
252
253 info!("Successfully downloaded model: {}", key);
254 Ok(aspect)
255 }
256
257 pub async fn model_exists(&self, key: &str) -> Result<bool> {
259 self.backend
260 .exists(key)
261 .await
262 .map_err(|e| SammError::cloud_error(format!("Existence check failed: {}", e)))
263 }
264
265 pub async fn delete_model(&mut self, key: &str) -> Result<()> {
267 info!("Deleting model from cloud: {}", key);
268
269 self.backend
270 .delete(key)
271 .await
272 .map_err(|e| SammError::cloud_error(format!("Delete failed: {}", e)))?;
273
274 if let Some(cache) = &self.cache {
276 if let Ok(mut cache_guard) = cache.lock() {
277 cache_guard.models.remove(key);
278 }
279 }
280
281 info!("Successfully deleted model: {}", key);
282 Ok(())
283 }
284
285 pub async fn list_models(&self, prefix: &str) -> Result<Vec<ModelInfo>> {
287 info!("Listing models with prefix: {}", prefix);
288
289 let keys = self
290 .backend
291 .list(prefix)
292 .await
293 .map_err(|e| SammError::cloud_error(format!("List failed: {}", e)))?;
294
295 let mut models = Vec::new();
296 for key in keys {
297 if key.ends_with(".ttl") {
298 if let Ok(metadata) = self.backend.get_metadata(&key).await {
299 models.push(ModelInfo {
300 key: metadata.key,
301 size: metadata.size,
302 last_modified: metadata.last_modified,
303 });
304 }
305 }
306 }
307
308 Ok(models)
309 }
310
311 pub async fn upload_models_batch(
313 &mut self,
314 models: Vec<(String, Aspect)>,
315 ) -> Result<BatchResult> {
316 info!("Uploading {} models in batch", models.len());
317
318 let mut successful = 0;
319 let mut failed = Vec::new();
320
321 for (key, aspect) in models {
322 match self.upload_model(&key, &aspect).await {
323 Ok(_) => successful += 1,
324 Err(e) => {
325 error!("Failed to upload {}: {}", key, e);
326 failed.push((key, e.to_string()));
327 }
328 }
329 }
330
331 let failed_count = failed.len();
332
333 info!(
334 "Batch upload complete: {} successful, {} failed",
335 successful, failed_count
336 );
337
338 Ok(BatchResult {
339 successful,
340 failed,
341 total: successful + failed_count,
342 })
343 }
344
345 pub fn clear_cache(&mut self) {
347 if let Some(cache) = &self.cache {
348 if let Ok(mut cache_guard) = cache.lock() {
349 cache_guard.clear();
350 info!("Cache cleared");
351 }
352 }
353 }
354
355 pub fn cache_stats(&self) -> Option<CacheStats> {
357 self.cache.as_ref().and_then(|cache| {
358 cache.lock().ok().map(|guard| CacheStats {
359 entries: guard.models.len(),
360 ttl_seconds: guard.ttl.as_secs(),
361 })
362 })
363 }
364}
365
366pub struct MemoryBackend {
368 storage: Arc<Mutex<HashMap<String, Vec<u8>>>>,
369}
370
371impl MemoryBackend {
372 pub fn new() -> Self {
374 Self {
375 storage: Arc::new(Mutex::new(HashMap::new())),
376 }
377 }
378}
379
380impl Default for MemoryBackend {
381 fn default() -> Self {
382 Self::new()
383 }
384}
385
386#[async_trait]
387impl CloudStorageBackend for MemoryBackend {
388 async fn upload(&self, key: &str, data: Vec<u8>) -> std::result::Result<(), String> {
389 let mut storage = self
390 .storage
391 .lock()
392 .expect("storage mutex should not be poisoned");
393 storage.insert(key.to_string(), data);
394 Ok(())
395 }
396
397 async fn download(&self, key: &str) -> std::result::Result<Vec<u8>, String> {
398 let storage = self.storage.lock().expect("lock should not be poisoned");
399 storage
400 .get(key)
401 .cloned()
402 .ok_or_else(|| format!("Key not found: {}", key))
403 }
404
405 async fn exists(&self, key: &str) -> std::result::Result<bool, String> {
406 let storage = self.storage.lock().expect("lock should not be poisoned");
407 Ok(storage.contains_key(key))
408 }
409
410 async fn delete(&self, key: &str) -> std::result::Result<(), String> {
411 let mut storage = self
412 .storage
413 .lock()
414 .expect("storage mutex should not be poisoned");
415 storage.remove(key);
416 Ok(())
417 }
418
419 async fn list(&self, prefix: &str) -> std::result::Result<Vec<String>, String> {
420 let storage = self.storage.lock().expect("lock should not be poisoned");
421 Ok(storage
422 .keys()
423 .filter(|k| k.starts_with(prefix))
424 .cloned()
425 .collect())
426 }
427
428 async fn get_metadata(&self, key: &str) -> std::result::Result<ObjectMetadata, String> {
429 let storage = self.storage.lock().expect("lock should not be poisoned");
430 storage
431 .get(key)
432 .map(|data| ObjectMetadata {
433 key: key.to_string(),
434 size: data.len(),
435 last_modified: Some(SystemTime::now()),
436 })
437 .ok_or_else(|| format!("Key not found: {}", key))
438 }
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize)]
443pub struct ObjectMetadata {
444 pub key: String,
446 pub size: usize,
448 pub last_modified: Option<SystemTime>,
450}
451
452#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct ModelInfo {
455 pub key: String,
457 pub size: usize,
459 pub last_modified: Option<SystemTime>,
461}
462
463#[derive(Debug, Clone, Serialize, Deserialize)]
465pub struct BatchResult {
466 pub successful: usize,
468 pub failed: Vec<(String, String)>,
470 pub total: usize,
472}
473
474#[derive(Debug, Clone, Serialize, Deserialize)]
476pub struct CacheStats {
477 pub entries: usize,
479 pub ttl_seconds: u64,
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486 use crate::metamodel::ModelElement;
487
488 #[test]
489 fn test_model_info_creation() {
490 let info = ModelInfo {
491 key: "models/test.ttl".to_string(),
492 size: 1024,
493 last_modified: Some(SystemTime::now()),
494 };
495
496 assert_eq!(info.key, "models/test.ttl");
497 assert_eq!(info.size, 1024);
498 assert!(info.last_modified.is_some());
499 }
500
501 #[test]
502 fn test_batch_result() {
503 let result = BatchResult {
504 successful: 5,
505 failed: vec![("model1.ttl".to_string(), "Error".to_string())],
506 total: 6,
507 };
508
509 assert_eq!(result.successful, 5);
510 assert_eq!(result.failed.len(), 1);
511 assert_eq!(result.total, 6);
512 }
513
514 #[test]
515 fn test_cache_stats() {
516 let stats = CacheStats {
517 entries: 10,
518 ttl_seconds: 3600,
519 };
520
521 assert_eq!(stats.entries, 10);
522 assert_eq!(stats.ttl_seconds, 3600);
523 }
524
525 #[tokio::test]
526 async fn test_memory_backend() {
527 let backend = MemoryBackend::new();
528
529 let data = b"test data".to_vec();
531 backend
532 .upload("test.txt", data.clone())
533 .await
534 .expect("async operation should succeed");
535
536 assert!(backend
538 .exists("test.txt")
539 .await
540 .expect("async operation should succeed"));
541 assert!(!backend
542 .exists("nonexistent.txt")
543 .await
544 .expect("async operation should succeed"));
545
546 let downloaded = backend
548 .download("test.txt")
549 .await
550 .expect("async operation should succeed");
551 assert_eq!(downloaded, data);
552
553 backend
555 .upload("dir/file1.txt", vec![])
556 .await
557 .expect("async operation should succeed");
558 backend
559 .upload("dir/file2.txt", vec![])
560 .await
561 .expect("async operation should succeed");
562 let files = backend
563 .list("dir/")
564 .await
565 .expect("async operation should succeed");
566 assert_eq!(files.len(), 2);
567
568 backend
570 .delete("test.txt")
571 .await
572 .expect("async operation should succeed");
573 assert!(!backend
574 .exists("test.txt")
575 .await
576 .expect("async operation should succeed"));
577 }
578
579 #[tokio::test]
580 async fn test_cloud_model_storage() {
581 let backend = MemoryBackend::new();
582 let mut storage = CloudModelStorage::new(Box::new(backend));
583
584 let aspect = Aspect::new("urn:samm:org.test:1.0.0#TestAspect".to_string());
586
587 storage
589 .upload_model("models/test.ttl", &aspect)
590 .await
591 .expect("operation should succeed");
592
593 assert!(storage
595 .model_exists("models/test.ttl")
596 .await
597 .expect("async operation should succeed"));
598
599 let downloaded = storage
601 .download_model("models/test.ttl")
602 .await
603 .expect("async operation should succeed");
604 assert_eq!(downloaded.name(), aspect.name());
605
606 let models = storage
608 .list_models("models/")
609 .await
610 .expect("async operation should succeed");
611 assert_eq!(models.len(), 1);
612
613 storage
615 .delete_model("models/test.ttl")
616 .await
617 .expect("async operation should succeed");
618 assert!(!storage
619 .model_exists("models/test.ttl")
620 .await
621 .expect("async operation should succeed"));
622 }
623
624 #[tokio::test]
625 async fn test_cache_functionality() {
626 let backend = MemoryBackend::new();
627 let mut storage = CloudModelStorage::new(Box::new(backend));
628
629 let aspect = Aspect::new("urn:samm:org.test:1.0.0#CachedAspect".to_string());
630
631 storage
633 .upload_model("cached/model.ttl", &aspect)
634 .await
635 .expect("operation should succeed");
636
637 let _first = storage
639 .download_model("cached/model.ttl")
640 .await
641 .expect("async operation should succeed");
642
643 let stats = storage.cache_stats().expect("operation should succeed");
645 assert_eq!(stats.entries, 1);
646
647 let _second = storage
649 .download_model("cached/model.ttl")
650 .await
651 .expect("async operation should succeed");
652
653 storage.clear_cache();
655 let stats_after_clear = storage.cache_stats().expect("clear should succeed");
656 assert_eq!(stats_after_clear.entries, 0);
657 }
658}