Skip to main content

fresh/services/fs/
manager.rs

1use crate::model::filesystem::{DirEntry, FileMetadata, FileSystem};
2use std::collections::HashMap;
3use std::fmt;
4use std::io;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use tokio::sync::{oneshot, Mutex};
8
9/// Type alias for pending directory requests map
10type PendingDirRequests =
11    Arc<Mutex<HashMap<PathBuf, Vec<oneshot::Sender<io::Result<Vec<DirEntry>>>>>>>;
12
13/// Manages filesystem operations with request batching and deduplication
14///
15/// The FsManager sits between the application and the filesystem backend,
16/// providing optimizations like:
17/// - Request deduplication (multiple requests for the same path)
18/// - Batching of metadata requests
19/// - Centralized error handling
20///
21/// This wraps a `FileSystem` trait object and provides async methods
22/// using `spawn_blocking` internally.
23pub struct FsManager {
24    fs: Arc<dyn FileSystem + Send + Sync>,
25    /// Pending directory listing requests
26    /// Map of path -> list of channels waiting for the result
27    pending_dir_requests: PendingDirRequests,
28}
29
30impl fmt::Debug for FsManager {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        f.debug_struct("FsManager")
33            .field("fs", &"<dyn FileSystem>")
34            .field("pending_dir_requests", &"<mutex>")
35            .finish()
36    }
37}
38
39impl FsManager {
40    /// Create a new filesystem manager with the given filesystem implementation
41    pub fn new(fs: Arc<dyn FileSystem + Send + Sync>) -> Self {
42        Self {
43            fs,
44            pending_dir_requests: Arc::new(Mutex::new(HashMap::new())),
45        }
46    }
47
48    /// List directory contents with request deduplication
49    ///
50    /// If multiple requests for the same directory are made concurrently,
51    /// only one filesystem operation will be performed and all requesters
52    /// will receive the same result.
53    pub async fn list_dir(&self, path: PathBuf) -> io::Result<Vec<DirEntry>> {
54        // Check if there's already a pending request for this path
55        let (rx, should_execute) = {
56            let mut pending = self.pending_dir_requests.lock().await;
57
58            if let Some(senders) = pending.get_mut(&path) {
59                // There's already a request in progress, just add our channel
60                let (tx, rx) = oneshot::channel();
61                senders.push(tx);
62                (rx, false)
63            } else {
64                // We're the first request for this path
65                let (tx, rx) = oneshot::channel();
66                pending.insert(path.clone(), vec![tx]);
67                (rx, true)
68            }
69        };
70
71        if should_execute {
72            // We're responsible for executing the request
73            let fs = Arc::clone(&self.fs);
74            let path_clone = path.clone();
75            let result = tokio::task::spawn_blocking(move || fs.read_dir(&path_clone))
76                .await
77                .map_err(|e| io::Error::other(e.to_string()))?;
78
79            // Notify all waiting requesters
80            let mut pending = self.pending_dir_requests.lock().await;
81            if let Some(senders) = pending.remove(&path) {
82                for sender in senders {
83                    // Clone the result for each waiter.
84                    // Receiver may have been dropped if the requester timed out or was cancelled.
85                    #[allow(clippy::let_underscore_must_use)]
86                    let _ = sender.send(
87                        result
88                            .as_ref()
89                            .map(|v| v.clone())
90                            .map_err(|e| io::Error::new(e.kind(), e.to_string())),
91                    );
92                }
93            }
94
95            result
96        } else {
97            // Wait for the other request to complete
98            rx.await
99                .unwrap_or_else(|_| Err(io::Error::other("Request cancelled")))
100        }
101    }
102
103    /// Get metadata for multiple paths efficiently
104    ///
105    /// This fetches metadata in parallel using spawn_blocking.
106    ///
107    /// Returns a result for each path in the same order as the input.
108    pub async fn get_metadata(&self, paths: Vec<PathBuf>) -> Vec<io::Result<FileMetadata>> {
109        // Spawn parallel tasks for each path
110        let tasks: Vec<_> = paths
111            .into_iter()
112            .map(|path| {
113                let fs = Arc::clone(&self.fs);
114                tokio::task::spawn_blocking(move || fs.metadata(&path))
115            })
116            .collect();
117
118        // Collect results
119        let mut results = Vec::with_capacity(tasks.len());
120        for task in tasks {
121            match task.await {
122                Ok(result) => results.push(result),
123                Err(e) => results.push(Err(io::Error::other(e.to_string()))),
124            }
125        }
126
127        results
128    }
129
130    /// Get metadata for a single path
131    pub async fn get_single_metadata(&self, path: &Path) -> io::Result<FileMetadata> {
132        let fs = Arc::clone(&self.fs);
133        let path = path.to_path_buf();
134        tokio::task::spawn_blocking(move || fs.metadata(&path))
135            .await
136            .map_err(|e| io::Error::other(e.to_string()))?
137    }
138
139    /// Check if a path exists
140    pub async fn exists(&self, path: &Path) -> bool {
141        let fs = Arc::clone(&self.fs);
142        let path = path.to_path_buf();
143        tokio::task::spawn_blocking(move || fs.exists(&path))
144            .await
145            .unwrap_or(false)
146    }
147
148    /// Check if a path is a directory
149    pub async fn is_dir(&self, path: &Path) -> io::Result<bool> {
150        let fs = Arc::clone(&self.fs);
151        let path = path.to_path_buf();
152        tokio::task::spawn_blocking(move || fs.is_dir(&path))
153            .await
154            .map_err(|e| io::Error::other(e.to_string()))?
155    }
156
157    /// Get a complete entry for a path (with metadata)
158    pub async fn get_entry(&self, path: &Path) -> io::Result<DirEntry> {
159        let fs = Arc::clone(&self.fs);
160        let path_buf = path.to_path_buf();
161        tokio::task::spawn_blocking(move || {
162            // Handle root paths (e.g., "/" on Unix) which have no file_name component
163            let name = path_buf
164                .file_name()
165                .map(|n| n.to_string_lossy().into_owned())
166                .unwrap_or_else(|| path_buf.to_string_lossy().into_owned());
167
168            // Get symlink metadata first to check if it's a symlink
169            let symlink_meta = fs.symlink_metadata(&path_buf)?;
170
171            // Determine entry type
172            let is_symlink = {
173                #[cfg(unix)]
174                {
175                    // Check file type from permissions mode
176                    if let Some(ref perms) = symlink_meta.permissions {
177                        // S_IFLNK = 0o120000
178                        (perms.mode() & 0o170000) == 0o120000
179                    } else {
180                        false
181                    }
182                }
183                #[cfg(not(unix))]
184                {
185                    false
186                }
187            };
188
189            if is_symlink {
190                // For symlinks, check what they point to
191                let target_is_dir = fs.is_dir(&path_buf).unwrap_or(false);
192                Ok(
193                    DirEntry::new_symlink(path_buf, name, target_is_dir)
194                        .with_metadata(symlink_meta),
195                )
196            } else {
197                // Regular file or directory
198                let entry_type = if fs.is_dir(&path_buf).unwrap_or(false) {
199                    crate::model::filesystem::EntryType::Directory
200                } else {
201                    crate::model::filesystem::EntryType::File
202                };
203                Ok(DirEntry::new(path_buf, name, entry_type).with_metadata(symlink_meta))
204            }
205        })
206        .await
207        .map_err(|e| io::Error::other(e.to_string()))?
208    }
209
210    /// Get canonical path
211    pub async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
212        let fs = Arc::clone(&self.fs);
213        let path = path.to_path_buf();
214        tokio::task::spawn_blocking(move || fs.canonicalize(&path))
215            .await
216            .map_err(|e| io::Error::other(e.to_string()))?
217    }
218
219    /// List directory and fetch metadata for all entries in parallel
220    ///
221    /// This is a convenience method that combines `list_dir` with
222    /// `get_metadata` to get complete information about all entries.
223    pub async fn list_dir_with_metadata(&self, path: PathBuf) -> io::Result<Vec<DirEntry>> {
224        let mut entries = self.list_dir(path).await?;
225
226        // Collect paths for metadata batch fetch
227        let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
228
229        // Fetch metadata in parallel
230        let metadata_results = self.get_metadata(paths).await;
231
232        // Attach metadata to entries
233        for (entry, metadata_result) in entries.iter_mut().zip(metadata_results.into_iter()) {
234            if let Ok(metadata) = metadata_result {
235                entry.metadata = Some(metadata);
236            }
237        }
238
239        Ok(entries)
240    }
241
242    /// Get the underlying filesystem implementation
243    pub fn filesystem(&self) -> &Arc<dyn FileSystem + Send + Sync> {
244        &self.fs
245    }
246}
247
248impl Clone for FsManager {
249    fn clone(&self) -> Self {
250        Self {
251            fs: Arc::clone(&self.fs),
252            pending_dir_requests: Arc::clone(&self.pending_dir_requests),
253        }
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use crate::model::filesystem::{EntryType, StdFileSystem};
261    use std::fs as std_fs;
262    use tempfile::TempDir;
263
264    #[tokio::test]
265    async fn test_list_dir() {
266        let temp_dir = TempDir::new().unwrap();
267        let temp_path = temp_dir.path();
268
269        // Create test structure
270        std_fs::write(temp_path.join("file1.txt"), "content1").unwrap();
271        std_fs::write(temp_path.join("file2.txt"), "content2").unwrap();
272        std_fs::create_dir(temp_path.join("subdir")).unwrap();
273
274        let fs = Arc::new(StdFileSystem);
275        let manager = FsManager::new(fs);
276
277        let entries = manager.list_dir(temp_path.to_path_buf()).await.unwrap();
278
279        assert_eq!(entries.len(), 3);
280
281        let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
282        assert!(names.contains(&"file1.txt"));
283        assert!(names.contains(&"file2.txt"));
284        assert!(names.contains(&"subdir"));
285    }
286
287    #[tokio::test]
288    async fn test_request_deduplication() {
289        let temp_dir = TempDir::new().unwrap();
290        let temp_path = temp_dir.path();
291
292        // Create test files
293        for i in 0..10 {
294            std_fs::write(
295                temp_path.join(format!("file{}.txt", i)),
296                format!("content{}", i),
297            )
298            .unwrap();
299        }
300
301        let fs = Arc::new(StdFileSystem);
302        let manager = FsManager::new(fs);
303
304        // Spawn multiple concurrent requests for the same directory
305        let mut handles = vec![];
306        for _ in 0..10 {
307            let manager = manager.clone();
308            let path = temp_path.to_path_buf();
309            handles.push(tokio::spawn(async move { manager.list_dir(path).await }));
310        }
311
312        // All requests should succeed and return the same data
313        let mut results = vec![];
314        for handle in handles {
315            let result = handle.await.unwrap().unwrap();
316            results.push(result);
317        }
318
319        assert_eq!(results.len(), 10);
320
321        // All results should have the same entries
322        let first_len = results[0].len();
323        assert!(results.iter().all(|r| r.len() == first_len));
324    }
325
326    #[tokio::test]
327    async fn test_get_metadata() {
328        let temp_dir = TempDir::new().unwrap();
329        let temp_path = temp_dir.path();
330
331        std_fs::write(temp_path.join("file1.txt"), "content1").unwrap();
332        std_fs::write(temp_path.join("file2.txt"), "content2").unwrap();
333
334        let fs = Arc::new(StdFileSystem);
335        let manager = FsManager::new(fs);
336
337        let paths = vec![temp_path.join("file1.txt"), temp_path.join("file2.txt")];
338
339        let results = manager.get_metadata(paths).await;
340
341        assert_eq!(results.len(), 2);
342        assert!(results[0].is_ok());
343        assert!(results[1].is_ok());
344    }
345
346    #[tokio::test]
347    async fn test_get_single_metadata() {
348        let temp_dir = TempDir::new().unwrap();
349        let temp_path = temp_dir.path();
350        let file_path = temp_path.join("test.txt");
351
352        std_fs::write(&file_path, "content").unwrap();
353
354        let fs = Arc::new(StdFileSystem);
355        let manager = FsManager::new(fs);
356
357        let metadata = manager.get_single_metadata(&file_path).await.unwrap();
358        assert_eq!(metadata.size, 7);
359    }
360
361    #[tokio::test]
362    async fn test_exists() {
363        let temp_dir = TempDir::new().unwrap();
364        let temp_path = temp_dir.path();
365        let file_path = temp_path.join("test.txt");
366
367        let fs = Arc::new(StdFileSystem);
368        let manager = FsManager::new(fs);
369
370        assert!(!manager.exists(&file_path).await);
371
372        std_fs::write(&file_path, "content").unwrap();
373
374        assert!(manager.exists(&file_path).await);
375    }
376
377    #[tokio::test]
378    async fn test_is_dir() {
379        let temp_dir = TempDir::new().unwrap();
380        let temp_path = temp_dir.path();
381        let file_path = temp_path.join("test.txt");
382        let dir_path = temp_path.join("subdir");
383
384        std_fs::write(&file_path, "content").unwrap();
385        std_fs::create_dir(&dir_path).unwrap();
386
387        let fs = Arc::new(StdFileSystem);
388        let manager = FsManager::new(fs);
389
390        assert!(!manager.is_dir(&file_path).await.unwrap());
391        assert!(manager.is_dir(&dir_path).await.unwrap());
392    }
393
394    #[tokio::test]
395    async fn test_get_entry() {
396        let temp_dir = TempDir::new().unwrap();
397        let temp_path = temp_dir.path();
398        let file_path = temp_path.join("test.txt");
399
400        std_fs::write(&file_path, "test content").unwrap();
401
402        let fs = Arc::new(StdFileSystem);
403        let manager = FsManager::new(fs);
404
405        let entry = manager.get_entry(&file_path).await.unwrap();
406
407        assert_eq!(entry.name, "test.txt");
408        assert_eq!(entry.entry_type, EntryType::File);
409        assert!(entry.metadata.is_some());
410        assert_eq!(entry.metadata.unwrap().size, 12);
411    }
412
413    /// Test that get_entry works for root path "/" (issue #902)
414    #[tokio::test]
415    async fn test_get_entry_root_path() {
416        let fs = Arc::new(StdFileSystem);
417        let manager = FsManager::new(fs);
418
419        // Root path "/" has no file_name() component, but should still work
420        let root = PathBuf::from("/");
421        let entry = manager.get_entry(&root).await.unwrap();
422
423        assert_eq!(entry.name, "/");
424        assert_eq!(entry.entry_type, EntryType::Directory);
425    }
426
427    #[tokio::test]
428    async fn test_list_dir_with_metadata() {
429        let temp_dir = TempDir::new().unwrap();
430        let temp_path = temp_dir.path();
431
432        std_fs::write(temp_path.join("file1.txt"), "content1").unwrap();
433        std_fs::write(temp_path.join("file2.txt"), "content2").unwrap();
434        std_fs::create_dir(temp_path.join("subdir")).unwrap();
435
436        let fs = Arc::new(StdFileSystem);
437        let manager = FsManager::new(fs);
438
439        let entries = manager
440            .list_dir_with_metadata(temp_path.to_path_buf())
441            .await
442            .unwrap();
443
444        assert_eq!(entries.len(), 3);
445
446        // All entries should have metadata
447        assert!(entries.iter().all(|e| e.metadata.is_some()));
448
449        // Check file sizes
450        let file1 = entries.iter().find(|e| e.name == "file1.txt").unwrap();
451        assert_eq!(file1.metadata.as_ref().unwrap().size, 8);
452    }
453
454    #[tokio::test]
455    async fn test_concurrent_different_dirs() {
456        let temp_dir = TempDir::new().unwrap();
457        let temp_path = temp_dir.path();
458
459        // Create multiple directories
460        for i in 0..5 {
461            let dir_path = temp_path.join(format!("dir{}", i));
462            std_fs::create_dir(&dir_path).unwrap();
463            for j in 0..3 {
464                std_fs::write(
465                    dir_path.join(format!("file{}.txt", j)),
466                    format!("content{}", j),
467                )
468                .unwrap();
469            }
470        }
471
472        let fs = Arc::new(StdFileSystem);
473        let manager = FsManager::new(fs);
474
475        // List all directories concurrently
476        let mut handles = vec![];
477        for i in 0..5 {
478            let manager = manager.clone();
479            let path = temp_path.join(format!("dir{}", i));
480            handles.push(tokio::spawn(async move { manager.list_dir(path).await }));
481        }
482
483        // All should succeed
484        for handle in handles {
485            let result = handle.await.unwrap().unwrap();
486            assert_eq!(result.len(), 3);
487        }
488    }
489}