use {
super::FileSum,
crate::{
app::*,
path::*,
task_sync::Dam,
},
ahash::AHashMap,
crossbeam::channel,
rayon::{ThreadPool, ThreadPoolBuilder},
std::{
convert::TryInto,
fs,
path::{Path, PathBuf},
sync::{
atomic::{AtomicIsize, Ordering},
Arc,
Mutex,
},
},
};
#[cfg(unix)]
use {
fnv::FnvHashSet,
std::{
os::unix::fs::MetadataExt,
},
};
struct DirSummer {
thread_count: usize,
thread_pool: ThreadPool,
}
/// a node id, taking the device into account to be sure to discriminate
/// nodes with the same inode but on different devices
#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
struct NodeId {
/// inode number
inode: u64,
/// device number
dev: u64,
}
#[inline(always)]
fn is_ignored(path: &Path, special_paths: &[SpecialPath]) -> bool {
match special_paths.find(path) {
SpecialHandling::NoEnter | SpecialHandling::Hide => true,
SpecialHandling::None | SpecialHandling::Enter => false,
}
}
impl DirSummer {
pub fn new(thread_count: usize) -> Self {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(thread_count)
.build()
.unwrap();
Self {
thread_count,
thread_pool,
}
}
/// compute the consolidated numbers for a directory, with implementation
/// varying depending on the OS:
/// On unix, the computation is done on blocks of 512 bytes
/// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks
pub fn compute_dir_sum(
&mut self,
path: &Path,
cache: &mut AHashMap<PathBuf, FileSum>,
dam: &Dam,
con: &AppContext,
) -> Option<FileSum> {
let threads_count = self.thread_count;
if is_ignored(path, &con.special_paths) {
return Some(FileSum::zero());
}
// there are problems in /proc - See issue #637
if path.starts_with("/proc") {
debug!("not summing in /proc");
return Some(FileSum::zero());
}
if path.starts_with("/run") {
debug!("not summing in /run");
return Some(FileSum::zero());
}
// to avoid counting twice a node, we store their id in a set
#[cfg(unix)]
let nodes = Arc::new(Mutex::new(FnvHashSet::<NodeId>::default()));
// busy is the number of directories which are either being processed or queued
// We use this count to determine when threads can stop waiting for tasks
let mut busy = 0;
let mut sum = compute_file_sum(path);
// this MPMC channel contains the directory paths which must be handled.
// A None means there's nothing left and the thread may send its result and stop
let (dirs_sender, dirs_receiver) = channel::unbounded();
let special_paths: Vec<SpecialPath> = con.special_paths.iter()
.filter(|sp| sp.can_have_matches_in(path))
.cloned()
.collect();
// the first level is managed a little differently: we look at the cache
// before adding. This enables faster computations in two cases:
// - for the root line (assuming it's computed after the content)
// - when we navigate up the tree
if let Ok(entries) = fs::read_dir(path) {
for e in entries.flatten() {
if let Ok(md) = e.metadata() {
if md.is_dir() {
let entry_path = e.path();
if is_ignored(&entry_path, &special_paths) {
debug!("not summing special path {:?}", entry_path);
continue;
}
// we check the cache
if let Some(entry_sum) = cache.get(&entry_path) {
sum += *entry_sum;
continue;
}
// we add the directory to the channel of dirs needing
// processing
busy += 1;
dirs_sender.send(Some(entry_path)).unwrap();
} else {
#[cfg(unix)]
if md.nlink() > 1 {
let mut nodes = nodes.lock().unwrap();
let node_id = NodeId {
inode: md.ino(),
dev: md.dev(),
};
if !nodes.insert(node_id) {
// it was already in the set
continue;
}
}
}
sum += md_sum(&md);
}
}
}
if busy == 0 {
return Some(sum);
}
let busy = Arc::new(AtomicIsize::new(busy));
// this MPMC channel is here for the threads to send their results
// at end of computation
let (thread_sum_sender, thread_sum_receiver) = channel::bounded(threads_count);
// Each thread does a summation without merge and the data are merged
// at the end (this avoids waiting for a mutex during computation)
for _ in 0..threads_count {
let busy = Arc::clone(&busy);
let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone());
#[cfg(unix)]
let nodes = nodes.clone();
let special_paths = special_paths.clone();
let observer = dam.observer();
let thread_sum_sender = thread_sum_sender.clone();
self.thread_pool.spawn(move || {
let mut thread_sum = FileSum::zero();
loop {
let o = dirs_receiver.recv();
if let Ok(Some(open_dir)) = o {
if let Ok(entries) = fs::read_dir(&open_dir) {
for e in entries.flatten() {
if let Ok(md) = e.metadata() {
if md.is_dir() {
let path = e.path();
if is_ignored(&path, &special_paths) {
debug!("not summing (deep) special path {:?}", path);
continue;
}
// we add the directory to the channel of dirs needing
// processing
busy.fetch_add(1, Ordering::Relaxed);
dirs_sender.send(Some(path)).unwrap();
} else {
#[cfg(unix)]
if md.nlink() > 1 {
let mut nodes = nodes.lock().unwrap();
let node_id = NodeId {
inode: md.ino(),
dev: md.dev(),
};
if !nodes.insert(node_id) {
// it was already in the set
continue;
}
}
}
thread_sum += md_sum(&md);
} else {
// we can't measure much but we can count the file
thread_sum.incr();
}
}
}
busy.fetch_sub(1, Ordering::Relaxed);
}
if observer.has_event() {
dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
break;
}
if busy.load(Ordering::Relaxed) < 1 {
dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
break;
}
}
thread_sum_sender.send(thread_sum).unwrap();
});
}
// Wait for the threads to finish and consolidate their results
for _ in 0..threads_count {
match thread_sum_receiver.recv() {
Ok(thread_sum) => {
sum += thread_sum;
}
Err(e) => {
warn!("Error while recv summing thread result : {:?}", e);
}
}
}
if dam.has_event() {
return None;
}
Some(sum)
}
}
/// compute the consolidated numbers for a directory, with implementation
/// varying depending on the OS:
/// On unix, the computation is done on blocks of 512 bytes
/// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks
pub fn compute_dir_sum(
path: &Path,
cache: &mut AHashMap<PathBuf, FileSum>,
dam: &Dam,
con: &AppContext,
) -> Option<FileSum> {
use once_cell::sync::OnceCell;
static DIR_SUMMER: OnceCell<Mutex<DirSummer>> = OnceCell::new();
DIR_SUMMER
.get_or_init(|| {
Mutex::new(DirSummer::new(con.file_sum_threads_count))
})
.lock().unwrap()
.compute_dir_sum(path, cache, dam, con)
}
/// compute the sum for a regular file (not a folder)
pub fn compute_file_sum(path: &Path) -> FileSum {
match fs::symlink_metadata(path) {
Ok(md) => {
let seconds = extract_seconds(&md);
#[cfg(unix)]
{
let nominal_size = md.size();
let block_size = md.blocks() * 512;
FileSum::new(
block_size.min(nominal_size),
block_size < nominal_size,
1,
seconds,
)
}
#[cfg(not(unix))]
FileSum::new(md.len(), false, 1, seconds)
}
Err(_) => FileSum::new(0, false, 1, 0),
}
}
#[cfg(unix)]
#[inline(always)]
fn extract_seconds(md: &fs::Metadata) -> u32 {
md.mtime().try_into().unwrap_or(0)
}
#[cfg(not(unix))]
#[inline(always)]
fn extract_seconds(md: &fs::Metadata) -> u32 {
if let Ok(st) = md.modified() {
if let Ok(d) = st.duration_since(std::time::UNIX_EPOCH) {
if let Ok(secs) = d.as_secs().try_into() {
return secs;
}
}
}
0
}
#[inline(always)]
fn md_sum(md: &fs::Metadata) -> FileSum {
#[cfg(unix)]
let size = md.blocks() * 512;
#[cfg(not(unix))]
let size = md.len();
let seconds = extract_seconds(md);
FileSum::new(size, false, 1, seconds)
}