1use std::collections::HashMap;
2use std::fs::Metadata;
3use std::os::unix::fs::MetadataExt;
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7
8use crossbeam_channel::{Receiver, Sender};
9use jwalk::WalkDir;
10
11use super::progress::{ScanMessage, ScanProgress};
12use crate::tree::{DiskTree, NodeId, NodeKind};
13
14#[derive(Debug, Clone)]
16pub struct ScanConfig {
17 pub follow_symlinks: bool,
19 pub max_depth: Option<usize>,
21 pub same_filesystem: bool,
23 pub num_threads: usize,
25}
26
27impl Default for ScanConfig {
28 fn default() -> Self {
29 Self {
30 follow_symlinks: false,
31 max_depth: None,
32 same_filesystem: true,
33 num_threads: 0, }
35 }
36}
37
38#[derive(Debug, Clone)]
40pub struct CancellationToken {
41 cancelled: Arc<AtomicBool>,
42}
43
44impl CancellationToken {
45 pub fn new() -> Self {
46 Self {
47 cancelled: Arc::new(AtomicBool::new(false)),
48 }
49 }
50
51 pub fn cancel(&self) {
52 self.cancelled.store(true, Ordering::SeqCst);
53 }
54
55 pub fn is_cancelled(&self) -> bool {
56 self.cancelled.load(Ordering::SeqCst)
57 }
58}
59
60impl Default for CancellationToken {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66struct SharedProgress {
68 files_scanned: AtomicU64,
69 dirs_scanned: AtomicU64,
70 bytes_scanned: AtomicU64,
71 errors: AtomicU64,
72 current_path: Mutex<Option<PathBuf>>,
73 done: AtomicBool,
74}
75
76impl SharedProgress {
77 fn new() -> Self {
78 Self {
79 files_scanned: AtomicU64::new(0),
80 dirs_scanned: AtomicU64::new(0),
81 bytes_scanned: AtomicU64::new(0),
82 errors: AtomicU64::new(0),
83 current_path: Mutex::new(None),
84 done: AtomicBool::new(false),
85 }
86 }
87
88 fn to_scan_progress(&self) -> ScanProgress {
89 ScanProgress {
90 files_scanned: self.files_scanned.load(Ordering::Relaxed),
91 dirs_scanned: self.dirs_scanned.load(Ordering::Relaxed),
92 bytes_scanned: self.bytes_scanned.load(Ordering::Relaxed),
93 errors: self.errors.load(Ordering::Relaxed),
94 current_path: self.current_path.lock().ok().and_then(|g| g.clone()),
95 }
96 }
97}
98
99const SLOW_PATTERNS: &[&str] = &[
101 "/Volumes/", "/.Spotlight-V100", "/.fseventsd", "/.DocumentRevisions-V100", "/System/Volumes/Data/.Spotlight-V100",
106 "CoreSimulator/Volumes", "/.MobileBackups", ".timemachine", "/dev/", "/proc/", "/sys/", "/private/var/folders", "/private/var/db/dyld", "/private/var/db/uuidtext", ];
116
117fn is_virtual_or_slow_path(path: &std::path::Path, root_path: &std::path::Path) -> bool {
120 if path == root_path || root_path.starts_with(path) {
122 return false;
123 }
124
125 let path_str = path.to_string_lossy();
126 let root_str = root_path.to_string_lossy();
127
128 for pattern in SLOW_PATTERNS {
130 if path_str.contains(pattern) && !root_str.contains(pattern) {
133 return true;
134 }
135 }
136
137 false
138}
139
140pub struct Scanner {
142 config: ScanConfig,
143 cancel_token: CancellationToken,
144}
145
146impl Scanner {
147 pub fn new(config: ScanConfig) -> Self {
148 Self {
149 config,
150 cancel_token: CancellationToken::new(),
151 }
152 }
153
154 pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
155 self.cancel_token = token;
156 self
157 }
158
159 pub fn scan(
162 self,
163 root_path: PathBuf,
164 ) -> (Receiver<ScanMessage>, std::thread::JoinHandle<DiskTree>) {
165 let (tx, rx) = crossbeam_channel::unbounded();
166
167 let handle = std::thread::spawn(move || self.scan_sync(root_path, tx));
168
169 (rx, handle)
170 }
171
172 fn scan_sync(self, root_path: PathBuf, tx: Sender<ScanMessage>) -> DiskTree {
174 let root_path = root_path.canonicalize().unwrap_or(root_path);
175 let mut tree = DiskTree::new(root_path.clone());
176
177 let mut path_to_id: HashMap<PathBuf, NodeId> = HashMap::new();
179 path_to_id.insert(root_path.clone(), NodeId::ROOT);
180
181 let root_dev = std::fs::metadata(&root_path).map(|m| m.dev()).unwrap_or(0);
183
184 let shared_progress = Arc::new(SharedProgress::new());
186 let progress_for_heartbeat = Arc::clone(&shared_progress);
187 let tx_for_heartbeat = tx.clone();
188 let cancel_for_heartbeat = self.cancel_token.clone();
189
190 let heartbeat_handle = std::thread::spawn(move || {
192 while !progress_for_heartbeat.done.load(Ordering::Relaxed)
193 && !cancel_for_heartbeat.is_cancelled()
194 {
195 std::thread::sleep(std::time::Duration::from_millis(100));
196 let progress = progress_for_heartbeat.to_scan_progress();
197 let _ = tx_for_heartbeat.send(ScanMessage::Progress(progress));
198 }
199 });
200
201 let _ = tx.send(ScanMessage::StartedDirectory(root_path.clone()));
202
203 let same_fs = self.config.same_filesystem;
205 let root_for_filter = root_path.clone();
206 let walker = WalkDir::new(&root_path)
207 .skip_hidden(false)
208 .follow_links(self.config.follow_symlinks)
209 .sort(false) .process_read_dir(move |_depth, path, _read_dir_state, children| {
211 if is_virtual_or_slow_path(path, &root_for_filter) {
213 children.clear();
214 return;
215 }
216
217 if same_fs {
219 children.retain(|entry| {
220 if let Ok(e) = entry {
221 if let Ok(meta) = e.metadata()
223 && meta.dev() != root_dev
224 {
225 return false;
226 }
227 if is_virtual_or_slow_path(&e.path(), &root_for_filter) {
229 return false;
230 }
231 }
232 true
233 });
234 }
235 });
236
237 let walker = if let Some(depth) = self.config.max_depth {
238 walker.max_depth(depth)
239 } else {
240 walker
241 };
242
243 let walker = if self.config.num_threads > 0 {
244 walker.parallelism(jwalk::Parallelism::RayonNewPool(self.config.num_threads))
245 } else {
246 walker
247 };
248
249 for entry_result in walker {
250 if self.cancel_token.is_cancelled() {
252 shared_progress.done.store(true, Ordering::Relaxed);
253 let _ = heartbeat_handle.join();
254 let _ = tx.send(ScanMessage::Cancelled);
255 return tree;
256 }
257
258 let entry = match entry_result {
259 Ok(e) => e,
260 Err(_e) => {
261 shared_progress.errors.fetch_add(1, Ordering::Relaxed);
262 continue;
263 }
264 };
265
266 let path = entry.path();
267
268 if path == root_path {
270 continue;
271 }
272
273 let metadata = match entry.metadata() {
275 Ok(m) => m,
276 Err(_) => {
277 shared_progress.errors.fetch_add(1, Ordering::Relaxed);
278 continue;
279 }
280 };
281
282 if self.config.same_filesystem && metadata.dev() != root_dev {
284 continue;
285 }
286
287 let file_type = entry.file_type();
289 let kind = if file_type.is_dir() {
290 NodeKind::Directory
291 } else if file_type.is_symlink() {
292 NodeKind::Symlink
293 } else {
294 NodeKind::File
295 };
296
297 let parent_path = match path.parent() {
299 Some(p) => p.to_path_buf(),
300 None => continue,
301 };
302
303 let parent_id = match path_to_id.get(&parent_path) {
304 Some(&id) => id,
305 None => continue, };
307
308 let name = path
310 .file_name()
311 .map(|s| s.to_string_lossy().to_string())
312 .unwrap_or_else(|| path.to_string_lossy().to_string());
313
314 let node_id = tree.add_node(name, kind, path.clone(), parent_id);
316
317 if kind == NodeKind::Directory {
319 path_to_id.insert(path.clone(), node_id);
320 shared_progress.dirs_scanned.fetch_add(1, Ordering::Relaxed);
321 } else {
322 shared_progress
323 .files_scanned
324 .fetch_add(1, Ordering::Relaxed);
325 }
326
327 let size = get_disk_usage(&metadata);
329 tree.set_size(node_id, size);
330 shared_progress
331 .bytes_scanned
332 .fetch_add(size, Ordering::Relaxed);
333
334 if let Ok(mut guard) = shared_progress.current_path.lock() {
336 *guard = Some(path.clone());
337 }
338 }
339
340 shared_progress.done.store(true, Ordering::Relaxed);
342 let _ = heartbeat_handle.join();
343
344 let _ = tx.send(ScanMessage::Finalizing);
346
347 tree.aggregate_sizes();
349
350 tree.sort_by_size();
352
353 let progress = shared_progress.to_scan_progress();
355 let _ = tx.send(ScanMessage::Progress(progress));
356 let _ = tx.send(ScanMessage::Completed);
357
358 tree
359 }
360}
361
362fn get_disk_usage(metadata: &Metadata) -> u64 {
364 metadata.blocks() * 512
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use std::fs;
372 use tempfile::TempDir;
373
374 #[test]
375 fn test_scan_empty_dir() {
376 let temp = TempDir::new().unwrap();
377 let scanner = Scanner::new(ScanConfig::default());
378 let (rx, handle) = scanner.scan(temp.path().to_path_buf());
379
380 for _ in rx {}
382
383 let tree = handle.join().unwrap();
384 assert_eq!(tree.len(), 1); }
386
387 #[test]
388 fn test_scan_with_files() {
389 let temp = TempDir::new().unwrap();
390 fs::write(temp.path().join("file1.txt"), "hello").unwrap();
391 fs::write(temp.path().join("file2.txt"), "world").unwrap();
392 fs::create_dir(temp.path().join("subdir")).unwrap();
393 fs::write(temp.path().join("subdir/file3.txt"), "test").unwrap();
394
395 let scanner = Scanner::new(ScanConfig::default());
396 let (rx, handle) = scanner.scan(temp.path().to_path_buf());
397
398 for _ in rx {}
399
400 let tree = handle.join().unwrap();
401 assert!(tree.len() >= 4); }
403}