Skip to main content

broot/file_sum/
sum_computation.rs

1use {
2    super::FileSum,
3    crate::{
4        app::*,
5        path::*,
6        task_sync::Dam,
7    },
8    rayon::{
9        ThreadPool,
10        ThreadPoolBuilder,
11    },
12    rustc_hash::FxHashMap,
13    std::{
14        convert::TryInto,
15        fs,
16        path::{
17            Path,
18            PathBuf,
19        },
20        sync::{
21            Arc,
22            Mutex,
23            atomic::{
24                AtomicIsize,
25                Ordering,
26            },
27        },
28    },
29    termimad::crossbeam::channel,
30};
31
32#[cfg(unix)]
33use std::os::unix::fs::MetadataExt;
34
35struct DirSummer {
36    thread_count: usize,
37    thread_pool: ThreadPool,
38}
39
40/// a node id, taking the device into account to be sure to discriminate
41/// nodes with the same inode but on different devices
42#[cfg(unix)]
43#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
44struct NodeId {
45    /// inode number
46    inode: u64,
47    /// device number
48    dev: u64,
49}
50
51impl DirSummer {
52    pub fn new(thread_count: usize) -> Self {
53        let thread_pool = ThreadPoolBuilder::new()
54            .num_threads(thread_count)
55            .build()
56            .unwrap();
57        Self {
58            thread_count,
59            thread_pool,
60        }
61    }
62    /// compute the consolidated numbers for a directory, with implementation
63    /// varying depending on the OS:
64    /// On unix, the computation is done on blocks of 512 bytes
65    /// see <https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks>
66    pub fn compute_dir_sum(
67        &mut self,
68        path: &Path,
69        cache: &mut FxHashMap<PathBuf, FileSum>,
70        dam: &Dam,
71        con: &AppContext,
72    ) -> Option<FileSum> {
73        let threads_count = self.thread_count;
74
75        if con.special_paths.sum(path) == Directive::Never {
76            return Some(FileSum::zero());
77        }
78
79        // there are problems in /proc - See issue #637
80        if path.starts_with("/proc") {
81            debug!("not summing in /proc");
82            return Some(FileSum::zero());
83        }
84        if path.starts_with("/run") && !path.starts_with("/run/media") {
85            debug!("not summing in /run");
86            return Some(FileSum::zero());
87        }
88
89        // to avoid counting twice a node, we store their id in a set
90        #[cfg(unix)]
91        let nodes = Arc::new(Mutex::new(rustc_hash::FxHashSet::<NodeId>::default()));
92
93        // busy is the number of directories which are either being processed or queued
94        // We use this count to determine when threads can stop waiting for tasks
95        let mut busy = 0;
96        let mut sum = compute_file_sum(path);
97
98        // this MPMC channel contains the directory paths which must be handled.
99        // A None means there's nothing left and the thread may send its result and stop
100        let (dirs_sender, dirs_receiver) = channel::unbounded();
101
102        let special_paths = con.special_paths.reduce(path);
103
104        // the first level is managed a little differently: we look at the cache
105        // before adding. This enables faster computations in two cases:
106        // - for the root line (assuming it's computed after the content)
107        // - when we navigate up the tree
108        if let Ok(entries) = fs::read_dir(path) {
109            for e in entries.flatten() {
110                if let Ok(md) = e.metadata() {
111                    if md.is_dir() {
112                        let entry_path = e.path();
113
114                        if con.special_paths.sum(&entry_path) == Directive::Never {
115                            debug!("not summing special path {entry_path:?}");
116                            continue;
117                        }
118
119                        // we check the cache
120                        if let Some(entry_sum) = cache.get(&entry_path) {
121                            sum += *entry_sum;
122                            continue;
123                        }
124
125                        // we add the directory to the channel of dirs needing
126                        // processing
127                        busy += 1;
128                        dirs_sender.send(Some(entry_path)).unwrap();
129                    } else {
130                        #[cfg(unix)]
131                        if md.nlink() > 1 {
132                            let mut nodes = nodes.lock().unwrap();
133                            let node_id = NodeId {
134                                inode: md.ino(),
135                                dev: md.dev(),
136                            };
137                            if !nodes.insert(node_id) {
138                                // it was already in the set
139                                continue;
140                            }
141                        }
142                    }
143                    sum += md_sum(&md);
144                }
145            }
146        }
147
148        if busy == 0 {
149            return Some(sum);
150        }
151
152        let busy = Arc::new(AtomicIsize::new(busy));
153
154        // this MPMC channel is here for the threads to send their results
155        // at end of computation
156        let (thread_sum_sender, thread_sum_receiver) = channel::bounded(threads_count);
157
158        // Each  thread does a summation without merge and the data are merged
159        // at the end (this avoids waiting for a mutex during computation)
160        for _ in 0..threads_count {
161            let busy = Arc::clone(&busy);
162            let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone());
163
164            #[cfg(unix)]
165            let nodes = nodes.clone();
166
167            let special_paths = special_paths.clone();
168
169            let observer = dam.observer();
170            let thread_sum_sender = thread_sum_sender.clone();
171            self.thread_pool.spawn(move || {
172                let mut thread_sum = FileSum::zero();
173                loop {
174                    let o = dirs_receiver.recv();
175                    if let Ok(Some(open_dir)) = o {
176                        if let Ok(entries) = fs::read_dir(open_dir) {
177                            for e in entries.flatten() {
178                                if let Ok(md) = e.metadata() {
179                                    if md.is_dir() {
180                                        let path = e.path();
181
182                                        if special_paths.sum(&path) == Directive::Never {
183                                            debug!("not summing (deep) special path {path:?}");
184                                            continue;
185                                        }
186
187                                        // we add the directory to the channel of dirs needing
188                                        // processing
189                                        busy.fetch_add(1, Ordering::Relaxed);
190                                        dirs_sender.send(Some(path)).unwrap();
191                                    } else {
192                                        #[cfg(unix)]
193                                        if md.nlink() > 1 {
194                                            let mut nodes = nodes.lock().unwrap();
195                                            let node_id = NodeId {
196                                                inode: md.ino(),
197                                                dev: md.dev(),
198                                            };
199                                            if !nodes.insert(node_id) {
200                                                // it was already in the set
201                                                continue;
202                                            }
203                                        }
204                                    }
205                                    thread_sum += md_sum(&md);
206                                } else {
207                                    // we can't measure much but we can count the file
208                                    thread_sum.incr();
209                                }
210                            }
211                        }
212                        busy.fetch_sub(1, Ordering::Relaxed);
213                    }
214                    if observer.has_event() {
215                        dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
216                        break;
217                    }
218                    if busy.load(Ordering::Relaxed) < 1 {
219                        dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
220                        break;
221                    }
222                }
223                thread_sum_sender.send(thread_sum).unwrap();
224            });
225        }
226        // Wait for the threads to finish and consolidate their results
227        for _ in 0..threads_count {
228            match thread_sum_receiver.recv() {
229                Ok(thread_sum) => {
230                    sum += thread_sum;
231                }
232                Err(e) => {
233                    warn!("Error while recv summing thread result : {e:?}");
234                }
235            }
236        }
237        if dam.has_event() {
238            return None;
239        }
240        Some(sum)
241    }
242}
243
244/// compute the consolidated numbers for a directory, with implementation
245/// varying depending on the OS:
246/// On unix, the computation is done on blocks of 512 bytes
247/// see <https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks>
248pub fn compute_dir_sum(
249    path: &Path,
250    cache: &mut FxHashMap<PathBuf, FileSum>,
251    dam: &Dam,
252    con: &AppContext,
253) -> Option<FileSum> {
254    use once_cell::sync::OnceCell;
255    static DIR_SUMMER: OnceCell<Mutex<DirSummer>> = OnceCell::new();
256    DIR_SUMMER
257        .get_or_init(|| Mutex::new(DirSummer::new(con.file_sum_threads_count)))
258        .lock()
259        .unwrap()
260        .compute_dir_sum(path, cache, dam, con)
261}
262
263/// compute the sum for a regular file (not a folder)
264pub fn compute_file_sum(path: &Path) -> FileSum {
265    match fs::symlink_metadata(path) {
266        Ok(md) => {
267            let seconds = extract_seconds(&md);
268
269            #[cfg(unix)]
270            {
271                let nominal_size = md.size();
272                let block_size = md.blocks() * 512;
273                FileSum::new(
274                    block_size.min(nominal_size),
275                    block_size < nominal_size,
276                    1,
277                    seconds,
278                )
279            }
280
281            #[cfg(not(unix))]
282            FileSum::new(md.len(), false, 1, seconds)
283        }
284        Err(_) => FileSum::new(0, false, 1, 0),
285    }
286}
287
288#[cfg(unix)]
289fn extract_seconds(md: &fs::Metadata) -> u32 {
290    md.mtime().try_into().unwrap_or(0)
291}
292
293#[cfg(not(unix))]
294fn extract_seconds(md: &fs::Metadata) -> u32 {
295    if let Ok(st) = md.modified() {
296        if let Ok(d) = st.duration_since(std::time::UNIX_EPOCH) {
297            if let Ok(secs) = d.as_secs().try_into() {
298                return secs;
299            }
300        }
301    }
302    0
303}
304
305fn md_sum(md: &fs::Metadata) -> FileSum {
306    #[cfg(unix)]
307    let size = md.blocks() * 512;
308
309    #[cfg(not(unix))]
310    let size = md.len();
311
312    let seconds = extract_seconds(md);
313    FileSum::new(size, false, 1, seconds)
314}