1#![warn(clippy::all, missing_docs, nonstandard_style, future_incompatible)]
20
21use std::fs::File;
22use std::io::{self, Read};
23use std::path::{Path, PathBuf};
24use std::sync::mpsc::{channel, Receiver, Sender};
25use threadpool::ThreadPool;
26use walkdir::WalkDir;
27
28#[derive(Default)]
30pub struct Warmer {
31 dirs: Vec<PathBuf>,
32 files: Vec<PathBuf>,
33 num_threads: usize,
34 follow_links: bool,
35}
36
37pub struct Iter {
39 rx: Receiver<u64>,
40}
41
42impl Warmer {
43 pub fn new(num_threads: usize, follow_links: bool) -> Self {
45 Self {
46 num_threads,
47 follow_links,
48 ..Default::default()
49 }
50 }
51
52 pub fn add_dirs(&mut self, paths: &[impl AsRef<Path>]) {
54 let mut paths: Vec<_> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
55 self.dirs.append(&mut paths);
56 }
57
58 pub fn add_files(&mut self, paths: &[impl AsRef<Path>]) {
60 let mut paths: Vec<_> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
61 self.files.append(&mut paths);
62 }
63
64 pub fn estimate(&self) -> u64 {
66 self.iter_estimate().sum()
67 }
68
69 pub fn warm(&self) -> u64 {
71 self.iter_warm().sum()
72 }
73
74 pub fn iter_estimate(&self) -> Iter {
76 let (tx, rx) = channel();
77 let dirs = self.dirs.clone();
78 let files = self.files.clone();
79 let num_threads = self.num_threads;
80 let follow_links = self.follow_links;
81 std::thread::spawn(move || {
82 let pool = ThreadPool::new(num_threads);
83 for file in files {
84 let tx = tx.clone();
85 pool.execute(move || {
86 if let Ok(Some(file)) = resolve_file(file) {
87 if let Ok(size) = file.metadata().map(|m| m.len()) {
88 tx.send(size).ok();
89 }
90 }
91 });
92 }
93 for dir in dirs {
94 for entry in walker(dir, follow_links) {
95 let tx = tx.clone();
96 pool.execute(move || {
97 if let Ok(size) = entry.metadata().map(|m| m.len()) {
98 tx.send(size).ok();
99 }
100 });
101 }
102 }
103 });
104 Iter { rx }
105 }
106
107 pub fn iter_warm(&self) -> Iter {
109 let (tx, rx) = channel();
110 let dirs = self.dirs.clone();
111 let files = self.files.clone();
112 let num_threads = self.num_threads;
113 let follow_links = self.follow_links;
114 std::thread::spawn(move || {
115 let pool = ThreadPool::new(num_threads);
116 for file in files {
117 let tx = tx.clone();
118 pool.execute(move || {
119 if let Ok(Some(file)) = resolve_file(file) {
120 warm_file(file, tx);
121 }
122 });
123 }
124 for dir in dirs {
125 for entry in walker(dir, follow_links) {
126 let tx = tx.clone();
127 pool.execute(move || warm_file(entry.path(), tx));
128 }
129 }
130 });
131 Iter { rx }
132 }
133}
134
135fn resolve_file(path: PathBuf) -> io::Result<Option<PathBuf>> {
137 if path.is_file() {
138 Ok(Some(path))
139 } else if path.is_symlink() {
140 path.canonicalize().map(Some)
141 } else {
142 Ok(None)
143 }
144}
145
146fn warm_file(path: impl AsRef<Path>, tx: Sender<u64>) {
148 if let Ok(mut file) = File::open(path) {
149 let mut buffer = [0; 1024];
150 loop {
151 let count = file.read(&mut buffer).unwrap_or_default();
152 if count == 0 {
153 break;
154 }
155 tx.send(count as u64).ok();
156 }
157 }
158}
159
160fn walker(path: impl AsRef<Path>, follow_links: bool) -> impl Iterator<Item = walkdir::DirEntry> {
162 let mut w = WalkDir::new(path);
163 if follow_links {
164 w = w.follow_links(true);
165 }
166 w.into_iter()
167 .filter_map(|e| e.ok())
168 .filter(|e| e.file_type().is_file())
169}
170
171impl Iterator for Iter {
172 type Item = u64;
173
174 fn next(&mut self) -> Option<Self::Item> {
176 self.rx.recv().ok()
177 }
178}