fresh/services/fs/
manager.rs1use crate::model::filesystem::{DirEntry, FileMetadata, FileSystem};
2use std::collections::HashMap;
3use std::fmt;
4use std::io;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use tokio::sync::{oneshot, Mutex};
8
9type PendingDirRequests =
11 Arc<Mutex<HashMap<PathBuf, Vec<oneshot::Sender<io::Result<Vec<DirEntry>>>>>>>;
12
13pub struct FsManager {
24 fs: Arc<dyn FileSystem + Send + Sync>,
25 pending_dir_requests: PendingDirRequests,
28}
29
30impl fmt::Debug for FsManager {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 f.debug_struct("FsManager")
33 .field("fs", &"<dyn FileSystem>")
34 .field("pending_dir_requests", &"<mutex>")
35 .finish()
36 }
37}
38
39impl FsManager {
40 pub fn new(fs: Arc<dyn FileSystem + Send + Sync>) -> Self {
42 Self {
43 fs,
44 pending_dir_requests: Arc::new(Mutex::new(HashMap::new())),
45 }
46 }
47
48 pub async fn list_dir(&self, path: PathBuf) -> io::Result<Vec<DirEntry>> {
54 let (rx, should_execute) = {
56 let mut pending = self.pending_dir_requests.lock().await;
57
58 if let Some(senders) = pending.get_mut(&path) {
59 let (tx, rx) = oneshot::channel();
61 senders.push(tx);
62 (rx, false)
63 } else {
64 let (tx, rx) = oneshot::channel();
66 pending.insert(path.clone(), vec![tx]);
67 (rx, true)
68 }
69 };
70
71 if should_execute {
72 let fs = Arc::clone(&self.fs);
74 let path_clone = path.clone();
75 let result = tokio::task::spawn_blocking(move || fs.read_dir(&path_clone))
76 .await
77 .map_err(|e| io::Error::other(e.to_string()))?;
78
79 let mut pending = self.pending_dir_requests.lock().await;
81 if let Some(senders) = pending.remove(&path) {
82 for sender in senders {
83 #[allow(clippy::let_underscore_must_use)]
86 let _ = sender.send(
87 result
88 .as_ref()
89 .map(|v| v.clone())
90 .map_err(|e| io::Error::new(e.kind(), e.to_string())),
91 );
92 }
93 }
94
95 result
96 } else {
97 rx.await
99 .unwrap_or_else(|_| Err(io::Error::other("Request cancelled")))
100 }
101 }
102
103 pub async fn get_metadata(&self, paths: Vec<PathBuf>) -> Vec<io::Result<FileMetadata>> {
109 let tasks: Vec<_> = paths
111 .into_iter()
112 .map(|path| {
113 let fs = Arc::clone(&self.fs);
114 tokio::task::spawn_blocking(move || fs.metadata(&path))
115 })
116 .collect();
117
118 let mut results = Vec::with_capacity(tasks.len());
120 for task in tasks {
121 match task.await {
122 Ok(result) => results.push(result),
123 Err(e) => results.push(Err(io::Error::other(e.to_string()))),
124 }
125 }
126
127 results
128 }
129
130 pub async fn get_single_metadata(&self, path: &Path) -> io::Result<FileMetadata> {
132 let fs = Arc::clone(&self.fs);
133 let path = path.to_path_buf();
134 tokio::task::spawn_blocking(move || fs.metadata(&path))
135 .await
136 .map_err(|e| io::Error::other(e.to_string()))?
137 }
138
139 pub async fn exists(&self, path: &Path) -> bool {
141 let fs = Arc::clone(&self.fs);
142 let path = path.to_path_buf();
143 tokio::task::spawn_blocking(move || fs.exists(&path))
144 .await
145 .unwrap_or(false)
146 }
147
148 pub async fn is_dir(&self, path: &Path) -> io::Result<bool> {
150 let fs = Arc::clone(&self.fs);
151 let path = path.to_path_buf();
152 tokio::task::spawn_blocking(move || fs.is_dir(&path))
153 .await
154 .map_err(|e| io::Error::other(e.to_string()))?
155 }
156
157 pub async fn get_entry(&self, path: &Path) -> io::Result<DirEntry> {
159 let fs = Arc::clone(&self.fs);
160 let path_buf = path.to_path_buf();
161 tokio::task::spawn_blocking(move || {
162 let name = path_buf
164 .file_name()
165 .map(|n| n.to_string_lossy().into_owned())
166 .unwrap_or_else(|| path_buf.to_string_lossy().into_owned());
167
168 let symlink_meta = fs.symlink_metadata(&path_buf)?;
170
171 let is_symlink = {
173 #[cfg(unix)]
174 {
175 if let Some(ref perms) = symlink_meta.permissions {
177 (perms.mode() & 0o170000) == 0o120000
179 } else {
180 false
181 }
182 }
183 #[cfg(not(unix))]
184 {
185 false
186 }
187 };
188
189 if is_symlink {
190 let target_is_dir = fs.is_dir(&path_buf).unwrap_or(false);
192 Ok(
193 DirEntry::new_symlink(path_buf, name, target_is_dir)
194 .with_metadata(symlink_meta),
195 )
196 } else {
197 let entry_type = if fs.is_dir(&path_buf).unwrap_or(false) {
199 crate::model::filesystem::EntryType::Directory
200 } else {
201 crate::model::filesystem::EntryType::File
202 };
203 Ok(DirEntry::new(path_buf, name, entry_type).with_metadata(symlink_meta))
204 }
205 })
206 .await
207 .map_err(|e| io::Error::other(e.to_string()))?
208 }
209
210 pub async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
212 let fs = Arc::clone(&self.fs);
213 let path = path.to_path_buf();
214 tokio::task::spawn_blocking(move || fs.canonicalize(&path))
215 .await
216 .map_err(|e| io::Error::other(e.to_string()))?
217 }
218
219 pub async fn list_dir_with_metadata(&self, path: PathBuf) -> io::Result<Vec<DirEntry>> {
224 let mut entries = self.list_dir(path).await?;
225
226 let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
228
229 let metadata_results = self.get_metadata(paths).await;
231
232 for (entry, metadata_result) in entries.iter_mut().zip(metadata_results.into_iter()) {
234 if let Ok(metadata) = metadata_result {
235 entry.metadata = Some(metadata);
236 }
237 }
238
239 Ok(entries)
240 }
241
242 pub fn filesystem(&self) -> &Arc<dyn FileSystem + Send + Sync> {
244 &self.fs
245 }
246}
247
248impl Clone for FsManager {
249 fn clone(&self) -> Self {
250 Self {
251 fs: Arc::clone(&self.fs),
252 pending_dir_requests: Arc::clone(&self.pending_dir_requests),
253 }
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use crate::model::filesystem::{EntryType, StdFileSystem};
261 use std::fs as std_fs;
262 use tempfile::TempDir;
263
264 #[tokio::test]
265 async fn test_list_dir() {
266 let temp_dir = TempDir::new().unwrap();
267 let temp_path = temp_dir.path();
268
269 std_fs::write(temp_path.join("file1.txt"), "content1").unwrap();
271 std_fs::write(temp_path.join("file2.txt"), "content2").unwrap();
272 std_fs::create_dir(temp_path.join("subdir")).unwrap();
273
274 let fs = Arc::new(StdFileSystem);
275 let manager = FsManager::new(fs);
276
277 let entries = manager.list_dir(temp_path.to_path_buf()).await.unwrap();
278
279 assert_eq!(entries.len(), 3);
280
281 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
282 assert!(names.contains(&"file1.txt"));
283 assert!(names.contains(&"file2.txt"));
284 assert!(names.contains(&"subdir"));
285 }
286
287 #[tokio::test]
288 async fn test_request_deduplication() {
289 let temp_dir = TempDir::new().unwrap();
290 let temp_path = temp_dir.path();
291
292 for i in 0..10 {
294 std_fs::write(
295 temp_path.join(format!("file{}.txt", i)),
296 format!("content{}", i),
297 )
298 .unwrap();
299 }
300
301 let fs = Arc::new(StdFileSystem);
302 let manager = FsManager::new(fs);
303
304 let mut handles = vec![];
306 for _ in 0..10 {
307 let manager = manager.clone();
308 let path = temp_path.to_path_buf();
309 handles.push(tokio::spawn(async move { manager.list_dir(path).await }));
310 }
311
312 let mut results = vec![];
314 for handle in handles {
315 let result = handle.await.unwrap().unwrap();
316 results.push(result);
317 }
318
319 assert_eq!(results.len(), 10);
320
321 let first_len = results[0].len();
323 assert!(results.iter().all(|r| r.len() == first_len));
324 }
325
326 #[tokio::test]
327 async fn test_get_metadata() {
328 let temp_dir = TempDir::new().unwrap();
329 let temp_path = temp_dir.path();
330
331 std_fs::write(temp_path.join("file1.txt"), "content1").unwrap();
332 std_fs::write(temp_path.join("file2.txt"), "content2").unwrap();
333
334 let fs = Arc::new(StdFileSystem);
335 let manager = FsManager::new(fs);
336
337 let paths = vec![temp_path.join("file1.txt"), temp_path.join("file2.txt")];
338
339 let results = manager.get_metadata(paths).await;
340
341 assert_eq!(results.len(), 2);
342 assert!(results[0].is_ok());
343 assert!(results[1].is_ok());
344 }
345
346 #[tokio::test]
347 async fn test_get_single_metadata() {
348 let temp_dir = TempDir::new().unwrap();
349 let temp_path = temp_dir.path();
350 let file_path = temp_path.join("test.txt");
351
352 std_fs::write(&file_path, "content").unwrap();
353
354 let fs = Arc::new(StdFileSystem);
355 let manager = FsManager::new(fs);
356
357 let metadata = manager.get_single_metadata(&file_path).await.unwrap();
358 assert_eq!(metadata.size, 7);
359 }
360
361 #[tokio::test]
362 async fn test_exists() {
363 let temp_dir = TempDir::new().unwrap();
364 let temp_path = temp_dir.path();
365 let file_path = temp_path.join("test.txt");
366
367 let fs = Arc::new(StdFileSystem);
368 let manager = FsManager::new(fs);
369
370 assert!(!manager.exists(&file_path).await);
371
372 std_fs::write(&file_path, "content").unwrap();
373
374 assert!(manager.exists(&file_path).await);
375 }
376
377 #[tokio::test]
378 async fn test_is_dir() {
379 let temp_dir = TempDir::new().unwrap();
380 let temp_path = temp_dir.path();
381 let file_path = temp_path.join("test.txt");
382 let dir_path = temp_path.join("subdir");
383
384 std_fs::write(&file_path, "content").unwrap();
385 std_fs::create_dir(&dir_path).unwrap();
386
387 let fs = Arc::new(StdFileSystem);
388 let manager = FsManager::new(fs);
389
390 assert!(!manager.is_dir(&file_path).await.unwrap());
391 assert!(manager.is_dir(&dir_path).await.unwrap());
392 }
393
394 #[tokio::test]
395 async fn test_get_entry() {
396 let temp_dir = TempDir::new().unwrap();
397 let temp_path = temp_dir.path();
398 let file_path = temp_path.join("test.txt");
399
400 std_fs::write(&file_path, "test content").unwrap();
401
402 let fs = Arc::new(StdFileSystem);
403 let manager = FsManager::new(fs);
404
405 let entry = manager.get_entry(&file_path).await.unwrap();
406
407 assert_eq!(entry.name, "test.txt");
408 assert_eq!(entry.entry_type, EntryType::File);
409 assert!(entry.metadata.is_some());
410 assert_eq!(entry.metadata.unwrap().size, 12);
411 }
412
413 #[tokio::test]
415 async fn test_get_entry_root_path() {
416 let fs = Arc::new(StdFileSystem);
417 let manager = FsManager::new(fs);
418
419 let root = PathBuf::from("/");
421 let entry = manager.get_entry(&root).await.unwrap();
422
423 assert_eq!(entry.name, "/");
424 assert_eq!(entry.entry_type, EntryType::Directory);
425 }
426
427 #[tokio::test]
428 async fn test_list_dir_with_metadata() {
429 let temp_dir = TempDir::new().unwrap();
430 let temp_path = temp_dir.path();
431
432 std_fs::write(temp_path.join("file1.txt"), "content1").unwrap();
433 std_fs::write(temp_path.join("file2.txt"), "content2").unwrap();
434 std_fs::create_dir(temp_path.join("subdir")).unwrap();
435
436 let fs = Arc::new(StdFileSystem);
437 let manager = FsManager::new(fs);
438
439 let entries = manager
440 .list_dir_with_metadata(temp_path.to_path_buf())
441 .await
442 .unwrap();
443
444 assert_eq!(entries.len(), 3);
445
446 assert!(entries.iter().all(|e| e.metadata.is_some()));
448
449 let file1 = entries.iter().find(|e| e.name == "file1.txt").unwrap();
451 assert_eq!(file1.metadata.as_ref().unwrap().size, 8);
452 }
453
454 #[tokio::test]
455 async fn test_concurrent_different_dirs() {
456 let temp_dir = TempDir::new().unwrap();
457 let temp_path = temp_dir.path();
458
459 for i in 0..5 {
461 let dir_path = temp_path.join(format!("dir{}", i));
462 std_fs::create_dir(&dir_path).unwrap();
463 for j in 0..3 {
464 std_fs::write(
465 dir_path.join(format!("file{}.txt", j)),
466 format!("content{}", j),
467 )
468 .unwrap();
469 }
470 }
471
472 let fs = Arc::new(StdFileSystem);
473 let manager = FsManager::new(fs);
474
475 let mut handles = vec![];
477 for i in 0..5 {
478 let manager = manager.clone();
479 let path = temp_path.join(format!("dir{}", i));
480 handles.push(tokio::spawn(async move { manager.list_dir(path).await }));
481 }
482
483 for handle in handles {
485 let result = handle.await.unwrap().unwrap();
486 assert_eq!(result.len(), 3);
487 }
488 }
489}