Struct broot::task_sync::DamObserver
source · pub struct DamObserver { /* private fields */ }
Implementations§
source§impl DamObserver
impl DamObserver
sourcepub fn has_event(&self) -> bool
pub fn has_event(&self) -> bool
be careful that this can be used as a thread stop condition only before the event receiver start being active to avoid a race condition.
Examples found in repository?
src/file_sum/sum_computation.rs (line 225)
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
pub fn compute_dir_sum(
&mut self,
path: &Path,
cache: &mut AHashMap<PathBuf, FileSum>,
dam: &Dam,
con: &AppContext,
) -> Option<FileSum> {
let threads_count = self.thread_count;
if is_ignored(path, &con.special_paths) {
return Some(FileSum::zero());
}
// there are problems in /proc - See issue #637
if path.starts_with("/proc") {
debug!("not summing in /proc");
return Some(FileSum::zero());
}
if path.starts_with("/run") {
debug!("not summing in /run");
return Some(FileSum::zero());
}
// to avoid counting twice a node, we store their id in a set
#[cfg(unix)]
let nodes = Arc::new(Mutex::new(FnvHashSet::<NodeId>::default()));
// busy is the number of directories which are either being processed or queued
// We use this count to determine when threads can stop waiting for tasks
let mut busy = 0;
let mut sum = compute_file_sum(path);
// this MPMC channel contains the directory paths which must be handled.
// A None means there's nothing left and the thread may send its result and stop
let (dirs_sender, dirs_receiver) = channel::unbounded();
let special_paths: Vec<SpecialPath> = con.special_paths.iter()
.filter(|sp| sp.can_have_matches_in(path))
.cloned()
.collect();
// the first level is managed a little differently: we look at the cache
// before adding. This enables faster computations in two cases:
// - for the root line (assuming it's computed after the content)
// - when we navigate up the tree
if let Ok(entries) = fs::read_dir(path) {
for e in entries.flatten() {
if let Ok(md) = e.metadata() {
if md.is_dir() {
let entry_path = e.path();
if is_ignored(&entry_path, &special_paths) {
debug!("not summing special path {:?}", entry_path);
continue;
}
// we check the cache
if let Some(entry_sum) = cache.get(&entry_path) {
sum += *entry_sum;
continue;
}
// we add the directory to the channel of dirs needing
// processing
busy += 1;
dirs_sender.send(Some(entry_path)).unwrap();
} else {
#[cfg(unix)]
if md.nlink() > 1 {
let mut nodes = nodes.lock().unwrap();
let node_id = NodeId {
inode: md.ino(),
dev: md.dev(),
};
if !nodes.insert(node_id) {
// it was already in the set
continue;
}
}
}
sum += md_sum(&md);
}
}
}
if busy == 0 {
return Some(sum);
}
let busy = Arc::new(AtomicIsize::new(busy));
// this MPMC channel is here for the threads to send their results
// at end of computation
let (thread_sum_sender, thread_sum_receiver) = channel::bounded(threads_count);
// Each thread does a summation without merge and the data are merged
// at the end (this avoids waiting for a mutex during computation)
for _ in 0..threads_count {
let busy = Arc::clone(&busy);
let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone());
#[cfg(unix)]
let nodes = nodes.clone();
let special_paths = special_paths.clone();
let observer = dam.observer();
let thread_sum_sender = thread_sum_sender.clone();
self.thread_pool.spawn(move || {
let mut thread_sum = FileSum::zero();
loop {
let o = dirs_receiver.recv();
if let Ok(Some(open_dir)) = o {
if let Ok(entries) = fs::read_dir(&open_dir) {
for e in entries.flatten() {
if let Ok(md) = e.metadata() {
if md.is_dir() {
let path = e.path();
if is_ignored(&path, &special_paths) {
debug!("not summing (deep) special path {:?}", path);
continue;
}
// we add the directory to the channel of dirs needing
// processing
busy.fetch_add(1, Ordering::Relaxed);
dirs_sender.send(Some(path)).unwrap();
} else {
#[cfg(unix)]
if md.nlink() > 1 {
let mut nodes = nodes.lock().unwrap();
let node_id = NodeId {
inode: md.ino(),
dev: md.dev(),
};
if !nodes.insert(node_id) {
// it was already in the set
continue;
}
}
}
thread_sum += md_sum(&md);
} else {
// we can't measure much but we can count the file
thread_sum.incr();
}
}
}
busy.fetch_sub(1, Ordering::Relaxed);
}
if observer.has_event() {
dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
break;
}
if busy.load(Ordering::Relaxed) < 1 {
dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
break;
}
}
thread_sum_sender.send(thread_sum).unwrap();
});
}
// Wait for the threads to finish and consolidate their results
for _ in 0..threads_count {
match thread_sum_receiver.recv() {
Ok(thread_sum) => {
sum += thread_sum;
}
Err(e) => {
warn!("Error while recv summing thread result : {:?}", e);
}
}
}
if dam.has_event() {
return None;
}
Some(sum)
}