broot/file_sum/
sum_computation.rs1use {
2 super::FileSum,
3 crate::{
4 app::*,
5 path::*,
6 task_sync::Dam,
7 },
8 rayon::{
9 ThreadPool,
10 ThreadPoolBuilder,
11 },
12 rustc_hash::FxHashMap,
13 std::{
14 convert::TryInto,
15 fs,
16 path::{
17 Path,
18 PathBuf,
19 },
20 sync::{
21 Arc,
22 Mutex,
23 atomic::{
24 AtomicIsize,
25 Ordering,
26 },
27 },
28 },
29 termimad::crossbeam::channel,
30};
31
32#[cfg(unix)]
33use std::os::unix::fs::MetadataExt;
34
35struct DirSummer {
36 thread_count: usize,
37 thread_pool: ThreadPool,
38}
39
40#[cfg(unix)]
43#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
44struct NodeId {
45 inode: u64,
47 dev: u64,
49}
50
51impl DirSummer {
52 pub fn new(thread_count: usize) -> Self {
53 let thread_pool = ThreadPoolBuilder::new()
54 .num_threads(thread_count)
55 .build()
56 .unwrap();
57 Self {
58 thread_count,
59 thread_pool,
60 }
61 }
62 pub fn compute_dir_sum(
67 &mut self,
68 path: &Path,
69 cache: &mut FxHashMap<PathBuf, FileSum>,
70 dam: &Dam,
71 con: &AppContext,
72 ) -> Option<FileSum> {
73 let threads_count = self.thread_count;
74
75 if con.special_paths.sum(path) == Directive::Never {
76 return Some(FileSum::zero());
77 }
78
79 if path.starts_with("/proc") {
81 debug!("not summing in /proc");
82 return Some(FileSum::zero());
83 }
84 if path.starts_with("/run") && !path.starts_with("/run/media") {
85 debug!("not summing in /run");
86 return Some(FileSum::zero());
87 }
88
89 #[cfg(unix)]
91 let nodes = Arc::new(Mutex::new(rustc_hash::FxHashSet::<NodeId>::default()));
92
93 let mut busy = 0;
96 let mut sum = compute_file_sum(path);
97
98 let (dirs_sender, dirs_receiver) = channel::unbounded();
101
102 let special_paths = con.special_paths.reduce(path);
103
104 if let Ok(entries) = fs::read_dir(path) {
109 for e in entries.flatten() {
110 if let Ok(md) = e.metadata() {
111 if md.is_dir() {
112 let entry_path = e.path();
113
114 if con.special_paths.sum(&entry_path) == Directive::Never {
115 debug!("not summing special path {entry_path:?}");
116 continue;
117 }
118
119 if let Some(entry_sum) = cache.get(&entry_path) {
121 sum += *entry_sum;
122 continue;
123 }
124
125 busy += 1;
128 dirs_sender.send(Some(entry_path)).unwrap();
129 } else {
130 #[cfg(unix)]
131 if md.nlink() > 1 {
132 let mut nodes = nodes.lock().unwrap();
133 let node_id = NodeId {
134 inode: md.ino(),
135 dev: md.dev(),
136 };
137 if !nodes.insert(node_id) {
138 continue;
140 }
141 }
142 }
143 sum += md_sum(&md);
144 }
145 }
146 }
147
148 if busy == 0 {
149 return Some(sum);
150 }
151
152 let busy = Arc::new(AtomicIsize::new(busy));
153
154 let (thread_sum_sender, thread_sum_receiver) = channel::bounded(threads_count);
157
158 for _ in 0..threads_count {
161 let busy = Arc::clone(&busy);
162 let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone());
163
164 #[cfg(unix)]
165 let nodes = nodes.clone();
166
167 let special_paths = special_paths.clone();
168
169 let observer = dam.observer();
170 let thread_sum_sender = thread_sum_sender.clone();
171 self.thread_pool.spawn(move || {
172 let mut thread_sum = FileSum::zero();
173 loop {
174 let o = dirs_receiver.recv();
175 if let Ok(Some(open_dir)) = o {
176 if let Ok(entries) = fs::read_dir(open_dir) {
177 for e in entries.flatten() {
178 if let Ok(md) = e.metadata() {
179 if md.is_dir() {
180 let path = e.path();
181
182 if special_paths.sum(&path) == Directive::Never {
183 debug!("not summing (deep) special path {path:?}");
184 continue;
185 }
186
187 busy.fetch_add(1, Ordering::Relaxed);
190 dirs_sender.send(Some(path)).unwrap();
191 } else {
192 #[cfg(unix)]
193 if md.nlink() > 1 {
194 let mut nodes = nodes.lock().unwrap();
195 let node_id = NodeId {
196 inode: md.ino(),
197 dev: md.dev(),
198 };
199 if !nodes.insert(node_id) {
200 continue;
202 }
203 }
204 }
205 thread_sum += md_sum(&md);
206 } else {
207 thread_sum.incr();
209 }
210 }
211 }
212 busy.fetch_sub(1, Ordering::Relaxed);
213 }
214 if observer.has_event() {
215 dirs_sender.send(None).unwrap(); break;
217 }
218 if busy.load(Ordering::Relaxed) < 1 {
219 dirs_sender.send(None).unwrap(); break;
221 }
222 }
223 thread_sum_sender.send(thread_sum).unwrap();
224 });
225 }
226 for _ in 0..threads_count {
228 match thread_sum_receiver.recv() {
229 Ok(thread_sum) => {
230 sum += thread_sum;
231 }
232 Err(e) => {
233 warn!("Error while recv summing thread result : {e:?}");
234 }
235 }
236 }
237 if dam.has_event() {
238 return None;
239 }
240 Some(sum)
241 }
242}
243
244pub fn compute_dir_sum(
249 path: &Path,
250 cache: &mut FxHashMap<PathBuf, FileSum>,
251 dam: &Dam,
252 con: &AppContext,
253) -> Option<FileSum> {
254 use once_cell::sync::OnceCell;
255 static DIR_SUMMER: OnceCell<Mutex<DirSummer>> = OnceCell::new();
256 DIR_SUMMER
257 .get_or_init(|| Mutex::new(DirSummer::new(con.file_sum_threads_count)))
258 .lock()
259 .unwrap()
260 .compute_dir_sum(path, cache, dam, con)
261}
262
263pub fn compute_file_sum(path: &Path) -> FileSum {
265 match fs::symlink_metadata(path) {
266 Ok(md) => {
267 let seconds = extract_seconds(&md);
268
269 #[cfg(unix)]
270 {
271 let nominal_size = md.size();
272 let block_size = md.blocks() * 512;
273 FileSum::new(
274 block_size.min(nominal_size),
275 block_size < nominal_size,
276 1,
277 seconds,
278 )
279 }
280
281 #[cfg(not(unix))]
282 FileSum::new(md.len(), false, 1, seconds)
283 }
284 Err(_) => FileSum::new(0, false, 1, 0),
285 }
286}
287
288#[cfg(unix)]
289fn extract_seconds(md: &fs::Metadata) -> u32 {
290 md.mtime().try_into().unwrap_or(0)
291}
292
293#[cfg(not(unix))]
294fn extract_seconds(md: &fs::Metadata) -> u32 {
295 if let Ok(st) = md.modified() {
296 if let Ok(d) = st.duration_since(std::time::UNIX_EPOCH) {
297 if let Ok(secs) = d.as_secs().try_into() {
298 return secs;
299 }
300 }
301 }
302 0
303}
304
305fn md_sum(md: &fs::Metadata) -> FileSum {
306 #[cfg(unix)]
307 let size = md.blocks() * 512;
308
309 #[cfg(not(unix))]
310 let size = md.len();
311
312 let seconds = extract_seconds(md);
313 FileSum::new(size, false, 1, seconds)
314}