1use async_trait::async_trait;
10use aura_core::effects::{StorageCoreEffects, StorageError, StorageExtendedEffects, StorageStats};
11use std::path::PathBuf;
12use std::sync::atomic::{AtomicU64, Ordering};
13use tokio::fs;
14use tokio::fs::DirEntry;
15
16#[derive(Debug, Clone)]
21pub struct FilesystemStorageHandler {
22 base_path: PathBuf,
24}
25
26static NEXT_TEMP_WRITE_ID: AtomicU64 = AtomicU64::new(0);
27
28impl FilesystemStorageHandler {
29 pub fn new(base_path: PathBuf) -> Self {
31 Self { base_path }
32 }
33
34 pub fn from_path(base_path: PathBuf) -> Self {
36 Self { base_path }
37 }
38
39 pub fn with_default_path() -> Self {
41 Self::new(PathBuf::from("./storage"))
42 }
43}
44
45#[async_trait]
46impl StorageCoreEffects for FilesystemStorageHandler {
47 async fn store(&self, key: &str, value: Vec<u8>) -> Result<(), StorageError> {
48 if key.is_empty() {
49 return Err(StorageError::InvalidKey {
50 reason: "Key cannot be empty".to_string(),
51 });
52 }
53
54 let file_path = self.base_path.join(format!("{key}.dat"));
55 let temp_file_path = self.base_path.join(format!(
56 "{key}.dat.tmp-{}",
57 NEXT_TEMP_WRITE_ID.fetch_add(1, Ordering::Relaxed)
58 ));
59 if let Some(parent) = file_path.parent() {
60 fs::create_dir_all(parent).await.map_err(|e| {
61 StorageError::WriteFailed(format!("Failed to create directory: {e}"))
62 })?;
63 }
64
65 fs::write(&temp_file_path, value)
68 .await
69 .map_err(|e| StorageError::WriteFailed(format!("Failed to write temp file: {e}")))?;
70
71 if let Err(err) = fs::rename(&temp_file_path, &file_path).await {
72 if file_path.exists() {
73 fs::remove_file(&file_path).await.map_err(|remove_err| {
74 StorageError::WriteFailed(format!(
75 "Failed to replace existing file after rename error ({err}): {remove_err}"
76 ))
77 })?;
78 fs::rename(&temp_file_path, &file_path)
79 .await
80 .map_err(|rename_err| {
81 StorageError::WriteFailed(format!(
82 "Failed to finalize atomic write after removing existing file: {rename_err}"
83 ))
84 })?;
85 } else {
86 return Err(StorageError::WriteFailed(format!(
87 "Failed to rename temp file into place: {err}"
88 )));
89 }
90 }
91
92 Ok(())
93 }
94
95 async fn retrieve(&self, key: &str) -> Result<Option<Vec<u8>>, StorageError> {
96 let file_path = self.base_path.join(format!("{key}.dat"));
97
98 if !file_path.exists() {
99 return Ok(None);
100 }
101
102 let data = fs::read(&file_path)
103 .await
104 .map_err(|e| StorageError::ReadFailed(format!("Failed to read file: {e}")))?;
105
106 Ok(Some(data))
107 }
108
109 async fn remove(&self, key: &str) -> Result<bool, StorageError> {
110 let file_path = self.base_path.join(format!("{key}.dat"));
111
112 if !file_path.exists() {
113 return Ok(false);
114 }
115
116 fs::remove_file(&file_path)
117 .await
118 .map_err(|e| StorageError::DeleteFailed(format!("Failed to remove file: {e}")))?;
119
120 Ok(true)
121 }
122
123 async fn list_keys(&self, prefix: Option<&str>) -> Result<Vec<String>, StorageError> {
124 let mut keys = Vec::new();
128 let mut stack: Vec<PathBuf> = vec![self.base_path.clone()];
129
130 while let Some(dir) = stack.pop() {
131 let mut entries = match fs::read_dir(&dir).await {
132 Ok(e) => e,
133 Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
134 Err(e) => {
135 return Err(StorageError::ReadFailed(format!(
136 "Failed to read directory: {e}"
137 )))
138 }
139 };
140
141 while let Some(entry) = entries.next_entry().await.map_err(|e| {
142 StorageError::ReadFailed(format!("Failed to read directory entry: {e}"))
143 })? {
144 Self::visit_entry_for_keys(&self.base_path, entry, prefix, &mut stack, &mut keys)
145 .await?;
146 }
147 }
148
149 keys.sort();
150 Ok(keys)
151 }
152}
153
154#[async_trait]
155impl StorageExtendedEffects for FilesystemStorageHandler {
156 async fn exists(&self, key: &str) -> Result<bool, StorageError> {
157 let file_path = self.base_path.join(format!("{key}.dat"));
158 Ok(file_path.exists())
159 }
160
161 async fn store_batch(
162 &self,
163 pairs: std::collections::HashMap<String, Vec<u8>>,
164 ) -> Result<(), StorageError> {
165 for (k, v) in pairs {
166 self.store(&k, v).await?;
167 }
168 Ok(())
169 }
170
171 async fn retrieve_batch(
172 &self,
173 keys: &[String],
174 ) -> Result<std::collections::HashMap<String, Vec<u8>>, StorageError> {
175 let mut out = std::collections::HashMap::new();
176 for key in keys {
177 if let Some(val) = self.retrieve(key).await? {
178 out.insert(key.clone(), val);
179 }
180 }
181 Ok(out)
182 }
183
184 async fn clear_all(&self) -> Result<(), StorageError> {
185 match fs::remove_dir_all(&self.base_path).await {
186 Ok(()) => {}
187 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
188 Err(e) => {
189 return Err(StorageError::DeleteFailed(format!(
190 "Failed to remove storage directory: {e}"
191 )))
192 }
193 }
194
195 fs::create_dir_all(&self.base_path).await.map_err(|e| {
196 StorageError::WriteFailed(format!("Failed to recreate storage directory: {e}"))
197 })?;
198 Ok(())
199 }
200
201 async fn stats(&self) -> Result<StorageStats, StorageError> {
202 let mut key_count: u64 = 0;
203 let mut total_size: u64 = 0;
204
205 let mut stack: Vec<PathBuf> = vec![self.base_path.clone()];
206 while let Some(dir) = stack.pop() {
207 let mut entries = match fs::read_dir(&dir).await {
208 Ok(e) => e,
209 Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
210 Err(e) => {
211 return Err(StorageError::ReadFailed(format!(
212 "Failed to read directory: {e}"
213 )))
214 }
215 };
216
217 while let Ok(Some(entry)) = entries.next_entry().await {
218 let file_type = match entry.file_type().await {
219 Ok(ft) => ft,
220 Err(_) => continue,
221 };
222
223 if file_type.is_dir() {
224 stack.push(entry.path());
225 continue;
226 }
227
228 if !file_type.is_file() {
229 continue;
230 }
231
232 let path = entry.path();
233 if path.extension().and_then(|e| e.to_str()) != Some("dat") {
234 continue;
235 }
236
237 key_count += 1;
238 if let Ok(metadata) = entry.metadata().await {
239 total_size = total_size.saturating_add(metadata.len());
240 }
241 }
242 }
243
244 Ok(StorageStats {
245 key_count,
246 total_size,
247 available_space: None,
248 backend_type: "filesystem".to_string(),
249 })
250 }
251}
252
253impl FilesystemStorageHandler {
254 async fn visit_entry_for_keys(
255 base: &PathBuf,
256 entry: DirEntry,
257 prefix: Option<&str>,
258 stack: &mut Vec<PathBuf>,
259 keys: &mut Vec<String>,
260 ) -> Result<(), StorageError> {
261 let file_type = entry.file_type().await.map_err(|e| {
262 StorageError::ReadFailed(format!("Failed to stat directory entry: {e}"))
263 })?;
264 let path = entry.path();
265
266 if file_type.is_dir() {
267 stack.push(path);
268 return Ok(());
269 }
270 if !file_type.is_file() {
271 return Ok(());
272 }
273
274 if path.extension().and_then(|e| e.to_str()) != Some("dat") {
275 return Ok(());
276 }
277
278 let rel = path.strip_prefix(base).map_err(|e| {
279 StorageError::ReadFailed(format!("Failed to compute relative key path: {e}"))
280 })?;
281 let rel = rel.with_extension("");
282 let mut key = rel.to_string_lossy().to_string();
283 if std::path::MAIN_SEPARATOR != '/' {
284 key = key.replace(std::path::MAIN_SEPARATOR, "/");
285 }
286
287 if let Some(prefix) = prefix {
288 if key.starts_with(prefix) {
289 keys.push(key);
290 }
291 } else {
292 keys.push(key);
293 }
294
295 Ok(())
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use tempfile::TempDir;
303
304 #[tokio::test]
305 async fn test_filesystem_storage_handler() {
306 let temp_dir = TempDir::new().unwrap();
307 let handler = FilesystemStorageHandler::new(temp_dir.path().to_path_buf());
308
309 let key = "test_key";
311 let value = b"test_value".to_vec();
312
313 handler.store(key, value.clone()).await.unwrap();
314 let retrieved = handler.retrieve(key).await.unwrap();
315 assert_eq!(retrieved, Some(value));
316
317 assert!(handler.exists(key).await.unwrap());
319
320 assert!(handler.remove(key).await.unwrap());
322 assert!(!handler.exists(key).await.unwrap());
323 }
324
325 #[tokio::test]
326 async fn test_delete_and_retrieve() {
327 let temp_dir = TempDir::new().unwrap();
328 let handler = FilesystemStorageHandler::new(temp_dir.path().to_path_buf());
329
330 let key = "test_key";
331 let data = b"test_data".to_vec();
332
333 handler.store(key, data.clone()).await.unwrap();
335
336 let retrieved = handler.retrieve(key).await.unwrap();
338 assert_eq!(retrieved, Some(data));
339
340 let was_deleted = handler.remove(key).await.unwrap();
342 assert!(was_deleted);
343
344 let retrieved_after = handler.retrieve(key).await.unwrap();
346 assert_eq!(retrieved_after, None);
347 }
348}