Skip to main content

arcly_stream/
storage.rs

1//! Reference [`StorageBackend`] implementations.
2//!
3//! Gated behind `storage-fs`. The trait itself lives in [`crate::traits`] and
4//! always compiles, so a consumer can implement their own backend (S3, GCS,
5//! in-memory) without enabling this module.
6
7use crate::traits::StorageBackend;
8use crate::{Result, StreamError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use std::path::{Path, PathBuf};
12
13/// A filesystem-backed object store rooted at a base directory.
14///
15/// Keys are treated as relative paths under `root`; parent directories are
16/// created on `put`. `put_file` is overridden to `rename` when the source and
17/// destination share a filesystem, avoiding the read-into-memory round-trip.
18#[derive(Debug, Clone)]
19pub struct FsStorage {
20    root: PathBuf,
21}
22
23impl FsStorage {
24    /// Create a backend rooted at `root`. The directory is created lazily.
25    pub fn new(root: impl Into<PathBuf>) -> Self {
26        Self { root: root.into() }
27    }
28
29    /// Resolve a key to an absolute path, rejecting traversal outside `root`.
30    fn resolve(&self, key: &str) -> Result<PathBuf> {
31        // Reject absolute keys and parent-dir escapes up front.
32        if key.starts_with('/') || key.split('/').any(|seg| seg == "..") {
33            return Err(StreamError::storage(format!("invalid key: {key}")));
34        }
35        Ok(self.root.join(key))
36    }
37}
38
39#[async_trait]
40impl StorageBackend for FsStorage {
41    async fn put(&self, key: &str, data: Bytes) -> Result<()> {
42        let path = self.resolve(key)?;
43        if let Some(parent) = path.parent() {
44            tokio::fs::create_dir_all(parent)
45                .await
46                .map_err(|e| StreamError::storage(format!("mkdir {parent:?}: {e}")))?;
47        }
48        tokio::fs::write(&path, &data)
49            .await
50            .map_err(|e| StreamError::storage(format!("write {path:?}: {e}")))
51    }
52
53    async fn get(&self, key: &str) -> Result<Bytes> {
54        let path = self.resolve(key)?;
55        match tokio::fs::read(&path).await {
56            Ok(data) => Ok(Bytes::from(data)),
57            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
58                Err(StreamError::StorageNotFound(key.to_string()))
59            }
60            Err(e) => Err(StreamError::storage(format!("read {path:?}: {e}"))),
61        }
62    }
63
64    async fn delete(&self, key: &str) -> Result<()> {
65        let path = self.resolve(key)?;
66        match tokio::fs::remove_file(&path).await {
67            Ok(()) => Ok(()),
68            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
69            Err(e) => Err(StreamError::storage(format!("delete {path:?}: {e}"))),
70        }
71    }
72
73    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
74        let mut out = Vec::new();
75        let mut stack = vec![self.root.clone()];
76        while let Some(dir) = stack.pop() {
77            let mut entries = match tokio::fs::read_dir(&dir).await {
78                Ok(e) => e,
79                Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
80                Err(e) => return Err(StreamError::storage(format!("readdir {dir:?}: {e}"))),
81            };
82            while let Some(entry) = entries
83                .next_entry()
84                .await
85                .map_err(|e| StreamError::storage(format!("readdir entry: {e}")))?
86            {
87                let path = entry.path();
88                if path.is_dir() {
89                    stack.push(path);
90                } else if let Some(rel) =
91                    path.strip_prefix(&self.root).ok().and_then(|p| p.to_str())
92                {
93                    let key = rel.replace('\\', "/");
94                    if key.starts_with(prefix) {
95                        out.push(key);
96                    }
97                }
98            }
99        }
100        Ok(out)
101    }
102
103    async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
104        let dest = self.resolve(key)?;
105        if let Some(parent) = dest.parent() {
106            tokio::fs::create_dir_all(parent)
107                .await
108                .map_err(|e| StreamError::storage(format!("mkdir {parent:?}: {e}")))?;
109        }
110        // Try a cheap rename first; fall back to copy+remove across filesystems.
111        match tokio::fs::rename(path, &dest).await {
112            Ok(()) => Ok(()),
113            Err(_) => {
114                tokio::fs::copy(path, &dest)
115                    .await
116                    .map_err(|e| StreamError::storage(format!("copy to {dest:?}: {e}")))?;
117                tokio::fs::remove_file(path).await.ok();
118                Ok(())
119            }
120        }
121    }
122}