#![warn(clippy::all, missing_docs, nonstandard_style, future_incompatible)]
use std::fs::File;
use std::io::{self, Read};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver, Sender};
use threadpool::ThreadPool;
use walkdir::WalkDir;
#[derive(Default)]
pub struct Warmer {
dirs: Vec<PathBuf>,
files: Vec<PathBuf>,
num_threads: usize,
follow_links: bool,
}
pub struct Iter {
rx: Receiver<u64>,
}
impl Warmer {
pub fn new(num_threads: usize, follow_links: bool) -> Self {
Self {
num_threads,
follow_links,
..Default::default()
}
}
pub fn add_dirs(&mut self, paths: &[impl AsRef<Path>]) {
let mut paths: Vec<_> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
self.dirs.append(&mut paths);
}
pub fn add_files(&mut self, paths: &[impl AsRef<Path>]) {
let mut paths: Vec<_> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
self.files.append(&mut paths);
}
pub fn estimate(&self) -> u64 {
self.iter_estimate().sum()
}
pub fn warm(&self) -> u64 {
self.iter_warm().sum()
}
pub fn iter_estimate(&self) -> Iter {
let (tx, rx) = channel();
let dirs = self.dirs.clone();
let files = self.files.clone();
let num_threads = self.num_threads;
let follow_links = self.follow_links;
std::thread::spawn(move || {
let pool = ThreadPool::new(num_threads);
for file in files {
let tx = tx.clone();
pool.execute(move || {
if let Ok(Some(file)) = resolve_file(file) {
if let Ok(size) = file.metadata().map(|m| m.len()) {
tx.send(size).ok();
}
}
});
}
for dir in dirs {
for entry in walker(dir, follow_links) {
let tx = tx.clone();
pool.execute(move || {
if let Ok(size) = entry.metadata().map(|m| m.len()) {
tx.send(size).ok();
}
});
}
}
});
Iter { rx }
}
pub fn iter_warm(&self) -> Iter {
let (tx, rx) = channel();
let dirs = self.dirs.clone();
let files = self.files.clone();
let num_threads = self.num_threads;
let follow_links = self.follow_links;
std::thread::spawn(move || {
let pool = ThreadPool::new(num_threads);
for file in files {
let tx = tx.clone();
pool.execute(move || {
if let Ok(Some(file)) = resolve_file(file) {
warm_file(file, tx);
}
});
}
for dir in dirs {
for entry in walker(dir, follow_links) {
let tx = tx.clone();
pool.execute(move || warm_file(entry.path(), tx));
}
}
});
Iter { rx }
}
}
fn resolve_file(path: PathBuf) -> io::Result<Option<PathBuf>> {
if path.is_file() {
Ok(Some(path))
} else if path.is_symlink() {
path.canonicalize().map(Some)
} else {
Ok(None)
}
}
fn warm_file(path: impl AsRef<Path>, tx: Sender<u64>) {
if let Ok(mut file) = File::open(path) {
let mut buffer = [0; 1024];
loop {
let count = file.read(&mut buffer).unwrap_or_default();
if count == 0 {
break;
}
tx.send(count as u64).ok();
}
}
}
fn walker(path: impl AsRef<Path>, follow_links: bool) -> impl Iterator<Item = walkdir::DirEntry> {
let mut w = WalkDir::new(path);
if follow_links {
w = w.follow_links(true);
}
w.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
}
impl Iterator for Iter {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
self.rx.recv().ok()
}
}