Skip to main content

oxirs_samm/
cloud_storage.rs

1//! Cloud Storage Integration for SAMM Models
2//!
3//! This module provides a flexible trait-based cloud storage abstraction for SAMM models.
4//! Users can implement their own cloud storage backends or use pre-built integrations.
5//!
6//! # Features
7//!
8//! - **Trait-Based Design**: Implement `CloudStorageBackend` for any storage provider
9//! - **Model Caching**: Optional local caching of frequently accessed models
10//! - **Batch Operations**: Upload/download multiple models efficiently
11//! - **Async Support**: Full async/await support for I/O operations
12//!
13//! # Examples
14//!
15//! ```rust,no_run
16//! use oxirs_samm::cloud_storage::{CloudModelStorage, MemoryBackend};
17//! use oxirs_samm::metamodel::Aspect;
18//!
19//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
20//! // Create an in-memory storage backend for testing
21//! let backend = MemoryBackend::new();
22//! let mut storage = CloudModelStorage::new(Box::new(backend));
23//!
24//! // Upload a model
25//! let aspect = Aspect::new("urn:samm:org.example:1.0.0#Vehicle".to_string());
26//! storage.upload_model("models/vehicle.ttl", &aspect).await?;
27//!
28//! // Download a model
29//! let downloaded = storage.download_model("models/vehicle.ttl").await?;
30//!
31//! // List all models
32//! let models = storage.list_models("models/").await?;
33//! println!("Found {} models", models.len());
34//! # Ok(())
35//! # }
36//! ```
37//!
38//! # Implementing Custom Backends
39//!
40//! ```rust
41//! use oxirs_samm::cloud_storage::CloudStorageBackend;
42//! use async_trait::async_trait;
43//!
44//! struct MyS3Backend {
45//!     // Your AWS S3 client
46//! }
47//!
48//! #[async_trait]
49//! impl CloudStorageBackend for MyS3Backend {
50//!     async fn upload(&self, key: &str, data: Vec<u8>) -> std::result::Result<(), String> {
51//!         // Upload to S3
52//!         Ok(())
53//!     }
54//!
55//!     async fn download(&self, key: &str) -> std::result::Result<Vec<u8>, String> {
56//!         // Download from S3
57//!         Ok(vec![])
58//!     }
59//!
60//!     async fn exists(&self, key: &str) -> std::result::Result<bool, String> {
61//!         // Check if object exists in S3
62//!         Ok(false)
63//!     }
64//!
65//!     async fn delete(&self, key: &str) -> std::result::Result<(), String> {
66//!         // Delete from S3
67//!         Ok(())
68//!     }
69//!
70//!     async fn list(&self, prefix: &str) -> std::result::Result<Vec<String>, String> {
71//!         // List objects in S3
72//!         Ok(vec![])
73//!     }
74//! }
75//! ```
76
77use crate::error::{Result, SammError};
78use crate::metamodel::Aspect;
79use crate::parser::parse_aspect_from_string;
80use crate::serializer::serialize_aspect_to_string;
81use async_trait::async_trait;
82use serde::{Deserialize, Serialize};
83use std::collections::HashMap;
84use std::sync::{Arc, Mutex};
85use std::time::{Duration, SystemTime};
86use tracing::{debug, error, info};
87
88/// Trait for cloud storage backends
89///
90/// Implement this trait to add support for different cloud storage providers
91/// (AWS S3, Google Cloud Storage, Azure Blob Storage, etc.)
92#[async_trait]
93pub trait CloudStorageBackend: Send + Sync {
94    /// Upload data to cloud storage
95    async fn upload(&self, key: &str, data: Vec<u8>) -> std::result::Result<(), String>;
96
97    /// Download data from cloud storage
98    async fn download(&self, key: &str) -> std::result::Result<Vec<u8>, String>;
99
100    /// Check if an object exists
101    async fn exists(&self, key: &str) -> std::result::Result<bool, String>;
102
103    /// Delete an object
104    async fn delete(&self, key: &str) -> std::result::Result<(), String>;
105
106    /// List objects with a given prefix
107    async fn list(&self, prefix: &str) -> std::result::Result<Vec<String>, String>;
108
109    /// Get object metadata (optional, returns empty metadata by default)
110    async fn get_metadata(&self, key: &str) -> std::result::Result<ObjectMetadata, String> {
111        Ok(ObjectMetadata {
112            key: key.to_string(),
113            size: 0,
114            last_modified: None,
115        })
116    }
117}
118
119/// Cloud storage client for SAMM models
120pub struct CloudModelStorage {
121    backend: Box<dyn CloudStorageBackend>,
122    cache: Option<Arc<Mutex<ModelCache>>>,
123}
124
125/// Local cache for cloud models
126#[derive(Debug)]
127struct ModelCache {
128    models: HashMap<String, (Aspect, SystemTime)>,
129    ttl: Duration,
130}
131
132impl ModelCache {
133    fn new(ttl: Duration) -> Self {
134        Self {
135            models: HashMap::new(),
136            ttl,
137        }
138    }
139
140    fn get(&mut self, key: &str) -> Option<Aspect> {
141        if let Some((model, timestamp)) = self.models.get(key) {
142            if timestamp.elapsed().unwrap_or(Duration::MAX) < self.ttl {
143                debug!("Cache hit for model: {}", key);
144                return Some(model.clone());
145            } else {
146                debug!("Cache expired for model: {}", key);
147                self.models.remove(key);
148            }
149        }
150        None
151    }
152
153    fn put(&mut self, key: String, model: Aspect) {
154        self.models.insert(key, (model, SystemTime::now()));
155    }
156
157    fn clear(&mut self) {
158        self.models.clear();
159    }
160}
161
162impl CloudModelStorage {
163    /// Create a new cloud model storage client
164    ///
165    /// # Arguments
166    ///
167    /// * `backend` - Cloud storage backend implementation
168    ///
169    /// # Example
170    ///
171    /// ```rust
172    /// # use oxirs_samm::cloud_storage::{CloudModelStorage, MemoryBackend};
173    /// let backend = MemoryBackend::new();
174    /// let storage = CloudModelStorage::new(Box::new(backend));
175    /// ```
176    pub fn new(backend: Box<dyn CloudStorageBackend>) -> Self {
177        info!("Initialized cloud model storage");
178        Self {
179            backend,
180            cache: Some(Arc::new(Mutex::new(ModelCache::new(Duration::from_secs(
181                3600,
182            ))))),
183        }
184    }
185
186    /// Create storage without caching
187    pub fn new_without_cache(backend: Box<dyn CloudStorageBackend>) -> Self {
188        info!("Initialized cloud model storage (no cache)");
189        Self {
190            backend,
191            cache: None,
192        }
193    }
194
195    /// Upload a SAMM model to cloud storage
196    pub async fn upload_model(&mut self, key: &str, aspect: &Aspect) -> Result<()> {
197        info!("Uploading model to cloud: {}", key);
198
199        // Serialize aspect to Turtle format
200        let ttl_content = serialize_aspect_to_string(aspect)?;
201
202        // Upload to cloud
203        self.backend
204            .upload(key, ttl_content.into_bytes())
205            .await
206            .map_err(|e| SammError::cloud_error(format!("Upload failed: {}", e)))?;
207
208        // Update cache
209        if let Some(cache) = &self.cache {
210            if let Ok(mut cache_guard) = cache.lock() {
211                cache_guard.put(key.to_string(), aspect.clone());
212            }
213        }
214
215        info!("Successfully uploaded model: {}", key);
216        Ok(())
217    }
218
219    /// Download a SAMM model from cloud storage
220    pub async fn download_model(&mut self, key: &str) -> Result<Aspect> {
221        // Check cache first
222        if let Some(cache) = &self.cache {
223            if let Ok(mut cache_guard) = cache.lock() {
224                if let Some(model) = cache_guard.get(key) {
225                    return Ok(model);
226                }
227            }
228        }
229
230        info!("Downloading model from cloud: {}", key);
231
232        // Download from cloud
233        let data = self
234            .backend
235            .download(key)
236            .await
237            .map_err(|e| SammError::cloud_error(format!("Download failed: {}", e)))?;
238
239        // Parse the Turtle content
240        let ttl_content = String::from_utf8(data)
241            .map_err(|e| SammError::ParseError(format!("Invalid UTF-8: {}", e)))?;
242
243        // Use a dummy base URI for parsing
244        let aspect = parse_aspect_from_string(&ttl_content, "urn:samm:org.eclipse.esmf").await?;
245
246        // Update cache
247        if let Some(cache) = &self.cache {
248            if let Ok(mut cache_guard) = cache.lock() {
249                cache_guard.put(key.to_string(), aspect.clone());
250            }
251        }
252
253        info!("Successfully downloaded model: {}", key);
254        Ok(aspect)
255    }
256
257    /// Check if a model exists in cloud storage
258    pub async fn model_exists(&self, key: &str) -> Result<bool> {
259        self.backend
260            .exists(key)
261            .await
262            .map_err(|e| SammError::cloud_error(format!("Existence check failed: {}", e)))
263    }
264
265    /// Delete a model from cloud storage
266    pub async fn delete_model(&mut self, key: &str) -> Result<()> {
267        info!("Deleting model from cloud: {}", key);
268
269        self.backend
270            .delete(key)
271            .await
272            .map_err(|e| SammError::cloud_error(format!("Delete failed: {}", e)))?;
273
274        // Remove from cache
275        if let Some(cache) = &self.cache {
276            if let Ok(mut cache_guard) = cache.lock() {
277                cache_guard.models.remove(key);
278            }
279        }
280
281        info!("Successfully deleted model: {}", key);
282        Ok(())
283    }
284
285    /// List all models in a directory/prefix
286    pub async fn list_models(&self, prefix: &str) -> Result<Vec<ModelInfo>> {
287        info!("Listing models with prefix: {}", prefix);
288
289        let keys = self
290            .backend
291            .list(prefix)
292            .await
293            .map_err(|e| SammError::cloud_error(format!("List failed: {}", e)))?;
294
295        let mut models = Vec::new();
296        for key in keys {
297            if key.ends_with(".ttl") {
298                if let Ok(metadata) = self.backend.get_metadata(&key).await {
299                    models.push(ModelInfo {
300                        key: metadata.key,
301                        size: metadata.size,
302                        last_modified: metadata.last_modified,
303                    });
304                }
305            }
306        }
307
308        Ok(models)
309    }
310
311    /// Upload multiple models in batch
312    pub async fn upload_models_batch(
313        &mut self,
314        models: Vec<(String, Aspect)>,
315    ) -> Result<BatchResult> {
316        info!("Uploading {} models in batch", models.len());
317
318        let mut successful = 0;
319        let mut failed = Vec::new();
320
321        for (key, aspect) in models {
322            match self.upload_model(&key, &aspect).await {
323                Ok(_) => successful += 1,
324                Err(e) => {
325                    error!("Failed to upload {}: {}", key, e);
326                    failed.push((key, e.to_string()));
327                }
328            }
329        }
330
331        let failed_count = failed.len();
332
333        info!(
334            "Batch upload complete: {} successful, {} failed",
335            successful, failed_count
336        );
337
338        Ok(BatchResult {
339            successful,
340            failed,
341            total: successful + failed_count,
342        })
343    }
344
345    /// Clear the local cache
346    pub fn clear_cache(&mut self) {
347        if let Some(cache) = &self.cache {
348            if let Ok(mut cache_guard) = cache.lock() {
349                cache_guard.clear();
350                info!("Cache cleared");
351            }
352        }
353    }
354
355    /// Get cache statistics
356    pub fn cache_stats(&self) -> Option<CacheStats> {
357        self.cache.as_ref().and_then(|cache| {
358            cache.lock().ok().map(|guard| CacheStats {
359                entries: guard.models.len(),
360                ttl_seconds: guard.ttl.as_secs(),
361            })
362        })
363    }
364}
365
366/// In-memory storage backend for testing
367pub struct MemoryBackend {
368    storage: Arc<Mutex<HashMap<String, Vec<u8>>>>,
369}
370
371impl MemoryBackend {
372    /// Create a new in-memory backend
373    pub fn new() -> Self {
374        Self {
375            storage: Arc::new(Mutex::new(HashMap::new())),
376        }
377    }
378}
379
380impl Default for MemoryBackend {
381    fn default() -> Self {
382        Self::new()
383    }
384}
385
386#[async_trait]
387impl CloudStorageBackend for MemoryBackend {
388    async fn upload(&self, key: &str, data: Vec<u8>) -> std::result::Result<(), String> {
389        let mut storage = self
390            .storage
391            .lock()
392            .expect("storage mutex should not be poisoned");
393        storage.insert(key.to_string(), data);
394        Ok(())
395    }
396
397    async fn download(&self, key: &str) -> std::result::Result<Vec<u8>, String> {
398        let storage = self.storage.lock().expect("lock should not be poisoned");
399        storage
400            .get(key)
401            .cloned()
402            .ok_or_else(|| format!("Key not found: {}", key))
403    }
404
405    async fn exists(&self, key: &str) -> std::result::Result<bool, String> {
406        let storage = self.storage.lock().expect("lock should not be poisoned");
407        Ok(storage.contains_key(key))
408    }
409
410    async fn delete(&self, key: &str) -> std::result::Result<(), String> {
411        let mut storage = self
412            .storage
413            .lock()
414            .expect("storage mutex should not be poisoned");
415        storage.remove(key);
416        Ok(())
417    }
418
419    async fn list(&self, prefix: &str) -> std::result::Result<Vec<String>, String> {
420        let storage = self.storage.lock().expect("lock should not be poisoned");
421        Ok(storage
422            .keys()
423            .filter(|k| k.starts_with(prefix))
424            .cloned()
425            .collect())
426    }
427
428    async fn get_metadata(&self, key: &str) -> std::result::Result<ObjectMetadata, String> {
429        let storage = self.storage.lock().expect("lock should not be poisoned");
430        storage
431            .get(key)
432            .map(|data| ObjectMetadata {
433                key: key.to_string(),
434                size: data.len(),
435                last_modified: Some(SystemTime::now()),
436            })
437            .ok_or_else(|| format!("Key not found: {}", key))
438    }
439}
440
441/// Object metadata
442#[derive(Debug, Clone, Serialize, Deserialize)]
443pub struct ObjectMetadata {
444    /// Object key
445    pub key: String,
446    /// Size in bytes
447    pub size: usize,
448    /// Last modification time
449    pub last_modified: Option<SystemTime>,
450}
451
452/// Information about a cloud-stored model
453#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct ModelInfo {
455    /// Cloud storage key
456    pub key: String,
457    /// File size in bytes
458    pub size: usize,
459    /// Last modification timestamp
460    pub last_modified: Option<SystemTime>,
461}
462
463/// Batch operation result
464#[derive(Debug, Clone, Serialize, Deserialize)]
465pub struct BatchResult {
466    /// Number of successful operations
467    pub successful: usize,
468    /// Failed operations with error messages
469    pub failed: Vec<(String, String)>,
470    /// Total operations attempted
471    pub total: usize,
472}
473
474/// Cache statistics
475#[derive(Debug, Clone, Serialize, Deserialize)]
476pub struct CacheStats {
477    /// Number of cached entries
478    pub entries: usize,
479    /// Cache TTL in seconds
480    pub ttl_seconds: u64,
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486    use crate::metamodel::ModelElement;
487
488    #[test]
489    fn test_model_info_creation() {
490        let info = ModelInfo {
491            key: "models/test.ttl".to_string(),
492            size: 1024,
493            last_modified: Some(SystemTime::now()),
494        };
495
496        assert_eq!(info.key, "models/test.ttl");
497        assert_eq!(info.size, 1024);
498        assert!(info.last_modified.is_some());
499    }
500
501    #[test]
502    fn test_batch_result() {
503        let result = BatchResult {
504            successful: 5,
505            failed: vec![("model1.ttl".to_string(), "Error".to_string())],
506            total: 6,
507        };
508
509        assert_eq!(result.successful, 5);
510        assert_eq!(result.failed.len(), 1);
511        assert_eq!(result.total, 6);
512    }
513
514    #[test]
515    fn test_cache_stats() {
516        let stats = CacheStats {
517            entries: 10,
518            ttl_seconds: 3600,
519        };
520
521        assert_eq!(stats.entries, 10);
522        assert_eq!(stats.ttl_seconds, 3600);
523    }
524
525    #[tokio::test]
526    async fn test_memory_backend() {
527        let backend = MemoryBackend::new();
528
529        // Test upload
530        let data = b"test data".to_vec();
531        backend
532            .upload("test.txt", data.clone())
533            .await
534            .expect("async operation should succeed");
535
536        // Test exists
537        assert!(backend
538            .exists("test.txt")
539            .await
540            .expect("async operation should succeed"));
541        assert!(!backend
542            .exists("nonexistent.txt")
543            .await
544            .expect("async operation should succeed"));
545
546        // Test download
547        let downloaded = backend
548            .download("test.txt")
549            .await
550            .expect("async operation should succeed");
551        assert_eq!(downloaded, data);
552
553        // Test list
554        backend
555            .upload("dir/file1.txt", vec![])
556            .await
557            .expect("async operation should succeed");
558        backend
559            .upload("dir/file2.txt", vec![])
560            .await
561            .expect("async operation should succeed");
562        let files = backend
563            .list("dir/")
564            .await
565            .expect("async operation should succeed");
566        assert_eq!(files.len(), 2);
567
568        // Test delete
569        backend
570            .delete("test.txt")
571            .await
572            .expect("async operation should succeed");
573        assert!(!backend
574            .exists("test.txt")
575            .await
576            .expect("async operation should succeed"));
577    }
578
579    #[tokio::test]
580    async fn test_cloud_model_storage() {
581        let backend = MemoryBackend::new();
582        let mut storage = CloudModelStorage::new(Box::new(backend));
583
584        // Create a test aspect
585        let aspect = Aspect::new("urn:samm:org.test:1.0.0#TestAspect".to_string());
586
587        // Test upload
588        storage
589            .upload_model("models/test.ttl", &aspect)
590            .await
591            .expect("operation should succeed");
592
593        // Test exists
594        assert!(storage
595            .model_exists("models/test.ttl")
596            .await
597            .expect("async operation should succeed"));
598
599        // Test download
600        let downloaded = storage
601            .download_model("models/test.ttl")
602            .await
603            .expect("async operation should succeed");
604        assert_eq!(downloaded.name(), aspect.name());
605
606        // Test list
607        let models = storage
608            .list_models("models/")
609            .await
610            .expect("async operation should succeed");
611        assert_eq!(models.len(), 1);
612
613        // Test delete
614        storage
615            .delete_model("models/test.ttl")
616            .await
617            .expect("async operation should succeed");
618        assert!(!storage
619            .model_exists("models/test.ttl")
620            .await
621            .expect("async operation should succeed"));
622    }
623
624    #[tokio::test]
625    async fn test_cache_functionality() {
626        let backend = MemoryBackend::new();
627        let mut storage = CloudModelStorage::new(Box::new(backend));
628
629        let aspect = Aspect::new("urn:samm:org.test:1.0.0#CachedAspect".to_string());
630
631        // Upload model
632        storage
633            .upload_model("cached/model.ttl", &aspect)
634            .await
635            .expect("operation should succeed");
636
637        // First download (from backend)
638        let _first = storage
639            .download_model("cached/model.ttl")
640            .await
641            .expect("async operation should succeed");
642
643        // Check cache stats
644        let stats = storage.cache_stats().expect("operation should succeed");
645        assert_eq!(stats.entries, 1);
646
647        // Second download (from cache)
648        let _second = storage
649            .download_model("cached/model.ttl")
650            .await
651            .expect("async operation should succeed");
652
653        // Clear cache
654        storage.clear_cache();
655        let stats_after_clear = storage.cache_stats().expect("clear should succeed");
656        assert_eq!(stats_after_clear.entries, 0);
657    }
658}