fshasher/collector/
mod.rs

1mod context;
2pub mod error;
3mod pool;
4mod worker;
5
6use crate::{
7    breaker::Breaker,
8    entry::Entry,
9    walker::{options, JobType, Progress},
10};
11use context::Context;
12pub use error::E;
13use log::{debug, error, warn};
14pub use pool::Pool;
15use std::{
16    path::PathBuf,
17    sync::mpsc::{channel, Receiver, Sender},
18    thread::{self, JoinHandle},
19    time::Instant,
20};
21pub use worker::Worker;
22
23/// Defines tolerance levels for errors during the collection of file paths. In some cases,
24/// an attempt to read a file or folder can cause an error (for example, a permissions error). To
25/// handle such situations, users can define the behavior of the collector.
26#[derive(Debug, Clone)]
27pub enum Tolerance {
28    /// All errors during collection will be logged but will not stop the collecting
29    /// process. A list of paths that caused errors will be returned by `collect()`.
30    LogErrors,
31    /// All errors during collection will be ignored without logging. The collecting
32    /// process will not be stopped. A list of paths that caused errors will be returned
33    /// by `collect()`.
34    DoNotLogErrors,
35    /// The collecting process will be stopped on the first error.
36    StopOnErrors,
37}
38
39impl Default for Tolerance {
40    fn default() -> Self {
41        Self::LogErrors
42    }
43}
44
45/// Message for communication between `collect()` and workers during collecting.
46#[derive(Debug)]
47pub enum Action {
48    /// Called by a worker to delegate reading of a found folder to another worker.
49    Delegate(PathBuf),
50    /// Called by a worker to report found paths to files.
51    Processed(Result<Vec<PathBuf>, (PathBuf, E)>),
52    /// Reported by a worker in case of an error.
53    ///
54    /// # Parameters
55    ///
56    /// - `PathBuf`: The path that caused the error.
57    /// - `E`: The error encountered during processing.
58    Error(PathBuf, E),
59}
60
61/// The result type for the `collect()` function.
62pub type CollectingResult = Result<(Vec<PathBuf>, Vec<(PathBuf, E)>), E>;
63
64/// Collects file paths based on the provided entry and filters.
65///
66/// # Parameters
67///
68/// - `progress`: An optional progress tracker.
69/// - `entry`: The entry point for collecting file paths.
70/// - `breaker`: A breaker to handle interruptions.
71/// - `tolerance`: The tolerance level for error handling.
72///   - `Tolerance::LogErrors`: Errors will be logged, but the collecting process will not be stopped.
73///   - `Tolerance::DoNotLogErrors`: Errors will be ignored, and the collecting process will not be stopped.
74///   - `Tolerance::StopOnErrors`: The collecting process will stop on any IO errors.
75/// - `threads`: The optional number of threads to use for processing. If this setting is not set
76///   (`None`), the number of threads will default to the number of available cores.
77///
78/// # Returns
79///
80/// - `CollectingResult`: A result containing a tuple of vectors with collected paths and ignored
81///   paths, or an error if the operation fails.
82///   - `Ok((Vec<PathBuf>, Vec<PathBuf>))` includes a list of collected file paths and a list of ignored
83///     paths (in case of tolerance: `Tolerance::LogErrors` or `Tolerance::DoNotLogErrors`). In the case of
84///     `Tolerance::StopOnErrors`, the list of ignored paths will always be empty.
85///
86/// # Errors
87///
88/// This function will return an error if the operation is interrupted or if there is an issue with
89/// threading. Returning errors is sensitive to the tolerance level. Only in the case of `Tolerance::StopOnErrors`
90/// will `collect()` return an error in case of IO errors.
91///
92/// # Examples
93///
94/// Example of tracking the collection of files
95/// ```
96/// use fshasher::{collect, Breaker, Entry, Progress, Tolerance};
97/// use std::{env::temp_dir, thread};
98/// let (progress, rx) = Progress::channel(10);
99/// let rx = rx.unwrap();
100/// thread::spawn(move || {
101///     while let Ok(tick) = rx.recv() {
102///         println!("{tick}");
103///     }
104///     println!("Collecting is finished");
105/// });
106/// let (included, ignored) = collect(
107///     &Some(progress),
108///     &Entry::from(temp_dir()).unwrap(),
109///     &Breaker::new(),
110///     &Tolerance::LogErrors,
111///     &None,
112/// )
113/// .unwrap();
114/// println!(
115///     "Found {} accessible paths to files; {} ignored",
116///     included.len(),
117///     ignored.len()
118/// );
119/// ```
120///
121/// Aborting the collecting operation with the first progress tick
122///
123/// ```
124/// use fshasher::{collect, Breaker, Entry, Progress, Tolerance};
125/// use std::{env::temp_dir, thread};
126///
127/// let (progress, rx) = Progress::channel(10);
128/// let rx = rx.unwrap();
129/// let breaker = Breaker::new();
130/// let breaker_inner = breaker.clone();
131/// thread::spawn(move || {
132///     let _ = rx.recv();
133///     println!("Breaking collecting with the first tick");
134///     breaker_inner.abort();
135/// });
136/// let result = collect(
137///     &Some(progress),
138///     &Entry::from(temp_dir()).unwrap(),
139///     &breaker,
140///     &Tolerance::LogErrors,
141///     &None,
142/// );
143/// // In case of empty dest folder, collect() will finish without errors,
144/// // because no time to check breaker state.
145/// println!("Collecting operation has been aborted: {result:?}");
146/// ```
147pub fn collect(
148    progress: &Option<Progress>,
149    entry: &Entry,
150    breaker: &Breaker,
151    tolerance: &Tolerance,
152    threads: &Option<usize>,
153) -> CollectingResult {
154    let now = Instant::now();
155    let (tx_queue, rx_queue): (Sender<Action>, Receiver<Action>) = channel();
156    tx_queue
157        .send(Action::Delegate(entry.entry.clone()))
158        .map_err(|_| E::ChannelErr(String::from("Master Queue")))?;
159    let progress = progress.clone();
160    let breaker = breaker.clone();
161    let tolerance = tolerance.clone();
162    let cores = thread::available_parallelism()
163        .ok()
164        .map(|n| n.get())
165        .ok_or(E::OptimalThreadsNumber)?;
166    if let Some(threads) = threads {
167        if cores * options::MAX_THREADS_MLT_TO_CORES < *threads {
168            return Err(E::OptimalThreadsNumber);
169        }
170    }
171    let threads = threads.unwrap_or(cores);
172    let entry_inner = entry.clone();
173    let mut context = Context::new(&entry.context);
174    let handle: JoinHandle<CollectingResult> = thread::spawn(move || {
175        let mut collected: Vec<PathBuf> = Vec::new();
176        let mut invalid: Vec<(PathBuf, E)> = Vec::new();
177        let mut workers = Pool::new(threads, entry_inner.clone(), tx_queue.clone(), &breaker);
178        debug!("Created pool with {threads} workers for paths collecting");
179        let mut pending: Option<Action> = None;
180        let mut queue: isize = 0;
181        if breaker.is_aborted() {
182            return Err(E::Aborted);
183        }
184        fn check(
185            path: PathBuf,
186            err: E,
187            invalid: &mut Vec<(PathBuf, E)>,
188            tolerance: &Tolerance,
189        ) -> Result<(), E> {
190            match tolerance {
191                Tolerance::StopOnErrors => {
192                    error!("entry: {}; error: {err}", path.display());
193                    return Err(err);
194                }
195                Tolerance::LogErrors => {
196                    warn!("entry: {}; error: {err}", path.display());
197                    invalid.push((path, err));
198                }
199                Tolerance::DoNotLogErrors => {
200                    invalid.push((path, err));
201                }
202            };
203            Ok(())
204        }
205        let result = 'listener: loop {
206            let next = if let Some(next) = pending.take() {
207                next
208            } else if let Ok(next) = rx_queue.recv() {
209                next
210            } else {
211                break 'listener Ok((collected, invalid));
212            };
213            if breaker.is_aborted() {
214                break 'listener Err(E::Aborted);
215            }
216            match next {
217                Action::Delegate(next) => {
218                    let Some(worker) = workers.get() else {
219                        break 'listener Err(E::NoAvailableWorkers);
220                    };
221                    if let Err(err) = context.consider(&next) {
222                        break 'listener Err(err);
223                    }
224                    if !context.filtered(&next) {
225                        continue;
226                    }
227                    queue += 1;
228                    worker.delegate(next);
229                    continue;
230                }
231                Action::Processed(processed) => {
232                    queue -= 1;
233                    match processed {
234                        Ok(paths) => {
235                            collected.append(
236                                &mut paths
237                                    .into_iter()
238                                    .filter(|p| context.filtered(p))
239                                    .collect::<Vec<PathBuf>>(),
240                            );
241                            if let Some(ref progress) = progress {
242                                let count = collected.len();
243                                progress.notify(JobType::Collecting, count, count);
244                            }
245                        }
246                        Err((path, err)) => {
247                            if let Err(err) = check(path, err, &mut invalid, &tolerance) {
248                                break 'listener Err(err);
249                            }
250                        }
251                    }
252                }
253                Action::Error(path, err) => {
254                    if let Err(err) = check(path, err, &mut invalid, &tolerance) {
255                        break 'listener Err(err);
256                    }
257                }
258            }
259            if let Ok(next) = rx_queue.try_recv() {
260                pending = Some(next);
261                continue;
262            }
263            if workers.is_all_done() && queue == 0 {
264                break 'listener Ok((collected, invalid));
265            }
266        };
267        workers.shutdown();
268        if breaker.is_aborted() {
269            Err(E::Aborted)
270        } else {
271            result
272        }
273    });
274    let (collected, ignored) = handle
275        .join()
276        .map_err(|e| E::JoinError(format!("{e:?}")))??;
277    debug!(
278        "Collected {} files (ignored: {}) in {}µs / {}ms / {}s; source: {}",
279        collected.len(),
280        ignored.len(),
281        now.elapsed().as_micros(),
282        now.elapsed().as_millis(),
283        now.elapsed().as_secs(),
284        entry.entry.display()
285    );
286    Ok((collected, ignored))
287}