use crate::model::filesystem::{DirEntry, FileMetadata, FileSystem};
use std::collections::HashMap;
use std::fmt;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
type PendingDirRequests =
Arc<Mutex<HashMap<PathBuf, Vec<oneshot::Sender<io::Result<Vec<DirEntry>>>>>>>;
pub struct FsManager {
fs: Arc<dyn FileSystem + Send + Sync>,
pending_dir_requests: PendingDirRequests,
}
impl fmt::Debug for FsManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FsManager")
.field("fs", &"<dyn FileSystem>")
.field("pending_dir_requests", &"<mutex>")
.finish()
}
}
impl FsManager {
pub fn new(fs: Arc<dyn FileSystem + Send + Sync>) -> Self {
Self {
fs,
pending_dir_requests: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn list_dir(&self, path: PathBuf) -> io::Result<Vec<DirEntry>> {
let (rx, should_execute) = {
let mut pending = self.pending_dir_requests.lock().await;
if let Some(senders) = pending.get_mut(&path) {
let (tx, rx) = oneshot::channel();
senders.push(tx);
(rx, false)
} else {
let (tx, rx) = oneshot::channel();
pending.insert(path.clone(), vec![tx]);
(rx, true)
}
};
if should_execute {
let fs = Arc::clone(&self.fs);
let path_clone = path.clone();
let result = tokio::task::spawn_blocking(move || fs.read_dir(&path_clone))
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let mut pending = self.pending_dir_requests.lock().await;
if let Some(senders) = pending.remove(&path) {
for sender in senders {
let _ = sender.send(
result
.as_ref()
.map(|v| v.clone())
.map_err(|e| io::Error::new(e.kind(), e.to_string())),
);
}
}
result
} else {
rx.await
.unwrap_or_else(|_| Err(io::Error::other("Request cancelled")))
}
}
pub async fn get_metadata(&self, paths: Vec<PathBuf>) -> Vec<io::Result<FileMetadata>> {
let tasks: Vec<_> = paths
.into_iter()
.map(|path| {
let fs = Arc::clone(&self.fs);
tokio::task::spawn_blocking(move || fs.metadata(&path))
})
.collect();
let mut results = Vec::with_capacity(tasks.len());
for task in tasks {
match task.await {
Ok(result) => results.push(result),
Err(e) => results.push(Err(io::Error::new(io::ErrorKind::Other, e.to_string()))),
}
}
results
}
pub async fn get_single_metadata(&self, path: &Path) -> io::Result<FileMetadata> {
let fs = Arc::clone(&self.fs);
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || fs.metadata(&path))
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
}
pub async fn exists(&self, path: &Path) -> bool {
let fs = Arc::clone(&self.fs);
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || fs.exists(&path))
.await
.unwrap_or(false)
}
pub async fn is_dir(&self, path: &Path) -> io::Result<bool> {
let fs = Arc::clone(&self.fs);
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || fs.is_dir(&path))
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
}
pub async fn get_entry(&self, path: &Path) -> io::Result<DirEntry> {
let fs = Arc::clone(&self.fs);
let path_buf = path.to_path_buf();
tokio::task::spawn_blocking(move || {
let name = path_buf
.file_name()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Invalid path"))?
.to_string_lossy()
.into_owned();
let symlink_meta = fs.symlink_metadata(&path_buf)?;
let is_symlink = {
#[cfg(unix)]
{
if let Some(ref perms) = symlink_meta.permissions {
(perms.mode() & 0o170000) == 0o120000
} else {
false
}
}
#[cfg(not(unix))]
{
false
}
};
if is_symlink {
let target_is_dir = fs.is_dir(&path_buf).unwrap_or(false);
Ok(
DirEntry::new_symlink(path_buf, name, target_is_dir)
.with_metadata(symlink_meta),
)
} else {
let entry_type = if fs.is_dir(&path_buf).unwrap_or(false) {
crate::model::filesystem::EntryType::Directory
} else {
crate::model::filesystem::EntryType::File
};
Ok(DirEntry::new(path_buf, name, entry_type).with_metadata(symlink_meta))
}
})
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
}
pub async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
let fs = Arc::clone(&self.fs);
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || fs.canonicalize(&path))
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
}
pub async fn list_dir_with_metadata(&self, path: PathBuf) -> io::Result<Vec<DirEntry>> {
let mut entries = self.list_dir(path).await?;
let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
let metadata_results = self.get_metadata(paths).await;
for (entry, metadata_result) in entries.iter_mut().zip(metadata_results.into_iter()) {
if let Ok(metadata) = metadata_result {
entry.metadata = Some(metadata);
}
}
Ok(entries)
}
pub fn filesystem(&self) -> &Arc<dyn FileSystem + Send + Sync> {
&self.fs
}
}
impl Clone for FsManager {
fn clone(&self) -> Self {
Self {
fs: Arc::clone(&self.fs),
pending_dir_requests: Arc::clone(&self.pending_dir_requests),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::filesystem::{EntryType, StdFileSystem};
use std::fs as std_fs;
use tempfile::TempDir;
#[tokio::test]
async fn test_list_dir() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
std_fs::write(temp_path.join("file1.txt"), "content1").unwrap();
std_fs::write(temp_path.join("file2.txt"), "content2").unwrap();
std_fs::create_dir(temp_path.join("subdir")).unwrap();
let fs = Arc::new(StdFileSystem);
let manager = FsManager::new(fs);
let entries = manager.list_dir(temp_path.to_path_buf()).await.unwrap();
assert_eq!(entries.len(), 3);
let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
assert!(names.contains(&"file1.txt"));
assert!(names.contains(&"file2.txt"));
assert!(names.contains(&"subdir"));
}
#[tokio::test]
async fn test_request_deduplication() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
for i in 0..10 {
std_fs::write(
temp_path.join(format!("file{}.txt", i)),
format!("content{}", i),
)
.unwrap();
}
let fs = Arc::new(StdFileSystem);
let manager = FsManager::new(fs);
let mut handles = vec![];
for _ in 0..10 {
let manager = manager.clone();
let path = temp_path.to_path_buf();
handles.push(tokio::spawn(async move { manager.list_dir(path).await }));
}
let mut results = vec![];
for handle in handles {
let result = handle.await.unwrap().unwrap();
results.push(result);
}
assert_eq!(results.len(), 10);
let first_len = results[0].len();
assert!(results.iter().all(|r| r.len() == first_len));
}
#[tokio::test]
async fn test_get_metadata() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
std_fs::write(temp_path.join("file1.txt"), "content1").unwrap();
std_fs::write(temp_path.join("file2.txt"), "content2").unwrap();
let fs = Arc::new(StdFileSystem);
let manager = FsManager::new(fs);
let paths = vec![temp_path.join("file1.txt"), temp_path.join("file2.txt")];
let results = manager.get_metadata(paths).await;
assert_eq!(results.len(), 2);
assert!(results[0].is_ok());
assert!(results[1].is_ok());
}
#[tokio::test]
async fn test_get_single_metadata() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
let file_path = temp_path.join("test.txt");
std_fs::write(&file_path, "content").unwrap();
let fs = Arc::new(StdFileSystem);
let manager = FsManager::new(fs);
let metadata = manager.get_single_metadata(&file_path).await.unwrap();
assert_eq!(metadata.size, 7);
}
#[tokio::test]
async fn test_exists() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
let file_path = temp_path.join("test.txt");
let fs = Arc::new(StdFileSystem);
let manager = FsManager::new(fs);
assert!(!manager.exists(&file_path).await);
std_fs::write(&file_path, "content").unwrap();
assert!(manager.exists(&file_path).await);
}
#[tokio::test]
async fn test_is_dir() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
let file_path = temp_path.join("test.txt");
let dir_path = temp_path.join("subdir");
std_fs::write(&file_path, "content").unwrap();
std_fs::create_dir(&dir_path).unwrap();
let fs = Arc::new(StdFileSystem);
let manager = FsManager::new(fs);
assert!(!manager.is_dir(&file_path).await.unwrap());
assert!(manager.is_dir(&dir_path).await.unwrap());
}
#[tokio::test]
async fn test_get_entry() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
let file_path = temp_path.join("test.txt");
std_fs::write(&file_path, "test content").unwrap();
let fs = Arc::new(StdFileSystem);
let manager = FsManager::new(fs);
let entry = manager.get_entry(&file_path).await.unwrap();
assert_eq!(entry.name, "test.txt");
assert_eq!(entry.entry_type, EntryType::File);
assert!(entry.metadata.is_some());
assert_eq!(entry.metadata.unwrap().size, 12);
}
#[tokio::test]
async fn test_list_dir_with_metadata() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
std_fs::write(temp_path.join("file1.txt"), "content1").unwrap();
std_fs::write(temp_path.join("file2.txt"), "content2").unwrap();
std_fs::create_dir(temp_path.join("subdir")).unwrap();
let fs = Arc::new(StdFileSystem);
let manager = FsManager::new(fs);
let entries = manager
.list_dir_with_metadata(temp_path.to_path_buf())
.await
.unwrap();
assert_eq!(entries.len(), 3);
assert!(entries.iter().all(|e| e.metadata.is_some()));
let file1 = entries.iter().find(|e| e.name == "file1.txt").unwrap();
assert_eq!(file1.metadata.as_ref().unwrap().size, 8);
}
#[tokio::test]
async fn test_concurrent_different_dirs() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path();
for i in 0..5 {
let dir_path = temp_path.join(format!("dir{}", i));
std_fs::create_dir(&dir_path).unwrap();
for j in 0..3 {
std_fs::write(
dir_path.join(format!("file{}.txt", j)),
format!("content{}", j),
)
.unwrap();
}
}
let fs = Arc::new(StdFileSystem);
let manager = FsManager::new(fs);
let mut handles = vec![];
for i in 0..5 {
let manager = manager.clone();
let path = temp_path.join(format!("dir{}", i));
handles.push(tokio::spawn(async move { manager.list_dir(path).await }));
}
for handle in handles {
let result = handle.await.unwrap().unwrap();
assert_eq!(result.len(), 3);
}
}
}