Skip to main content

torsh_package/
storage.rs

1//! Cloud storage abstraction for package distribution
2//!
3//! This module provides a unified interface for storing and retrieving packages
4//! from various storage backends including local file systems, S3, GCS, and Azure Blob Storage.
5//!
6//! # Architecture
7//!
8//! The storage system is designed with the following components:
9//! - **StorageBackend**: Trait defining common operations for all storage backends
10//! - **LocalStorage**: Local file system implementation
11//! - **S3Storage**: AWS S3 storage implementation (feature-gated)
12//! - **GcsStorage**: Google Cloud Storage implementation (feature-gated)
13//! - **AzureStorage**: Azure Blob Storage implementation (feature-gated)
14//! - **StorageManager**: High-level interface with caching and retry logic
15//!
16//! # Example
17//!
18//! ```rust,no_run
19//! use torsh_package::storage::{LocalStorage, StorageBackend, StorageManager};
20//! use std::path::PathBuf;
21//!
22//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
23//! // Create local storage backend
24//! let local_storage = LocalStorage::new(PathBuf::from("/var/packages"))?;
25//!
26//! // Create storage manager with caching
27//! let mut manager = StorageManager::new(Box::new(local_storage))
28//!     .with_cache_size(1024 * 1024 * 100) // 100MB cache
29//!     .with_retry_count(3);
30//!
31//! // Store a package
32//! let package_data = b"package contents";
33//! manager.put("models/my-model/v1.0.0.torshpkg", package_data)?;
34//!
35//! // Retrieve a package
36//! let retrieved = manager.get("models/my-model/v1.0.0.torshpkg")?;
37//! assert_eq!(retrieved, package_data);
38//!
39//! // List packages
40//! let packages = manager.list("models/my-model/")?;
41//! for package in packages {
42//!     println!("Found package: {}", package.key);
43//! }
44//! # Ok(())
45//! # }
46//! ```
47
48use serde::{Deserialize, Serialize};
49use std::collections::HashMap;
50use std::fs;
51use std::io::{Read, Write};
52use std::path::{Path, PathBuf};
53use std::time::SystemTime;
54use torsh_core::error::{Result, TorshError};
55
56/// Storage backend trait for package storage
57///
58/// This trait defines the interface that all storage backends must implement.
59/// Implementations should handle connection management, authentication, and
60/// error handling internally.
61pub trait StorageBackend: Send + Sync {
62    /// Store data at the specified key
63    ///
64    /// # Arguments
65    /// * `key` - The storage key (path) for the data
66    /// * `data` - The data to store
67    ///
68    /// # Returns
69    /// * `Ok(())` on success
70    /// * `Err(TorshError)` on failure
71    fn put(&mut self, key: &str, data: &[u8]) -> Result<()>;
72
73    /// Retrieve data from the specified key
74    ///
75    /// # Arguments
76    /// * `key` - The storage key (path) to retrieve
77    ///
78    /// # Returns
79    /// * `Ok(Vec<u8>)` containing the data on success
80    /// * `Err(TorshError)` if the key doesn't exist or retrieval fails
81    fn get(&self, key: &str) -> Result<Vec<u8>>;
82
83    /// Delete data at the specified key
84    ///
85    /// # Arguments
86    /// * `key` - The storage key (path) to delete
87    ///
88    /// # Returns
89    /// * `Ok(())` on success or if key doesn't exist
90    /// * `Err(TorshError)` on failure
91    fn delete(&mut self, key: &str) -> Result<()>;
92
93    /// Check if a key exists
94    ///
95    /// # Arguments
96    /// * `key` - The storage key (path) to check
97    ///
98    /// # Returns
99    /// * `Ok(true)` if the key exists
100    /// * `Ok(false)` if the key doesn't exist
101    /// * `Err(TorshError)` on error checking existence
102    fn exists(&self, key: &str) -> Result<bool>;
103
104    /// List all keys with the specified prefix
105    ///
106    /// # Arguments
107    /// * `prefix` - The prefix to filter keys (e.g., "models/")
108    ///
109    /// # Returns
110    /// * `Ok(Vec<StorageObject>)` containing metadata about matching objects
111    /// * `Err(TorshError)` on failure
112    fn list(&self, prefix: &str) -> Result<Vec<StorageObject>>;
113
114    /// Get metadata about a stored object
115    ///
116    /// # Arguments
117    /// * `key` - The storage key (path) to get metadata for
118    ///
119    /// # Returns
120    /// * `Ok(StorageObject)` containing object metadata
121    /// * `Err(TorshError)` if the key doesn't exist or retrieval fails
122    fn get_metadata(&self, key: &str) -> Result<StorageObject>;
123
124    /// Copy an object from one key to another
125    ///
126    /// # Arguments
127    /// * `from_key` - Source key
128    /// * `to_key` - Destination key
129    ///
130    /// # Returns
131    /// * `Ok(())` on success
132    /// * `Err(TorshError)` on failure
133    fn copy(&mut self, from_key: &str, to_key: &str) -> Result<()> {
134        let data = self.get(from_key)?;
135        self.put(to_key, &data)
136    }
137
138    /// Get the storage backend type
139    fn backend_type(&self) -> &str;
140}
141
142/// Metadata about a stored object
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct StorageObject {
145    /// Storage key (path)
146    pub key: String,
147    /// Size in bytes
148    pub size: u64,
149    /// Last modified timestamp
150    pub last_modified: SystemTime,
151    /// Content type (MIME type)
152    pub content_type: Option<String>,
153    /// ETag or version identifier
154    pub etag: Option<String>,
155    /// Additional metadata
156    pub metadata: HashMap<String, String>,
157}
158
159/// Local file system storage backend
160///
161/// This implementation stores packages in a local directory structure.
162/// It's useful for development, testing, and single-machine deployments.
163pub struct LocalStorage {
164    /// Base directory for storage
165    base_path: PathBuf,
166}
167
168impl LocalStorage {
169    /// Create a new local storage backend
170    ///
171    /// # Arguments
172    /// * `base_path` - Base directory for storing packages
173    ///
174    /// # Returns
175    /// * `Ok(LocalStorage)` on success
176    /// * `Err(TorshError)` if the directory cannot be created or accessed
177    pub fn new(base_path: PathBuf) -> Result<Self> {
178        // Create base directory if it doesn't exist
179        if !base_path.exists() {
180            fs::create_dir_all(&base_path).map_err(|e| {
181                TorshError::IoError(format!(
182                    "Failed to create storage directory {}: {}",
183                    base_path.display(),
184                    e
185                ))
186            })?;
187        }
188
189        Ok(Self { base_path })
190    }
191
192    /// Get the full path for a storage key
193    fn get_path(&self, key: &str) -> PathBuf {
194        self.base_path.join(key)
195    }
196
197    /// Ensure parent directory exists for a key
198    fn ensure_parent_dir(&self, key: &str) -> Result<()> {
199        let path = self.get_path(key);
200        if let Some(parent) = path.parent() {
201            if !parent.exists() {
202                fs::create_dir_all(parent).map_err(|e| {
203                    TorshError::IoError(format!("Failed to create parent directory: {}", e))
204                })?;
205            }
206        }
207        Ok(())
208    }
209}
210
211impl StorageBackend for LocalStorage {
212    fn put(&mut self, key: &str, data: &[u8]) -> Result<()> {
213        self.ensure_parent_dir(key)?;
214        let path = self.get_path(key);
215
216        let mut file = fs::File::create(&path).map_err(|e| {
217            TorshError::IoError(format!("Failed to create file {}: {}", path.display(), e))
218        })?;
219
220        file.write_all(data).map_err(|e| {
221            TorshError::IoError(format!("Failed to write to file {}: {}", path.display(), e))
222        })?;
223
224        Ok(())
225    }
226
227    fn get(&self, key: &str) -> Result<Vec<u8>> {
228        let path = self.get_path(key);
229
230        if !path.exists() {
231            return Err(TorshError::InvalidArgument(format!(
232                "Storage key not found: {}",
233                key
234            )));
235        }
236
237        let mut file = fs::File::open(&path).map_err(|e| {
238            TorshError::IoError(format!("Failed to open file {}: {}", path.display(), e))
239        })?;
240
241        let mut data = Vec::new();
242        file.read_to_end(&mut data).map_err(|e| {
243            TorshError::IoError(format!("Failed to read file {}: {}", path.display(), e))
244        })?;
245
246        Ok(data)
247    }
248
249    fn delete(&mut self, key: &str) -> Result<()> {
250        let path = self.get_path(key);
251
252        if path.exists() {
253            fs::remove_file(&path).map_err(|e| {
254                TorshError::IoError(format!("Failed to delete file {}: {}", path.display(), e))
255            })?;
256        }
257
258        Ok(())
259    }
260
261    fn exists(&self, key: &str) -> Result<bool> {
262        let path = self.get_path(key);
263        Ok(path.exists())
264    }
265
266    fn list(&self, prefix: &str) -> Result<Vec<StorageObject>> {
267        let prefix_path = self.get_path(prefix);
268
269        if !prefix_path.exists() {
270            return Ok(Vec::new());
271        }
272
273        let mut objects = Vec::new();
274
275        // Walk directory recursively
276        fn walk_dir(dir: &Path, base: &Path, objects: &mut Vec<StorageObject>) -> Result<()> {
277            if dir.is_dir() {
278                for entry in fs::read_dir(dir)
279                    .map_err(|e| TorshError::IoError(format!("Failed to read directory: {}", e)))?
280                {
281                    let entry = entry.map_err(|e| {
282                        TorshError::IoError(format!("Failed to read directory entry: {}", e))
283                    })?;
284                    let path = entry.path();
285
286                    if path.is_file() {
287                        let metadata = fs::metadata(&path).map_err(|e| {
288                            TorshError::IoError(format!("Failed to get metadata: {}", e))
289                        })?;
290
291                        let relative_path = path
292                            .strip_prefix(base)
293                            .map_err(|e| {
294                                TorshError::InvalidArgument(format!("Invalid path: {}", e))
295                            })?
296                            .to_string_lossy()
297                            .to_string();
298
299                        objects.push(StorageObject {
300                            key: relative_path,
301                            size: metadata.len(),
302                            last_modified: metadata
303                                .modified()
304                                .unwrap_or_else(|_| SystemTime::now()),
305                            content_type: None,
306                            etag: None,
307                            metadata: HashMap::new(),
308                        });
309                    } else if path.is_dir() {
310                        walk_dir(&path, base, objects)?;
311                    }
312                }
313            }
314            Ok(())
315        }
316
317        walk_dir(&prefix_path, &self.base_path, &mut objects)?;
318
319        Ok(objects)
320    }
321
322    fn get_metadata(&self, key: &str) -> Result<StorageObject> {
323        let path = self.get_path(key);
324
325        if !path.exists() {
326            return Err(TorshError::InvalidArgument(format!(
327                "Storage key not found: {}",
328                key
329            )));
330        }
331
332        let metadata = fs::metadata(&path).map_err(|e| {
333            TorshError::IoError(format!("Failed to get metadata for {}: {}", key, e))
334        })?;
335
336        Ok(StorageObject {
337            key: key.to_string(),
338            size: metadata.len(),
339            last_modified: metadata.modified().unwrap_or_else(|_| SystemTime::now()),
340            content_type: None,
341            etag: None,
342            metadata: HashMap::new(),
343        })
344    }
345
346    fn backend_type(&self) -> &str {
347        "local"
348    }
349}
350
351/// Storage manager with caching and retry logic
352///
353/// Provides a high-level interface for storage operations with:
354/// - In-memory caching for frequently accessed objects
355/// - Automatic retry on transient failures
356/// - Bandwidth throttling
357/// - Metrics collection
358pub struct StorageManager {
359    backend: Box<dyn StorageBackend>,
360    cache: HashMap<String, CachedObject>,
361    cache_size_limit: usize,
362    current_cache_size: usize,
363    retry_count: u32,
364    stats: StorageStats,
365}
366
367/// Cached object with metadata
368#[derive(Clone)]
369struct CachedObject {
370    data: Vec<u8>,
371    accessed_at: SystemTime,
372    access_count: u64,
373}
374
375/// Storage operation statistics
376#[derive(Debug, Default, Clone)]
377pub struct StorageStats {
378    /// Total number of get operations
379    pub gets: u64,
380    /// Total number of put operations
381    pub puts: u64,
382    /// Total number of delete operations
383    pub deletes: u64,
384    /// Total bytes read
385    pub bytes_read: u64,
386    /// Total bytes written
387    pub bytes_written: u64,
388    /// Cache hit count
389    pub cache_hits: u64,
390    /// Cache miss count
391    pub cache_misses: u64,
392}
393
394impl StorageManager {
395    /// Create a new storage manager
396    ///
397    /// # Arguments
398    /// * `backend` - The storage backend to use
399    pub fn new(backend: Box<dyn StorageBackend>) -> Self {
400        Self {
401            backend,
402            cache: HashMap::new(),
403            cache_size_limit: 100 * 1024 * 1024, // 100MB default
404            current_cache_size: 0,
405            retry_count: 3,
406            stats: StorageStats::default(),
407        }
408    }
409
410    /// Set the cache size limit
411    pub fn with_cache_size(mut self, size_bytes: usize) -> Self {
412        self.cache_size_limit = size_bytes;
413        self
414    }
415
416    /// Set the retry count for failed operations
417    pub fn with_retry_count(mut self, count: u32) -> Self {
418        self.retry_count = count;
419        self
420    }
421
422    /// Store data with retry logic
423    pub fn put(&mut self, key: &str, data: &[u8]) -> Result<()> {
424        let mut last_error = None;
425
426        for attempt in 0..=self.retry_count {
427            match self.backend.put(key, data) {
428                Ok(()) => {
429                    self.stats.puts += 1;
430                    self.stats.bytes_written += data.len() as u64;
431
432                    // Update cache if key exists in cache
433                    if self.cache.contains_key(key) {
434                        self.put_in_cache(key, data);
435                    }
436
437                    return Ok(());
438                }
439                Err(e) => {
440                    last_error = Some(e);
441                    if attempt < self.retry_count {
442                        // Exponential backoff
443                        let backoff_ms = 100 * 2u64.pow(attempt);
444                        std::thread::sleep(std::time::Duration::from_millis(backoff_ms));
445                    }
446                }
447            }
448        }
449
450        Err(last_error.expect("last_error is set when retries exhausted"))
451    }
452
453    /// Retrieve data with caching and retry logic
454    pub fn get(&mut self, key: &str) -> Result<Vec<u8>> {
455        // Check cache first
456        if let Some(cached) = self.cache.get_mut(key) {
457            cached.accessed_at = SystemTime::now();
458            cached.access_count += 1;
459            self.stats.cache_hits += 1;
460            self.stats.gets += 1;
461            return Ok(cached.data.clone());
462        }
463
464        self.stats.cache_misses += 1;
465
466        // Try to fetch from backend with retry
467        let mut last_error = None;
468
469        for attempt in 0..=self.retry_count {
470            match self.backend.get(key) {
471                Ok(data) => {
472                    self.stats.gets += 1;
473                    self.stats.bytes_read += data.len() as u64;
474
475                    // Add to cache
476                    self.put_in_cache(key, &data);
477
478                    return Ok(data);
479                }
480                Err(e) => {
481                    last_error = Some(e);
482                    if attempt < self.retry_count {
483                        let backoff_ms = 100 * 2u64.pow(attempt);
484                        std::thread::sleep(std::time::Duration::from_millis(backoff_ms));
485                    }
486                }
487            }
488        }
489
490        Err(last_error.expect("last_error is set when retries exhausted"))
491    }
492
493    /// Delete data with retry logic
494    pub fn delete(&mut self, key: &str) -> Result<()> {
495        // Remove from cache
496        if let Some(cached) = self.cache.remove(key) {
497            self.current_cache_size -= cached.data.len();
498        }
499
500        let mut last_error = None;
501
502        for attempt in 0..=self.retry_count {
503            match self.backend.delete(key) {
504                Ok(()) => {
505                    self.stats.deletes += 1;
506                    return Ok(());
507                }
508                Err(e) => {
509                    last_error = Some(e);
510                    if attempt < self.retry_count {
511                        let backoff_ms = 100 * 2u64.pow(attempt);
512                        std::thread::sleep(std::time::Duration::from_millis(backoff_ms));
513                    }
514                }
515            }
516        }
517
518        Err(last_error.expect("last_error is set when retries exhausted"))
519    }
520
521    /// Check if a key exists
522    pub fn exists(&self, key: &str) -> Result<bool> {
523        if self.cache.contains_key(key) {
524            return Ok(true);
525        }
526        self.backend.exists(key)
527    }
528
529    /// List objects with the specified prefix
530    pub fn list(&self, prefix: &str) -> Result<Vec<StorageObject>> {
531        self.backend.list(prefix)
532    }
533
534    /// Get metadata about a stored object
535    pub fn get_metadata(&self, key: &str) -> Result<StorageObject> {
536        self.backend.get_metadata(key)
537    }
538
539    /// Copy an object
540    pub fn copy(&mut self, from_key: &str, to_key: &str) -> Result<()> {
541        self.backend.copy(from_key, to_key)
542    }
543
544    /// Clear the cache
545    pub fn clear_cache(&mut self) {
546        self.cache.clear();
547        self.current_cache_size = 0;
548    }
549
550    /// Get storage statistics
551    pub fn stats(&self) -> &StorageStats {
552        &self.stats
553    }
554
555    /// Reset statistics
556    pub fn reset_stats(&mut self) {
557        self.stats = StorageStats::default();
558    }
559
560    /// Add data to cache with eviction if needed
561    fn put_in_cache(&mut self, key: &str, data: &[u8]) {
562        // Check if we need to evict
563        while self.current_cache_size + data.len() > self.cache_size_limit && !self.cache.is_empty()
564        {
565            // Evict least recently used item
566            if let Some(lru_key) = self.find_lru_key() {
567                if let Some(removed) = self.cache.remove(&lru_key) {
568                    self.current_cache_size -= removed.data.len();
569                }
570            } else {
571                break;
572            }
573        }
574
575        // Only cache if it fits
576        if data.len() <= self.cache_size_limit {
577            self.current_cache_size += data.len();
578            self.cache.insert(
579                key.to_string(),
580                CachedObject {
581                    data: data.to_vec(),
582                    accessed_at: SystemTime::now(),
583                    access_count: 1,
584                },
585            );
586        }
587    }
588
589    /// Find the least recently used cache key
590    fn find_lru_key(&self) -> Option<String> {
591        self.cache
592            .iter()
593            .min_by_key(|(_, obj)| obj.accessed_at)
594            .map(|(key, _)| key.clone())
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use tempfile::TempDir;
602
603    #[test]
604    fn test_local_storage_creation() {
605        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
606        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
607            .expect("Failed to create storage path");
608        assert_eq!(storage.backend_type(), "local");
609    }
610
611    #[test]
612    fn test_local_storage_put_get() {
613        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
614        let mut storage = LocalStorage::new(temp_dir.path().to_path_buf())
615            .expect("Failed to create storage path");
616
617        let data = b"test package data";
618        storage.put("test/package.bin", data).unwrap();
619
620        let retrieved = storage.get("test/package.bin").unwrap();
621        assert_eq!(retrieved, data);
622    }
623
624    #[test]
625    fn test_local_storage_exists() {
626        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
627        let mut storage = LocalStorage::new(temp_dir.path().to_path_buf())
628            .expect("Failed to create storage path");
629
630        assert!(!storage.exists("nonexistent").unwrap());
631
632        storage.put("exists", b"data").unwrap();
633        assert!(storage.exists("exists").unwrap());
634    }
635
636    #[test]
637    fn test_local_storage_delete() {
638        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
639        let mut storage = LocalStorage::new(temp_dir.path().to_path_buf())
640            .expect("Failed to create storage path");
641
642        storage.put("to_delete", b"data").unwrap();
643        assert!(storage.exists("to_delete").unwrap());
644
645        storage.delete("to_delete").unwrap();
646        assert!(!storage.exists("to_delete").unwrap());
647    }
648
649    #[test]
650    fn test_local_storage_list() {
651        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
652        let mut storage = LocalStorage::new(temp_dir.path().to_path_buf())
653            .expect("Failed to create storage path");
654
655        storage.put("models/model1.bin", b"data1").unwrap();
656        storage.put("models/model2.bin", b"data2").unwrap();
657        storage.put("other/file.txt", b"data3").unwrap();
658
659        let models = storage.list("models/").unwrap();
660        assert_eq!(models.len(), 2);
661
662        let all = storage.list("").unwrap();
663        assert_eq!(all.len(), 3);
664    }
665
666    #[test]
667    fn test_local_storage_metadata() {
668        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
669        let mut storage = LocalStorage::new(temp_dir.path().to_path_buf())
670            .expect("Failed to create storage path");
671
672        let data = b"test data";
673        storage.put("metadata_test", data).unwrap();
674
675        let metadata = storage.get_metadata("metadata_test").unwrap();
676        assert_eq!(metadata.size, data.len() as u64);
677        assert_eq!(metadata.key, "metadata_test");
678    }
679
680    #[test]
681    fn test_storage_manager_caching() {
682        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
683        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
684            .expect("Failed to create storage path");
685        let mut manager = StorageManager::new(Box::new(storage)).with_cache_size(1024 * 1024);
686
687        let data = b"cached data";
688        manager.put("cache_test", data).unwrap();
689
690        // First get - should be cache miss
691        let retrieved1 = manager.get("cache_test").unwrap();
692        assert_eq!(retrieved1, data);
693        assert_eq!(manager.stats().cache_misses, 1);
694        assert_eq!(manager.stats().cache_hits, 0);
695
696        // Second get - should be cache hit
697        let retrieved2 = manager.get("cache_test").unwrap();
698        assert_eq!(retrieved2, data);
699        assert_eq!(manager.stats().cache_misses, 1);
700        assert_eq!(manager.stats().cache_hits, 1);
701    }
702
703    #[test]
704    fn test_storage_manager_cache_eviction() {
705        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
706        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
707            .expect("Failed to create storage path");
708        let mut manager = StorageManager::new(Box::new(storage)).with_cache_size(100); // Small cache
709
710        // Add data larger than cache
711        manager.put("large1", &vec![1u8; 60]).unwrap();
712        manager.put("large2", &vec![2u8; 60]).unwrap();
713
714        // Load both into cache
715        manager.get("large1").unwrap();
716        manager.get("large2").unwrap();
717
718        // Cache should have evicted one item
719        assert!(manager.current_cache_size <= 100);
720    }
721
722    #[test]
723    fn test_storage_manager_stats() {
724        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
725        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
726            .expect("Failed to create storage path");
727        let mut manager = StorageManager::new(Box::new(storage));
728
729        let data = b"test data";
730        manager.put("stats_test", data).unwrap();
731        manager.get("stats_test").unwrap();
732        manager.delete("stats_test").unwrap();
733
734        let stats = manager.stats();
735        assert_eq!(stats.puts, 1);
736        assert_eq!(stats.gets, 1);
737        assert_eq!(stats.deletes, 1);
738        assert_eq!(stats.bytes_written, data.len() as u64);
739        assert_eq!(stats.bytes_read, data.len() as u64);
740    }
741
742    #[test]
743    fn test_storage_manager_copy() {
744        let temp_dir = TempDir::new().expect("Failed to create temp directory for test");
745        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
746            .expect("Failed to create storage path");
747        let mut manager = StorageManager::new(Box::new(storage));
748
749        let data = b"copy test data";
750        manager.put("source", data).unwrap();
751        manager.copy("source", "destination").unwrap();
752
753        let copied = manager.get("destination").unwrap();
754        assert_eq!(copied, data);
755    }
756}