use crate::args;
use ansi_term::Colour::{Green, Red, Yellow};
use anyhow::{Context, Error};
use fs_err as fs;
use human_bytes::human_bytes;
use human_format::Formatter;
use jwalk::{DirEntry, Parallelism, WalkDir};
use std::collections::HashSet;
use std::fs::read_dir;
use std::fs::Metadata;
use std::os::unix::fs::MetadataExt;
use std::path::Path;
use std::path::PathBuf;
use std::process;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
pub const ALERT_COUNT: u64 = 10_000;
pub const BLACKLIST_COUNT: u64 = 100_000;
const ERROR_EXIT: i32 = 1;
pub const STATUS_SECONDS: u64 = 20;
pub fn parallel_search(
path: &PathBuf,
path_metadata: Metadata,
size_inode_ratio: u64,
shutdown: Arc<AtomicBool>,
args: Arc<args::Args>,
) -> Result<(), Error> {
let skip_path = args.skip_path.iter().cloned().collect::<HashSet<_>>();
let pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(args.threads)
.build()
.context("Unable to spawn calibration thread pool")?,
);
let dir_count = Arc::new(AtomicU64::new(0));
if args.updates > 0 {
let dir_count_status = dir_count.clone();
let sleep_delay = args.updates;
pool.spawn(move || loop {
sleep(Duration::from_secs(sleep_delay));
let count = dir_count_status.load(Ordering::SeqCst);
println!(
"Processed {} directories so far, next update in {} seconds",
Green.paint(count.to_string()),
sleep_delay
);
});
}
for _ in WalkDir::new(path)
.skip_hidden(false)
.sort(false)
.parallelism(Parallelism::RayonExistingPool {
pool,
busy_timeout: None,
})
.process_read_dir(move |_, _, _, children| {
if shutdown.load(Ordering::SeqCst) {
println!("Requested program exit, stopping scan...");
process::exit(ERROR_EXIT);
}
for dir_entry_result in children.iter_mut() {
process_dir_entry(
&path_metadata,
size_inode_ratio,
dir_entry_result,
&skip_path,
&args,
&dir_count,
);
}
})
{}
Ok(())
}
fn process_dir_entry<E>(
path_metadata: &Metadata,
size_inode_ratio: u64,
dir_entry_result: &mut Result<DirEntry<((), ())>, E>,
skip_path: &HashSet<PathBuf>,
args: &Arc<args::Args>,
dir_count_walk: &Arc<AtomicU64>,
) {
if let Ok(dir_entry) = dir_entry_result {
if dir_entry.file_type.is_dir() {
if let Some(full_path) = dir_entry.read_children_path.as_ref() {
dir_count_walk.fetch_add(1, Ordering::SeqCst);
if !skip_path.is_empty() && skip_path.contains(&full_path.to_path_buf()) {
println!(
"Skipping further scan at {} as requested",
full_path.display()
);
dir_entry.read_children_path = None;
return;
}
if let Ok(dir_entry_metadata) = fs::metadata(full_path) {
if args.one_filesystem && (dir_entry_metadata.dev() != path_metadata.dev()) {
println!(
"Identified filesystem boundary at {}, skipping...",
full_path.display()
);
dir_entry.read_children_path = None;
return;
}
let size = dir_entry_metadata.size();
let approx_files = size / size_inode_ratio;
if approx_files > args.blacklist_threshold {
print_offender(full_path, size, approx_files, args.accurate, true);
dir_entry.read_children_path = None;
} else if approx_files > args.alert_threshold {
print_offender(full_path, size, approx_files, args.accurate, false);
}
}
}
}
}
}
#[allow(clippy::cast_precision_loss)]
fn print_offender(
full_path: &Arc<Path>,
size: u64,
approx_files: u64,
accurate: bool,
red_alert: bool,
) {
let human_files = if accurate {
let exact_files = match read_dir(full_path) {
Ok(r) => r.count() as u64,
Err(_) => approx_files,
};
Formatter::new().format(exact_files as f64)
} else {
Formatter::new().format(approx_files as f64)
};
println!(
"Found directory {} with inode size {} and {}{} files",
full_path.display(),
human_bytes(size as f64),
if accurate { "" } else { "approx " },
if red_alert {
Red.paint(human_files)
} else {
Yellow.paint(human_files)
}
);
}