diskscan/
scanner.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::sync::mpsc::{Receiver, Sender};
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::thread::JoinHandle;
8use std::time::{Duration, Instant};
9
10use byte_unit::Byte;
11
12use crate::entry::DirEntry;
13use crate::tree::FileTree;
14use crate::watcher::Watcher;
15use crate::{platform, EntryPath, EntrySnapshot, SnapshotConfig, TreeSnapshot};
16
17#[derive(Clone, Debug)]
18pub struct ScanStats {
19    pub used_size: Byte,
20    pub total_size: Option<Byte>,
21    pub available_size: Option<Byte>,
22    pub is_mount_point: bool,
23    pub files: u64,
24    pub dirs: u64,
25    pub scan_duration: Duration,
26    pub used_memory: Option<Byte>,
27}
28
29#[derive(Debug)]
30struct ScanState {
31    tree: Mutex<FileTree>,
32
33    current_path: Mutex<Option<EntryPath>>,
34
35    is_scanning: AtomicBool,
36
37    scan_flag: AtomicBool,
38
39    scan_duration_ms: AtomicU32,
40}
41
42#[derive(Debug, Eq, PartialEq)]
43struct ScanTask {
44    path: EntryPath,
45    reset_stopwatch: bool,
46    recursive: bool,
47}
48
49#[non_exhaustive]
50#[derive(Debug, Default)]
51pub struct ScannerBuilder;
52
53impl ScannerBuilder {
54    pub fn scan(self, path: String) -> Scanner {
55        Scanner::new(path)
56    }
57}
58
59#[derive(Debug)]
60pub struct Scanner {
61    root: EntryPath,
62
63    state: Arc<ScanState>,
64
65    tx: Sender<ScanTask>,
66
67    scan_handle: Option<JoinHandle<()>>,
68}
69
70impl Scanner {
71    pub fn get_scan_path(&self) -> &EntryPath {
72        &self.root
73    }
74
75    pub fn get_current_scan_path(&self) -> Option<EntryPath> {
76        self.state.current_path.lock().unwrap().clone()
77    }
78
79    pub fn get_tree(
80        &self,
81        root: &EntryPath,
82        config: SnapshotConfig,
83    ) -> Option<TreeSnapshot<EntrySnapshot>> {
84        self.state
85            .tree
86            .lock()
87            .unwrap()
88            .make_snapshot(root, config, &Scanner::retrieve_files)
89    }
90
91    pub fn get_tree_wrapped<W: AsRef<EntrySnapshot> + AsMut<EntrySnapshot>>(
92        &self,
93        root: &EntryPath,
94        config: SnapshotConfig,
95        wrapper: &dyn Fn(EntrySnapshot) -> W,
96    ) -> Option<TreeSnapshot<W>> {
97        self.state.tree.lock().unwrap().make_snapshot_wrapped(
98            root,
99            config,
100            wrapper,
101            &Scanner::retrieve_files,
102        )
103    }
104
105    pub fn is_scanning(&self) -> bool {
106        self.state.is_scanning.load(Ordering::SeqCst)
107    }
108
109    pub fn rescan_path(&self, path: EntryPath, reset_stopwatch: bool) {
110        info!("Start rescan of '{}'", path);
111        self.tx
112            .send(ScanTask {
113                path,
114                reset_stopwatch,
115                recursive: true,
116            })
117            .unwrap();
118    }
119
120    pub fn stats(&self) -> ScanStats {
121        let scan_stats = self.state.tree.lock().unwrap().stats();
122        let scan_duration =
123            Duration::from_millis(self.state.scan_duration_ms.load(Ordering::SeqCst) as u64);
124        let (total, available, is_mount) = platform::get_mount_stats(self.root.get_path())
125            .map(|s| (Some(s.total), Some(s.available), s.is_mount_point))
126            .unwrap_or((None, None, false));
127        ScanStats {
128            used_size: scan_stats.used_size,
129            total_size: total,
130            available_size: available,
131            is_mount_point: is_mount,
132            files: scan_stats.files,
133            dirs: scan_stats.dirs,
134            scan_duration,
135            used_memory: platform::get_used_memory(),
136        }
137    }
138
139    fn merge_to_queue(queue: &mut Vec<ScanTask>, task: ScanTask) {
140        // could use Vec::drain_filter, but it's unstable
141        let mut i = 0;
142        let mut insert = 0;
143        while i < queue.len() {
144            let existing = &queue[i];
145
146            if &task != existing
147                && (!task.recursive
148                    || existing
149                        .path
150                        .partial_cmp(&task.path)
151                        .map(|ord| ord == std::cmp::Ordering::Less)
152                        .unwrap_or(true))
153            {
154                // existing task is kept in queue if it is not the same as new task AND
155                // new task is not recursive OR it is recursive but existing task is not scanning
156                // some subdirectory of new task
157
158                if insert != i {
159                    queue.swap(insert, i);
160                }
161                insert += 1;
162            }
163            i += 1;
164        }
165
166        queue.truncate(insert);
167        queue.push(task);
168    }
169
170    fn new(path: String) -> Self {
171        let tree = FileTree::new(path.clone());
172        let root = tree.get_root().get_path(tree.get_arena());
173        let (tx, rx) = std::sync::mpsc::channel();
174        tx.send(ScanTask {
175            path: root.clone(),
176            reset_stopwatch: true,
177            recursive: true,
178        })
179        .unwrap();
180        let state = Arc::new(ScanState {
181            tree: Mutex::new(tree),
182            current_path: Mutex::new(None),
183            is_scanning: AtomicBool::new(true),
184            scan_flag: AtomicBool::new(true),
185            scan_duration_ms: AtomicU32::new(0),
186        });
187
188        let scan_handle = Scanner::start_scan(path, Arc::clone(&state), rx);
189
190        Scanner {
191            root,
192            state,
193            tx,
194            scan_handle: Some(scan_handle),
195        }
196    }
197
198    /// Retrieve list of all files and their sizes at specified path
199    /// Files are not sorted in any way
200    fn retrieve_files(path: &Path) -> Vec<(String, i64)> {
201        std::fs::read_dir(path)
202            .and_then(|rd| {
203                let mut files = vec![];
204                for f in rd {
205                    let f = f?;
206
207                    if let Ok(metadata) = f.metadata() {
208                        if !metadata.is_dir() || metadata.is_symlink() {
209                            let name = f.file_name().to_str().unwrap().to_string();
210                            let size = platform::get_file_size(&metadata) as i64;
211
212                            files.push((name, size))
213                        }
214                    }
215                }
216                Ok(files)
217            })
218            .unwrap_or_default()
219    }
220
221    fn start_scan(root: String, state: Arc<ScanState>, rx: Receiver<ScanTask>) -> JoinHandle<()> {
222        thread::spawn(move || {
223            let mut watcher = crate::watcher::new_watcher(root.clone());
224
225            let mut start = Instant::now();
226
227            let mut queue: Vec<ScanTask> = vec![];
228            let mut children = vec![];
229
230            let available: HashSet<_> = platform::get_available_mounts().into_iter().collect();
231            // excluded paths are all available mounts (excluding root scan path)
232            // and all unsupported mounts
233            let excluded: HashSet<_> = platform::get_excluded_paths()
234                .into_iter()
235                .filter_map(|p| p.to_str().map(|s| s.to_string()))
236                .chain(available.into_iter())
237                .filter(|p| p != &root)
238                .collect();
239
240            info!("Start scan of '{}'", root);
241
242            while state.scan_flag.load(Ordering::SeqCst) {
243                while state.scan_flag.load(Ordering::SeqCst) {
244                    // check for events
245                    if let Some(w) = &mut watcher {
246                        for task in w
247                            .read_events()
248                            .into_iter()
249                            .filter_map(|e| EntryPath::from(&root, e.updated_path))
250                            .map(|path| ScanTask {
251                                recursive: false,
252                                reset_stopwatch: false,
253                                path,
254                            })
255                        {
256                            Scanner::merge_to_queue(&mut queue, task);
257                        }
258                    }
259                    // add all tasks to queue
260                    for task in rx.try_iter() {
261                        if task.reset_stopwatch && !state.is_scanning.load(Ordering::SeqCst) {
262                            start = Instant::now();
263                            state.is_scanning.store(true, Ordering::SeqCst);
264                        }
265                        Scanner::merge_to_queue(&mut queue, task);
266                    }
267                    if !queue.is_empty() {
268                        break;
269                    }
270                    thread::sleep(Duration::from_millis(10));
271                }
272
273                if let Some(task) = queue.pop() {
274                    let task_path = task.path.to_string();
275                    if excluded.contains(&task_path) {
276                        continue;
277                    }
278                    watcher.as_mut().map(|w| w.add_dir(task_path));
279                    state
280                        .current_path
281                        .lock()
282                        .unwrap()
283                        .replace(task.path.clone());
284                    let entries: Vec<_> = std::fs::read_dir(&task.path.get_path())
285                        .and_then(|dir| dir.collect::<Result<_, _>>())
286                        .unwrap_or_else(|_| {
287                            warn!("Unable to scan '{}'", task.path);
288                            vec![]
289                        });
290
291                    let mut file_count = 0;
292                    let mut files_size = 0;
293                    for entry in entries {
294                        if let Ok(metadata) = entry.metadata() {
295                            let name = entry.file_name().to_str().unwrap().to_string();
296                            if task.recursive && metadata.is_dir() && !metadata.is_symlink() {
297                                let mut path = task.path.clone();
298                                path.join(name.clone());
299                                queue.push(ScanTask {
300                                    path,
301                                    reset_stopwatch: false,
302                                    recursive: true,
303                                });
304                            }
305
306                            if metadata.is_dir() && !metadata.is_symlink() {
307                                // treat all directories as zero sized
308                                children.push(DirEntry::new_dir(name));
309                            } else {
310                                file_count += 1;
311                                files_size += platform::get_file_size(&metadata) as i64;
312                            }
313                        } else {
314                            warn!("Failed to get metadata for {:?}", entry.path());
315                        }
316                    }
317                    let new_dirs = {
318                        let mut tree = state.tree.lock().unwrap();
319                        tree.set_children(&task.path, children, file_count, files_size)
320                    };
321
322                    if let Some(new_dirs) = new_dirs {
323                        if !task.recursive {
324                            for dir in new_dirs {
325                                let mut path = task.path.clone();
326                                path.join(dir);
327                                queue.push(ScanTask {
328                                    path,
329                                    reset_stopwatch: false,
330                                    recursive: true,
331                                });
332                            }
333                        }
334                    }
335                    children = vec![];
336                }
337                if state.is_scanning.load(Ordering::SeqCst) {
338                    let duration = start.elapsed().as_millis() as u32;
339                    state.scan_duration_ms.store(duration, Ordering::SeqCst);
340                    if queue.is_empty() {
341                        let stats = state.tree.lock().unwrap().stats();
342                        info!(
343                            "Scan finished: {} files {} dirs in {:?}",
344                            stats.files,
345                            stats.dirs,
346                            Duration::from_millis(duration as u64)
347                        );
348                    }
349                }
350                if queue.is_empty() {
351                    state.is_scanning.store(false, Ordering::SeqCst);
352                    state.current_path.lock().unwrap().take();
353                }
354            }
355        })
356    }
357}
358
359impl Drop for Scanner {
360    fn drop(&mut self) {
361        self.state.scan_flag.store(false, Ordering::SeqCst);
362        let _ = self.scan_handle.take().unwrap().join();
363    }
364}