use std::path::{Path, PathBuf};
use async_trait::async_trait;
use opendal::{services, EntryMode, Operator};
use tracing::instrument;
use crate::error::StorageError;
use crate::storage_trait::{Storage, StorageEntry};
pub struct FileStorage {
op: Operator,
root: PathBuf,
}
fn validate_relative_path(path: &str) -> Result<(), StorageError> {
if path.starts_with('/') {
return Err(StorageError::InvalidPath(PathBuf::from(path)));
}
for component in std::path::Path::new(path).components() {
if component == std::path::Component::ParentDir {
return Err(StorageError::InvalidPath(PathBuf::from(path)));
}
}
Ok(())
}
impl FileStorage {
pub fn new(root: &Path) -> Result<Self, StorageError> {
crate::nfs_check::ensure_local_filesystem(root)?;
let root_str = root
.to_str()
.ok_or_else(|| StorageError::InvalidPath(root.to_path_buf()))?;
let builder = services::Fs::default().root(root_str);
let op = Operator::new(builder)
.map_err(|e| StorageError::OpenDal(e.to_string()))?
.finish();
Ok(Self {
op,
root: root.to_path_buf(),
})
}
#[must_use]
pub fn root(&self) -> &Path {
&self.root
}
}
#[async_trait]
impl Storage for FileStorage {
#[instrument(skip(self), fields(path))]
async fn read(&self, path: &str) -> Result<Vec<u8>, StorageError> {
validate_relative_path(path)?;
self.op
.read(path)
.await
.map(|buf| buf.to_vec())
.map_err(|e| {
if e.kind() == opendal::ErrorKind::NotFound {
StorageError::NotFound(path.to_owned())
} else {
StorageError::OpenDal(e.to_string())
}
})
}
#[instrument(skip(self, content), fields(path, bytes = content.len()))]
async fn write(&self, path: &str, content: &[u8]) -> Result<(), StorageError> {
validate_relative_path(path)?;
self.op
.write(path, content.to_vec())
.await
.map_err(|e| StorageError::OpenDal(e.to_string()))
}
#[instrument(skip(self), fields(path))]
async fn delete(&self, path: &str) -> Result<(), StorageError> {
validate_relative_path(path)?;
self.op.delete(path).await.map_err(|e| {
if e.kind() == opendal::ErrorKind::NotFound {
StorageError::NotFound(path.to_owned())
} else {
StorageError::OpenDal(e.to_string())
}
})
}
#[instrument(skip(self), fields(prefix))]
async fn list(&self, prefix: &str) -> Result<Vec<StorageEntry>, StorageError> {
validate_relative_path(prefix)?;
let entries = self
.op
.list_with(prefix)
.recursive(true)
.await
.map_err(|e| StorageError::OpenDal(e.to_string()))?;
let result = entries
.into_iter()
.map(|e| {
let meta = e.metadata();
let is_dir = matches!(meta.mode(), EntryMode::DIR);
let size = if is_dir { 0 } else { meta.content_length() };
let last_modified = meta.last_modified().map(|dt| dt.timestamp_millis());
StorageEntry {
path: e.path().to_owned(),
size,
last_modified,
is_dir,
}
})
.collect();
Ok(result)
}
#[instrument(skip(self), fields(path))]
async fn stat(&self, path: &str) -> Result<StorageEntry, StorageError> {
validate_relative_path(path)?;
let meta = self.op.stat(path).await.map_err(|e| {
if e.kind() == opendal::ErrorKind::NotFound {
StorageError::NotFound(path.to_owned())
} else {
StorageError::OpenDal(e.to_string())
}
})?;
let is_dir = matches!(meta.mode(), EntryMode::DIR);
let size = if is_dir { 0 } else { meta.content_length() };
let last_modified = meta.last_modified().map(|dt| dt.timestamp_millis());
Ok(StorageEntry {
path: path.to_owned(),
size,
last_modified,
is_dir,
})
}
#[instrument(skip(self), fields(path))]
async fn exists(&self, path: &str) -> Result<bool, StorageError> {
validate_relative_path(path)?;
self.op
.exists(path)
.await
.map_err(|e| StorageError::OpenDal(e.to_string()))
}
#[instrument(skip(self), fields(path))]
async fn create_dir(&self, path: &str) -> Result<(), StorageError> {
validate_relative_path(path)?;
self.op
.create_dir(path)
.await
.map_err(|e| StorageError::OpenDal(e.to_string()))
}
}