1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::path::PathBuf;
4use thiserror::Error;
5use tokio::fs;
6use tokio::io::AsyncWriteExt;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct StorageResult {
11 pub key: String,
12 pub size_bytes: u64,
13 pub timestamp: DateTime<Utc>,
14 pub backend_name: String,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct StorageEntry {
20 pub key: String,
21 pub size_bytes: u64,
22 pub last_modified: DateTime<Utc>,
23}
24
25#[derive(Debug, Error)]
27pub enum StorageError {
28 #[error("I/O error: {0}")]
29 IoError(#[from] std::io::Error),
30
31 #[error("S3 error: {0}")]
32 S3Error(String),
33
34 #[error("Not found: {0}")]
35 NotFound(String),
36
37 #[error("Permission denied: {0}")]
38 PermissionDenied(String),
39}
40
41#[async_trait::async_trait]
43pub trait StorageBackend: Send + Sync {
44 async fn store(&self, key: &str, data: &[u8]) -> Result<StorageResult, StorageError>;
46
47 async fn retrieve(&self, key: &str) -> Result<Vec<u8>, StorageError>;
49
50 async fn delete(&self, key: &str) -> Result<(), StorageError>;
52
53 async fn list(&self, prefix: &str) -> Result<Vec<StorageEntry>, StorageError>;
55
56 fn name(&self) -> &str;
58}
59
60pub struct LocalStorage {
62 base_path: PathBuf,
63}
64
65impl LocalStorage {
66 pub async fn new(base_path: PathBuf) -> Result<Self, StorageError> {
67 fs::create_dir_all(&base_path).await?;
69
70 Ok(Self { base_path })
71 }
72
73 fn get_full_path(&self, key: &str) -> PathBuf {
74 self.base_path.join(key)
75 }
76}
77
78#[async_trait::async_trait]
79impl StorageBackend for LocalStorage {
80 async fn store(&self, key: &str, data: &[u8]) -> Result<StorageResult, StorageError> {
81 let full_path = self.get_full_path(key);
82
83 if let Some(parent) = full_path.parent() {
85 fs::create_dir_all(parent).await?;
86 }
87
88 let temp_path = full_path.with_extension("tmp");
90
91 let mut file = fs::File::create(&temp_path).await?;
92 file.write_all(data).await?;
93 file.sync_all().await?;
94 drop(file);
95
96 fs::rename(&temp_path, &full_path).await?;
97
98 let metadata = fs::metadata(&full_path).await?;
99
100 Ok(StorageResult {
101 key: key.to_string(),
102 size_bytes: metadata.len(),
103 timestamp: Utc::now(),
104 backend_name: self.name().to_string(),
105 })
106 }
107
108 async fn retrieve(&self, key: &str) -> Result<Vec<u8>, StorageError> {
109 let full_path = self.get_full_path(key);
110
111 if !full_path.exists() {
112 return Err(StorageError::NotFound(key.to_string()));
113 }
114
115 let data = fs::read(&full_path).await?;
116 Ok(data)
117 }
118
119 async fn delete(&self, key: &str) -> Result<(), StorageError> {
120 let full_path = self.get_full_path(key);
121
122 if !full_path.exists() {
123 return Err(StorageError::NotFound(key.to_string()));
124 }
125
126 fs::remove_file(&full_path).await?;
127 Ok(())
128 }
129
130 async fn list(&self, prefix: &str) -> Result<Vec<StorageEntry>, StorageError> {
131 let prefix_path = self.base_path.join(prefix);
132
133 if !prefix_path.exists() {
134 return Ok(Vec::new());
135 }
136
137 let mut entries = Vec::new();
138 let mut read_dir = fs::read_dir(&prefix_path).await?;
139
140 while let Some(entry) = read_dir.next_entry().await? {
141 let metadata = entry.metadata().await?;
142
143 if metadata.is_file() {
144 let key = entry
145 .path()
146 .strip_prefix(&self.base_path)
147 .map_err(|e| {
148 StorageError::IoError(std::io::Error::new(
149 std::io::ErrorKind::Other,
150 e.to_string(),
151 ))
152 })?
153 .to_string_lossy()
154 .to_string();
155
156 let modified = metadata.modified()?;
157 let datetime: DateTime<Utc> = modified.into();
158
159 entries.push(StorageEntry {
160 key,
161 size_bytes: metadata.len(),
162 last_modified: datetime,
163 });
164 }
165 }
166
167 entries.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
168
169 Ok(entries)
170 }
171
172 fn name(&self) -> &str {
173 "local"
174 }
175}
176
177pub struct S3Storage {
179 bucket: Box<s3::Bucket>,
180 prefix: String,
181}
182
183impl S3Storage {
184 pub fn new(
185 bucket_name: String,
186 region: String,
187 prefix: String,
188 access_key: Option<String>,
189 secret_key: Option<String>,
190 ) -> Result<Self, StorageError> {
191 let region = region
192 .parse::<s3::Region>()
193 .map_err(|e| StorageError::S3Error(format!("Invalid region: {e}")))?;
194
195 let credentials = if let (Some(access), Some(secret)) = (access_key, secret_key) {
196 s3::creds::Credentials::new(Some(&access), Some(&secret), None, None, None)
197 .map_err(|e| StorageError::S3Error(format!("Invalid credentials: {e}")))?
198 } else {
199 s3::creds::Credentials::default().map_err(|e| {
200 StorageError::S3Error(format!("Failed to get default credentials: {e}"))
201 })?
202 };
203
204 let bucket = s3::Bucket::new(&bucket_name, region, credentials)
205 .map_err(|e| StorageError::S3Error(format!("Failed to create bucket: {e}")))?
206 .with_path_style();
207
208 Ok(Self { bucket, prefix })
209 }
210
211 fn get_s3_key(&self, key: &str) -> String {
212 if self.prefix.is_empty() {
213 key.to_string()
214 } else {
215 format!("{}/{}", self.prefix.trim_end_matches('/'), key)
216 }
217 }
218}
219
220#[async_trait::async_trait]
221impl StorageBackend for S3Storage {
222 async fn store(&self, key: &str, data: &[u8]) -> Result<StorageResult, StorageError> {
223 let s3_key = self.get_s3_key(key);
224
225 let response = self
226 .bucket
227 .put_object(&s3_key, data)
228 .await
229 .map_err(|e| StorageError::S3Error(format!("Failed to put object: {e}")))?;
230
231 if response.status_code() != 200 {
232 return Err(StorageError::S3Error(format!(
233 "S3 returned status code: {}",
234 response.status_code()
235 )));
236 }
237
238 Ok(StorageResult {
239 key: key.to_string(),
240 size_bytes: data.len() as u64,
241 timestamp: Utc::now(),
242 backend_name: self.name().to_string(),
243 })
244 }
245
246 async fn retrieve(&self, key: &str) -> Result<Vec<u8>, StorageError> {
247 let s3_key = self.get_s3_key(key);
248
249 let response = self
250 .bucket
251 .get_object(&s3_key)
252 .await
253 .map_err(|e| StorageError::S3Error(format!("Failed to get object: {e}")))?;
254
255 if response.status_code() == 404 {
256 return Err(StorageError::NotFound(key.to_string()));
257 }
258
259 if response.status_code() != 200 {
260 return Err(StorageError::S3Error(format!(
261 "S3 returned status code: {}",
262 response.status_code()
263 )));
264 }
265
266 Ok(response.bytes().to_vec())
267 }
268
269 async fn delete(&self, key: &str) -> Result<(), StorageError> {
270 let s3_key = self.get_s3_key(key);
271
272 let response = self
273 .bucket
274 .delete_object(&s3_key)
275 .await
276 .map_err(|e| StorageError::S3Error(format!("Failed to delete object: {e}")))?;
277
278 if response.status_code() == 404 {
279 return Err(StorageError::NotFound(key.to_string()));
280 }
281
282 if response.status_code() != 204 && response.status_code() != 200 {
283 return Err(StorageError::S3Error(format!(
284 "S3 returned status code: {}",
285 response.status_code()
286 )));
287 }
288
289 Ok(())
290 }
291
292 async fn list(&self, prefix: &str) -> Result<Vec<StorageEntry>, StorageError> {
293 let s3_prefix = self.get_s3_key(prefix);
294
295 let results = self
296 .bucket
297 .list(s3_prefix, None)
298 .await
299 .map_err(|e| StorageError::S3Error(format!("Failed to list objects: {e}")))?;
300
301 let mut entries = Vec::new();
302
303 for result in results {
304 for object in result.contents {
305 let key = if self.prefix.is_empty() {
307 object.key.clone()
308 } else {
309 object
310 .key
311 .strip_prefix(&format!("{}/", self.prefix.trim_end_matches('/')))
312 .unwrap_or(&object.key)
313 .to_string()
314 };
315
316 let last_modified = DateTime::parse_from_rfc3339(&object.last_modified)
317 .map(|dt| dt.with_timezone(&Utc))
318 .unwrap_or_else(|_| Utc::now());
319
320 entries.push(StorageEntry {
321 key,
322 size_bytes: object.size,
323 last_modified,
324 });
325 }
326 }
327
328 entries.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
329
330 Ok(entries)
331 }
332
333 fn name(&self) -> &str {
334 "s3"
335 }
336}
337
338pub struct StorageManager {
340 backends: Vec<Box<dyn StorageBackend + Send + Sync>>,
341}
342
343impl StorageManager {
344 pub fn new() -> Self {
345 Self {
346 backends: Vec::new(),
347 }
348 }
349
350 pub fn add_backend(&mut self, backend: Box<dyn StorageBackend + Send + Sync>) {
352 self.backends.push(backend);
353 }
354
355 fn generate_snapshot_key(camera: &str, format: &str) -> String {
357 let now = Utc::now();
358 let date = now.format("%Y-%m-%d");
359 let time = now.format("%H%M%S");
360 format!("{camera}/{date}/snap_{time}.{format}")
361 }
362
363 fn generate_clip_key(camera: &str, format: &str) -> String {
365 let now = Utc::now();
366 let date = now.format("%Y-%m-%d");
367 let time = now.format("%H%M%S");
368 format!("{camera}/{date}/clip_{time}.{format}")
369 }
370
371 pub async fn store_snapshot(
373 &self,
374 camera: &str,
375 data: &[u8],
376 format: &str,
377 ) -> Result<Vec<StorageResult>, StorageError> {
378 let key = Self::generate_snapshot_key(camera, format);
379 let mut results = Vec::new();
380
381 for backend in &self.backends {
382 let result = backend.store(&key, data).await?;
383 results.push(result);
384 }
385
386 Ok(results)
387 }
388
389 pub async fn store_clip(
391 &self,
392 camera: &str,
393 data: &[u8],
394 format: &str,
395 ) -> Result<Vec<StorageResult>, StorageError> {
396 let key = Self::generate_clip_key(camera, format);
397 let mut results = Vec::new();
398
399 for backend in &self.backends {
400 let result = backend.store(&key, data).await?;
401 results.push(result);
402 }
403
404 Ok(results)
405 }
406
407 pub fn backend_count(&self) -> usize {
409 self.backends.len()
410 }
411}
412
413impl Default for StorageManager {
414 fn default() -> Self {
415 Self::new()
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422 use tempfile::TempDir;
423
424 #[tokio::test]
425 async fn test_local_storage_roundtrip() {
426 let temp_dir = TempDir::new().unwrap();
427 let storage = LocalStorage::new(temp_dir.path().to_path_buf())
428 .await
429 .unwrap();
430
431 let key = "test/file.txt";
432 let data = b"Hello, World!";
433
434 let result = storage.store(key, data).await.unwrap();
436 assert_eq!(result.key, key);
437 assert_eq!(result.size_bytes, data.len() as u64);
438 assert_eq!(result.backend_name, "local");
439
440 let retrieved = storage.retrieve(key).await.unwrap();
442 assert_eq!(retrieved, data);
443
444 let entries = storage.list("test").await.unwrap();
446 assert_eq!(entries.len(), 1);
447 assert_eq!(entries[0].size_bytes, data.len() as u64);
448
449 storage.delete(key).await.unwrap();
451
452 let result = storage.retrieve(key).await;
454 assert!(matches!(result, Err(StorageError::NotFound(_))));
455 }
456
457 #[tokio::test]
458 async fn test_local_storage_nested_paths() {
459 let temp_dir = TempDir::new().unwrap();
460 let storage = LocalStorage::new(temp_dir.path().to_path_buf())
461 .await
462 .unwrap();
463
464 let key = "camera1/2024-01-15/snap_143022.jpg";
465 let data = b"image data";
466
467 let result = storage.store(key, data).await.unwrap();
468 assert_eq!(result.key, key);
469
470 let retrieved = storage.retrieve(key).await.unwrap();
471 assert_eq!(retrieved, data);
472 }
473
474 #[tokio::test]
475 async fn test_local_storage_atomic_write() {
476 let temp_dir = TempDir::new().unwrap();
477 let storage = LocalStorage::new(temp_dir.path().to_path_buf())
478 .await
479 .unwrap();
480
481 let key = "test/atomic.txt";
482 let data = b"atomic data";
483
484 storage.store(key, data).await.unwrap();
485
486 let full_path = storage.get_full_path(key);
488 let temp_path = full_path.with_extension("tmp");
489 assert!(!temp_path.exists());
490 assert!(full_path.exists());
491 }
492
493 #[tokio::test]
494 async fn test_storage_manager_snapshot_key_generation() {
495 let key = StorageManager::generate_snapshot_key("front-door", "jpg");
496 assert!(key.contains("front-door"));
497 assert!(key.contains("snap_"));
498 assert!(key.ends_with(".jpg"));
499
500 let parts: Vec<&str> = key.split('/').collect();
502 assert_eq!(parts.len(), 3);
503 assert_eq!(parts[0], "front-door");
504 assert_eq!(parts[1].len(), 10); }
506
507 #[tokio::test]
508 async fn test_storage_manager_clip_key_generation() {
509 let key = StorageManager::generate_clip_key("back-yard", "mp4");
510 assert!(key.contains("back-yard"));
511 assert!(key.contains("clip_"));
512 assert!(key.ends_with(".mp4"));
513 }
514
515 #[tokio::test]
516 async fn test_storage_manager_multiple_backends() {
517 let temp_dir1 = TempDir::new().unwrap();
518 let temp_dir2 = TempDir::new().unwrap();
519
520 let storage1 = LocalStorage::new(temp_dir1.path().to_path_buf())
521 .await
522 .unwrap();
523 let storage2 = LocalStorage::new(temp_dir2.path().to_path_buf())
524 .await
525 .unwrap();
526
527 let mut manager = StorageManager::new();
528 manager.add_backend(Box::new(storage1));
529 manager.add_backend(Box::new(storage2));
530
531 assert_eq!(manager.backend_count(), 2);
532
533 let data = b"test snapshot data";
534 let results = manager
535 .store_snapshot("test-camera", data, "jpg")
536 .await
537 .unwrap();
538
539 assert_eq!(results.len(), 2);
540 assert_eq!(results[0].backend_name, "local");
541 assert_eq!(results[1].backend_name, "local");
542 }
543
544 #[tokio::test]
545 async fn test_local_storage_list_empty() {
546 let temp_dir = TempDir::new().unwrap();
547 let storage = LocalStorage::new(temp_dir.path().to_path_buf())
548 .await
549 .unwrap();
550
551 let entries = storage.list("nonexistent").await.unwrap();
552 assert_eq!(entries.len(), 0);
553 }
554
555 #[tokio::test]
556 async fn test_local_storage_delete_not_found() {
557 let temp_dir = TempDir::new().unwrap();
558 let storage = LocalStorage::new(temp_dir.path().to_path_buf())
559 .await
560 .unwrap();
561
562 let result = storage.delete("nonexistent/file.txt").await;
563 assert!(matches!(result, Err(StorageError::NotFound(_))));
564 }
565}