Skip to main content

aura_effects/
storage.rs

1//! Layer 3: Storage Effect Handlers - Production Only
2//!
3//! Stateless single-party implementations of StorageEffects from aura-core (Layer 1).
4//! These handlers provide production storage operations delegating to filesystem or cloud APIs.
5//!
6//! **Layer Constraint**: NO mock handlers - those belong in aura-testkit (Layer 8).
7//! This module contains only production-grade stateless handlers.
8
9use async_trait::async_trait;
10use aura_core::effects::{StorageCoreEffects, StorageError, StorageExtendedEffects, StorageStats};
11use std::path::PathBuf;
12use std::sync::atomic::{AtomicU64, Ordering};
13use tokio::fs;
14use tokio::fs::DirEntry;
15
16/// Filesystem-based storage handler for production use
17///
18/// This handler stores data as files on the local filesystem.
19/// It is stateless and delegates all storage operations to the filesystem.
20#[derive(Debug, Clone)]
21pub struct FilesystemStorageHandler {
22    /// Base directory for storage files
23    base_path: PathBuf,
24}
25
26static NEXT_TEMP_WRITE_ID: AtomicU64 = AtomicU64::new(0);
27
28impl FilesystemStorageHandler {
29    /// Create a new filesystem storage handler
30    pub fn new(base_path: PathBuf) -> Self {
31        Self { base_path }
32    }
33
34    /// Alias for clarity; avoids relying on `new` naming in higher layers.
35    pub fn from_path(base_path: PathBuf) -> Self {
36        Self { base_path }
37    }
38
39    /// Create a new filesystem storage handler with default path
40    pub fn with_default_path() -> Self {
41        Self::new(PathBuf::from("./storage"))
42    }
43}
44
45#[async_trait]
46impl StorageCoreEffects for FilesystemStorageHandler {
47    async fn store(&self, key: &str, value: Vec<u8>) -> Result<(), StorageError> {
48        if key.is_empty() {
49            return Err(StorageError::InvalidKey {
50                reason: "Key cannot be empty".to_string(),
51            });
52        }
53
54        let file_path = self.base_path.join(format!("{key}.dat"));
55        let temp_file_path = self.base_path.join(format!(
56            "{key}.dat.tmp-{}",
57            NEXT_TEMP_WRITE_ID.fetch_add(1, Ordering::Relaxed)
58        ));
59        if let Some(parent) = file_path.parent() {
60            fs::create_dir_all(parent).await.map_err(|e| {
61                StorageError::WriteFailed(format!("Failed to create directory: {e}"))
62            })?;
63        }
64
65        // Write via a sibling temp file so concurrent readers never observe
66        // truncated ciphertext or partially-updated journal blobs.
67        fs::write(&temp_file_path, value)
68            .await
69            .map_err(|e| StorageError::WriteFailed(format!("Failed to write temp file: {e}")))?;
70
71        if let Err(err) = fs::rename(&temp_file_path, &file_path).await {
72            if file_path.exists() {
73                fs::remove_file(&file_path).await.map_err(|remove_err| {
74                    StorageError::WriteFailed(format!(
75                        "Failed to replace existing file after rename error ({err}): {remove_err}"
76                    ))
77                })?;
78                fs::rename(&temp_file_path, &file_path)
79                    .await
80                    .map_err(|rename_err| {
81                        StorageError::WriteFailed(format!(
82                        "Failed to finalize atomic write after removing existing file: {rename_err}"
83                    ))
84                    })?;
85            } else {
86                return Err(StorageError::WriteFailed(format!(
87                    "Failed to rename temp file into place: {err}"
88                )));
89            }
90        }
91
92        Ok(())
93    }
94
95    async fn retrieve(&self, key: &str) -> Result<Option<Vec<u8>>, StorageError> {
96        let file_path = self.base_path.join(format!("{key}.dat"));
97
98        if !file_path.exists() {
99            return Ok(None);
100        }
101
102        let data = fs::read(&file_path)
103            .await
104            .map_err(|e| StorageError::ReadFailed(format!("Failed to read file: {e}")))?;
105
106        Ok(Some(data))
107    }
108
109    async fn remove(&self, key: &str) -> Result<bool, StorageError> {
110        let file_path = self.base_path.join(format!("{key}.dat"));
111
112        if !file_path.exists() {
113            return Ok(false);
114        }
115
116        fs::remove_file(&file_path)
117            .await
118            .map_err(|e| StorageError::DeleteFailed(format!("Failed to remove file: {e}")))?;
119
120        Ok(true)
121    }
122
123    async fn list_keys(&self, prefix: Option<&str>) -> Result<Vec<String>, StorageError> {
124        // Keys may contain path separators (e.g. `journal/facts/...`), so we must
125        // traverse the directory tree recursively and strip the `.dat` suffix
126        // from persisted filenames.
127        let mut keys = Vec::new();
128        let mut stack: Vec<PathBuf> = vec![self.base_path.clone()];
129
130        while let Some(dir) = stack.pop() {
131            let mut entries = match fs::read_dir(&dir).await {
132                Ok(e) => e,
133                Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
134                Err(e) => {
135                    return Err(StorageError::ReadFailed(format!(
136                        "Failed to read directory: {e}"
137                    )))
138                }
139            };
140
141            while let Some(entry) = entries.next_entry().await.map_err(|e| {
142                StorageError::ReadFailed(format!("Failed to read directory entry: {e}"))
143            })? {
144                Self::visit_entry_for_keys(&self.base_path, entry, prefix, &mut stack, &mut keys)
145                    .await?;
146            }
147        }
148
149        keys.sort();
150        Ok(keys)
151    }
152}
153
154#[async_trait]
155impl StorageExtendedEffects for FilesystemStorageHandler {
156    async fn exists(&self, key: &str) -> Result<bool, StorageError> {
157        let file_path = self.base_path.join(format!("{key}.dat"));
158        Ok(file_path.exists())
159    }
160
161    async fn store_batch(
162        &self,
163        pairs: std::collections::HashMap<String, Vec<u8>>,
164    ) -> Result<(), StorageError> {
165        for (k, v) in pairs {
166            self.store(&k, v).await?;
167        }
168        Ok(())
169    }
170
171    async fn retrieve_batch(
172        &self,
173        keys: &[String],
174    ) -> Result<std::collections::HashMap<String, Vec<u8>>, StorageError> {
175        let mut out = std::collections::HashMap::new();
176        for key in keys {
177            if let Some(val) = self.retrieve(key).await? {
178                out.insert(key.clone(), val);
179            }
180        }
181        Ok(out)
182    }
183
184    async fn clear_all(&self) -> Result<(), StorageError> {
185        match fs::remove_dir_all(&self.base_path).await {
186            Ok(()) => {}
187            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
188            Err(e) => {
189                return Err(StorageError::DeleteFailed(format!(
190                    "Failed to remove storage directory: {e}"
191                )))
192            }
193        }
194
195        fs::create_dir_all(&self.base_path).await.map_err(|e| {
196            StorageError::WriteFailed(format!("Failed to recreate storage directory: {e}"))
197        })?;
198        Ok(())
199    }
200
201    async fn stats(&self) -> Result<StorageStats, StorageError> {
202        let mut key_count: u64 = 0;
203        let mut total_size: u64 = 0;
204
205        let mut stack: Vec<PathBuf> = vec![self.base_path.clone()];
206        while let Some(dir) = stack.pop() {
207            let mut entries = match fs::read_dir(&dir).await {
208                Ok(e) => e,
209                Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
210                Err(e) => {
211                    return Err(StorageError::ReadFailed(format!(
212                        "Failed to read directory: {e}"
213                    )))
214                }
215            };
216
217            while let Ok(Some(entry)) = entries.next_entry().await {
218                let file_type = match entry.file_type().await {
219                    Ok(ft) => ft,
220                    Err(_) => continue,
221                };
222
223                if file_type.is_dir() {
224                    stack.push(entry.path());
225                    continue;
226                }
227
228                if !file_type.is_file() {
229                    continue;
230                }
231
232                let path = entry.path();
233                if path.extension().and_then(|e| e.to_str()) != Some("dat") {
234                    continue;
235                }
236
237                key_count += 1;
238                if let Ok(metadata) = entry.metadata().await {
239                    total_size = total_size.saturating_add(metadata.len());
240                }
241            }
242        }
243
244        Ok(StorageStats {
245            key_count,
246            total_size,
247            available_space: None,
248            backend_type: "filesystem".to_string(),
249        })
250    }
251}
252
253impl FilesystemStorageHandler {
254    async fn visit_entry_for_keys(
255        base: &PathBuf,
256        entry: DirEntry,
257        prefix: Option<&str>,
258        stack: &mut Vec<PathBuf>,
259        keys: &mut Vec<String>,
260    ) -> Result<(), StorageError> {
261        let file_type = entry.file_type().await.map_err(|e| {
262            StorageError::ReadFailed(format!("Failed to stat directory entry: {e}"))
263        })?;
264        let path = entry.path();
265
266        if file_type.is_dir() {
267            stack.push(path);
268            return Ok(());
269        }
270        if !file_type.is_file() {
271            return Ok(());
272        }
273
274        if path.extension().and_then(|e| e.to_str()) != Some("dat") {
275            return Ok(());
276        }
277
278        let rel = path.strip_prefix(base).map_err(|e| {
279            StorageError::ReadFailed(format!("Failed to compute relative key path: {e}"))
280        })?;
281        let rel = rel.with_extension("");
282        let mut key = rel.to_string_lossy().to_string();
283        if std::path::MAIN_SEPARATOR != '/' {
284            key = key.replace(std::path::MAIN_SEPARATOR, "/");
285        }
286
287        if let Some(prefix) = prefix {
288            if key.starts_with(prefix) {
289                keys.push(key);
290            }
291        } else {
292            keys.push(key);
293        }
294
295        Ok(())
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use tempfile::TempDir;
303
304    #[tokio::test]
305    async fn test_filesystem_storage_handler() {
306        let temp_dir = TempDir::new().unwrap();
307        let handler = FilesystemStorageHandler::new(temp_dir.path().to_path_buf());
308
309        // Test store and retrieve
310        let key = "test_key";
311        let value = b"test_value".to_vec();
312
313        handler.store(key, value.clone()).await.unwrap();
314        let retrieved = handler.retrieve(key).await.unwrap();
315        assert_eq!(retrieved, Some(value));
316
317        // Test exists
318        assert!(handler.exists(key).await.unwrap());
319
320        // Test remove
321        assert!(handler.remove(key).await.unwrap());
322        assert!(!handler.exists(key).await.unwrap());
323    }
324
325    #[tokio::test]
326    async fn test_delete_and_retrieve() {
327        let temp_dir = TempDir::new().unwrap();
328        let handler = FilesystemStorageHandler::new(temp_dir.path().to_path_buf());
329
330        let key = "test_key";
331        let data = b"test_data".to_vec();
332
333        // Store data
334        handler.store(key, data.clone()).await.unwrap();
335
336        // Verify it exists
337        let retrieved = handler.retrieve(key).await.unwrap();
338        assert_eq!(retrieved, Some(data));
339
340        // Delete it
341        let was_deleted = handler.remove(key).await.unwrap();
342        assert!(was_deleted);
343
344        // Verify it's gone
345        let retrieved_after = handler.retrieve(key).await.unwrap();
346        assert_eq!(retrieved_after, None);
347    }
348}