broot/file_sum/
sum_computation.rs1use {
2 super::FileSum,
3 crate::{
4 app::*,
5 path::*,
6 task_sync::Dam,
7 },
8 rayon::{
9 ThreadPool,
10 ThreadPoolBuilder,
11 },
12 rustc_hash::{
13 FxHashMap,
14 },
15 std::{
16 convert::TryInto,
17 fs,
18 path::{
19 Path,
20 PathBuf,
21 },
22 sync::{
23 atomic::{
24 AtomicIsize,
25 Ordering,
26 },
27 Arc,
28 Mutex,
29 },
30 },
31 termimad::crossbeam::channel,
32};
33
34#[cfg(unix)]
35use {
36 std::os::unix::fs::MetadataExt,
37};
38
39struct DirSummer {
40 thread_count: usize,
41 thread_pool: ThreadPool,
42}
43
44#[cfg(unix)]
47#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
48struct NodeId {
49 inode: u64,
51 dev: u64,
53}
54
55impl DirSummer {
56 pub fn new(thread_count: usize) -> Self {
57 let thread_pool = ThreadPoolBuilder::new()
58 .num_threads(thread_count)
59 .build()
60 .unwrap();
61 Self {
62 thread_count,
63 thread_pool,
64 }
65 }
66 pub fn compute_dir_sum(
71 &mut self,
72 path: &Path,
73 cache: &mut FxHashMap<PathBuf, FileSum>,
74 dam: &Dam,
75 con: &AppContext,
76 ) -> Option<FileSum> {
77 let threads_count = self.thread_count;
78
79 if con.special_paths.sum(path) == Directive::Never {
80 return Some(FileSum::zero());
81 }
82
83 if path.starts_with("/proc") {
85 debug!("not summing in /proc");
86 return Some(FileSum::zero());
87 }
88 if path.starts_with("/run") && !path.starts_with("/run/media") {
89 debug!("not summing in /run");
90 return Some(FileSum::zero());
91 }
92
93 #[cfg(unix)]
95 let nodes = Arc::new(Mutex::new(rustc_hash::FxHashSet::<NodeId>::default()));
96
97 let mut busy = 0;
100 let mut sum = compute_file_sum(path);
101
102 let (dirs_sender, dirs_receiver) = channel::unbounded();
105
106 let special_paths = con.special_paths.reduce(path);
107
108 if let Ok(entries) = fs::read_dir(path) {
113 for e in entries.flatten() {
114 if let Ok(md) = e.metadata() {
115 if md.is_dir() {
116 let entry_path = e.path();
117
118 if con.special_paths.sum(&entry_path) == Directive::Never {
119 debug!("not summing special path {:?}", entry_path);
120 continue;
121 }
122
123 if let Some(entry_sum) = cache.get(&entry_path) {
125 sum += *entry_sum;
126 continue;
127 }
128
129 busy += 1;
132 dirs_sender.send(Some(entry_path)).unwrap();
133 } else {
134 #[cfg(unix)]
135 if md.nlink() > 1 {
136 let mut nodes = nodes.lock().unwrap();
137 let node_id = NodeId {
138 inode: md.ino(),
139 dev: md.dev(),
140 };
141 if !nodes.insert(node_id) {
142 continue;
144 }
145 }
146 }
147 sum += md_sum(&md);
148 }
149 }
150 }
151
152 if busy == 0 {
153 return Some(sum);
154 }
155
156 let busy = Arc::new(AtomicIsize::new(busy));
157
158 let (thread_sum_sender, thread_sum_receiver) = channel::bounded(threads_count);
161
162 for _ in 0..threads_count {
165 let busy = Arc::clone(&busy);
166 let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone());
167
168 #[cfg(unix)]
169 let nodes = nodes.clone();
170
171 let special_paths = special_paths.clone();
172
173 let observer = dam.observer();
174 let thread_sum_sender = thread_sum_sender.clone();
175 self.thread_pool.spawn(move || {
176 let mut thread_sum = FileSum::zero();
177 loop {
178 let o = dirs_receiver.recv();
179 if let Ok(Some(open_dir)) = o {
180 if let Ok(entries) = fs::read_dir(open_dir) {
181 for e in entries.flatten() {
182 if let Ok(md) = e.metadata() {
183 if md.is_dir() {
184 let path = e.path();
185
186 if special_paths.sum(&path) == Directive::Never {
187 debug!("not summing (deep) special path {:?}", path);
188 continue;
189 }
190
191 busy.fetch_add(1, Ordering::Relaxed);
194 dirs_sender.send(Some(path)).unwrap();
195 } else {
196 #[cfg(unix)]
197 if md.nlink() > 1 {
198 let mut nodes = nodes.lock().unwrap();
199 let node_id = NodeId {
200 inode: md.ino(),
201 dev: md.dev(),
202 };
203 if !nodes.insert(node_id) {
204 continue;
206 }
207 }
208 }
209 thread_sum += md_sum(&md);
210 } else {
211 thread_sum.incr();
213 }
214 }
215 }
216 busy.fetch_sub(1, Ordering::Relaxed);
217 }
218 if observer.has_event() {
219 dirs_sender.send(None).unwrap(); break;
221 }
222 if busy.load(Ordering::Relaxed) < 1 {
223 dirs_sender.send(None).unwrap(); break;
225 }
226 }
227 thread_sum_sender.send(thread_sum).unwrap();
228 });
229 }
230 for _ in 0..threads_count {
232 match thread_sum_receiver.recv() {
233 Ok(thread_sum) => {
234 sum += thread_sum;
235 }
236 Err(e) => {
237 warn!("Error while recv summing thread result : {:?}", e);
238 }
239 }
240 }
241 if dam.has_event() {
242 return None;
243 }
244 Some(sum)
245 }
246}
247
248pub fn compute_dir_sum(
253 path: &Path,
254 cache: &mut FxHashMap<PathBuf, FileSum>,
255 dam: &Dam,
256 con: &AppContext,
257) -> Option<FileSum> {
258 use once_cell::sync::OnceCell;
259 static DIR_SUMMER: OnceCell<Mutex<DirSummer>> = OnceCell::new();
260 DIR_SUMMER
261 .get_or_init(|| Mutex::new(DirSummer::new(con.file_sum_threads_count)))
262 .lock()
263 .unwrap()
264 .compute_dir_sum(path, cache, dam, con)
265}
266
267pub fn compute_file_sum(path: &Path) -> FileSum {
269 match fs::symlink_metadata(path) {
270 Ok(md) => {
271 let seconds = extract_seconds(&md);
272
273 #[cfg(unix)]
274 {
275 let nominal_size = md.size();
276 let block_size = md.blocks() * 512;
277 FileSum::new(
278 block_size.min(nominal_size),
279 block_size < nominal_size,
280 1,
281 seconds,
282 )
283 }
284
285 #[cfg(not(unix))]
286 FileSum::new(md.len(), false, 1, seconds)
287 }
288 Err(_) => FileSum::new(0, false, 1, 0),
289 }
290}
291
292#[cfg(unix)]
293#[inline(always)]
294fn extract_seconds(md: &fs::Metadata) -> u32 {
295 md.mtime().try_into().unwrap_or(0)
296}
297
298#[cfg(not(unix))]
299#[inline(always)]
300fn extract_seconds(md: &fs::Metadata) -> u32 {
301 if let Ok(st) = md.modified() {
302 if let Ok(d) = st.duration_since(std::time::UNIX_EPOCH) {
303 if let Ok(secs) = d.as_secs().try_into() {
304 return secs;
305 }
306 }
307 }
308 0
309}
310
311#[inline(always)]
312fn md_sum(md: &fs::Metadata) -> FileSum {
313 #[cfg(unix)]
314 let size = md.blocks() * 512;
315
316 #[cfg(not(unix))]
317 let size = md.len();
318
319 let seconds = extract_seconds(md);
320 FileSum::new(size, false, 1, seconds)
321}