fshasher/collector/mod.rs
1mod context;
2pub mod error;
3mod pool;
4mod worker;
5
6use crate::{
7 breaker::Breaker,
8 entry::Entry,
9 walker::{options, JobType, Progress},
10};
11use context::Context;
12pub use error::E;
13use log::{debug, error, warn};
14pub use pool::Pool;
15use std::{
16 path::PathBuf,
17 sync::mpsc::{channel, Receiver, Sender},
18 thread::{self, JoinHandle},
19 time::Instant,
20};
21pub use worker::Worker;
22
23/// Defines tolerance levels for errors during the collection of file paths. In some cases,
24/// an attempt to read a file or folder can cause an error (for example, a permissions error). To
25/// handle such situations, users can define the behavior of the collector.
26#[derive(Debug, Clone)]
27pub enum Tolerance {
28 /// All errors during collection will be logged but will not stop the collecting
29 /// process. A list of paths that caused errors will be returned by `collect()`.
30 LogErrors,
31 /// All errors during collection will be ignored without logging. The collecting
32 /// process will not be stopped. A list of paths that caused errors will be returned
33 /// by `collect()`.
34 DoNotLogErrors,
35 /// The collecting process will be stopped on the first error.
36 StopOnErrors,
37}
38
39impl Default for Tolerance {
40 fn default() -> Self {
41 Self::LogErrors
42 }
43}
44
45/// Message for communication between `collect()` and workers during collecting.
46#[derive(Debug)]
47pub enum Action {
48 /// Called by a worker to delegate reading of a found folder to another worker.
49 Delegate(PathBuf),
50 /// Called by a worker to report found paths to files.
51 Processed(Result<Vec<PathBuf>, (PathBuf, E)>),
52 /// Reported by a worker in case of an error.
53 ///
54 /// # Parameters
55 ///
56 /// - `PathBuf`: The path that caused the error.
57 /// - `E`: The error encountered during processing.
58 Error(PathBuf, E),
59}
60
61/// The result type for the `collect()` function.
62pub type CollectingResult = Result<(Vec<PathBuf>, Vec<(PathBuf, E)>), E>;
63
64/// Collects file paths based on the provided entry and filters.
65///
66/// # Parameters
67///
68/// - `progress`: An optional progress tracker.
69/// - `entry`: The entry point for collecting file paths.
70/// - `breaker`: A breaker to handle interruptions.
71/// - `tolerance`: The tolerance level for error handling.
72/// - `Tolerance::LogErrors`: Errors will be logged, but the collecting process will not be stopped.
73/// - `Tolerance::DoNotLogErrors`: Errors will be ignored, and the collecting process will not be stopped.
74/// - `Tolerance::StopOnErrors`: The collecting process will stop on any IO errors.
75/// - `threads`: The optional number of threads to use for processing. If this setting is not set
76/// (`None`), the number of threads will default to the number of available cores.
77///
78/// # Returns
79///
80/// - `CollectingResult`: A result containing a tuple of vectors with collected paths and ignored
81/// paths, or an error if the operation fails.
82/// - `Ok((Vec<PathBuf>, Vec<PathBuf>))` includes a list of collected file paths and a list of ignored
83/// paths (in case of tolerance: `Tolerance::LogErrors` or `Tolerance::DoNotLogErrors`). In the case of
84/// `Tolerance::StopOnErrors`, the list of ignored paths will always be empty.
85///
86/// # Errors
87///
88/// This function will return an error if the operation is interrupted or if there is an issue with
89/// threading. Returning errors is sensitive to the tolerance level. Only in the case of `Tolerance::StopOnErrors`
90/// will `collect()` return an error in case of IO errors.
91///
92/// # Examples
93///
94/// Example of tracking the collection of files
95/// ```
96/// use fshasher::{collect, Breaker, Entry, Progress, Tolerance};
97/// use std::{env::temp_dir, thread};
98/// let (progress, rx) = Progress::channel(10);
99/// let rx = rx.unwrap();
100/// thread::spawn(move || {
101/// while let Ok(tick) = rx.recv() {
102/// println!("{tick}");
103/// }
104/// println!("Collecting is finished");
105/// });
106/// let (included, ignored) = collect(
107/// &Some(progress),
108/// &Entry::from(temp_dir()).unwrap(),
109/// &Breaker::new(),
110/// &Tolerance::LogErrors,
111/// &None,
112/// )
113/// .unwrap();
114/// println!(
115/// "Found {} accessible paths to files; {} ignored",
116/// included.len(),
117/// ignored.len()
118/// );
119/// ```
120///
121/// Aborting the collecting operation with the first progress tick
122///
123/// ```
124/// use fshasher::{collect, Breaker, Entry, Progress, Tolerance};
125/// use std::{env::temp_dir, thread};
126///
127/// let (progress, rx) = Progress::channel(10);
128/// let rx = rx.unwrap();
129/// let breaker = Breaker::new();
130/// let breaker_inner = breaker.clone();
131/// thread::spawn(move || {
132/// let _ = rx.recv();
133/// println!("Breaking collecting with the first tick");
134/// breaker_inner.abort();
135/// });
136/// let result = collect(
137/// &Some(progress),
138/// &Entry::from(temp_dir()).unwrap(),
139/// &breaker,
140/// &Tolerance::LogErrors,
141/// &None,
142/// );
143/// // In case of empty dest folder, collect() will finish without errors,
144/// // because no time to check breaker state.
145/// println!("Collecting operation has been aborted: {result:?}");
146/// ```
147pub fn collect(
148 progress: &Option<Progress>,
149 entry: &Entry,
150 breaker: &Breaker,
151 tolerance: &Tolerance,
152 threads: &Option<usize>,
153) -> CollectingResult {
154 let now = Instant::now();
155 let (tx_queue, rx_queue): (Sender<Action>, Receiver<Action>) = channel();
156 tx_queue
157 .send(Action::Delegate(entry.entry.clone()))
158 .map_err(|_| E::ChannelErr(String::from("Master Queue")))?;
159 let progress = progress.clone();
160 let breaker = breaker.clone();
161 let tolerance = tolerance.clone();
162 let cores = thread::available_parallelism()
163 .ok()
164 .map(|n| n.get())
165 .ok_or(E::OptimalThreadsNumber)?;
166 if let Some(threads) = threads {
167 if cores * options::MAX_THREADS_MLT_TO_CORES < *threads {
168 return Err(E::OptimalThreadsNumber);
169 }
170 }
171 let threads = threads.unwrap_or(cores);
172 let entry_inner = entry.clone();
173 let mut context = Context::new(&entry.context);
174 let handle: JoinHandle<CollectingResult> = thread::spawn(move || {
175 let mut collected: Vec<PathBuf> = Vec::new();
176 let mut invalid: Vec<(PathBuf, E)> = Vec::new();
177 let mut workers = Pool::new(threads, entry_inner.clone(), tx_queue.clone(), &breaker);
178 debug!("Created pool with {threads} workers for paths collecting");
179 let mut pending: Option<Action> = None;
180 let mut queue: isize = 0;
181 if breaker.is_aborted() {
182 return Err(E::Aborted);
183 }
184 fn check(
185 path: PathBuf,
186 err: E,
187 invalid: &mut Vec<(PathBuf, E)>,
188 tolerance: &Tolerance,
189 ) -> Result<(), E> {
190 match tolerance {
191 Tolerance::StopOnErrors => {
192 error!("entry: {}; error: {err}", path.display());
193 return Err(err);
194 }
195 Tolerance::LogErrors => {
196 warn!("entry: {}; error: {err}", path.display());
197 invalid.push((path, err));
198 }
199 Tolerance::DoNotLogErrors => {
200 invalid.push((path, err));
201 }
202 };
203 Ok(())
204 }
205 let result = 'listener: loop {
206 let next = if let Some(next) = pending.take() {
207 next
208 } else if let Ok(next) = rx_queue.recv() {
209 next
210 } else {
211 break 'listener Ok((collected, invalid));
212 };
213 if breaker.is_aborted() {
214 break 'listener Err(E::Aborted);
215 }
216 match next {
217 Action::Delegate(next) => {
218 let Some(worker) = workers.get() else {
219 break 'listener Err(E::NoAvailableWorkers);
220 };
221 if let Err(err) = context.consider(&next) {
222 break 'listener Err(err);
223 }
224 if !context.filtered(&next) {
225 continue;
226 }
227 queue += 1;
228 worker.delegate(next);
229 continue;
230 }
231 Action::Processed(processed) => {
232 queue -= 1;
233 match processed {
234 Ok(paths) => {
235 collected.append(
236 &mut paths
237 .into_iter()
238 .filter(|p| context.filtered(p))
239 .collect::<Vec<PathBuf>>(),
240 );
241 if let Some(ref progress) = progress {
242 let count = collected.len();
243 progress.notify(JobType::Collecting, count, count);
244 }
245 }
246 Err((path, err)) => {
247 if let Err(err) = check(path, err, &mut invalid, &tolerance) {
248 break 'listener Err(err);
249 }
250 }
251 }
252 }
253 Action::Error(path, err) => {
254 if let Err(err) = check(path, err, &mut invalid, &tolerance) {
255 break 'listener Err(err);
256 }
257 }
258 }
259 if let Ok(next) = rx_queue.try_recv() {
260 pending = Some(next);
261 continue;
262 }
263 if workers.is_all_done() && queue == 0 {
264 break 'listener Ok((collected, invalid));
265 }
266 };
267 workers.shutdown();
268 if breaker.is_aborted() {
269 Err(E::Aborted)
270 } else {
271 result
272 }
273 });
274 let (collected, ignored) = handle
275 .join()
276 .map_err(|e| E::JoinError(format!("{e:?}")))??;
277 debug!(
278 "Collected {} files (ignored: {}) in {}µs / {}ms / {}s; source: {}",
279 collected.len(),
280 ignored.len(),
281 now.elapsed().as_micros(),
282 now.elapsed().as_millis(),
283 now.elapsed().as_secs(),
284 entry.entry.display()
285 );
286 Ok((collected, ignored))
287}