dux_core/scanner/
walker.rs

1use std::collections::HashMap;
2use std::fs::Metadata;
3use std::path::PathBuf;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6
7#[cfg(unix)]
8use std::os::unix::fs::MetadataExt;
9
10use crossbeam_channel::{Receiver, Sender};
11use jwalk::WalkDir;
12
13use super::progress::{ScanMessage, ScanProgress};
14use crate::tree::{DiskTree, NodeId, NodeKind};
15
16/// Scanner configuration
17#[derive(Debug, Clone)]
18pub struct ScanConfig {
19    /// Follow symbolic links
20    pub follow_symlinks: bool,
21    /// Maximum depth to scan (None = unlimited)
22    pub max_depth: Option<usize>,
23    /// Stay on same filesystem (don't cross mount points)
24    pub same_filesystem: bool,
25    /// Number of parallel threads (0 = auto)
26    pub num_threads: usize,
27}
28
29impl Default for ScanConfig {
30    fn default() -> Self {
31        Self {
32            follow_symlinks: false,
33            max_depth: None,
34            same_filesystem: true,
35            num_threads: 0, // auto
36        }
37    }
38}
39
40/// Cancellation token for stopping scans
41#[derive(Debug, Clone)]
42pub struct CancellationToken {
43    cancelled: Arc<AtomicBool>,
44}
45
46impl CancellationToken {
47    pub fn new() -> Self {
48        Self {
49            cancelled: Arc::new(AtomicBool::new(false)),
50        }
51    }
52
53    pub fn cancel(&self) {
54        self.cancelled.store(true, Ordering::SeqCst);
55    }
56
57    pub fn is_cancelled(&self) -> bool {
58        self.cancelled.load(Ordering::SeqCst)
59    }
60}
61
62impl Default for CancellationToken {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68/// Shared progress state for heartbeat updates
69struct SharedProgress {
70    files_scanned: AtomicU64,
71    dirs_scanned: AtomicU64,
72    bytes_scanned: AtomicU64,
73    errors: AtomicU64,
74    current_path: Mutex<Option<PathBuf>>,
75    done: AtomicBool,
76}
77
78impl SharedProgress {
79    fn new() -> Self {
80        Self {
81            files_scanned: AtomicU64::new(0),
82            dirs_scanned: AtomicU64::new(0),
83            bytes_scanned: AtomicU64::new(0),
84            errors: AtomicU64::new(0),
85            current_path: Mutex::new(None),
86            done: AtomicBool::new(false),
87        }
88    }
89
90    fn to_scan_progress(&self) -> ScanProgress {
91        ScanProgress {
92            files_scanned: self.files_scanned.load(Ordering::Relaxed),
93            dirs_scanned: self.dirs_scanned.load(Ordering::Relaxed),
94            bytes_scanned: self.bytes_scanned.load(Ordering::Relaxed),
95            errors: self.errors.load(Ordering::Relaxed),
96            current_path: self.current_path.lock().ok().and_then(|g| g.clone()),
97        }
98    }
99}
100
101/// Patterns that indicate potentially slow/problematic paths
102const SLOW_PATTERNS: &[&str] = &[
103    "/Volumes/",                // Mounted volumes (might be network/external)
104    "/.Spotlight-V100",         // Spotlight index
105    "/.fseventsd",              // FSEvents
106    "/.DocumentRevisions-V100", // Document versions
107    "/System/Volumes/Data/.Spotlight-V100",
108    "CoreSimulator/Volumes",    // iOS Simulator disk images
109    "/.MobileBackups",          // Mobile backups
110    ".timemachine",             // Time Machine
111    "/dev/",                    // Device files
112    "/proc/",                   // Linux proc filesystem
113    "/sys/",                    // Linux sys filesystem
114    "/private/var/folders",     // macOS temp folders (can hang)
115    "/private/var/db/dyld",     // dyld cache (permission issues)
116    "/private/var/db/uuidtext", // UUID text (slow)
117];
118
119/// Check if a path looks like a virtual/problematic filesystem path
120/// Only returns true if the path contains a slow pattern AND is not under the root path
121fn is_virtual_or_slow_path(path: &std::path::Path, root_path: &std::path::Path) -> bool {
122    // If this path is the root or an ancestor of root, don't skip it
123    if path == root_path || root_path.starts_with(path) {
124        return false;
125    }
126
127    let path_str = path.to_string_lossy();
128    let root_str = root_path.to_string_lossy();
129
130    // Check for known virtual filesystem patterns
131    for pattern in SLOW_PATTERNS {
132        // Only skip if the pattern appears in the path but NOT in the root path
133        // This allows scanning within /private/var/folders if that's where we started
134        if path_str.contains(pattern) && !root_str.contains(pattern) {
135            return true;
136        }
137    }
138
139    false
140}
141
142/// Filesystem scanner
143pub struct Scanner {
144    config: ScanConfig,
145    cancel_token: CancellationToken,
146}
147
148impl Scanner {
149    pub fn new(config: ScanConfig) -> Self {
150        Self {
151            config,
152            cancel_token: CancellationToken::new(),
153        }
154    }
155
156    pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
157        self.cancel_token = token;
158        self
159    }
160
161    /// Scan a directory and build a tree
162    /// Returns a receiver for progress updates and spawns scanning in background
163    pub fn scan(
164        self,
165        root_path: PathBuf,
166    ) -> (Receiver<ScanMessage>, std::thread::JoinHandle<DiskTree>) {
167        let (tx, rx) = crossbeam_channel::unbounded();
168
169        let handle = std::thread::spawn(move || self.scan_sync(root_path, tx));
170
171        (rx, handle)
172    }
173
174    /// Synchronous scan (runs in thread)
175    fn scan_sync(self, root_path: PathBuf, tx: Sender<ScanMessage>) -> DiskTree {
176        let root_path = root_path.canonicalize().unwrap_or(root_path);
177        let mut tree = DiskTree::new(root_path.clone());
178
179        // Map from path to node ID for parent lookups
180        let mut path_to_id: HashMap<PathBuf, NodeId> = HashMap::new();
181        path_to_id.insert(root_path.clone(), NodeId::ROOT);
182
183        // Get root device for same-filesystem check
184        let root_dev = std::fs::metadata(&root_path)
185            .map(|m| get_device_id(&m))
186            .unwrap_or(0);
187
188        // Shared progress state
189        let shared_progress = Arc::new(SharedProgress::new());
190        let progress_for_heartbeat = Arc::clone(&shared_progress);
191        let tx_for_heartbeat = tx.clone();
192        let cancel_for_heartbeat = self.cancel_token.clone();
193
194        // Spawn heartbeat thread that sends progress every 100ms
195        let heartbeat_handle = std::thread::spawn(move || {
196            while !progress_for_heartbeat.done.load(Ordering::Relaxed)
197                && !cancel_for_heartbeat.is_cancelled()
198            {
199                std::thread::sleep(std::time::Duration::from_millis(100));
200                let progress = progress_for_heartbeat.to_scan_progress();
201                let _ = tx_for_heartbeat.send(ScanMessage::Progress(progress));
202            }
203        });
204
205        let _ = tx.send(ScanMessage::StartedDirectory(root_path.clone()));
206
207        // Configure walker with process_read_dir to skip problematic directories
208        let same_fs = self.config.same_filesystem;
209        let root_for_filter = root_path.clone();
210        let walker = WalkDir::new(&root_path)
211            .skip_hidden(false)
212            .follow_links(self.config.follow_symlinks)
213            .sort(false) // We'll sort by size later
214            .process_read_dir(move |_depth, path, _read_dir_state, children| {
215                // Skip children in virtual/slow directories
216                if is_virtual_or_slow_path(path, &root_for_filter) {
217                    children.clear();
218                    return;
219                }
220
221                // Filter out children that are on different filesystems or are virtual
222                if same_fs {
223                    children.retain(|entry| {
224                        if let Ok(e) = entry {
225                            // Check if child is on same filesystem
226                            if let Ok(meta) = e.metadata()
227                                && get_device_id(&meta) != root_dev
228                            {
229                                return false;
230                            }
231                            // Check if child path is virtual/slow
232                            if is_virtual_or_slow_path(&e.path(), &root_for_filter) {
233                                return false;
234                            }
235                        }
236                        true
237                    });
238                }
239            });
240
241        let walker = if let Some(depth) = self.config.max_depth {
242            walker.max_depth(depth)
243        } else {
244            walker
245        };
246
247        let walker = if self.config.num_threads > 0 {
248            walker.parallelism(jwalk::Parallelism::RayonNewPool(self.config.num_threads))
249        } else {
250            walker
251        };
252
253        for entry_result in walker {
254            // Check for cancellation
255            if self.cancel_token.is_cancelled() {
256                shared_progress.done.store(true, Ordering::Relaxed);
257                let _ = heartbeat_handle.join();
258                let _ = tx.send(ScanMessage::Cancelled);
259                return tree;
260            }
261
262            let entry = match entry_result {
263                Ok(e) => e,
264                Err(_e) => {
265                    shared_progress.errors.fetch_add(1, Ordering::Relaxed);
266                    continue;
267                }
268            };
269
270            let path = entry.path();
271
272            // Skip root (already added)
273            if path == root_path {
274                continue;
275            }
276
277            // Get metadata
278            let metadata = match entry.metadata() {
279                Ok(m) => m,
280                Err(_) => {
281                    shared_progress.errors.fetch_add(1, Ordering::Relaxed);
282                    continue;
283                }
284            };
285
286            // Check filesystem boundary
287            if self.config.same_filesystem && get_device_id(&metadata) != root_dev {
288                continue;
289            }
290
291            // Determine node kind
292            let file_type = entry.file_type();
293            let kind = if file_type.is_dir() {
294                NodeKind::Directory
295            } else if file_type.is_symlink() {
296                NodeKind::Symlink
297            } else {
298                NodeKind::File
299            };
300
301            // Get parent path and node ID
302            let parent_path = match path.parent() {
303                Some(p) => p.to_path_buf(),
304                None => continue,
305            };
306
307            let parent_id = match path_to_id.get(&parent_path) {
308                Some(&id) => id,
309                None => continue, // Parent not in tree (skipped?)
310            };
311
312            // Get name
313            let name = path
314                .file_name()
315                .map(|s| s.to_string_lossy().to_string())
316                .unwrap_or_else(|| path.to_string_lossy().to_string());
317
318            // Add node
319            let node_id = tree.add_node(name, kind, path.clone(), parent_id);
320
321            // Track path for directories
322            if kind == NodeKind::Directory {
323                path_to_id.insert(path.clone(), node_id);
324                shared_progress.dirs_scanned.fetch_add(1, Ordering::Relaxed);
325            } else {
326                shared_progress
327                    .files_scanned
328                    .fetch_add(1, Ordering::Relaxed);
329            }
330
331            // Set size for files
332            let size = get_disk_usage(&metadata);
333            tree.set_size(node_id, size);
334            shared_progress
335                .bytes_scanned
336                .fetch_add(size, Ordering::Relaxed);
337
338            // Update current path
339            if let Ok(mut guard) = shared_progress.current_path.lock() {
340                *guard = Some(path.clone());
341            }
342        }
343
344        // Stop heartbeat thread
345        shared_progress.done.store(true, Ordering::Relaxed);
346        let _ = heartbeat_handle.join();
347
348        // Send finalizing message (aggregation can take time on large trees)
349        let _ = tx.send(ScanMessage::Finalizing);
350
351        // Aggregate sizes from children to parents
352        tree.aggregate_sizes();
353
354        // Sort all children by size
355        tree.sort_by_size();
356
357        // Send final progress
358        let progress = shared_progress.to_scan_progress();
359        let _ = tx.send(ScanMessage::Progress(progress));
360        let _ = tx.send(ScanMessage::Completed);
361
362        tree
363    }
364}
365
366/// Get actual disk usage for a file (accounts for sparse files and block size)
367#[cfg(unix)]
368fn get_disk_usage(metadata: &Metadata) -> u64 {
369    // st_blocks is in 512-byte units
370    metadata.blocks() * 512
371}
372
373/// Get actual disk usage for a file (Windows fallback - uses file size)
374#[cfg(not(unix))]
375fn get_disk_usage(metadata: &Metadata) -> u64 {
376    metadata.len()
377}
378
379/// Get device ID for same-filesystem checks
380#[cfg(unix)]
381fn get_device_id(metadata: &Metadata) -> u64 {
382    metadata.dev()
383}
384
385/// Get device ID (Windows - not supported, return 0)
386#[cfg(not(unix))]
387fn get_device_id(_metadata: &Metadata) -> u64 {
388    0
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394    use std::fs;
395    use tempfile::TempDir;
396
397    #[test]
398    fn test_scan_empty_dir() {
399        let temp = TempDir::new().unwrap();
400        let scanner = Scanner::new(ScanConfig::default());
401        let (rx, handle) = scanner.scan(temp.path().to_path_buf());
402
403        // Drain messages
404        for _ in rx {}
405
406        let tree = handle.join().unwrap();
407        assert_eq!(tree.len(), 1); // Just root
408    }
409
410    #[test]
411    fn test_scan_with_files() {
412        let temp = TempDir::new().unwrap();
413        fs::write(temp.path().join("file1.txt"), "hello").unwrap();
414        fs::write(temp.path().join("file2.txt"), "world").unwrap();
415        fs::create_dir(temp.path().join("subdir")).unwrap();
416        fs::write(temp.path().join("subdir/file3.txt"), "test").unwrap();
417
418        let scanner = Scanner::new(ScanConfig::default());
419        let (rx, handle) = scanner.scan(temp.path().to_path_buf());
420
421        for _ in rx {}
422
423        let tree = handle.join().unwrap();
424        assert!(tree.len() >= 4); // root + 2 files + subdir + 1 file
425    }
426}