broot/file_sum/
sum_computation.rs

1use {
2    super::FileSum,
3    crate::{
4        app::*,
5        path::*,
6        task_sync::Dam,
7    },
8    rayon::{
9        ThreadPool,
10        ThreadPoolBuilder,
11    },
12    rustc_hash::{
13        FxHashMap,
14    },
15    std::{
16        convert::TryInto,
17        fs,
18        path::{
19            Path,
20            PathBuf,
21        },
22        sync::{
23            atomic::{
24                AtomicIsize,
25                Ordering,
26            },
27            Arc,
28            Mutex,
29        },
30    },
31    termimad::crossbeam::channel,
32};
33
34#[cfg(unix)]
35use {
36    std::os::unix::fs::MetadataExt,
37};
38
39struct DirSummer {
40    thread_count: usize,
41    thread_pool: ThreadPool,
42}
43
44/// a node id, taking the device into account to be sure to discriminate
45/// nodes with the same inode but on different devices
46#[cfg(unix)]
47#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
48struct NodeId {
49    /// inode number
50    inode: u64,
51    /// device number
52    dev: u64,
53}
54
55impl DirSummer {
56    pub fn new(thread_count: usize) -> Self {
57        let thread_pool = ThreadPoolBuilder::new()
58            .num_threads(thread_count)
59            .build()
60            .unwrap();
61        Self {
62            thread_count,
63            thread_pool,
64        }
65    }
66    /// compute the consolidated numbers for a directory, with implementation
67    /// varying depending on the OS:
68    /// On unix, the computation is done on blocks of 512 bytes
69    /// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks
70    pub fn compute_dir_sum(
71        &mut self,
72        path: &Path,
73        cache: &mut FxHashMap<PathBuf, FileSum>,
74        dam: &Dam,
75        con: &AppContext,
76    ) -> Option<FileSum> {
77        let threads_count = self.thread_count;
78
79        if con.special_paths.sum(path) == Directive::Never {
80            return Some(FileSum::zero());
81        }
82
83        // there are problems in /proc - See issue #637
84        if path.starts_with("/proc") {
85            debug!("not summing in /proc");
86            return Some(FileSum::zero());
87        }
88        if path.starts_with("/run") && !path.starts_with("/run/media") {
89            debug!("not summing in /run");
90            return Some(FileSum::zero());
91        }
92
93        // to avoid counting twice a node, we store their id in a set
94        #[cfg(unix)]
95        let nodes = Arc::new(Mutex::new(rustc_hash::FxHashSet::<NodeId>::default()));
96
97        // busy is the number of directories which are either being processed or queued
98        // We use this count to determine when threads can stop waiting for tasks
99        let mut busy = 0;
100        let mut sum = compute_file_sum(path);
101
102        // this MPMC channel contains the directory paths which must be handled.
103        // A None means there's nothing left and the thread may send its result and stop
104        let (dirs_sender, dirs_receiver) = channel::unbounded();
105
106        let special_paths = con.special_paths.reduce(path);
107
108        // the first level is managed a little differently: we look at the cache
109        // before adding. This enables faster computations in two cases:
110        // - for the root line (assuming it's computed after the content)
111        // - when we navigate up the tree
112        if let Ok(entries) = fs::read_dir(path) {
113            for e in entries.flatten() {
114                if let Ok(md) = e.metadata() {
115                    if md.is_dir() {
116                        let entry_path = e.path();
117
118                        if con.special_paths.sum(&entry_path) == Directive::Never {
119                            debug!("not summing special path {:?}", entry_path);
120                            continue;
121                        }
122
123                        // we check the cache
124                        if let Some(entry_sum) = cache.get(&entry_path) {
125                            sum += *entry_sum;
126                            continue;
127                        }
128
129                        // we add the directory to the channel of dirs needing
130                        // processing
131                        busy += 1;
132                        dirs_sender.send(Some(entry_path)).unwrap();
133                    } else {
134                        #[cfg(unix)]
135                        if md.nlink() > 1 {
136                            let mut nodes = nodes.lock().unwrap();
137                            let node_id = NodeId {
138                                inode: md.ino(),
139                                dev: md.dev(),
140                            };
141                            if !nodes.insert(node_id) {
142                                // it was already in the set
143                                continue;
144                            }
145                        }
146                    }
147                    sum += md_sum(&md);
148                }
149            }
150        }
151
152        if busy == 0 {
153            return Some(sum);
154        }
155
156        let busy = Arc::new(AtomicIsize::new(busy));
157
158        // this MPMC channel is here for the threads to send their results
159        // at end of computation
160        let (thread_sum_sender, thread_sum_receiver) = channel::bounded(threads_count);
161
162        // Each  thread does a summation without merge and the data are merged
163        // at the end (this avoids waiting for a mutex during computation)
164        for _ in 0..threads_count {
165            let busy = Arc::clone(&busy);
166            let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone());
167
168            #[cfg(unix)]
169            let nodes = nodes.clone();
170
171            let special_paths = special_paths.clone();
172
173            let observer = dam.observer();
174            let thread_sum_sender = thread_sum_sender.clone();
175            self.thread_pool.spawn(move || {
176                let mut thread_sum = FileSum::zero();
177                loop {
178                    let o = dirs_receiver.recv();
179                    if let Ok(Some(open_dir)) = o {
180                        if let Ok(entries) = fs::read_dir(open_dir) {
181                            for e in entries.flatten() {
182                                if let Ok(md) = e.metadata() {
183                                    if md.is_dir() {
184                                        let path = e.path();
185
186                                        if special_paths.sum(&path) == Directive::Never {
187                                            debug!("not summing (deep) special path {:?}", path);
188                                            continue;
189                                        }
190
191                                        // we add the directory to the channel of dirs needing
192                                        // processing
193                                        busy.fetch_add(1, Ordering::Relaxed);
194                                        dirs_sender.send(Some(path)).unwrap();
195                                    } else {
196                                        #[cfg(unix)]
197                                        if md.nlink() > 1 {
198                                            let mut nodes = nodes.lock().unwrap();
199                                            let node_id = NodeId {
200                                                inode: md.ino(),
201                                                dev: md.dev(),
202                                            };
203                                            if !nodes.insert(node_id) {
204                                                // it was already in the set
205                                                continue;
206                                            }
207                                        }
208                                    }
209                                    thread_sum += md_sum(&md);
210                                } else {
211                                    // we can't measure much but we can count the file
212                                    thread_sum.incr();
213                                }
214                            }
215                        }
216                        busy.fetch_sub(1, Ordering::Relaxed);
217                    }
218                    if observer.has_event() {
219                        dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
220                        break;
221                    }
222                    if busy.load(Ordering::Relaxed) < 1 {
223                        dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
224                        break;
225                    }
226                }
227                thread_sum_sender.send(thread_sum).unwrap();
228            });
229        }
230        // Wait for the threads to finish and consolidate their results
231        for _ in 0..threads_count {
232            match thread_sum_receiver.recv() {
233                Ok(thread_sum) => {
234                    sum += thread_sum;
235                }
236                Err(e) => {
237                    warn!("Error while recv summing thread result : {:?}", e);
238                }
239            }
240        }
241        if dam.has_event() {
242            return None;
243        }
244        Some(sum)
245    }
246}
247
248/// compute the consolidated numbers for a directory, with implementation
249/// varying depending on the OS:
250/// On unix, the computation is done on blocks of 512 bytes
251/// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks
252pub fn compute_dir_sum(
253    path: &Path,
254    cache: &mut FxHashMap<PathBuf, FileSum>,
255    dam: &Dam,
256    con: &AppContext,
257) -> Option<FileSum> {
258    use once_cell::sync::OnceCell;
259    static DIR_SUMMER: OnceCell<Mutex<DirSummer>> = OnceCell::new();
260    DIR_SUMMER
261        .get_or_init(|| Mutex::new(DirSummer::new(con.file_sum_threads_count)))
262        .lock()
263        .unwrap()
264        .compute_dir_sum(path, cache, dam, con)
265}
266
267/// compute the sum for a regular file (not a folder)
268pub fn compute_file_sum(path: &Path) -> FileSum {
269    match fs::symlink_metadata(path) {
270        Ok(md) => {
271            let seconds = extract_seconds(&md);
272
273            #[cfg(unix)]
274            {
275                let nominal_size = md.size();
276                let block_size = md.blocks() * 512;
277                FileSum::new(
278                    block_size.min(nominal_size),
279                    block_size < nominal_size,
280                    1,
281                    seconds,
282                )
283            }
284
285            #[cfg(not(unix))]
286            FileSum::new(md.len(), false, 1, seconds)
287        }
288        Err(_) => FileSum::new(0, false, 1, 0),
289    }
290}
291
292#[cfg(unix)]
293#[inline(always)]
294fn extract_seconds(md: &fs::Metadata) -> u32 {
295    md.mtime().try_into().unwrap_or(0)
296}
297
298#[cfg(not(unix))]
299#[inline(always)]
300fn extract_seconds(md: &fs::Metadata) -> u32 {
301    if let Ok(st) = md.modified() {
302        if let Ok(d) = st.duration_since(std::time::UNIX_EPOCH) {
303            if let Ok(secs) = d.as_secs().try_into() {
304                return secs;
305            }
306        }
307    }
308    0
309}
310
311#[inline(always)]
312fn md_sum(md: &fs::Metadata) -> FileSum {
313    #[cfg(unix)]
314    let size = md.blocks() * 512;
315
316    #[cfg(not(unix))]
317    let size = md.len();
318
319    let seconds = extract_seconds(md);
320    FileSum::new(size, false, 1, seconds)
321}