fshasher/walker/mod.rs
1mod error;
2pub(crate) mod options;
3mod pool;
4mod progress;
5#[cfg(feature = "tracking")]
6mod tracking;
7mod worker;
8
9use crate::{
10 collector::collect,
11 entry::{Entry, Filter},
12 Breaker, Hasher, Reader, Tolerance,
13};
14pub use error::E;
15use log::{debug, error, warn};
16pub use options::{Options, ReadingStrategy};
17use pool::Pool;
18pub use progress::{JobType, Progress, ProgressChannel, Tick};
19use std::{
20 io, mem,
21 path::PathBuf,
22 sync::mpsc::{channel, Receiver, Sender},
23 thread::{self, JoinHandle},
24 time::Instant,
25};
26#[cfg(feature = "tracking")]
27pub use tracking::Tracking;
28pub use worker::Worker;
29
30/// The default minimum number of paths that will be given to a hash worker to calculate hashes.
31const MIN_PATHS_PER_JOB: usize = 2;
32/// The default maximum number of paths that will be given to a hash worker to calculate hashes.
33const MAX_PATHS_PER_JOB: usize = 500;
34
35enum JobCollecting {
36 Err(E),
37 NoJobs,
38 Success,
39}
40
41/// Message for communication between `Walker` and workers during hashing.
42pub enum Action {
43 /// Used by workers to report the results of hashing files to `Walker`.
44 ///
45 /// # Parameters
46 /// - `u16`: Worker's ID.
47 /// - `Vec<(PathBuf, Vec<u8>)>`: A vector of tuples where each tuple contains
48 /// a file path and its corresponding hash.
49 /// - `Vec<(PathBuf, E)>`: A vector of tuples where each tuple contains
50 /// a file path and the related error.
51 Processed(u16, Vec<(PathBuf, Vec<u8>)>, Vec<(PathBuf, E)>),
52
53 /// Used by workers to notify `Walker` about the closing of a worker's thread.
54 WorkerShutdownNotification,
55
56 /// Used by workers to report an error.
57 ///
58 /// # Parameters
59 ///
60 /// - `PathBuf`: The path to the file that caused the error.
61 /// - `E`: The error encountered during processing.
62 Error(PathBuf, E),
63}
64
65/// `HashItem` contains the path to the file and the state of its hashing.
66///
67/// # Fields
68///
69/// * `PathBuf` - Full file path.
70/// * `Option<Result<Vec<u8>, E>>` - Can have the following values:
71/// * `None` - Right after `collect()` has been called.
72/// * `Some(Result<Vec<u8>, E>)` - The result of hashing; if hashing failed, contains the related
73/// error.
74///
75/// # Values of `Option<Result<Vec<u8>, E>>`
76///
77/// * `None` - If the file was accepted during collecting without errors, but the hashing operation
78/// hasn't been applied to the file yet.
79/// * `Some(Err(E))` - An error that can occur during collecting and attempting to access the file,
80/// or during hashing. In both cases, it will be stored in the item.
81/// * `Some(Ok(Vec<u8>))` - If collecting and hashing were successful, this contains the hash of the
82/// file.
83type HashItem = (PathBuf, Option<Result<Vec<u8>, E>>);
84
85/// `Walker` collects file paths according to a given pattern, then calculates the hash for each
86/// file and provides a combined hash for all files.
87///
88/// `Walker` collects file paths recursively, traversing all nested folders. If a symlink is
89/// encountered, `Walker` reads it and, if it is linked to a file, includes it. If the symlink
90/// leads to a folder, `Walker` reads this folder as nested. Other entities (except for files,
91/// folders, and symlinks) are ignored.
92///
93/// The operations of path collection and hashing are separated. Reading the folder and collecting
94/// paths is done using the `collect()` method; hashing is done with the `hash()` method. If
95/// `hash()` is called without a prior call to `collect()`, it will not result in an error but
96/// will return an empty hash.
97///
98/// The `progress()` method provides a `Receiver<Tick>` to track the progress of the operation.
99/// A repeated call to `hash()` will recreate the channel. Therefore, before calling `hash()`
100/// again, you need to get a new `Receiver<Tick>` using the `progress()` method.
101///
102/// Path collection and subsequent hashing are interruptible operations. To interrupt, you need to
103/// get a `Breaker` by calling the `breaker()` method. Interruption is done by calling the `abort()`
104/// method. The operation will be interrupted at the earliest possible time but not instantaneously.
105///
106/// In case of interruption, both the `collect()` and `hash()` methods will return an `E::Aborted`
107/// error.
108///
109/// The built-in interruption function can be used to implement timeout support.
110///
111/// When an instance of `E::Aborted` is dropped, the background threads are not stopped automatically.
112/// To stop all running background threads, you need to use `Breaker` and call `abort()`. Otherwise,
113/// there is a risk of resource leakage.
114///
115/// The most efficient way to create an instance of `Walker` is to use `Options`, which allows
116/// flexible configuration of `Walker`.
117///
118/// # Example
119///
120/// ```
121/// use fshasher::{Options, Entry, Tolerance, hasher, reader};
122/// use std::env::temp_dir;
123///
124/// let mut walker = Options::new()
125/// .entry(Entry::from(temp_dir()).unwrap()).unwrap()
126/// .tolerance(Tolerance::LogErrors)
127/// .walker().unwrap();
128/// let hash = walker.collect().unwrap()
129/// .hash::<hasher::blake::Blake, reader::buffering::Buffering>().unwrap();
130/// println!("Hash of {}: {:?}", temp_dir().display(), hash);
131/// ```
132
133#[derive(Debug)]
134pub struct Walker {
135 /// Settings for the `Walker`.
136 opt: Option<Options>,
137
138 /// `Breaker` structure for interrupting the path collection and hashing operations.
139 breaker: Breaker,
140
141 /// Paths collected during the recursive traversal of the paths specified in `Options` and
142 /// according to the patterns. This field is populated when `collect()` is called.
143 ///
144 /// Results collected as `type HashItem = (PathBuf, Option<Result<Vec<u8>, E>>)`
145 ///
146 /// `HashItem` contains the path to the file and the state of its hashing.
147 ///
148 /// # Fields
149 ///
150 /// * `PathBuf` - Full file path.
151 /// * `Option<Result<Vec<u8>, E>>` - Can have the following values:
152 /// * `None` - Right after `collect()` has been called.
153 /// * `Some(Result<Vec<u8>, E>)` - The result of hashing; if hashing failed, contains the related
154 /// error.
155 ///
156 /// # Values of `Option<Result<Vec<u8>, E>>`
157 ///
158 /// * `None` - If the file was accepted during collecting without errors, but the hashing operation
159 /// hasn't been applied to the file yet.
160 /// * `Some(Err(E))` - An error that can occur during collecting and attempting to access the file,
161 /// or during hashing. In both cases, it will be stored in the item.
162 /// * `Some(Ok(Vec<u8>))` - If collecting and hashing were successful, this contains the hash of the
163 /// file.
164 pub paths: Vec<HashItem>,
165
166 /// The resulting hash. Set when `hash()` is called.
167 hash: Option<Vec<u8>>,
168
169 /// An instance of the channel for tracking the progress of path collection and hashing.
170 progress: Option<ProgressChannel>,
171}
172impl Walker {
173 /// Creates a new instance of `Walker`.
174 ///
175 /// # Parameters
176 ///
177 /// - `opt`: An instance of `Options` containing the configuration for `Walker`.
178 ///
179 /// # Returns
180 ///
181 /// - A new instance of `Walker`.
182 pub fn new(opt: Options) -> Self {
183 let progress = opt.progress.map(Progress::channel);
184 Self {
185 opt: Some(opt),
186 breaker: Breaker::new(),
187 paths: Vec::new(),
188 hash: None,
189 progress,
190 }
191 }
192
193 /// Collects file paths and saves them in the `paths` field for further hashing.
194 ///
195 /// # Returns
196 ///
197 /// - A mutable reference to the instance of `Walker`.
198 ///
199 /// # Errors
200 ///
201 /// This method will return an error if the operation is interrupted. By default, `Walker` has
202 /// a tolerance level of `Tolerance::LogErrors`, which means that the collection process will
203 /// not stop on an IO error; instead, the problematic path will be ignored. To change this strategy,
204 /// set the tolerance level to `Tolerance::StopOnErrors`. With `Tolerance::StopOnErrors`, the `collect()`
205 /// method will return an error for any IO error encountered.
206 ///
207 /// Paths that caused errors will be available in the `paths` field or during iteration.
208 pub fn collect(&mut self) -> Result<&mut Self, E> {
209 let now = Instant::now();
210 self.reset();
211 let opt = self.opt.as_mut().ok_or(E::IsNotInited)?;
212 let progress = self.progress.as_ref().map(|(progress, _)| progress.clone());
213 for entry in opt.entries.iter() {
214 let (collected, invalid) = collect(
215 &progress,
216 entry,
217 &self.breaker,
218 &opt.tolerance,
219 &opt.threads,
220 )?;
221 self.paths
222 .append(&mut collected.into_iter().map(|p| (p, None)).collect());
223 self.paths.append(
224 &mut invalid
225 .into_iter()
226 .map(|(p, e)| (p, Some(Err(e.into()))))
227 .collect(),
228 );
229 }
230 debug!(
231 "collected {} paths in {}µs / {}ms / {}s",
232 self.paths.len(),
233 now.elapsed().as_micros(),
234 now.elapsed().as_millis(),
235 now.elapsed().as_secs()
236 );
237 Ok(self)
238 }
239
240 /// Returns a `Breaker` which can be used to abort collecting and hashing operations.
241 /// Interruption is done by calling the `abort()` method. The operation will be interrupted
242 /// at the earliest possible time but not instantaneously.
243 ///
244 /// In case of interruption, both the `collect()` and `hash()` methods will return an `E::Aborted`
245 /// error.
246 ///
247 /// When an instance of `E::Aborted` is dropped, the background threads are not stopped automatically.
248 /// To stop all running background threads, you need to use `Breaker` and call `abort()`. Otherwise,
249 /// there is a risk of resource leakage.
250 ///
251 /// # Returns
252 ///
253 /// - A new instance of `Breaker`.
254 ///
255 /// # Example
256 ///
257 /// ```
258 /// use fshasher::{hasher, reader, walker::E, Entry, Options, Tolerance};
259 /// use std::{env::temp_dir, thread};
260 ///
261 /// let mut walker = Options::new()
262 /// .entry(Entry::from(temp_dir()).unwrap())
263 /// .unwrap()
264 /// .tolerance(Tolerance::LogErrors)
265 /// .progress(10)
266 /// .walker()
267 /// .unwrap();
268 /// let progress = walker.progress().unwrap();
269 /// let breaker = walker.breaker();
270 /// thread::spawn(move || {
271 /// let _ = progress.recv();
272 /// // Abort collecting as soon as it's started
273 /// breaker.abort();
274 /// });
275 /// let result = walker.collect();
276 /// // In case of empty dest folder, collect() will finish without errors,
277 /// // because no time to check breaker state.
278 /// println!("Collecting operation has been aborted: {result:?}");
279 /// ```
280 pub fn breaker(&self) -> Breaker {
281 self.breaker.clone()
282 }
283
284 /// This is equal to the number of paths found by `collect()`, including not accepted paths.
285 ///
286 /// # Returns
287 ///
288 /// - The number of calculated hashes.
289 pub fn count(&self) -> usize {
290 self.paths.len()
291 }
292
293 /// Returns a channel for tracking the progress of collecting and hashing. A repeated call to `hash()`
294 /// will recreate the channel. Therefore, before calling `hash()` again, you need to get a new
295 /// `Receiver<Tick>` using the `progress()` method.
296 ///
297 /// # Returns
298 ///
299 /// - `Option<Receiver<Tick>>`: A channel for tracking progress, or `None` if the channel is not available.
300 pub fn progress(&mut self) -> Option<Receiver<Tick>> {
301 self.progress.as_mut().and_then(|(_, rx)| rx.take())
302 }
303
304 /// Calculates a common hash and returns it. `hash()` should always be used in pair with `collect()`,
305 /// because `collect()` gathers the paths to files that will be hashed.
306 ///
307 /// # Returns
308 ///
309 /// - `Result<&[u8], E>`: A hash calculated based on the paths collected with the given patterns.
310 ///
311 /// # Errors
312 ///
313 /// This method, like `collect()`, is sensitive to tolerance settings. By default, `Walker` has
314 /// a tolerance level of `Tolerance::LogErrors`, which means that the hashing process will
315 /// not stop on an IO error (caused by hasher or reader); instead, the problematic path will be ignored.
316 /// To change this strategy, set the tolerance level to `Tolerance::StopOnErrors`. With
317 /// `Tolerance::StopOnErrors`, the `hash()` method will return an error for any IO error encountered.
318 ///
319 /// All ignored paths will stay in the `paths` field (vector of `HashItem`), but instead of a hash, they will include
320 /// an `Err(E)`.
321 pub fn hash<H: Hasher + 'static, R: Reader + 'static>(&mut self) -> Result<&[u8], E>
322 where
323 E: From<<H as Hasher>::Error> + From<<R as Reader>::Error>,
324 {
325 let now = Instant::now();
326 if self.paths.is_empty() {
327 return Ok(&[]);
328 }
329 let opt = self.opt.as_mut().ok_or(E::IsNotInited)?;
330 let tolerance = opt.tolerance.clone();
331 let (tx_queue, rx_queue): (Sender<Action>, Receiver<Action>) = channel();
332 let progress = self.progress.as_ref().map(|(progress, _)| progress.clone());
333 let breaker = self.breaker.clone();
334 let cores = thread::available_parallelism()
335 .ok()
336 .map(|n| n.get())
337 .ok_or(E::OptimalThreadsNumber)?;
338 if let Some(threads) = &opt.threads {
339 if cores * options::MAX_THREADS_MLT_TO_CORES < *threads {
340 return Err(E::OptimalThreadsNumber);
341 }
342 }
343 let threads = opt.threads.unwrap_or(cores);
344 let mut pool: Pool = Pool::new::<H, R>(
345 threads,
346 tx_queue.clone(),
347 &opt.reading_strategy,
348 &opt.tolerance,
349 &self.breaker,
350 );
351 debug!("Created pool with {threads} workers for hashing");
352 let mut paths = mem::take(&mut self.paths);
353 let total = paths.len();
354 let paths_per_jobs =
355 ((total as f64 * 0.05).ceil() as usize).clamp(MIN_PATHS_PER_JOB, MAX_PATHS_PER_JOB);
356
357 type HashingResult<T> = Result<(T, Vec<HashItem>), E>;
358
359 let handle: JoinHandle<HashingResult<H>> = thread::spawn(move || {
360 fn check_err(
361 path: PathBuf,
362 err: E,
363 tolerance: &Tolerance,
364 hashes: &mut Vec<HashItem>,
365 ) -> Result<(), E> {
366 match tolerance {
367 Tolerance::StopOnErrors => {
368 error!("entry: {}; error: {err}", path.display());
369 Err(E::Bound(path, Box::new(err)))
370 }
371 Tolerance::LogErrors => {
372 warn!("entry: {}; error: {err}", path.display());
373 hashes.push((path, Some(Err(err))));
374 Ok(())
375 }
376 Tolerance::DoNotLogErrors => {
377 hashes.push((path, Some(Err(err))));
378 Ok(())
379 }
380 }
381 }
382 fn get_next_job(
383 paths: &mut Vec<HashItem>,
384 paths_per_jobs: usize,
385 tolerance: &Tolerance,
386 hashes: &mut Vec<HashItem>,
387 ) -> Result<Vec<PathBuf>, E> {
388 let mut jobs = Vec::new();
389 while jobs.len() < paths_per_jobs && !paths.is_empty() {
390 let (path, state) = paths.remove(0);
391 if state.is_some() {
392 // Path marked by collector as caused error
393 continue;
394 }
395 if !path.exists() {
396 check_err(
397 path,
398 io::Error::new(io::ErrorKind::NotFound, "File not found").into(),
399 tolerance,
400 hashes,
401 )?;
402 continue;
403 }
404 jobs.push(path);
405 }
406 Ok(jobs)
407 }
408 fn deligate(
409 workers: Vec<&Worker>,
410 paths: &mut Vec<HashItem>,
411 paths_per_jobs: usize,
412 tolerance: &Tolerance,
413 hashes: &mut Vec<HashItem>,
414 worker_id: Option<u16>,
415 ) -> JobCollecting {
416 if paths.is_empty() {
417 return JobCollecting::NoJobs;
418 }
419 if let Some(id) = worker_id {
420 let Some(worker) = workers.iter().find(|w| w.id == id) else {
421 unreachable!("Worker with given ID always exists");
422 };
423 match get_next_job(paths, paths_per_jobs, tolerance, hashes) {
424 Ok(jobs) => {
425 if jobs.is_empty() {
426 return JobCollecting::NoJobs;
427 } else if worker.is_available() {
428 worker.delegate(jobs);
429 } else if let Some(worker) = workers.iter().find(|w| w.is_available()) {
430 error!(
431 "Hasher worker #{id} cannot accept a job, because it's down. Jobs deligated to another worker"
432 );
433 worker.delegate(jobs);
434 } else {
435 error!(
436 "Hasher worker #{id} cannot accept a job, because it's down. No other available workers"
437 );
438 }
439 }
440 Err(err) => {
441 return JobCollecting::Err(err);
442 }
443 }
444 } else {
445 for (i, worker) in workers.iter().enumerate() {
446 match get_next_job(paths, paths_per_jobs, tolerance, hashes) {
447 Ok(jobs) => {
448 if jobs.is_empty() && i == 0 {
449 // No any worker got a job
450 return JobCollecting::NoJobs;
451 } else if jobs.is_empty() && i != 0 {
452 // At least one worker got a job
453 break;
454 } else {
455 worker.delegate(jobs);
456 }
457 }
458 Err(err) => {
459 return JobCollecting::Err(err);
460 }
461 }
462 }
463 }
464 JobCollecting::Success
465 }
466 let mut summary = H::new();
467 let mut hashes: Vec<HashItem> = Vec::new();
468 let initialization = deligate(
469 pool.workers(),
470 &mut paths,
471 paths_per_jobs,
472 &tolerance,
473 &mut hashes,
474 // Deligate jobs to all workers
475 None,
476 );
477 if !matches!(initialization, JobCollecting::Success) {
478 pool.shutdown().wait();
479 summary.finish()?;
480 return if let JobCollecting::Err(err) = initialization {
481 Err(err)
482 } else {
483 Ok((summary, hashes))
484 };
485 }
486 let mut pending: Option<Action> = None;
487 let outer: Result<(), E> = 'outer: loop {
488 let next = if let Some(next) = pending.take() {
489 next
490 } else if let Ok(next) = rx_queue.recv() {
491 next
492 } else {
493 break 'outer Ok(());
494 };
495 if breaker.is_aborted() {
496 break 'outer Err(E::Aborted);
497 }
498 match next {
499 Action::Processed(worker_id, processed, reports) => {
500 for (path, err) in reports.into_iter() {
501 // If error reported by Worker, it's already not Tolerance::StopOnErrors
502 let _ = check_err(path, err, &tolerance, &mut hashes);
503 }
504 hashes.append(
505 &mut processed
506 .into_iter()
507 .map(|(p, h)| (p, Some(Ok(h))))
508 .collect(),
509 );
510 if let Some(ref progress) = progress {
511 progress.notify(JobType::Hashing, hashes.len(), total)
512 }
513 match deligate(
514 pool.workers(),
515 &mut paths,
516 paths_per_jobs,
517 &tolerance,
518 &mut hashes,
519 Some(worker_id),
520 ) {
521 JobCollecting::Err(err) => {
522 break 'outer Err(err);
523 }
524 JobCollecting::Success => {}
525 JobCollecting::NoJobs => {
526 pool.shutdown();
527 }
528 };
529 }
530 Action::WorkerShutdownNotification => {
531 // One of workers reported shutdowning state
532 }
533 Action::Error(path, err) => {
534 if let Err(err) = check_err(path, err, &tolerance, &mut hashes) {
535 break 'outer Err(err);
536 }
537 }
538 }
539 if pool.is_all_down() {
540 if let Ok(next) = rx_queue.try_recv() {
541 pending = Some(next);
542 continue;
543 } else {
544 break 'outer Ok(());
545 }
546 }
547 };
548 pool.shutdown().wait();
549 if let Err(err) = outer {
550 Err(err)
551 } else {
552 hashes.sort_by(|(a, _), (b, _)| a.cmp(b));
553 for (_, hash) in hashes.iter() {
554 if let Some(Ok(hash)) = hash {
555 summary.absorb(hash)?;
556 }
557 }
558 summary.finish()?;
559 Ok((summary, hashes))
560 }
561 });
562 self.progress = opt.progress.map(Progress::channel);
563 let (summary, mut hashes) = handle
564 .join()
565 .map_err(|e| E::JoinError(format!("{e:?}")))??;
566 self.paths = mem::take(&mut hashes);
567 let valid = self
568 .paths
569 .iter()
570 .filter(|(_, h)| if let Some(h) = h { h.is_ok() } else { false })
571 .count();
572 self.hash = Some(if valid == 0 || self.paths.is_empty() {
573 Vec::new()
574 } else {
575 summary.hash()?.to_vec()
576 });
577 self.progress = opt.progress.map(Progress::channel);
578 let hash = if let Some(ref hash) = self.hash {
579 hash
580 } else {
581 unreachable!("Hash has been stored");
582 };
583 debug!(
584 "hashing of {} paths in {}µs / {}ms / {}s",
585 total,
586 now.elapsed().as_micros(),
587 now.elapsed().as_millis(),
588 now.elapsed().as_secs()
589 );
590 Ok(hash)
591 }
592
593 /// Returns an iterator to iterate over the collected `HashItem`s.
594 ///
595 /// # Returns
596 ///
597 /// - `WalkerIter<'_, H, R>`: An iterator to iterate over the collected `HashItem`s.
598 pub fn iter(&self) -> WalkerIter<'_> {
599 WalkerIter {
600 walker: self,
601 pos: 0,
602 }
603 }
604
605 /// This method is used each time before `collect()` is called. It resets the previous state to default.
606 fn reset(&mut self) {
607 self.paths = Vec::new();
608 self.hash = None;
609 self.breaker.reset();
610 }
611}
612/// An iterator over the calculated hashes in a `Walker`.
613///
614/// `WalkerIter` is used to iterate over `HashItem` that represent the paths and their corresponding hashes
615/// calculated by the `Walker` or related to path errors.
616pub struct WalkerIter<'a> {
617 /// A reference to the `Walker` instance.
618 walker: &'a Walker,
619 /// The current position in the `paths` vector.
620 pos: usize,
621}
622
623impl<'a> Iterator for WalkerIter<'a> {
624 type Item = &'a HashItem;
625
626 /// Advances the iterator and returns the next `HashItem`.
627 ///
628 /// # Returns
629 ///
630 /// - `Some(&HashItem)` if there is another `HashItem` to return.
631 /// - `None` if there are no more items to return.
632 fn next(&mut self) -> Option<Self::Item> {
633 if self.pos >= self.walker.paths.len() {
634 None
635 } else {
636 self.pos += 1;
637 Some(&self.walker.paths[self.pos - 1])
638 }
639 }
640}
641
642impl<'a> IntoIterator for &'a Walker {
643 type Item = &'a HashItem;
644 type IntoIter = WalkerIter<'a>;
645
646 /// Creates an iterator over the calculated hashes in the `Walker`.
647 ///
648 /// # Returns
649 ///
650 /// - `WalkerIter<'a>`: An iterator to iterate over the `HashItem` items.
651 fn into_iter(self) -> Self::IntoIter {
652 WalkerIter {
653 walker: self,
654 pos: 0,
655 }
656 }
657}