Skip to main content

camgrab_core/storage/
mod.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::path::PathBuf;
4use thiserror::Error;
5use tokio::fs;
6use tokio::io::AsyncWriteExt;
7
8/// Result of a storage operation
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct StorageResult {
11    pub key: String,
12    pub size_bytes: u64,
13    pub timestamp: DateTime<Utc>,
14    pub backend_name: String,
15}
16
17/// Entry in storage list
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct StorageEntry {
20    pub key: String,
21    pub size_bytes: u64,
22    pub last_modified: DateTime<Utc>,
23}
24
25/// Errors that can occur during storage operations
26#[derive(Debug, Error)]
27pub enum StorageError {
28    #[error("I/O error: {0}")]
29    IoError(#[from] std::io::Error),
30
31    #[error("S3 error: {0}")]
32    S3Error(String),
33
34    #[error("Not found: {0}")]
35    NotFound(String),
36
37    #[error("Permission denied: {0}")]
38    PermissionDenied(String),
39}
40
41/// Trait for storage backends
42#[async_trait::async_trait]
43pub trait StorageBackend: Send + Sync {
44    /// Store data with the given key
45    async fn store(&self, key: &str, data: &[u8]) -> Result<StorageResult, StorageError>;
46
47    /// Retrieve data by key
48    async fn retrieve(&self, key: &str) -> Result<Vec<u8>, StorageError>;
49
50    /// Delete data by key
51    async fn delete(&self, key: &str) -> Result<(), StorageError>;
52
53    /// List all entries with the given prefix
54    async fn list(&self, prefix: &str) -> Result<Vec<StorageEntry>, StorageError>;
55
56    /// Get the name of this storage backend
57    fn name(&self) -> &str;
58}
59
60/// Local filesystem storage backend
61pub struct LocalStorage {
62    base_path: PathBuf,
63}
64
65impl LocalStorage {
66    pub async fn new(base_path: PathBuf) -> Result<Self, StorageError> {
67        // Create base directory if it doesn't exist
68        fs::create_dir_all(&base_path).await?;
69
70        Ok(Self { base_path })
71    }
72
73    fn get_full_path(&self, key: &str) -> PathBuf {
74        self.base_path.join(key)
75    }
76}
77
78#[async_trait::async_trait]
79impl StorageBackend for LocalStorage {
80    async fn store(&self, key: &str, data: &[u8]) -> Result<StorageResult, StorageError> {
81        let full_path = self.get_full_path(key);
82
83        // Create parent directories if needed
84        if let Some(parent) = full_path.parent() {
85            fs::create_dir_all(parent).await?;
86        }
87
88        // Atomic write: write to temp file, then rename
89        let temp_path = full_path.with_extension("tmp");
90
91        let mut file = fs::File::create(&temp_path).await?;
92        file.write_all(data).await?;
93        file.sync_all().await?;
94        drop(file);
95
96        fs::rename(&temp_path, &full_path).await?;
97
98        let metadata = fs::metadata(&full_path).await?;
99
100        Ok(StorageResult {
101            key: key.to_string(),
102            size_bytes: metadata.len(),
103            timestamp: Utc::now(),
104            backend_name: self.name().to_string(),
105        })
106    }
107
108    async fn retrieve(&self, key: &str) -> Result<Vec<u8>, StorageError> {
109        let full_path = self.get_full_path(key);
110
111        if !full_path.exists() {
112            return Err(StorageError::NotFound(key.to_string()));
113        }
114
115        let data = fs::read(&full_path).await?;
116        Ok(data)
117    }
118
119    async fn delete(&self, key: &str) -> Result<(), StorageError> {
120        let full_path = self.get_full_path(key);
121
122        if !full_path.exists() {
123            return Err(StorageError::NotFound(key.to_string()));
124        }
125
126        fs::remove_file(&full_path).await?;
127        Ok(())
128    }
129
130    async fn list(&self, prefix: &str) -> Result<Vec<StorageEntry>, StorageError> {
131        let prefix_path = self.base_path.join(prefix);
132
133        if !prefix_path.exists() {
134            return Ok(Vec::new());
135        }
136
137        let mut entries = Vec::new();
138        let mut read_dir = fs::read_dir(&prefix_path).await?;
139
140        while let Some(entry) = read_dir.next_entry().await? {
141            let metadata = entry.metadata().await?;
142
143            if metadata.is_file() {
144                let key = entry
145                    .path()
146                    .strip_prefix(&self.base_path)
147                    .map_err(|e| {
148                        StorageError::IoError(std::io::Error::new(
149                            std::io::ErrorKind::Other,
150                            e.to_string(),
151                        ))
152                    })?
153                    .to_string_lossy()
154                    .to_string();
155
156                let modified = metadata.modified()?;
157                let datetime: DateTime<Utc> = modified.into();
158
159                entries.push(StorageEntry {
160                    key,
161                    size_bytes: metadata.len(),
162                    last_modified: datetime,
163                });
164            }
165        }
166
167        entries.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
168
169        Ok(entries)
170    }
171
172    fn name(&self) -> &str {
173        "local"
174    }
175}
176
177/// S3 storage backend
178pub struct S3Storage {
179    bucket: Box<s3::Bucket>,
180    prefix: String,
181}
182
183impl S3Storage {
184    pub fn new(
185        bucket_name: String,
186        region: String,
187        prefix: String,
188        access_key: Option<String>,
189        secret_key: Option<String>,
190    ) -> Result<Self, StorageError> {
191        let region = region
192            .parse::<s3::Region>()
193            .map_err(|e| StorageError::S3Error(format!("Invalid region: {e}")))?;
194
195        let credentials = if let (Some(access), Some(secret)) = (access_key, secret_key) {
196            s3::creds::Credentials::new(Some(&access), Some(&secret), None, None, None)
197                .map_err(|e| StorageError::S3Error(format!("Invalid credentials: {e}")))?
198        } else {
199            s3::creds::Credentials::default().map_err(|e| {
200                StorageError::S3Error(format!("Failed to get default credentials: {e}"))
201            })?
202        };
203
204        let bucket = s3::Bucket::new(&bucket_name, region, credentials)
205            .map_err(|e| StorageError::S3Error(format!("Failed to create bucket: {e}")))?
206            .with_path_style();
207
208        Ok(Self { bucket, prefix })
209    }
210
211    fn get_s3_key(&self, key: &str) -> String {
212        if self.prefix.is_empty() {
213            key.to_string()
214        } else {
215            format!("{}/{}", self.prefix.trim_end_matches('/'), key)
216        }
217    }
218}
219
220#[async_trait::async_trait]
221impl StorageBackend for S3Storage {
222    async fn store(&self, key: &str, data: &[u8]) -> Result<StorageResult, StorageError> {
223        let s3_key = self.get_s3_key(key);
224
225        let response = self
226            .bucket
227            .put_object(&s3_key, data)
228            .await
229            .map_err(|e| StorageError::S3Error(format!("Failed to put object: {e}")))?;
230
231        if response.status_code() != 200 {
232            return Err(StorageError::S3Error(format!(
233                "S3 returned status code: {}",
234                response.status_code()
235            )));
236        }
237
238        Ok(StorageResult {
239            key: key.to_string(),
240            size_bytes: data.len() as u64,
241            timestamp: Utc::now(),
242            backend_name: self.name().to_string(),
243        })
244    }
245
246    async fn retrieve(&self, key: &str) -> Result<Vec<u8>, StorageError> {
247        let s3_key = self.get_s3_key(key);
248
249        let response = self
250            .bucket
251            .get_object(&s3_key)
252            .await
253            .map_err(|e| StorageError::S3Error(format!("Failed to get object: {e}")))?;
254
255        if response.status_code() == 404 {
256            return Err(StorageError::NotFound(key.to_string()));
257        }
258
259        if response.status_code() != 200 {
260            return Err(StorageError::S3Error(format!(
261                "S3 returned status code: {}",
262                response.status_code()
263            )));
264        }
265
266        Ok(response.bytes().to_vec())
267    }
268
269    async fn delete(&self, key: &str) -> Result<(), StorageError> {
270        let s3_key = self.get_s3_key(key);
271
272        let response = self
273            .bucket
274            .delete_object(&s3_key)
275            .await
276            .map_err(|e| StorageError::S3Error(format!("Failed to delete object: {e}")))?;
277
278        if response.status_code() == 404 {
279            return Err(StorageError::NotFound(key.to_string()));
280        }
281
282        if response.status_code() != 204 && response.status_code() != 200 {
283            return Err(StorageError::S3Error(format!(
284                "S3 returned status code: {}",
285                response.status_code()
286            )));
287        }
288
289        Ok(())
290    }
291
292    async fn list(&self, prefix: &str) -> Result<Vec<StorageEntry>, StorageError> {
293        let s3_prefix = self.get_s3_key(prefix);
294
295        let results = self
296            .bucket
297            .list(s3_prefix, None)
298            .await
299            .map_err(|e| StorageError::S3Error(format!("Failed to list objects: {e}")))?;
300
301        let mut entries = Vec::new();
302
303        for result in results {
304            for object in result.contents {
305                // Strip the prefix from the key
306                let key = if self.prefix.is_empty() {
307                    object.key.clone()
308                } else {
309                    object
310                        .key
311                        .strip_prefix(&format!("{}/", self.prefix.trim_end_matches('/')))
312                        .unwrap_or(&object.key)
313                        .to_string()
314                };
315
316                let last_modified = DateTime::parse_from_rfc3339(&object.last_modified)
317                    .map(|dt| dt.with_timezone(&Utc))
318                    .unwrap_or_else(|_| Utc::now());
319
320                entries.push(StorageEntry {
321                    key,
322                    size_bytes: object.size,
323                    last_modified,
324                });
325            }
326        }
327
328        entries.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
329
330        Ok(entries)
331    }
332
333    fn name(&self) -> &str {
334        "s3"
335    }
336}
337
338/// Manager that coordinates multiple storage backends
339pub struct StorageManager {
340    backends: Vec<Box<dyn StorageBackend + Send + Sync>>,
341}
342
343impl StorageManager {
344    pub fn new() -> Self {
345        Self {
346            backends: Vec::new(),
347        }
348    }
349
350    /// Add a storage backend
351    pub fn add_backend(&mut self, backend: Box<dyn StorageBackend + Send + Sync>) {
352        self.backends.push(backend);
353    }
354
355    /// Generate a key for a snapshot with timestamp
356    fn generate_snapshot_key(camera: &str, format: &str) -> String {
357        let now = Utc::now();
358        let date = now.format("%Y-%m-%d");
359        let time = now.format("%H%M%S");
360        format!("{camera}/{date}/snap_{time}.{format}")
361    }
362
363    /// Generate a key for a clip with timestamp
364    fn generate_clip_key(camera: &str, format: &str) -> String {
365        let now = Utc::now();
366        let date = now.format("%Y-%m-%d");
367        let time = now.format("%H%M%S");
368        format!("{camera}/{date}/clip_{time}.{format}")
369    }
370
371    /// Store a snapshot to all backends
372    pub async fn store_snapshot(
373        &self,
374        camera: &str,
375        data: &[u8],
376        format: &str,
377    ) -> Result<Vec<StorageResult>, StorageError> {
378        let key = Self::generate_snapshot_key(camera, format);
379        let mut results = Vec::new();
380
381        for backend in &self.backends {
382            let result = backend.store(&key, data).await?;
383            results.push(result);
384        }
385
386        Ok(results)
387    }
388
389    /// Store a clip to all backends
390    pub async fn store_clip(
391        &self,
392        camera: &str,
393        data: &[u8],
394        format: &str,
395    ) -> Result<Vec<StorageResult>, StorageError> {
396        let key = Self::generate_clip_key(camera, format);
397        let mut results = Vec::new();
398
399        for backend in &self.backends {
400            let result = backend.store(&key, data).await?;
401            results.push(result);
402        }
403
404        Ok(results)
405    }
406
407    /// Get number of backends
408    pub fn backend_count(&self) -> usize {
409        self.backends.len()
410    }
411}
412
413impl Default for StorageManager {
414    fn default() -> Self {
415        Self::new()
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422    use tempfile::TempDir;
423
424    #[tokio::test]
425    async fn test_local_storage_roundtrip() {
426        let temp_dir = TempDir::new().unwrap();
427        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
428            .await
429            .unwrap();
430
431        let key = "test/file.txt";
432        let data = b"Hello, World!";
433
434        // Store
435        let result = storage.store(key, data).await.unwrap();
436        assert_eq!(result.key, key);
437        assert_eq!(result.size_bytes, data.len() as u64);
438        assert_eq!(result.backend_name, "local");
439
440        // Retrieve
441        let retrieved = storage.retrieve(key).await.unwrap();
442        assert_eq!(retrieved, data);
443
444        // List
445        let entries = storage.list("test").await.unwrap();
446        assert_eq!(entries.len(), 1);
447        assert_eq!(entries[0].size_bytes, data.len() as u64);
448
449        // Delete
450        storage.delete(key).await.unwrap();
451
452        // Verify deleted
453        let result = storage.retrieve(key).await;
454        assert!(matches!(result, Err(StorageError::NotFound(_))));
455    }
456
457    #[tokio::test]
458    async fn test_local_storage_nested_paths() {
459        let temp_dir = TempDir::new().unwrap();
460        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
461            .await
462            .unwrap();
463
464        let key = "camera1/2024-01-15/snap_143022.jpg";
465        let data = b"image data";
466
467        let result = storage.store(key, data).await.unwrap();
468        assert_eq!(result.key, key);
469
470        let retrieved = storage.retrieve(key).await.unwrap();
471        assert_eq!(retrieved, data);
472    }
473
474    #[tokio::test]
475    async fn test_local_storage_atomic_write() {
476        let temp_dir = TempDir::new().unwrap();
477        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
478            .await
479            .unwrap();
480
481        let key = "test/atomic.txt";
482        let data = b"atomic data";
483
484        storage.store(key, data).await.unwrap();
485
486        // Verify no .tmp files remain
487        let full_path = storage.get_full_path(key);
488        let temp_path = full_path.with_extension("tmp");
489        assert!(!temp_path.exists());
490        assert!(full_path.exists());
491    }
492
493    #[tokio::test]
494    async fn test_storage_manager_snapshot_key_generation() {
495        let key = StorageManager::generate_snapshot_key("front-door", "jpg");
496        assert!(key.contains("front-door"));
497        assert!(key.contains("snap_"));
498        assert!(key.ends_with(".jpg"));
499
500        // Check date format YYYY-MM-DD
501        let parts: Vec<&str> = key.split('/').collect();
502        assert_eq!(parts.len(), 3);
503        assert_eq!(parts[0], "front-door");
504        assert_eq!(parts[1].len(), 10); // YYYY-MM-DD
505    }
506
507    #[tokio::test]
508    async fn test_storage_manager_clip_key_generation() {
509        let key = StorageManager::generate_clip_key("back-yard", "mp4");
510        assert!(key.contains("back-yard"));
511        assert!(key.contains("clip_"));
512        assert!(key.ends_with(".mp4"));
513    }
514
515    #[tokio::test]
516    async fn test_storage_manager_multiple_backends() {
517        let temp_dir1 = TempDir::new().unwrap();
518        let temp_dir2 = TempDir::new().unwrap();
519
520        let storage1 = LocalStorage::new(temp_dir1.path().to_path_buf())
521            .await
522            .unwrap();
523        let storage2 = LocalStorage::new(temp_dir2.path().to_path_buf())
524            .await
525            .unwrap();
526
527        let mut manager = StorageManager::new();
528        manager.add_backend(Box::new(storage1));
529        manager.add_backend(Box::new(storage2));
530
531        assert_eq!(manager.backend_count(), 2);
532
533        let data = b"test snapshot data";
534        let results = manager
535            .store_snapshot("test-camera", data, "jpg")
536            .await
537            .unwrap();
538
539        assert_eq!(results.len(), 2);
540        assert_eq!(results[0].backend_name, "local");
541        assert_eq!(results[1].backend_name, "local");
542    }
543
544    #[tokio::test]
545    async fn test_local_storage_list_empty() {
546        let temp_dir = TempDir::new().unwrap();
547        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
548            .await
549            .unwrap();
550
551        let entries = storage.list("nonexistent").await.unwrap();
552        assert_eq!(entries.len(), 0);
553    }
554
555    #[tokio::test]
556    async fn test_local_storage_delete_not_found() {
557        let temp_dir = TempDir::new().unwrap();
558        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
559            .await
560            .unwrap();
561
562        let result = storage.delete("nonexistent/file.txt").await;
563        assert!(matches!(result, Err(StorageError::NotFound(_))));
564    }
565}