use crate::error::Result;
use std::path::Path;
#[allow(clippy::module_name_repetitions)]
pub trait LocalFs: Send + Sync {
fn read_all(
&self,
path: &Path,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send + '_>>;
fn write_all(
&self,
path: &Path,
bytes: &[u8],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + '_>>;
fn exists(
&self,
path: &Path,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send + '_>>;
}
#[non_exhaustive]
#[derive(Clone, Debug, Default)]
pub struct TokioLocalFs;
impl TokioLocalFs {
#[allow(clippy::new_without_default)]
#[must_use]
pub const fn new() -> Self {
Self
}
}
impl LocalFs for TokioLocalFs {
fn read_all(
&self,
path: &Path,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send + '_>> {
let path = path.to_path_buf();
Box::pin(async move {
tokio::time::timeout(std::time::Duration::from_secs(60), async {
tokio::fs::read(&path).await.map_err(|e| {
crate::error::Error::io("failed to read local file", e)
.with("local_path", path.display().to_string())
})
})
.await
.map_err(|_e| crate::error::Error::timeout("local file operation exceeded 60s"))?
})
}
fn write_all(
&self,
path: &Path,
bytes: &[u8],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + '_>> {
let path = path.to_path_buf();
let bytes = bytes.to_vec();
Box::pin(async move {
tokio::time::timeout(std::time::Duration::from_secs(60), async move {
let mut file = tokio::fs::File::create(&path).await.map_err(|e| {
crate::error::Error::io("failed to create local file", e)
.with("local_path", path.display().to_string())
})?;
use tokio::io::AsyncWriteExt;
file.write_all(&bytes).await.map_err(|e| {
crate::error::Error::io("failed to write local file", e)
.with("local_path", path.display().to_string())
})?;
file.flush().await.map_err(|e| {
crate::error::Error::io("failed to flush local file", e)
.with("local_path", path.display().to_string())
})?;
Ok(())
})
.await
.map_err(|_e| crate::error::Error::timeout("local file operation exceeded 60s"))?
})
}
fn exists(
&self,
path: &Path,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send + '_>> {
let path = path.to_path_buf();
Box::pin(async move {
match tokio::time::timeout(
std::time::Duration::from_secs(60),
tokio::task::spawn_blocking(move || path.exists()),
)
.await
{
Ok(Ok(exists)) => exists,
Ok(Err(_)) => false,
Err(_) => false,
}
})
}
}
#[non_exhaustive]
#[derive(Clone, Debug, Default)]
pub struct MemoryLocalFs {
store:
std::sync::Arc<std::sync::RwLock<std::collections::HashMap<std::path::PathBuf, Vec<u8>>>>,
}
impl MemoryLocalFs {
#[allow(clippy::new_without_default)]
#[must_use]
pub fn new() -> Self {
Self {
store: std::sync::Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
}
}
}
impl LocalFs for MemoryLocalFs {
fn read_all(
&self,
path: &Path,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send + '_>> {
let path = path.to_path_buf();
let store = std::sync::Arc::clone(&self.store);
Box::pin(async move {
let guard = store.read().map_err(|e| {
crate::error::Error::io("lock poisoned", std::io::Error::other(e.to_string()))
})?;
guard.get(&path).cloned().ok_or_else(|| {
crate::error::Error::not_found(format!("file not found: {}", path.display()))
})
})
}
fn write_all(
&self,
path: &Path,
bytes: &[u8],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + '_>> {
let path = path.to_path_buf();
let bytes = bytes.to_vec();
let store = std::sync::Arc::clone(&self.store);
Box::pin(async move {
let mut guard = store.write().map_err(|e| {
crate::error::Error::io("lock poisoned", std::io::Error::other(e.to_string()))
})?;
guard.insert(path, bytes);
Ok(())
})
}
fn exists(
&self,
path: &Path,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send + '_>> {
let path = path.to_path_buf();
let store = std::sync::Arc::clone(&self.store);
Box::pin(async move {
let guard = store.read().unwrap();
guard.contains_key(&path)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
#[tokio::test]
async fn memory_local_fs_write_then_read() {
let fs = MemoryLocalFs::new();
let path = Path::new("/mem/test.txt");
fs.write_all(path, b"content").await.unwrap();
let bytes = fs.read_all(path).await.unwrap();
assert_eq!(bytes, b"content");
}
#[tokio::test]
async fn memory_local_fs_exists() {
let fs = MemoryLocalFs::new();
let path = Path::new("/mem/foo");
assert!(!fs.exists(path).await);
fs.write_all(path, b"x").await.unwrap();
assert!(fs.exists(path).await);
}
#[tokio::test]
async fn memory_local_fs_read_missing_returns_error() {
let fs = MemoryLocalFs::new();
let res = fs.read_all(Path::new("/mem/nonexistent")).await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind, crate::error::ErrorKind::NotFound);
}
#[tokio::test]
async fn memory_local_fs_overwrite() {
let fs = MemoryLocalFs::new();
let path = Path::new("/mem/overwrite.txt");
fs.write_all(path, b"first").await.unwrap();
fs.write_all(path, b"second").await.unwrap();
let bytes = fs.read_all(path).await.unwrap();
assert_eq!(bytes, b"second");
}
}