use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::io;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::RwLock;
use bytes::Bytes;
use tokio::io::{AsyncRead, AsyncReadExt};
#[derive(Debug)]
pub enum VfsError {
NotFound(String),
Io(io::Error),
InvalidPath(String),
}
impl fmt::Display for VfsError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
VfsError::NotFound(p) => write!(f, "blob not found: {p}"),
VfsError::Io(e) => write!(f, "vfs I/O error: {e}"),
VfsError::InvalidPath(p) => write!(f, "invalid blob path: {p}"),
}
}
}
impl std::error::Error for VfsError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
VfsError::Io(e) => Some(e),
_ => None,
}
}
}
impl From<io::Error> for VfsError {
fn from(e: io::Error) -> Self {
VfsError::Io(e)
}
}
#[derive(Debug, Clone)]
pub struct VfsStat {
pub size: u64,
}
#[derive(Debug, Clone)]
pub struct VfsEntry {
pub key: String,
pub size: u64,
}
pub trait Vfs: Send + Sync {
fn put<'a>(
&'a self,
path: &'a str,
data: &'a mut (dyn AsyncRead + Unpin + Send),
) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>>;
fn get<'a>(
&'a self,
path: &'a str,
) -> Pin<
Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + 'a>,
>;
fn delete<'a>(
&'a self,
path: &'a str,
) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>>;
fn list<'a>(
&'a self,
prefix: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Vec<VfsEntry>, VfsError>> + Send + 'a>>;
fn stat<'a>(
&'a self,
path: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Option<VfsStat>, VfsError>> + Send + 'a>>;
}
pub(crate) fn validate_path(path: &str) -> Result<(), VfsError> {
if path.contains('\0') {
return Err(VfsError::InvalidPath(path.to_string()));
}
let clean = path.trim_start_matches('/');
for component in Path::new(clean).components() {
if component == std::path::Component::ParentDir {
return Err(VfsError::InvalidPath(path.to_string()));
}
}
Ok(())
}
#[derive(Debug, Default)]
pub struct MemVfs {
data: RwLock<HashMap<String, Bytes>>,
}
impl MemVfs {
pub fn new() -> Self {
Self::default()
}
}
impl Vfs for MemVfs {
fn put<'a>(
&'a self,
path: &'a str,
data: &'a mut (dyn AsyncRead + Unpin + Send),
) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
if let Err(e) = validate_path(path) {
return Box::pin(async { Err(e) });
}
Box::pin(async move {
let mut buf = Vec::new();
let n = data.read_to_end(&mut buf).await.map_err(VfsError::Io)? as u64;
self.data
.write()
.unwrap()
.insert(path.to_string(), Bytes::from(buf));
Ok(n)
})
}
fn get<'a>(
&'a self,
path: &'a str,
) -> Pin<
Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + 'a>,
> {
if let Err(e) = validate_path(path) {
return Box::pin(async { Err(e) });
}
Box::pin(async move {
let data = self
.data
.read()
.unwrap()
.get(path)
.cloned()
.ok_or_else(|| VfsError::NotFound(path.to_string()))?;
Ok(Box::new(io::Cursor::new(data)) as Box<dyn AsyncRead + Send + Unpin>)
})
}
fn delete<'a>(
&'a self,
path: &'a str,
) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
if let Err(e) = validate_path(path) {
return Box::pin(async { Err(e) });
}
Box::pin(async move {
if self.data.write().unwrap().remove(path).is_none() {
return Err(VfsError::NotFound(path.to_string()));
}
Ok(())
})
}
fn list<'a>(
&'a self,
prefix: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Vec<VfsEntry>, VfsError>> + Send + 'a>> {
if let Err(e) = validate_path(prefix) {
return Box::pin(async { Err(e) });
}
Box::pin(async move {
let guard = self.data.read().unwrap();
let mut entries: Vec<VfsEntry> = guard
.iter()
.filter(|(k, _)| k.starts_with(prefix))
.map(|(k, v)| VfsEntry {
key: k.clone(),
size: v.len() as u64,
})
.collect();
entries.sort_by(|a, b| a.key.cmp(&b.key));
Ok(entries)
})
}
fn stat<'a>(
&'a self,
path: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Option<VfsStat>, VfsError>> + Send + 'a>> {
if let Err(e) = validate_path(path) {
return Box::pin(async { Err(e) });
}
Box::pin(async move {
Ok(self.data.read().unwrap().get(path).map(|v| VfsStat {
size: v.len() as u64,
}))
})
}
}
#[derive(Debug)]
pub struct FsVfs {
base_dir: PathBuf,
}
impl FsVfs {
pub fn new(base_dir: impl AsRef<Path>) -> io::Result<Self> {
let base_dir = base_dir.as_ref().to_path_buf();
std::fs::create_dir_all(&base_dir)?;
Ok(Self { base_dir })
}
fn full_path(&self, path: &str) -> Result<PathBuf, VfsError> {
validate_path(path)?;
Ok(self.base_dir.join(path.trim_start_matches('/')))
}
}
impl Vfs for FsVfs {
fn put<'a>(
&'a self,
path: &'a str,
data: &'a mut (dyn AsyncRead + Unpin + Send),
) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
match self.full_path(path) {
Err(e) => Box::pin(async { Err(e) }),
Ok(fp) => Box::pin(async move {
if let Some(parent) = fp.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut file = tokio::fs::File::create(&fp).await?;
let n = tokio::io::copy(data, &mut file).await?;
Ok(n)
}),
}
}
fn get<'a>(
&'a self,
path: &'a str,
) -> Pin<
Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + 'a>,
> {
match self.full_path(path) {
Err(e) => Box::pin(async { Err(e) }),
Ok(fp) => {
let path = path.to_string();
Box::pin(async move {
match tokio::fs::File::open(&fp).await {
Ok(f) => Ok(Box::new(f) as Box<dyn AsyncRead + Send + Unpin>),
Err(e) if e.kind() == io::ErrorKind::NotFound => {
Err(VfsError::NotFound(path))
}
Err(e) => Err(VfsError::Io(e)),
}
})
}
}
}
fn delete<'a>(
&'a self,
path: &'a str,
) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
match self.full_path(path) {
Err(e) => Box::pin(async { Err(e) }),
Ok(fp) => {
let path = path.to_string();
Box::pin(async move {
match tokio::fs::remove_file(&fp).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => {
Err(VfsError::NotFound(path))
}
Err(e) => Err(VfsError::Io(e)),
}
})
}
}
}
fn list<'a>(
&'a self,
prefix: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Vec<VfsEntry>, VfsError>> + Send + 'a>> {
if let Err(e) = validate_path(prefix) {
return Box::pin(async { Err(e) });
}
let clean_prefix = prefix.trim_start_matches('/');
let search_dir = if clean_prefix.is_empty() {
self.base_dir.clone()
} else {
let candidate = self.base_dir.join(clean_prefix);
if candidate.is_dir() {
candidate
} else {
candidate.parent().unwrap_or(&self.base_dir).to_path_buf()
}
};
let base_str = self.base_dir.to_string_lossy().into_owned();
let prefix_owned = clean_prefix.to_string();
Box::pin(async move {
tokio::task::spawn_blocking(move || -> Result<Vec<VfsEntry>, VfsError> {
let mut entries = Vec::new();
collect_files(&search_dir, &base_str, &mut entries)?;
entries.retain(|e| e.key.starts_with(&prefix_owned));
entries.sort_by(|a, b| a.key.cmp(&b.key));
Ok(entries)
})
.await
.map_err(|e| VfsError::Io(io::Error::other(e.to_string())))?
})
}
fn stat<'a>(
&'a self,
path: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Option<VfsStat>, VfsError>> + Send + 'a>> {
match self.full_path(path) {
Err(e) => Box::pin(async { Err(e) }),
Ok(fp) => Box::pin(async move {
match tokio::fs::metadata(&fp).await {
Ok(m) => Ok(Some(VfsStat { size: m.len() })),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(VfsError::Io(e)),
}
}),
}
}
}
fn collect_files(dir: &Path, base: &str, out: &mut Vec<VfsEntry>) -> Result<(), VfsError> {
if !dir.exists() {
return Ok(());
}
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let ft = entry.file_type()?;
let path = entry.path();
if ft.is_dir() {
collect_files(&path, base, out)?;
} else if ft.is_file() {
let full = path.to_string_lossy();
let key = if let Some(stripped) = full.strip_prefix(base) {
stripped.trim_start_matches(['/', '\\']).to_string()
} else {
full.into_owned()
};
let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
out.push(VfsEntry { key, size });
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
async fn roundtrip(vfs: &dyn Vfs) {
let data = b"hello world";
vfs.put("ns/a/b", &mut data.as_ref()).await.unwrap();
let mut buf = Vec::new();
vfs.get("ns/a/b")
.await
.unwrap()
.read_to_end(&mut buf)
.await
.unwrap();
assert_eq!(buf, data);
let stat = vfs.stat("ns/a/b").await.unwrap().unwrap();
assert_eq!(stat.size, 11);
assert!(vfs.stat("ns/a/missing").await.unwrap().is_none());
vfs.put("ns/a/c", &mut b"other".as_ref()).await.unwrap();
let entries = vfs.list("ns/a/").await.unwrap();
let keys: Vec<&str> = entries.iter().map(|e| e.key.as_str()).collect();
assert!(keys.contains(&"ns/a/b"), "expected ns/a/b in {keys:?}");
assert!(keys.contains(&"ns/a/c"), "expected ns/a/c in {keys:?}");
vfs.delete("ns/a/b").await.unwrap();
assert!(matches!(
vfs.get("ns/a/b").await,
Err(VfsError::NotFound(_))
));
assert!(matches!(
vfs.delete("ns/a/missing").await,
Err(VfsError::NotFound(_))
));
}
#[tokio::test]
async fn mem_vfs_roundtrip() {
let vfs = MemVfs::new();
roundtrip(&vfs).await;
}
#[tokio::test]
async fn fs_vfs_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let vfs = FsVfs::new(dir.path()).unwrap();
roundtrip(&vfs).await;
}
async fn rejects_path_traversal(vfs: &dyn Vfs) {
let traversal_paths = ["../escape", "a/../../escape", "a/../../../etc/passwd"];
for path in &traversal_paths {
assert!(
matches!(
vfs.put(path, &mut b"x".as_ref()).await,
Err(VfsError::InvalidPath(_))
),
"put({path}) should be rejected"
);
assert!(
matches!(vfs.get(path).await, Err(VfsError::InvalidPath(_))),
"get({path}) should be rejected"
);
assert!(
matches!(vfs.delete(path).await, Err(VfsError::InvalidPath(_))),
"delete({path}) should be rejected"
);
assert!(
matches!(vfs.list(path).await, Err(VfsError::InvalidPath(_))),
"list({path}) should be rejected"
);
assert!(
matches!(vfs.stat(path).await, Err(VfsError::InvalidPath(_))),
"stat({path}) should be rejected"
);
}
}
#[tokio::test]
async fn mem_vfs_rejects_path_traversal() {
let vfs = MemVfs::new();
rejects_path_traversal(&vfs).await;
}
#[tokio::test]
async fn fs_vfs_rejects_path_traversal() {
let dir = tempfile::tempdir().unwrap();
let vfs = FsVfs::new(dir.path()).unwrap();
rejects_path_traversal(&vfs).await;
}
}