fshasher/walker/
mod.rs

1mod error;
2pub(crate) mod options;
3mod pool;
4mod progress;
5#[cfg(feature = "tracking")]
6mod tracking;
7mod worker;
8
9use crate::{
10    collector::collect,
11    entry::{Entry, Filter},
12    Breaker, Hasher, Reader, Tolerance,
13};
14pub use error::E;
15use log::{debug, error, warn};
16pub use options::{Options, ReadingStrategy};
17use pool::Pool;
18pub use progress::{JobType, Progress, ProgressChannel, Tick};
19use std::{
20    io, mem,
21    path::PathBuf,
22    sync::mpsc::{channel, Receiver, Sender},
23    thread::{self, JoinHandle},
24    time::Instant,
25};
26#[cfg(feature = "tracking")]
27pub use tracking::Tracking;
28pub use worker::Worker;
29
30/// The default minimum number of paths that will be given to a hash worker to calculate hashes.
31const MIN_PATHS_PER_JOB: usize = 2;
32/// The default maximum number of paths that will be given to a hash worker to calculate hashes.
33const MAX_PATHS_PER_JOB: usize = 500;
34
35enum JobCollecting {
36    Err(E),
37    NoJobs,
38    Success,
39}
40
41/// Message for communication between `Walker` and workers during hashing.
42pub enum Action {
43    /// Used by workers to report the results of hashing files to `Walker`.
44    ///
45    /// # Parameters
46    /// - `u16`: Worker's ID.
47    /// - `Vec<(PathBuf, Vec<u8>)>`: A vector of tuples where each tuple contains
48    ///   a file path and its corresponding hash.
49    /// - `Vec<(PathBuf, E)>`: A vector of tuples where each tuple contains
50    ///   a file path and the related error.
51    Processed(u16, Vec<(PathBuf, Vec<u8>)>, Vec<(PathBuf, E)>),
52
53    /// Used by workers to notify `Walker` about the closing of a worker's thread.
54    WorkerShutdownNotification,
55
56    /// Used by workers to report an error.
57    ///
58    /// # Parameters
59    ///
60    /// - `PathBuf`: The path to the file that caused the error.
61    /// - `E`: The error encountered during processing.
62    Error(PathBuf, E),
63}
64
65/// `HashItem` contains the path to the file and the state of its hashing.
66///
67/// # Fields
68///
69/// * `PathBuf` - Full file path.
70/// * `Option<Result<Vec<u8>, E>>` - Can have the following values:
71///   * `None` - Right after `collect()` has been called.
72///   * `Some(Result<Vec<u8>, E>)` - The result of hashing; if hashing failed, contains the related
73///     error.
74///
75/// # Values of `Option<Result<Vec<u8>, E>>`
76///
77/// * `None` - If the file was accepted during collecting without errors, but the hashing operation
78///   hasn't been applied to the file yet.
79/// * `Some(Err(E))` - An error that can occur during collecting and attempting to access the file,
80///   or during hashing. In both cases, it will be stored in the item.
81/// * `Some(Ok(Vec<u8>))` - If collecting and hashing were successful, this contains the hash of the
82///   file.
83type HashItem = (PathBuf, Option<Result<Vec<u8>, E>>);
84
85/// `Walker` collects file paths according to a given pattern, then calculates the hash for each
86/// file and provides a combined hash for all files.
87///
88/// `Walker` collects file paths recursively, traversing all nested folders. If a symlink is
89/// encountered, `Walker` reads it and, if it is linked to a file, includes it. If the symlink
90/// leads to a folder, `Walker` reads this folder as nested. Other entities (except for files,
91/// folders, and symlinks) are ignored.
92///
93/// The operations of path collection and hashing are separated. Reading the folder and collecting
94/// paths is done using the `collect()` method; hashing is done with the `hash()` method. If
95/// `hash()` is called without a prior call to `collect()`, it will not result in an error but
96/// will return an empty hash.
97///
98/// The `progress()` method provides a `Receiver<Tick>` to track the progress of the operation.
99/// A repeated call to `hash()` will recreate the channel. Therefore, before calling `hash()`
100/// again, you need to get a new `Receiver<Tick>` using the `progress()` method.
101///
102/// Path collection and subsequent hashing are interruptible operations. To interrupt, you need to
103/// get a `Breaker` by calling the `breaker()` method. Interruption is done by calling the `abort()`
104/// method. The operation will be interrupted at the earliest possible time but not instantaneously.
105///
106/// In case of interruption, both the `collect()` and `hash()` methods will return an `E::Aborted`
107/// error.
108///
109/// The built-in interruption function can be used to implement timeout support.
110///
111/// When an instance of `E::Aborted` is dropped, the background threads are not stopped automatically.
112/// To stop all running background threads, you need to use `Breaker` and call `abort()`. Otherwise,
113/// there is a risk of resource leakage.
114///
115/// The most efficient way to create an instance of `Walker` is to use `Options`, which allows
116/// flexible configuration of `Walker`.
117///
118/// # Example
119///
120/// ```
121/// use fshasher::{Options, Entry, Tolerance, hasher, reader};
122/// use std::env::temp_dir;
123///
124/// let mut walker = Options::new()
125///     .entry(Entry::from(temp_dir()).unwrap()).unwrap()
126///     .tolerance(Tolerance::LogErrors)
127///     .walker().unwrap();
128/// let hash = walker.collect().unwrap()
129///     .hash::<hasher::blake::Blake, reader::buffering::Buffering>().unwrap();
130/// println!("Hash of {}: {:?}", temp_dir().display(), hash);
131/// ```
132
133#[derive(Debug)]
134pub struct Walker {
135    /// Settings for the `Walker`.
136    opt: Option<Options>,
137
138    /// `Breaker` structure for interrupting the path collection and hashing operations.
139    breaker: Breaker,
140
141    /// Paths collected during the recursive traversal of the paths specified in `Options` and
142    /// according to the patterns. This field is populated when `collect()` is called.
143    ///
144    /// Results collected as `type HashItem = (PathBuf, Option<Result<Vec<u8>, E>>)`
145    ///
146    /// `HashItem` contains the path to the file and the state of its hashing.
147    ///
148    /// # Fields
149    ///
150    /// * `PathBuf` - Full file path.
151    /// * `Option<Result<Vec<u8>, E>>` - Can have the following values:
152    ///   * `None` - Right after `collect()` has been called.
153    ///   * `Some(Result<Vec<u8>, E>)` - The result of hashing; if hashing failed, contains the related
154    ///     error.
155    ///
156    /// # Values of `Option<Result<Vec<u8>, E>>`
157    ///
158    /// * `None` - If the file was accepted during collecting without errors, but the hashing operation
159    ///   hasn't been applied to the file yet.
160    /// * `Some(Err(E))` - An error that can occur during collecting and attempting to access the file,
161    ///   or during hashing. In both cases, it will be stored in the item.
162    /// * `Some(Ok(Vec<u8>))` - If collecting and hashing were successful, this contains the hash of the
163    ///   file.
164    pub paths: Vec<HashItem>,
165
166    /// The resulting hash. Set when `hash()` is called.
167    hash: Option<Vec<u8>>,
168
169    /// An instance of the channel for tracking the progress of path collection and hashing.
170    progress: Option<ProgressChannel>,
171}
172impl Walker {
173    /// Creates a new instance of `Walker`.
174    ///
175    /// # Parameters
176    ///
177    /// - `opt`: An instance of `Options` containing the configuration for `Walker`.
178    ///
179    /// # Returns
180    ///
181    /// - A new instance of `Walker`.
182    pub fn new(opt: Options) -> Self {
183        let progress = opt.progress.map(Progress::channel);
184        Self {
185            opt: Some(opt),
186            breaker: Breaker::new(),
187            paths: Vec::new(),
188            hash: None,
189            progress,
190        }
191    }
192
193    /// Collects file paths and saves them in the `paths` field for further hashing.
194    ///
195    /// # Returns
196    ///
197    /// - A mutable reference to the instance of `Walker`.
198    ///
199    /// # Errors
200    ///
201    /// This method will return an error if the operation is interrupted. By default, `Walker` has
202    /// a tolerance level of `Tolerance::LogErrors`, which means that the collection process will
203    /// not stop on an IO error; instead, the problematic path will be ignored. To change this strategy,
204    /// set the tolerance level to `Tolerance::StopOnErrors`. With `Tolerance::StopOnErrors`, the `collect()`
205    /// method will return an error for any IO error encountered.
206    ///
207    /// Paths that caused errors will be available in the `paths` field or during iteration.
208    pub fn collect(&mut self) -> Result<&mut Self, E> {
209        let now = Instant::now();
210        self.reset();
211        let opt = self.opt.as_mut().ok_or(E::IsNotInited)?;
212        let progress = self.progress.as_ref().map(|(progress, _)| progress.clone());
213        for entry in opt.entries.iter() {
214            let (collected, invalid) = collect(
215                &progress,
216                entry,
217                &self.breaker,
218                &opt.tolerance,
219                &opt.threads,
220            )?;
221            self.paths
222                .append(&mut collected.into_iter().map(|p| (p, None)).collect());
223            self.paths.append(
224                &mut invalid
225                    .into_iter()
226                    .map(|(p, e)| (p, Some(Err(e.into()))))
227                    .collect(),
228            );
229        }
230        debug!(
231            "collected {} paths in {}µs / {}ms / {}s",
232            self.paths.len(),
233            now.elapsed().as_micros(),
234            now.elapsed().as_millis(),
235            now.elapsed().as_secs()
236        );
237        Ok(self)
238    }
239
240    /// Returns a `Breaker` which can be used to abort collecting and hashing operations.
241    /// Interruption is done by calling the `abort()` method. The operation will be interrupted
242    /// at the earliest possible time but not instantaneously.
243    ///
244    /// In case of interruption, both the `collect()` and `hash()` methods will return an `E::Aborted`
245    /// error.
246    ///
247    /// When an instance of `E::Aborted` is dropped, the background threads are not stopped automatically.
248    /// To stop all running background threads, you need to use `Breaker` and call `abort()`. Otherwise,
249    /// there is a risk of resource leakage.
250    ///
251    /// # Returns
252    ///
253    /// - A new instance of `Breaker`.
254    ///
255    /// # Example
256    ///
257    /// ```
258    /// use fshasher::{hasher, reader, walker::E, Entry, Options, Tolerance};
259    /// use std::{env::temp_dir, thread};
260    ///
261    /// let mut walker = Options::new()
262    ///     .entry(Entry::from(temp_dir()).unwrap())
263    ///     .unwrap()
264    ///     .tolerance(Tolerance::LogErrors)
265    ///     .progress(10)
266    ///     .walker()
267    ///     .unwrap();
268    /// let progress = walker.progress().unwrap();
269    /// let breaker = walker.breaker();
270    /// thread::spawn(move || {
271    ///     let _ = progress.recv();
272    ///     // Abort collecting as soon as it's started
273    ///     breaker.abort();
274    /// });
275    /// let result = walker.collect();
276    /// // In case of empty dest folder, collect() will finish without errors,
277    /// // because no time to check breaker state.
278    /// println!("Collecting operation has been aborted: {result:?}");
279    /// ```
280    pub fn breaker(&self) -> Breaker {
281        self.breaker.clone()
282    }
283
284    /// This is equal to the number of paths found by `collect()`, including not accepted paths.
285    ///
286    /// # Returns
287    ///
288    /// - The number of calculated hashes.
289    pub fn count(&self) -> usize {
290        self.paths.len()
291    }
292
293    /// Returns a channel for tracking the progress of collecting and hashing. A repeated call to `hash()`
294    /// will recreate the channel. Therefore, before calling `hash()` again, you need to get a new
295    /// `Receiver<Tick>` using the `progress()` method.
296    ///
297    /// # Returns
298    ///
299    /// - `Option<Receiver<Tick>>`: A channel for tracking progress, or `None` if the channel is not available.
300    pub fn progress(&mut self) -> Option<Receiver<Tick>> {
301        self.progress.as_mut().and_then(|(_, rx)| rx.take())
302    }
303
304    /// Calculates a common hash and returns it. `hash()` should always be used in pair with `collect()`,
305    /// because `collect()` gathers the paths to files that will be hashed.
306    ///
307    /// # Returns
308    ///
309    /// - `Result<&[u8], E>`: A hash calculated based on the paths collected with the given patterns.
310    ///
311    /// # Errors
312    ///
313    /// This method, like `collect()`, is sensitive to tolerance settings. By default, `Walker` has
314    /// a tolerance level of `Tolerance::LogErrors`, which means that the hashing process will
315    /// not stop on an IO error (caused by hasher or reader); instead, the problematic path will be ignored.
316    /// To change this strategy, set the tolerance level to `Tolerance::StopOnErrors`. With
317    /// `Tolerance::StopOnErrors`, the `hash()` method will return an error for any IO error encountered.
318    ///
319    /// All ignored paths will stay in the `paths` field (vector of `HashItem`), but instead of a hash, they will include
320    /// an `Err(E)`.
321    pub fn hash<H: Hasher + 'static, R: Reader + 'static>(&mut self) -> Result<&[u8], E>
322    where
323        E: From<<H as Hasher>::Error> + From<<R as Reader>::Error>,
324    {
325        let now = Instant::now();
326        if self.paths.is_empty() {
327            return Ok(&[]);
328        }
329        let opt = self.opt.as_mut().ok_or(E::IsNotInited)?;
330        let tolerance = opt.tolerance.clone();
331        let (tx_queue, rx_queue): (Sender<Action>, Receiver<Action>) = channel();
332        let progress = self.progress.as_ref().map(|(progress, _)| progress.clone());
333        let breaker = self.breaker.clone();
334        let cores = thread::available_parallelism()
335            .ok()
336            .map(|n| n.get())
337            .ok_or(E::OptimalThreadsNumber)?;
338        if let Some(threads) = &opt.threads {
339            if cores * options::MAX_THREADS_MLT_TO_CORES < *threads {
340                return Err(E::OptimalThreadsNumber);
341            }
342        }
343        let threads = opt.threads.unwrap_or(cores);
344        let mut pool: Pool = Pool::new::<H, R>(
345            threads,
346            tx_queue.clone(),
347            &opt.reading_strategy,
348            &opt.tolerance,
349            &self.breaker,
350        );
351        debug!("Created pool with {threads} workers for hashing");
352        let mut paths = mem::take(&mut self.paths);
353        let total = paths.len();
354        let paths_per_jobs =
355            ((total as f64 * 0.05).ceil() as usize).clamp(MIN_PATHS_PER_JOB, MAX_PATHS_PER_JOB);
356
357        type HashingResult<T> = Result<(T, Vec<HashItem>), E>;
358
359        let handle: JoinHandle<HashingResult<H>> = thread::spawn(move || {
360            fn check_err(
361                path: PathBuf,
362                err: E,
363                tolerance: &Tolerance,
364                hashes: &mut Vec<HashItem>,
365            ) -> Result<(), E> {
366                match tolerance {
367                    Tolerance::StopOnErrors => {
368                        error!("entry: {}; error: {err}", path.display());
369                        Err(E::Bound(path, Box::new(err)))
370                    }
371                    Tolerance::LogErrors => {
372                        warn!("entry: {}; error: {err}", path.display());
373                        hashes.push((path, Some(Err(err))));
374                        Ok(())
375                    }
376                    Tolerance::DoNotLogErrors => {
377                        hashes.push((path, Some(Err(err))));
378                        Ok(())
379                    }
380                }
381            }
382            fn get_next_job(
383                paths: &mut Vec<HashItem>,
384                paths_per_jobs: usize,
385                tolerance: &Tolerance,
386                hashes: &mut Vec<HashItem>,
387            ) -> Result<Vec<PathBuf>, E> {
388                let mut jobs = Vec::new();
389                while jobs.len() < paths_per_jobs && !paths.is_empty() {
390                    let (path, state) = paths.remove(0);
391                    if state.is_some() {
392                        // Path marked by collector as caused error
393                        continue;
394                    }
395                    if !path.exists() {
396                        check_err(
397                            path,
398                            io::Error::new(io::ErrorKind::NotFound, "File not found").into(),
399                            tolerance,
400                            hashes,
401                        )?;
402                        continue;
403                    }
404                    jobs.push(path);
405                }
406                Ok(jobs)
407            }
408            fn deligate(
409                workers: Vec<&Worker>,
410                paths: &mut Vec<HashItem>,
411                paths_per_jobs: usize,
412                tolerance: &Tolerance,
413                hashes: &mut Vec<HashItem>,
414                worker_id: Option<u16>,
415            ) -> JobCollecting {
416                if paths.is_empty() {
417                    return JobCollecting::NoJobs;
418                }
419                if let Some(id) = worker_id {
420                    let Some(worker) = workers.iter().find(|w| w.id == id) else {
421                        unreachable!("Worker with given ID always exists");
422                    };
423                    match get_next_job(paths, paths_per_jobs, tolerance, hashes) {
424                        Ok(jobs) => {
425                            if jobs.is_empty() {
426                                return JobCollecting::NoJobs;
427                            } else if worker.is_available() {
428                                worker.delegate(jobs);
429                            } else if let Some(worker) = workers.iter().find(|w| w.is_available()) {
430                                error!(
431                                    "Hasher worker #{id} cannot accept a job, because it's down. Jobs deligated to another worker"
432                                );
433                                worker.delegate(jobs);
434                            } else {
435                                error!(
436                                    "Hasher worker #{id} cannot accept a job, because it's down. No other available workers"
437                                );
438                            }
439                        }
440                        Err(err) => {
441                            return JobCollecting::Err(err);
442                        }
443                    }
444                } else {
445                    for (i, worker) in workers.iter().enumerate() {
446                        match get_next_job(paths, paths_per_jobs, tolerance, hashes) {
447                            Ok(jobs) => {
448                                if jobs.is_empty() && i == 0 {
449                                    // No any worker got a job
450                                    return JobCollecting::NoJobs;
451                                } else if jobs.is_empty() && i != 0 {
452                                    // At least one worker got a job
453                                    break;
454                                } else {
455                                    worker.delegate(jobs);
456                                }
457                            }
458                            Err(err) => {
459                                return JobCollecting::Err(err);
460                            }
461                        }
462                    }
463                }
464                JobCollecting::Success
465            }
466            let mut summary = H::new();
467            let mut hashes: Vec<HashItem> = Vec::new();
468            let initialization = deligate(
469                pool.workers(),
470                &mut paths,
471                paths_per_jobs,
472                &tolerance,
473                &mut hashes,
474                // Deligate jobs to all workers
475                None,
476            );
477            if !matches!(initialization, JobCollecting::Success) {
478                pool.shutdown().wait();
479                summary.finish()?;
480                return if let JobCollecting::Err(err) = initialization {
481                    Err(err)
482                } else {
483                    Ok((summary, hashes))
484                };
485            }
486            let mut pending: Option<Action> = None;
487            let outer: Result<(), E> = 'outer: loop {
488                let next = if let Some(next) = pending.take() {
489                    next
490                } else if let Ok(next) = rx_queue.recv() {
491                    next
492                } else {
493                    break 'outer Ok(());
494                };
495                if breaker.is_aborted() {
496                    break 'outer Err(E::Aborted);
497                }
498                match next {
499                    Action::Processed(worker_id, processed, reports) => {
500                        for (path, err) in reports.into_iter() {
501                            // If error reported by Worker, it's already not Tolerance::StopOnErrors
502                            let _ = check_err(path, err, &tolerance, &mut hashes);
503                        }
504                        hashes.append(
505                            &mut processed
506                                .into_iter()
507                                .map(|(p, h)| (p, Some(Ok(h))))
508                                .collect(),
509                        );
510                        if let Some(ref progress) = progress {
511                            progress.notify(JobType::Hashing, hashes.len(), total)
512                        }
513                        match deligate(
514                            pool.workers(),
515                            &mut paths,
516                            paths_per_jobs,
517                            &tolerance,
518                            &mut hashes,
519                            Some(worker_id),
520                        ) {
521                            JobCollecting::Err(err) => {
522                                break 'outer Err(err);
523                            }
524                            JobCollecting::Success => {}
525                            JobCollecting::NoJobs => {
526                                pool.shutdown();
527                            }
528                        };
529                    }
530                    Action::WorkerShutdownNotification => {
531                        // One of workers reported shutdowning state
532                    }
533                    Action::Error(path, err) => {
534                        if let Err(err) = check_err(path, err, &tolerance, &mut hashes) {
535                            break 'outer Err(err);
536                        }
537                    }
538                }
539                if pool.is_all_down() {
540                    if let Ok(next) = rx_queue.try_recv() {
541                        pending = Some(next);
542                        continue;
543                    } else {
544                        break 'outer Ok(());
545                    }
546                }
547            };
548            pool.shutdown().wait();
549            if let Err(err) = outer {
550                Err(err)
551            } else {
552                hashes.sort_by(|(a, _), (b, _)| a.cmp(b));
553                for (_, hash) in hashes.iter() {
554                    if let Some(Ok(hash)) = hash {
555                        summary.absorb(hash)?;
556                    }
557                }
558                summary.finish()?;
559                Ok((summary, hashes))
560            }
561        });
562        self.progress = opt.progress.map(Progress::channel);
563        let (summary, mut hashes) = handle
564            .join()
565            .map_err(|e| E::JoinError(format!("{e:?}")))??;
566        self.paths = mem::take(&mut hashes);
567        let valid = self
568            .paths
569            .iter()
570            .filter(|(_, h)| if let Some(h) = h { h.is_ok() } else { false })
571            .count();
572        self.hash = Some(if valid == 0 || self.paths.is_empty() {
573            Vec::new()
574        } else {
575            summary.hash()?.to_vec()
576        });
577        self.progress = opt.progress.map(Progress::channel);
578        let hash = if let Some(ref hash) = self.hash {
579            hash
580        } else {
581            unreachable!("Hash has been stored");
582        };
583        debug!(
584            "hashing of {} paths in {}µs / {}ms / {}s",
585            total,
586            now.elapsed().as_micros(),
587            now.elapsed().as_millis(),
588            now.elapsed().as_secs()
589        );
590        Ok(hash)
591    }
592
593    /// Returns an iterator to iterate over the collected `HashItem`s.
594    ///
595    /// # Returns
596    ///
597    /// - `WalkerIter<'_, H, R>`: An iterator to iterate over the collected `HashItem`s.
598    pub fn iter(&self) -> WalkerIter<'_> {
599        WalkerIter {
600            walker: self,
601            pos: 0,
602        }
603    }
604
605    /// This method is used each time before `collect()` is called. It resets the previous state to default.
606    fn reset(&mut self) {
607        self.paths = Vec::new();
608        self.hash = None;
609        self.breaker.reset();
610    }
611}
612/// An iterator over the calculated hashes in a `Walker`.
613///
614/// `WalkerIter` is used to iterate over `HashItem` that represent the paths and their corresponding hashes
615/// calculated by the `Walker` or related to path errors.
616pub struct WalkerIter<'a> {
617    /// A reference to the `Walker` instance.
618    walker: &'a Walker,
619    /// The current position in the `paths` vector.
620    pos: usize,
621}
622
623impl<'a> Iterator for WalkerIter<'a> {
624    type Item = &'a HashItem;
625
626    /// Advances the iterator and returns the next `HashItem`.
627    ///
628    /// # Returns
629    ///
630    /// - `Some(&HashItem)` if there is another `HashItem` to return.
631    /// - `None` if there are no more items to return.
632    fn next(&mut self) -> Option<Self::Item> {
633        if self.pos >= self.walker.paths.len() {
634            None
635        } else {
636            self.pos += 1;
637            Some(&self.walker.paths[self.pos - 1])
638        }
639    }
640}
641
642impl<'a> IntoIterator for &'a Walker {
643    type Item = &'a HashItem;
644    type IntoIter = WalkerIter<'a>;
645
646    /// Creates an iterator over the calculated hashes in the `Walker`.
647    ///
648    /// # Returns
649    ///
650    /// - `WalkerIter<'a>`: An iterator to iterate over the `HashItem` items.
651    fn into_iter(self) -> Self::IntoIter {
652        WalkerIter {
653            walker: self,
654            pos: 0,
655        }
656    }
657}