1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::sync::mpsc::{Receiver, Sender};
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::thread::JoinHandle;
8use std::time::{Duration, Instant};
9
10use byte_unit::Byte;
11
12use crate::entry::DirEntry;
13use crate::tree::FileTree;
14use crate::watcher::Watcher;
15use crate::{platform, EntryPath, EntrySnapshot, SnapshotConfig, TreeSnapshot};
16
17#[derive(Clone, Debug)]
18pub struct ScanStats {
19 pub used_size: Byte,
20 pub total_size: Option<Byte>,
21 pub available_size: Option<Byte>,
22 pub is_mount_point: bool,
23 pub files: u64,
24 pub dirs: u64,
25 pub scan_duration: Duration,
26 pub used_memory: Option<Byte>,
27}
28
29#[derive(Debug)]
30struct ScanState {
31 tree: Mutex<FileTree>,
32
33 current_path: Mutex<Option<EntryPath>>,
34
35 is_scanning: AtomicBool,
36
37 scan_flag: AtomicBool,
38
39 scan_duration_ms: AtomicU32,
40}
41
42#[derive(Debug, Eq, PartialEq)]
43struct ScanTask {
44 path: EntryPath,
45 reset_stopwatch: bool,
46 recursive: bool,
47}
48
49#[non_exhaustive]
50#[derive(Debug, Default)]
51pub struct ScannerBuilder;
52
53impl ScannerBuilder {
54 pub fn scan(self, path: String) -> Scanner {
55 Scanner::new(path)
56 }
57}
58
59#[derive(Debug)]
60pub struct Scanner {
61 root: EntryPath,
62
63 state: Arc<ScanState>,
64
65 tx: Sender<ScanTask>,
66
67 scan_handle: Option<JoinHandle<()>>,
68}
69
70impl Scanner {
71 pub fn get_scan_path(&self) -> &EntryPath {
72 &self.root
73 }
74
75 pub fn get_current_scan_path(&self) -> Option<EntryPath> {
76 self.state.current_path.lock().unwrap().clone()
77 }
78
79 pub fn get_tree(
80 &self,
81 root: &EntryPath,
82 config: SnapshotConfig,
83 ) -> Option<TreeSnapshot<EntrySnapshot>> {
84 self.state
85 .tree
86 .lock()
87 .unwrap()
88 .make_snapshot(root, config, &Scanner::retrieve_files)
89 }
90
91 pub fn get_tree_wrapped<W: AsRef<EntrySnapshot> + AsMut<EntrySnapshot>>(
92 &self,
93 root: &EntryPath,
94 config: SnapshotConfig,
95 wrapper: &dyn Fn(EntrySnapshot) -> W,
96 ) -> Option<TreeSnapshot<W>> {
97 self.state.tree.lock().unwrap().make_snapshot_wrapped(
98 root,
99 config,
100 wrapper,
101 &Scanner::retrieve_files,
102 )
103 }
104
105 pub fn is_scanning(&self) -> bool {
106 self.state.is_scanning.load(Ordering::SeqCst)
107 }
108
109 pub fn rescan_path(&self, path: EntryPath, reset_stopwatch: bool) {
110 info!("Start rescan of '{}'", path);
111 self.tx
112 .send(ScanTask {
113 path,
114 reset_stopwatch,
115 recursive: true,
116 })
117 .unwrap();
118 }
119
120 pub fn stats(&self) -> ScanStats {
121 let scan_stats = self.state.tree.lock().unwrap().stats();
122 let scan_duration =
123 Duration::from_millis(self.state.scan_duration_ms.load(Ordering::SeqCst) as u64);
124 let (total, available, is_mount) = platform::get_mount_stats(self.root.get_path())
125 .map(|s| (Some(s.total), Some(s.available), s.is_mount_point))
126 .unwrap_or((None, None, false));
127 ScanStats {
128 used_size: scan_stats.used_size,
129 total_size: total,
130 available_size: available,
131 is_mount_point: is_mount,
132 files: scan_stats.files,
133 dirs: scan_stats.dirs,
134 scan_duration,
135 used_memory: platform::get_used_memory(),
136 }
137 }
138
139 fn merge_to_queue(queue: &mut Vec<ScanTask>, task: ScanTask) {
140 let mut i = 0;
142 let mut insert = 0;
143 while i < queue.len() {
144 let existing = &queue[i];
145
146 if &task != existing
147 && (!task.recursive
148 || existing
149 .path
150 .partial_cmp(&task.path)
151 .map(|ord| ord == std::cmp::Ordering::Less)
152 .unwrap_or(true))
153 {
154 if insert != i {
159 queue.swap(insert, i);
160 }
161 insert += 1;
162 }
163 i += 1;
164 }
165
166 queue.truncate(insert);
167 queue.push(task);
168 }
169
170 fn new(path: String) -> Self {
171 let tree = FileTree::new(path.clone());
172 let root = tree.get_root().get_path(tree.get_arena());
173 let (tx, rx) = std::sync::mpsc::channel();
174 tx.send(ScanTask {
175 path: root.clone(),
176 reset_stopwatch: true,
177 recursive: true,
178 })
179 .unwrap();
180 let state = Arc::new(ScanState {
181 tree: Mutex::new(tree),
182 current_path: Mutex::new(None),
183 is_scanning: AtomicBool::new(true),
184 scan_flag: AtomicBool::new(true),
185 scan_duration_ms: AtomicU32::new(0),
186 });
187
188 let scan_handle = Scanner::start_scan(path, Arc::clone(&state), rx);
189
190 Scanner {
191 root,
192 state,
193 tx,
194 scan_handle: Some(scan_handle),
195 }
196 }
197
198 fn retrieve_files(path: &Path) -> Vec<(String, i64)> {
201 std::fs::read_dir(path)
202 .and_then(|rd| {
203 let mut files = vec![];
204 for f in rd {
205 let f = f?;
206
207 if let Ok(metadata) = f.metadata() {
208 if !metadata.is_dir() || metadata.is_symlink() {
209 let name = f.file_name().to_str().unwrap().to_string();
210 let size = platform::get_file_size(&metadata) as i64;
211
212 files.push((name, size))
213 }
214 }
215 }
216 Ok(files)
217 })
218 .unwrap_or_default()
219 }
220
221 fn start_scan(root: String, state: Arc<ScanState>, rx: Receiver<ScanTask>) -> JoinHandle<()> {
222 thread::spawn(move || {
223 let mut watcher = crate::watcher::new_watcher(root.clone());
224
225 let mut start = Instant::now();
226
227 let mut queue: Vec<ScanTask> = vec![];
228 let mut children = vec![];
229
230 let available: HashSet<_> = platform::get_available_mounts().into_iter().collect();
231 let excluded: HashSet<_> = platform::get_excluded_paths()
234 .into_iter()
235 .filter_map(|p| p.to_str().map(|s| s.to_string()))
236 .chain(available.into_iter())
237 .filter(|p| p != &root)
238 .collect();
239
240 info!("Start scan of '{}'", root);
241
242 while state.scan_flag.load(Ordering::SeqCst) {
243 while state.scan_flag.load(Ordering::SeqCst) {
244 if let Some(w) = &mut watcher {
246 for task in w
247 .read_events()
248 .into_iter()
249 .filter_map(|e| EntryPath::from(&root, e.updated_path))
250 .map(|path| ScanTask {
251 recursive: false,
252 reset_stopwatch: false,
253 path,
254 })
255 {
256 Scanner::merge_to_queue(&mut queue, task);
257 }
258 }
259 for task in rx.try_iter() {
261 if task.reset_stopwatch && !state.is_scanning.load(Ordering::SeqCst) {
262 start = Instant::now();
263 state.is_scanning.store(true, Ordering::SeqCst);
264 }
265 Scanner::merge_to_queue(&mut queue, task);
266 }
267 if !queue.is_empty() {
268 break;
269 }
270 thread::sleep(Duration::from_millis(10));
271 }
272
273 if let Some(task) = queue.pop() {
274 let task_path = task.path.to_string();
275 if excluded.contains(&task_path) {
276 continue;
277 }
278 watcher.as_mut().map(|w| w.add_dir(task_path));
279 state
280 .current_path
281 .lock()
282 .unwrap()
283 .replace(task.path.clone());
284 let entries: Vec<_> = std::fs::read_dir(&task.path.get_path())
285 .and_then(|dir| dir.collect::<Result<_, _>>())
286 .unwrap_or_else(|_| {
287 warn!("Unable to scan '{}'", task.path);
288 vec![]
289 });
290
291 let mut file_count = 0;
292 let mut files_size = 0;
293 for entry in entries {
294 if let Ok(metadata) = entry.metadata() {
295 let name = entry.file_name().to_str().unwrap().to_string();
296 if task.recursive && metadata.is_dir() && !metadata.is_symlink() {
297 let mut path = task.path.clone();
298 path.join(name.clone());
299 queue.push(ScanTask {
300 path,
301 reset_stopwatch: false,
302 recursive: true,
303 });
304 }
305
306 if metadata.is_dir() && !metadata.is_symlink() {
307 children.push(DirEntry::new_dir(name));
309 } else {
310 file_count += 1;
311 files_size += platform::get_file_size(&metadata) as i64;
312 }
313 } else {
314 warn!("Failed to get metadata for {:?}", entry.path());
315 }
316 }
317 let new_dirs = {
318 let mut tree = state.tree.lock().unwrap();
319 tree.set_children(&task.path, children, file_count, files_size)
320 };
321
322 if let Some(new_dirs) = new_dirs {
323 if !task.recursive {
324 for dir in new_dirs {
325 let mut path = task.path.clone();
326 path.join(dir);
327 queue.push(ScanTask {
328 path,
329 reset_stopwatch: false,
330 recursive: true,
331 });
332 }
333 }
334 }
335 children = vec![];
336 }
337 if state.is_scanning.load(Ordering::SeqCst) {
338 let duration = start.elapsed().as_millis() as u32;
339 state.scan_duration_ms.store(duration, Ordering::SeqCst);
340 if queue.is_empty() {
341 let stats = state.tree.lock().unwrap().stats();
342 info!(
343 "Scan finished: {} files {} dirs in {:?}",
344 stats.files,
345 stats.dirs,
346 Duration::from_millis(duration as u64)
347 );
348 }
349 }
350 if queue.is_empty() {
351 state.is_scanning.store(false, Ordering::SeqCst);
352 state.current_path.lock().unwrap().take();
353 }
354 }
355 })
356 }
357}
358
359impl Drop for Scanner {
360 fn drop(&mut self) {
361 self.state.scan_flag.store(false, Ordering::SeqCst);
362 let _ = self.scan_handle.take().unwrap().join();
363 }
364}