Skip to main content

winterbaume_core/
vfs.rs

1//! Virtual filesystem abstraction for blob storage.
2//!
3//! The [`Vfs`] trait abstracts over storage backends so that services can keep
4//! large binary blobs out of their primary in-memory state.  Two built-in
5//! implementations are provided:
6//!
7//! * [`MemVfs`] — stores blobs in a `HashMap` protected by an `RwLock`.
8//!   This is the default and requires no external resources.
9//! * [`FsVfs`] — stores each blob as a file under a base directory on the
10//!   local filesystem, enabling off-heap (and cross-restart) persistence.
11//!
12//! # Async streaming I/O
13//!
14//! Every data-transfer method on [`Vfs`] is async.  [`put`] accepts a
15//! `&mut (dyn AsyncRead + Unpin + Send)` so callers can stream data directly
16//! from network buffers without staging the full payload in memory.  [`get`]
17//! returns a `Box<dyn AsyncRead + Send + Unpin>` — for [`FsVfs`] this is an
18//! open [`tokio::fs::File`] that lets the caller read incrementally; for
19//! [`MemVfs`] it is a `Cursor` over the in-memory buffer.
20//!
21//! The trait uses the `Pin<Box<dyn Future>>` pattern (the same as
22//! [`crate::MockService::handle`]) so that `Arc<dyn Vfs>` remains object-safe
23//! without requiring the `async-trait` crate.
24
25use std::collections::HashMap;
26use std::fmt;
27use std::future::Future;
28use std::io;
29use std::path::{Path, PathBuf};
30use std::pin::Pin;
31use std::sync::RwLock;
32
33use bytes::Bytes;
34use tokio::io::{AsyncRead, AsyncReadExt};
35
36// ---------------------------------------------------------------------------
37// Error type
38// ---------------------------------------------------------------------------
39
40/// Errors returned by [`Vfs`] operations.
41#[derive(Debug)]
42pub enum VfsError {
43    /// The requested path was not found.
44    NotFound(String),
45    /// An underlying I/O error (only possible with [`FsVfs`]).
46    Io(io::Error),
47    /// The path contains invalid components (e.g. `..`) that would escape the
48    /// storage root.
49    InvalidPath(String),
50}
51
52impl fmt::Display for VfsError {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        match self {
55            VfsError::NotFound(p) => write!(f, "blob not found: {p}"),
56            VfsError::Io(e) => write!(f, "vfs I/O error: {e}"),
57            VfsError::InvalidPath(p) => write!(f, "invalid blob path: {p}"),
58        }
59    }
60}
61
62impl std::error::Error for VfsError {
63    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
64        match self {
65            VfsError::Io(e) => Some(e),
66            _ => None,
67        }
68    }
69}
70
71impl From<io::Error> for VfsError {
72    fn from(e: io::Error) -> Self {
73        VfsError::Io(e)
74    }
75}
76
77// ---------------------------------------------------------------------------
78// Supporting types
79// ---------------------------------------------------------------------------
80
81/// Metadata about a stored blob.
82#[derive(Debug, Clone)]
83pub struct VfsStat {
84    /// Size in bytes.
85    pub size: u64,
86}
87
88/// An entry returned by [`Vfs::list`].
89#[derive(Debug, Clone)]
90pub struct VfsEntry {
91    /// The full key of this blob (relative to the store's namespace).
92    pub key: String,
93    /// Size in bytes.
94    pub size: u64,
95}
96
97// ---------------------------------------------------------------------------
98// Trait
99// ---------------------------------------------------------------------------
100
101/// A simple async key-value blob store.
102///
103/// All paths/keys are `/`-separated strings (similar to URL paths).
104/// Implementations must be `Send + Sync` so they can be shared across
105/// threads via `Arc<dyn Vfs>`.
106///
107/// The trait uses explicit `Pin<Box<dyn Future<...>>>` return types so that
108/// `dyn Vfs` remains object-safe.
109pub trait Vfs: Send + Sync {
110    /// Stream `data` into `path`, overwriting any existing entry.
111    ///
112    /// Returns the number of bytes written.
113    fn put<'a>(
114        &'a self,
115        path: &'a str,
116        data: &'a mut (dyn AsyncRead + Unpin + Send),
117    ) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>>;
118
119    /// Return a streaming reader for the blob stored at `path`.
120    ///
121    /// Returns [`VfsError::NotFound`] if the path does not exist.
122    fn get<'a>(
123        &'a self,
124        path: &'a str,
125    ) -> Pin<
126        Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + 'a>,
127    >;
128
129    /// Delete the blob at `path`.
130    ///
131    /// Returns [`VfsError::NotFound`] if the path does not exist.
132    fn delete<'a>(
133        &'a self,
134        path: &'a str,
135    ) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>>;
136
137    /// List all entries whose key starts with `prefix`.
138    ///
139    /// The prefix may be empty to list everything.
140    fn list<'a>(
141        &'a self,
142        prefix: &'a str,
143    ) -> Pin<Box<dyn Future<Output = Result<Vec<VfsEntry>, VfsError>> + Send + 'a>>;
144
145    /// Return metadata for the blob at `path`, or `None` if it does not exist.
146    fn stat<'a>(
147        &'a self,
148        path: &'a str,
149    ) -> Pin<Box<dyn Future<Output = Result<Option<VfsStat>, VfsError>> + Send + 'a>>;
150}
151
152// ---------------------------------------------------------------------------
153// Shared path validation
154// ---------------------------------------------------------------------------
155
156/// Reject paths that contain `..` components or null bytes.
157///
158/// Both [`MemVfs`] and [`FsVfs`] call this before any operation so that they
159/// behave identically regardless of backend.  Null bytes are rejected because
160/// they terminate C strings used by OS syscalls in ways that could silently
161/// truncate the intended path.
162pub(crate) fn validate_path(path: &str) -> Result<(), VfsError> {
163    if path.contains('\0') {
164        return Err(VfsError::InvalidPath(path.to_string()));
165    }
166    let clean = path.trim_start_matches('/');
167    for component in Path::new(clean).components() {
168        if component == std::path::Component::ParentDir {
169            return Err(VfsError::InvalidPath(path.to_string()));
170        }
171    }
172    Ok(())
173}
174
175// ---------------------------------------------------------------------------
176// MemVfs
177// ---------------------------------------------------------------------------
178
179/// In-memory VFS backed by a `HashMap`.
180///
181/// This is the default backend used when no explicit VFS is configured.
182/// All data is stored on the heap and is lost when the service is dropped.
183/// [`get`] returns a [`io::Cursor`] over a ref-counted [`Bytes`] clone so no
184/// extra copy of stored data is made.
185///
186/// Path validation (rejection of `..` components and null bytes) matches
187/// [`FsVfs`] so both backends enforce the same invariants.
188#[derive(Debug, Default)]
189pub struct MemVfs {
190    data: RwLock<HashMap<String, Bytes>>,
191}
192
193impl MemVfs {
194    pub fn new() -> Self {
195        Self::default()
196    }
197}
198
199impl Vfs for MemVfs {
200    fn put<'a>(
201        &'a self,
202        path: &'a str,
203        data: &'a mut (dyn AsyncRead + Unpin + Send),
204    ) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
205        if let Err(e) = validate_path(path) {
206            return Box::pin(async { Err(e) });
207        }
208        Box::pin(async move {
209            let mut buf = Vec::new();
210            let n = data.read_to_end(&mut buf).await.map_err(VfsError::Io)? as u64;
211            self.data
212                .write()
213                .unwrap()
214                .insert(path.to_string(), Bytes::from(buf));
215            Ok(n)
216        })
217    }
218
219    fn get<'a>(
220        &'a self,
221        path: &'a str,
222    ) -> Pin<
223        Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + 'a>,
224    > {
225        if let Err(e) = validate_path(path) {
226            return Box::pin(async { Err(e) });
227        }
228        Box::pin(async move {
229            let data = self
230                .data
231                .read()
232                .unwrap()
233                .get(path)
234                .cloned()
235                .ok_or_else(|| VfsError::NotFound(path.to_string()))?;
236            Ok(Box::new(io::Cursor::new(data)) as Box<dyn AsyncRead + Send + Unpin>)
237        })
238    }
239
240    fn delete<'a>(
241        &'a self,
242        path: &'a str,
243    ) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
244        if let Err(e) = validate_path(path) {
245            return Box::pin(async { Err(e) });
246        }
247        Box::pin(async move {
248            if self.data.write().unwrap().remove(path).is_none() {
249                return Err(VfsError::NotFound(path.to_string()));
250            }
251            Ok(())
252        })
253    }
254
255    fn list<'a>(
256        &'a self,
257        prefix: &'a str,
258    ) -> Pin<Box<dyn Future<Output = Result<Vec<VfsEntry>, VfsError>> + Send + 'a>> {
259        if let Err(e) = validate_path(prefix) {
260            return Box::pin(async { Err(e) });
261        }
262        Box::pin(async move {
263            let guard = self.data.read().unwrap();
264            let mut entries: Vec<VfsEntry> = guard
265                .iter()
266                .filter(|(k, _)| k.starts_with(prefix))
267                .map(|(k, v)| VfsEntry {
268                    key: k.clone(),
269                    size: v.len() as u64,
270                })
271                .collect();
272            entries.sort_by(|a, b| a.key.cmp(&b.key));
273            Ok(entries)
274        })
275    }
276
277    fn stat<'a>(
278        &'a self,
279        path: &'a str,
280    ) -> Pin<Box<dyn Future<Output = Result<Option<VfsStat>, VfsError>> + Send + 'a>> {
281        if let Err(e) = validate_path(path) {
282            return Box::pin(async { Err(e) });
283        }
284        Box::pin(async move {
285            Ok(self.data.read().unwrap().get(path).map(|v| VfsStat {
286                size: v.len() as u64,
287            }))
288        })
289    }
290}
291
292// ---------------------------------------------------------------------------
293// FsVfs
294// ---------------------------------------------------------------------------
295
296/// Filesystem-backed VFS using async tokio I/O.
297///
298/// Each blob is stored as a regular file under `base_dir`.  The blob's path
299/// (after any BlobStore namespace prefix is stripped) is mapped directly to a
300/// filesystem path under `base_dir`, with `/` becoming the OS path separator.
301///
302/// Parent directories are created automatically on `put`.  Deleting a blob
303/// removes its file but leaves any empty parent directories intact.
304///
305/// [`put`] streams from the caller's reader directly to disk via
306/// [`tokio::io::copy`].  [`get`] returns an open [`tokio::fs::File`] so data
307/// can be read incrementally without a full in-memory allocation.  Directory
308/// listing uses [`tokio::task::spawn_blocking`] since recursive `read_dir`
309/// is not yet offered as a simple async utility by tokio.
310#[derive(Debug)]
311pub struct FsVfs {
312    base_dir: PathBuf,
313}
314
315impl FsVfs {
316    /// Create an `FsVfs` rooted at `base_dir`.
317    ///
318    /// The directory is created if it does not already exist.
319    pub fn new(base_dir: impl AsRef<Path>) -> io::Result<Self> {
320        let base_dir = base_dir.as_ref().to_path_buf();
321        std::fs::create_dir_all(&base_dir)?;
322        Ok(Self { base_dir })
323    }
324
325    fn full_path(&self, path: &str) -> Result<PathBuf, VfsError> {
326        validate_path(path)?;
327        Ok(self.base_dir.join(path.trim_start_matches('/')))
328    }
329}
330
331impl Vfs for FsVfs {
332    fn put<'a>(
333        &'a self,
334        path: &'a str,
335        data: &'a mut (dyn AsyncRead + Unpin + Send),
336    ) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
337        match self.full_path(path) {
338            Err(e) => Box::pin(async { Err(e) }),
339            Ok(fp) => Box::pin(async move {
340                if let Some(parent) = fp.parent() {
341                    tokio::fs::create_dir_all(parent).await?;
342                }
343                let mut file = tokio::fs::File::create(&fp).await?;
344                let n = tokio::io::copy(data, &mut file).await?;
345                Ok(n)
346            }),
347        }
348    }
349
350    fn get<'a>(
351        &'a self,
352        path: &'a str,
353    ) -> Pin<
354        Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + 'a>,
355    > {
356        match self.full_path(path) {
357            Err(e) => Box::pin(async { Err(e) }),
358            Ok(fp) => {
359                let path = path.to_string();
360                Box::pin(async move {
361                    match tokio::fs::File::open(&fp).await {
362                        Ok(f) => Ok(Box::new(f) as Box<dyn AsyncRead + Send + Unpin>),
363                        Err(e) if e.kind() == io::ErrorKind::NotFound => {
364                            Err(VfsError::NotFound(path))
365                        }
366                        Err(e) => Err(VfsError::Io(e)),
367                    }
368                })
369            }
370        }
371    }
372
373    fn delete<'a>(
374        &'a self,
375        path: &'a str,
376    ) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
377        match self.full_path(path) {
378            Err(e) => Box::pin(async { Err(e) }),
379            Ok(fp) => {
380                let path = path.to_string();
381                Box::pin(async move {
382                    match tokio::fs::remove_file(&fp).await {
383                        Ok(()) => Ok(()),
384                        Err(e) if e.kind() == io::ErrorKind::NotFound => {
385                            Err(VfsError::NotFound(path))
386                        }
387                        Err(e) => Err(VfsError::Io(e)),
388                    }
389                })
390            }
391        }
392    }
393
394    fn list<'a>(
395        &'a self,
396        prefix: &'a str,
397    ) -> Pin<Box<dyn Future<Output = Result<Vec<VfsEntry>, VfsError>> + Send + 'a>> {
398        if let Err(e) = validate_path(prefix) {
399            return Box::pin(async { Err(e) });
400        }
401        let clean_prefix = prefix.trim_start_matches('/');
402        // Walk the parent directory (or the prefix itself if it is a directory)
403        // and filter entries by string-prefix matching, matching MemVfs semantics.
404        let search_dir = if clean_prefix.is_empty() {
405            self.base_dir.clone()
406        } else {
407            let candidate = self.base_dir.join(clean_prefix);
408            if candidate.is_dir() {
409                candidate
410            } else {
411                // The prefix may be a partial path component (e.g. "ns/bucket"
412                // should match "ns/bucket-a/obj").  Walk from the parent
413                // directory so we can check every entry's key against the
414                // string prefix.
415                candidate.parent().unwrap_or(&self.base_dir).to_path_buf()
416            }
417        };
418        let base_str = self.base_dir.to_string_lossy().into_owned();
419        let prefix_owned = clean_prefix.to_string();
420        Box::pin(async move {
421            tokio::task::spawn_blocking(move || -> Result<Vec<VfsEntry>, VfsError> {
422                let mut entries = Vec::new();
423                collect_files(&search_dir, &base_str, &mut entries)?;
424                // Apply string-prefix filtering to match the Vfs trait contract.
425                entries.retain(|e| e.key.starts_with(&prefix_owned));
426                entries.sort_by(|a, b| a.key.cmp(&b.key));
427                Ok(entries)
428            })
429            .await
430            .map_err(|e| VfsError::Io(io::Error::other(e.to_string())))?
431        })
432    }
433
434    fn stat<'a>(
435        &'a self,
436        path: &'a str,
437    ) -> Pin<Box<dyn Future<Output = Result<Option<VfsStat>, VfsError>> + Send + 'a>> {
438        match self.full_path(path) {
439            Err(e) => Box::pin(async { Err(e) }),
440            Ok(fp) => Box::pin(async move {
441                match tokio::fs::metadata(&fp).await {
442                    Ok(m) => Ok(Some(VfsStat { size: m.len() })),
443                    Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
444                    Err(e) => Err(VfsError::Io(e)),
445                }
446            }),
447        }
448    }
449}
450
451/// Recursively walk `dir`, appending file entries to `out`.
452fn collect_files(dir: &Path, base: &str, out: &mut Vec<VfsEntry>) -> Result<(), VfsError> {
453    if !dir.exists() {
454        return Ok(());
455    }
456    for entry in std::fs::read_dir(dir)? {
457        let entry = entry?;
458        let ft = entry.file_type()?;
459        let path = entry.path();
460        if ft.is_dir() {
461            collect_files(&path, base, out)?;
462        } else if ft.is_file() {
463            let full = path.to_string_lossy();
464            let key = if let Some(stripped) = full.strip_prefix(base) {
465                stripped.trim_start_matches(['/', '\\']).to_string()
466            } else {
467                full.into_owned()
468            };
469            let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
470            out.push(VfsEntry { key, size });
471        }
472    }
473    Ok(())
474}
475
476// ---------------------------------------------------------------------------
477// Tests
478// ---------------------------------------------------------------------------
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483
484    async fn roundtrip(vfs: &dyn Vfs) {
485        let data = b"hello world";
486
487        // put + get
488        vfs.put("ns/a/b", &mut data.as_ref()).await.unwrap();
489        let mut buf = Vec::new();
490        vfs.get("ns/a/b")
491            .await
492            .unwrap()
493            .read_to_end(&mut buf)
494            .await
495            .unwrap();
496        assert_eq!(buf, data);
497
498        // stat
499        let stat = vfs.stat("ns/a/b").await.unwrap().unwrap();
500        assert_eq!(stat.size, 11);
501
502        // stat non-existent
503        assert!(vfs.stat("ns/a/missing").await.unwrap().is_none());
504
505        // list
506        vfs.put("ns/a/c", &mut b"other".as_ref()).await.unwrap();
507        let entries = vfs.list("ns/a/").await.unwrap();
508        let keys: Vec<&str> = entries.iter().map(|e| e.key.as_str()).collect();
509        assert!(keys.contains(&"ns/a/b"), "expected ns/a/b in {keys:?}");
510        assert!(keys.contains(&"ns/a/c"), "expected ns/a/c in {keys:?}");
511
512        // delete
513        vfs.delete("ns/a/b").await.unwrap();
514        assert!(matches!(
515            vfs.get("ns/a/b").await,
516            Err(VfsError::NotFound(_))
517        ));
518
519        // delete non-existent
520        assert!(matches!(
521            vfs.delete("ns/a/missing").await,
522            Err(VfsError::NotFound(_))
523        ));
524    }
525
526    #[tokio::test]
527    async fn mem_vfs_roundtrip() {
528        let vfs = MemVfs::new();
529        roundtrip(&vfs).await;
530    }
531
532    #[tokio::test]
533    async fn fs_vfs_roundtrip() {
534        let dir = tempfile::tempdir().unwrap();
535        let vfs = FsVfs::new(dir.path()).unwrap();
536        roundtrip(&vfs).await;
537    }
538
539    async fn rejects_path_traversal(vfs: &dyn Vfs) {
540        let traversal_paths = ["../escape", "a/../../escape", "a/../../../etc/passwd"];
541        for path in &traversal_paths {
542            assert!(
543                matches!(
544                    vfs.put(path, &mut b"x".as_ref()).await,
545                    Err(VfsError::InvalidPath(_))
546                ),
547                "put({path}) should be rejected"
548            );
549            assert!(
550                matches!(vfs.get(path).await, Err(VfsError::InvalidPath(_))),
551                "get({path}) should be rejected"
552            );
553            assert!(
554                matches!(vfs.delete(path).await, Err(VfsError::InvalidPath(_))),
555                "delete({path}) should be rejected"
556            );
557            assert!(
558                matches!(vfs.list(path).await, Err(VfsError::InvalidPath(_))),
559                "list({path}) should be rejected"
560            );
561            assert!(
562                matches!(vfs.stat(path).await, Err(VfsError::InvalidPath(_))),
563                "stat({path}) should be rejected"
564            );
565        }
566    }
567
568    #[tokio::test]
569    async fn mem_vfs_rejects_path_traversal() {
570        let vfs = MemVfs::new();
571        rejects_path_traversal(&vfs).await;
572    }
573
574    #[tokio::test]
575    async fn fs_vfs_rejects_path_traversal() {
576        let dir = tempfile::tempdir().unwrap();
577        let vfs = FsVfs::new(dir.path()).unwrap();
578        rejects_path_traversal(&vfs).await;
579    }
580}