use crate::traits::StorageBackend;
use crate::{Result, StreamError};
use async_trait::async_trait;
use bytes::Bytes;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone)]
pub struct FsStorage {
root: PathBuf,
}
impl FsStorage {
pub fn new(root: impl Into<PathBuf>) -> Self {
Self { root: root.into() }
}
fn resolve(&self, key: &str) -> Result<PathBuf> {
if key.starts_with('/') || key.split('/').any(|seg| seg == "..") {
return Err(StreamError::storage(format!("invalid key: {key}")));
}
Ok(self.root.join(key))
}
}
#[async_trait]
impl StorageBackend for FsStorage {
async fn put(&self, key: &str, data: Bytes) -> Result<()> {
let path = self.resolve(key)?;
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| StreamError::storage(format!("mkdir {parent:?}: {e}")))?;
}
tokio::fs::write(&path, &data)
.await
.map_err(|e| StreamError::storage(format!("write {path:?}: {e}")))
}
async fn get(&self, key: &str) -> Result<Bytes> {
let path = self.resolve(key)?;
match tokio::fs::read(&path).await {
Ok(data) => Ok(Bytes::from(data)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
Err(StreamError::StorageNotFound(key.to_string()))
}
Err(e) => Err(StreamError::storage(format!("read {path:?}: {e}"))),
}
}
async fn delete(&self, key: &str) -> Result<()> {
let path = self.resolve(key)?;
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(StreamError::storage(format!("delete {path:?}: {e}"))),
}
}
async fn list(&self, prefix: &str) -> Result<Vec<String>> {
let mut out = Vec::new();
let mut stack = vec![self.root.clone()];
while let Some(dir) = stack.pop() {
let mut entries = match tokio::fs::read_dir(&dir).await {
Ok(e) => e,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
Err(e) => return Err(StreamError::storage(format!("readdir {dir:?}: {e}"))),
};
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| StreamError::storage(format!("readdir entry: {e}")))?
{
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if let Some(rel) =
path.strip_prefix(&self.root).ok().and_then(|p| p.to_str())
{
let key = rel.replace('\\', "/");
if key.starts_with(prefix) {
out.push(key);
}
}
}
}
Ok(out)
}
async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
let dest = self.resolve(key)?;
if let Some(parent) = dest.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| StreamError::storage(format!("mkdir {parent:?}: {e}")))?;
}
match tokio::fs::rename(path, &dest).await {
Ok(()) => Ok(()),
Err(_) => {
tokio::fs::copy(path, &dest)
.await
.map_err(|e| StreamError::storage(format!("copy to {dest:?}: {e}")))?;
tokio::fs::remove_file(path).await.ok();
Ok(())
}
}
}
}