Skip to main content

apiary_storage/
local.rs

1//! Filesystem-backed storage backend for solo mode and local development.
2//!
3//! [`LocalBackend`] implements the [`StorageBackend`] trait using the local
4//! filesystem. Atomic conditional writes use `OpenOptions::create_new(true)`
5//! which maps to `O_CREAT | O_EXCL` on POSIX systems.
6
7use std::path::{Path, PathBuf};
8
9use async_trait::async_trait;
10use bytes::Bytes;
11use tokio::fs;
12use tracing::{debug, instrument};
13
14use apiary_core::error::ApiaryError;
15use apiary_core::storage::StorageBackend;
16use apiary_core::Result;
17
18/// A [`StorageBackend`] backed by the local filesystem.
19///
20/// All keys are mapped to paths under the configured `base_dir`.
21/// Parent directories are created automatically on `put`.
22#[derive(Debug, Clone)]
23pub struct LocalBackend {
24    base_dir: PathBuf,
25}
26
27impl LocalBackend {
28    /// Create a new `LocalBackend` rooted at the given directory.
29    ///
30    /// The directory is created if it does not exist.
31    pub async fn new(base_dir: impl Into<PathBuf>) -> Result<Self> {
32        let base_dir = base_dir.into();
33        fs::create_dir_all(&base_dir).await.map_err(|e| {
34            ApiaryError::storage(
35                format!("Failed to create base directory: {}", base_dir.display()),
36                e,
37            )
38        })?;
39        debug!(base_dir = %base_dir.display(), "LocalBackend initialised");
40        Ok(Self { base_dir })
41    }
42
43    /// Return the full filesystem path for a storage key.
44    fn key_to_path(&self, key: &str) -> PathBuf {
45        self.base_dir.join(key)
46    }
47
48    /// Return the base directory.
49    pub fn base_dir(&self) -> &Path {
50        &self.base_dir
51    }
52}
53
54#[async_trait]
55impl StorageBackend for LocalBackend {
56    #[instrument(skip(self, data), fields(key = %key, size = data.len()))]
57    async fn put(&self, key: &str, data: Bytes) -> Result<()> {
58        let path = self.key_to_path(key);
59        if let Some(parent) = path.parent() {
60            fs::create_dir_all(parent).await.map_err(|e| {
61                ApiaryError::storage(
62                    format!("Failed to create parent directories for {}", path.display()),
63                    e,
64                )
65            })?;
66        }
67        fs::write(&path, &data)
68            .await
69            .map_err(|e| ApiaryError::storage(format!("Failed to write {}", path.display()), e))?;
70        debug!("Put {} bytes to {}", data.len(), key);
71        Ok(())
72    }
73
74    #[instrument(skip(self), fields(key = %key))]
75    async fn get(&self, key: &str) -> Result<Bytes> {
76        let path = self.key_to_path(key);
77        let data = fs::read(&path).await.map_err(|e| {
78            if e.kind() == std::io::ErrorKind::NotFound {
79                ApiaryError::NotFound {
80                    key: key.to_string(),
81                }
82            } else {
83                ApiaryError::storage(format!("Failed to read {}", path.display()), e)
84            }
85        })?;
86        debug!("Get {} bytes from {}", data.len(), key);
87        Ok(Bytes::from(data))
88    }
89
90    #[instrument(skip(self), fields(prefix = %prefix))]
91    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
92        let base = &self.base_dir;
93        let mut results = Vec::new();
94        list_recursive(base, base, prefix, &mut results).await?;
95        results.sort();
96        debug!("Listed {} keys with prefix '{}'", results.len(), prefix);
97        Ok(results)
98    }
99
100    #[instrument(skip(self), fields(key = %key))]
101    async fn delete(&self, key: &str) -> Result<()> {
102        let path = self.key_to_path(key);
103        match fs::remove_file(&path).await {
104            Ok(()) => {
105                debug!("Deleted {}", key);
106                Ok(())
107            }
108            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
109                debug!("Delete {}: already absent", key);
110                Ok(())
111            }
112            Err(e) => Err(ApiaryError::storage(
113                format!("Failed to delete {}", path.display()),
114                e,
115            )),
116        }
117    }
118
119    #[instrument(skip(self, data), fields(key = %key, size = data.len()))]
120    async fn put_if_not_exists(&self, key: &str, data: Bytes) -> Result<bool> {
121        let path = self.key_to_path(key);
122        if let Some(parent) = path.parent() {
123            fs::create_dir_all(parent).await.map_err(|e| {
124                ApiaryError::storage(
125                    format!("Failed to create parent directories for {}", path.display()),
126                    e,
127                )
128            })?;
129        }
130
131        // Use std::fs::OpenOptions with create_new(true) for atomic creation.
132        // This maps to O_CREAT | O_EXCL on POSIX, providing atomicity.
133        let path_clone = path.clone();
134        let data_clone = data.clone();
135        let result = tokio::task::spawn_blocking(move || {
136            use std::io::Write;
137            match std::fs::OpenOptions::new()
138                .write(true)
139                .create_new(true)
140                .open(&path_clone)
141            {
142                Ok(mut file) => {
143                    file.write_all(&data_clone)
144                        .map_err(|e| ApiaryError::storage("Failed to write file", e))?;
145                    Ok(true)
146                }
147                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(false),
148                Err(e) => Err(ApiaryError::storage(
149                    format!("Failed to create {}", path_clone.display()),
150                    e,
151                )),
152            }
153        })
154        .await
155        .map_err(|e| ApiaryError::Internal {
156            message: format!("Blocking task panicked: {e}"),
157        })??;
158
159        debug!(
160            "put_if_not_exists {} → {}",
161            key,
162            if result { "created" } else { "already exists" }
163        );
164        Ok(result)
165    }
166
167    #[instrument(skip(self), fields(key = %key))]
168    async fn exists(&self, key: &str) -> Result<bool> {
169        let path = self.key_to_path(key);
170        let exists = path.exists();
171        debug!("exists {} → {}", key, exists);
172        Ok(exists)
173    }
174}
175
176/// Recursively list all files under `dir`, producing keys relative to `base`.
177async fn list_recursive(
178    base: &Path,
179    dir: &Path,
180    prefix: &str,
181    results: &mut Vec<String>,
182) -> Result<()> {
183    let mut entries = match fs::read_dir(dir).await {
184        Ok(entries) => entries,
185        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
186        Err(e) => {
187            return Err(ApiaryError::storage(
188                format!("Failed to read directory {}", dir.display()),
189                e,
190            ))
191        }
192    };
193
194    while let Some(entry) = entries.next_entry().await.map_err(|e| {
195        ApiaryError::storage(
196            format!("Failed to read directory entry in {}", dir.display()),
197            e,
198        )
199    })? {
200        let path = entry.path();
201        if path.is_dir() {
202            Box::pin(list_recursive(base, &path, prefix, results)).await?;
203        } else {
204            let relative = path.strip_prefix(base).map_err(|e| ApiaryError::Internal {
205                message: format!("Path prefix strip failed: {e}"),
206            })?;
207            // Normalise to forward slashes for cross-platform key consistency
208            let key = relative
209                .components()
210                .map(|c| c.as_os_str().to_string_lossy().to_string())
211                .collect::<Vec<_>>()
212                .join("/");
213            if key.starts_with(prefix) {
214                results.push(key);
215            }
216        }
217    }
218
219    Ok(())
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use tempfile::TempDir;
226
227    async fn test_backend() -> (LocalBackend, TempDir) {
228        let tmp = TempDir::new().unwrap();
229        let backend = LocalBackend::new(tmp.path()).await.unwrap();
230        (backend, tmp)
231    }
232
233    #[tokio::test]
234    async fn test_put_and_get() {
235        let (backend, _tmp) = test_backend().await;
236        backend
237            .put("test/file.txt", Bytes::from("hello"))
238            .await
239            .unwrap();
240        let data = backend.get("test/file.txt").await.unwrap();
241        assert_eq!(data, Bytes::from("hello"));
242    }
243
244    #[tokio::test]
245    async fn test_get_not_found() {
246        let (backend, _tmp) = test_backend().await;
247        let result = backend.get("nonexistent").await;
248        assert!(matches!(result, Err(ApiaryError::NotFound { .. })));
249    }
250
251    #[tokio::test]
252    async fn test_list() {
253        let (backend, _tmp) = test_backend().await;
254        backend.put("prefix/a.txt", Bytes::from("a")).await.unwrap();
255        backend.put("prefix/b.txt", Bytes::from("b")).await.unwrap();
256        backend.put("other/c.txt", Bytes::from("c")).await.unwrap();
257
258        let keys = backend.list("prefix/").await.unwrap();
259        assert_eq!(keys.len(), 2);
260        assert!(keys.contains(&"prefix/a.txt".to_string()));
261        assert!(keys.contains(&"prefix/b.txt".to_string()));
262    }
263
264    #[tokio::test]
265    async fn test_list_empty_prefix() {
266        let (backend, _tmp) = test_backend().await;
267        backend.put("a.txt", Bytes::from("a")).await.unwrap();
268        backend.put("b/c.txt", Bytes::from("c")).await.unwrap();
269
270        let keys = backend.list("").await.unwrap();
271        assert_eq!(keys.len(), 2);
272    }
273
274    #[tokio::test]
275    async fn test_delete() {
276        let (backend, _tmp) = test_backend().await;
277        backend
278            .put("to_delete.txt", Bytes::from("data"))
279            .await
280            .unwrap();
281        backend.delete("to_delete.txt").await.unwrap();
282        let result = backend.get("to_delete.txt").await;
283        assert!(matches!(result, Err(ApiaryError::NotFound { .. })));
284    }
285
286    #[tokio::test]
287    async fn test_delete_nonexistent() {
288        let (backend, _tmp) = test_backend().await;
289        // Should not error
290        backend.delete("nonexistent").await.unwrap();
291    }
292
293    #[tokio::test]
294    async fn test_put_if_not_exists_creates() {
295        let (backend, _tmp) = test_backend().await;
296        let created = backend
297            .put_if_not_exists("new_key.txt", Bytes::from("data"))
298            .await
299            .unwrap();
300        assert!(created);
301        let data = backend.get("new_key.txt").await.unwrap();
302        assert_eq!(data, Bytes::from("data"));
303    }
304
305    #[tokio::test]
306    async fn test_put_if_not_exists_returns_false_when_exists() {
307        let (backend, _tmp) = test_backend().await;
308        backend
309            .put("existing.txt", Bytes::from("original"))
310            .await
311            .unwrap();
312        let created = backend
313            .put_if_not_exists("existing.txt", Bytes::from("new"))
314            .await
315            .unwrap();
316        assert!(!created);
317        // Original data should be unchanged
318        let data = backend.get("existing.txt").await.unwrap();
319        assert_eq!(data, Bytes::from("original"));
320    }
321
322    #[tokio::test]
323    async fn test_put_if_not_exists_atomic() {
324        let (backend, _tmp) = test_backend().await;
325        // First call should succeed
326        let first = backend
327            .put_if_not_exists("race.txt", Bytes::from("first"))
328            .await
329            .unwrap();
330        assert!(first);
331        // Second call should fail
332        let second = backend
333            .put_if_not_exists("race.txt", Bytes::from("second"))
334            .await
335            .unwrap();
336        assert!(!second);
337        // Data should be from first write
338        let data = backend.get("race.txt").await.unwrap();
339        assert_eq!(data, Bytes::from("first"));
340    }
341
342    #[tokio::test]
343    async fn test_exists() {
344        let (backend, _tmp) = test_backend().await;
345        assert!(!backend.exists("missing").await.unwrap());
346        backend
347            .put("present.txt", Bytes::from("data"))
348            .await
349            .unwrap();
350        assert!(backend.exists("present.txt").await.unwrap());
351    }
352
353    #[tokio::test]
354    async fn test_put_creates_parent_dirs() {
355        let (backend, _tmp) = test_backend().await;
356        backend
357            .put("deep/nested/dir/file.txt", Bytes::from("deep"))
358            .await
359            .unwrap();
360        let data = backend.get("deep/nested/dir/file.txt").await.unwrap();
361        assert_eq!(data, Bytes::from("deep"));
362    }
363
364    #[tokio::test]
365    async fn test_put_overwrites() {
366        let (backend, _tmp) = test_backend().await;
367        backend
368            .put("overwrite.txt", Bytes::from("v1"))
369            .await
370            .unwrap();
371        backend
372            .put("overwrite.txt", Bytes::from("v2"))
373            .await
374            .unwrap();
375        let data = backend.get("overwrite.txt").await.unwrap();
376        assert_eq!(data, Bytes::from("v2"));
377    }
378
379    #[tokio::test]
380    async fn test_put_if_not_exists_concurrent() {
381        use futures::future::join_all;
382
383        let (backend, _tmp) = test_backend().await;
384        let backend = std::sync::Arc::new(backend);
385
386        // Create 10 concurrent tasks all trying to write to the same key
387        // Collect futures first, then await them together for maximum concurrency
388        let futures: Vec<_> = (0..10)
389            .map(|i| {
390                let backend_clone = backend.clone();
391                let data = format!("writer-{}", i);
392                tokio::spawn(async move {
393                    backend_clone
394                        .put_if_not_exists("concurrent.txt", Bytes::from(data))
395                        .await
396                })
397            })
398            .collect();
399
400        // Await all tasks concurrently
401        let results = join_all(futures).await;
402
403        // Extract the actual results, handling any task panics or I/O errors
404        let outcomes: Vec<bool> = results
405            .into_iter()
406            .map(|join_result| {
407                join_result
408                    .expect("Task should not panic")
409                    .unwrap_or_else(|e| panic!("Storage operation failed unexpectedly: {:?}", e))
410            })
411            .collect();
412
413        // Exactly one task should have succeeded
414        let success_count = outcomes.iter().filter(|&&r| r).count();
415        assert_eq!(
416            success_count, 1,
417            "Expected exactly 1 successful write, got {}",
418            success_count
419        );
420
421        // The file should exist and contain data from exactly one of the writers
422        let data = backend.get("concurrent.txt").await.unwrap();
423        let data_str = std::str::from_utf8(&data).expect("Data should be valid UTF-8");
424
425        // Verify it's one of the expected writer values
426        let expected_values: Vec<String> = (0..10).map(|i| format!("writer-{}", i)).collect();
427        assert!(
428            expected_values.contains(&data_str.to_string()),
429            "Expected one of {:?}, got '{}'",
430            expected_values,
431            data_str
432        );
433    }
434}