1use std::cell::UnsafeCell;
2use std::mem::take;
3use std::sync::Arc;
4use std::sync::atomic::{self, AtomicBool, AtomicU32};
5
6use ncp_matcher::Config;
7use parking_lot::Mutex;
8use rayon::{ThreadPool, prelude::*};
9
10use crate::par_sort::par_quicksort;
11use crate::pattern::{self, MultiPattern};
12use crate::{Match, boxcar};
13
14pub struct MatchListConfig {
16 pub sort_results: bool,
18 pub reverse_items: bool,
20}
21
22impl Default for MatchListConfig {
23 fn default() -> Self {
24 Self::DEFAULT
25 }
26}
27
28impl MatchListConfig {
29 pub const DEFAULT: Self = Self {
31 sort_results: true,
32 reverse_items: false,
33 };
34}
35
36struct Matchers(Box<[UnsafeCell<ncp_matcher::Matcher>]>);
37
38impl Matchers {
39 #[allow(clippy::mut_from_ref)]
41 unsafe fn get(&self) -> &mut ncp_matcher::Matcher {
42 unsafe { &mut *self.0[rayon::current_thread_index().unwrap()].get() }
43 }
44}
45
46unsafe impl Sync for Matchers {}
47unsafe impl Send for Matchers {}
48
49pub(crate) struct Worker<T: Sync + Send + 'static> {
50 pub(crate) running: bool,
51 matchers: Matchers,
52 pub(crate) matches: Vec<Match>,
53 pub(crate) pattern: MultiPattern,
54 pub(crate) sort_results: bool,
55 pub(crate) reverse_items: bool,
56 pub(crate) canceled: Arc<AtomicBool>,
57 pub(crate) should_notify: Arc<AtomicBool>,
58 pub(crate) was_canceled: bool,
59 pub(crate) last_snapshot: u32,
60 notify: Arc<dyn Fn() + Sync + Send>,
61 pub(crate) items: Arc<boxcar::Vec<T>>,
62 in_flight: Vec<u32>,
63}
64
65impl<T: Sync + Send + 'static> Worker<T> {
66 pub(crate) fn item_count(&self) -> u32 {
67 self.last_snapshot - self.in_flight.len() as u32
68 }
69 pub(crate) fn update_config(&mut self, config: Config) {
70 for matcher in self.matchers.0.iter_mut() {
71 matcher.get_mut().config = config.clone();
72 }
73 }
74 pub(crate) fn sort_results(&mut self, sort_results: bool) {
75 self.sort_results = sort_results;
76 }
77 pub(crate) fn reverse_items(&mut self, reverse_items: bool) {
78 self.reverse_items = reverse_items;
79 }
80
81 pub(crate) fn new(
82 worker_threads: Option<usize>,
83 config: Config,
84 notify: Arc<dyn Fn() + Sync + Send>,
85 cols: u32,
86 match_config: MatchListConfig,
87 ) -> (ThreadPool, Self) {
88 let worker_threads = worker_threads.unwrap_or_else(|| {
89 std::thread::available_parallelism().map_or(4, std::num::NonZero::get)
90 });
91 let pool = rayon::ThreadPoolBuilder::new()
92 .thread_name(|i| format!("nucleo worker {i}"))
93 .num_threads(worker_threads)
94 .build()
95 .expect("creating threadpool failed");
96 let matchers = (0..worker_threads)
97 .map(|_| UnsafeCell::new(ncp_matcher::Matcher::new(config.clone())))
98 .collect();
99 let worker = Self {
100 running: false,
101 matchers: Matchers(matchers),
102 last_snapshot: 0,
103 matches: Vec::new(),
104 pattern: MultiPattern::new(cols as usize),
106 sort_results: match_config.sort_results,
107 reverse_items: match_config.reverse_items,
108 canceled: Arc::new(AtomicBool::new(false)),
109 should_notify: Arc::new(AtomicBool::new(false)),
110 was_canceled: false,
111 notify,
112 items: Arc::new(boxcar::Vec::with_capacity(2 * 1024, cols)),
113 in_flight: Vec::with_capacity(64),
114 };
115 (pool, worker)
116 }
117
118 unsafe fn process_new_items(&mut self, unmatched: &AtomicU32) {
119 unsafe {
120 let matchers = &self.matchers;
121 let pattern = &self.pattern;
122 self.matches.reserve(self.in_flight.len());
123 self.in_flight.retain(|&idx| {
124 let Some(item) = self.items.get(idx) else {
125 return true;
126 };
127 if let Some(score) = pattern.score(item.matcher_columns, matchers.get()) {
128 self.matches.push(Match { score, idx });
129 };
130 false
131 });
132 let new_snapshot = self.items.par_snapshot(self.last_snapshot);
133 if new_snapshot.end() != self.last_snapshot {
134 let end = new_snapshot.end();
135 let in_flight = Mutex::new(&mut self.in_flight);
136 let items = new_snapshot.map(|(idx, item)| {
137 let Some(item) = item else {
138 in_flight.lock().push(idx);
139 unmatched.fetch_add(1, atomic::Ordering::Relaxed);
140 return Match {
141 score: 0,
142 idx: u32::MAX,
143 };
144 };
145 if self.canceled.load(atomic::Ordering::Relaxed) {
146 return Match { score: 0, idx };
147 }
148 let Some(score) = pattern.score(item.matcher_columns, matchers.get()) else {
149 unmatched.fetch_add(1, atomic::Ordering::Relaxed);
150 return Match {
151 score: 0,
152 idx: u32::MAX,
153 };
154 };
155 Match { score, idx }
156 });
157 self.matches.par_extend(items);
158 self.last_snapshot = end;
159 }
160 }
161 }
162
163 fn remove_in_flight_matches(&mut self) {
164 let mut off = 0;
165 self.in_flight.retain(|&i| {
166 let is_in_flight = self.items.get(i).is_none();
167 if is_in_flight {
168 self.matches.remove((i - off) as usize);
169 off += 1;
170 }
171 is_in_flight
172 });
173 }
174
175 unsafe fn process_new_items_trivial(&mut self) {
176 unsafe {
177 let new_snapshot = self.items.snapshot(self.last_snapshot);
178 if new_snapshot.end() != self.last_snapshot {
179 let end = new_snapshot.end();
180 let items = new_snapshot.filter_map(|(idx, item)| {
181 if item.is_none() {
182 self.in_flight.push(idx);
183 return None;
184 };
185 Some(Match { score: 0, idx })
186 });
187 self.matches.extend(items);
188 self.last_snapshot = end;
189 }
190 }
191 }
192
193 pub(crate) unsafe fn run(&mut self, pattern_status: pattern::Status, cleared: bool) {
194 unsafe {
195 self.running = true;
196 self.was_canceled = false;
197
198 if cleared {
199 self.last_snapshot = 0;
200 self.in_flight.clear();
201 self.matches.clear();
202 }
203
204 if self.pattern.is_empty() {
206 self.reset_matches();
207 self.process_new_items_trivial();
208 let canceled = self.sort_matches();
209 if canceled {
210 self.was_canceled = true;
211 } else if self.should_notify.load(atomic::Ordering::Relaxed) {
212 (self.notify)();
213 }
214 return;
215 }
216
217 if pattern_status == pattern::Status::Rescore {
218 self.reset_matches();
219 }
220
221 let mut unmatched = AtomicU32::new(0);
222 if pattern_status != pattern::Status::Unchanged && !self.matches.is_empty() {
223 self.process_new_items_trivial();
224 let matchers = &self.matchers;
225 let pattern = &self.pattern;
226 self.matches
227 .par_iter_mut()
228 .take_any_while(|_| !self.canceled.load(atomic::Ordering::Relaxed))
229 .for_each(|match_| {
230 if match_.idx == u32::MAX {
231 debug_assert_eq!(match_.score, 0);
232 unmatched.fetch_add(1, atomic::Ordering::Relaxed);
233 return;
234 }
235 let item = self.items.get_unchecked(match_.idx);
237 if let Some(score) = pattern.score(item.matcher_columns, matchers.get()) {
238 match_.score = score;
239 } else {
240 unmatched.fetch_add(1, atomic::Ordering::Relaxed);
241 match_.score = 0;
242 match_.idx = u32::MAX;
243 }
244 });
245 } else {
246 self.process_new_items(&unmatched);
247 }
248
249 let canceled = self.sort_matches();
250 if canceled {
251 self.was_canceled = true;
252 } else {
253 self.matches
254 .truncate(self.matches.len() - take(unmatched.get_mut()) as usize);
255 if self.should_notify.load(atomic::Ordering::Relaxed) {
256 (self.notify)();
257 }
258 }
259 }
260 }
261
262 unsafe fn sort_matches(&mut self) -> bool {
263 unsafe {
264 if self.sort_results {
265 par_quicksort(
266 &mut self.matches,
267 |match1, match2| {
268 if match1.score != match2.score {
269 return match1.score > match2.score;
270 }
271 if match1.idx == u32::MAX {
272 return false;
273 }
274 if match2.idx == u32::MAX {
275 return true;
276 }
277 let item1 = self.items.get_unchecked(match1.idx);
281 let item2 = &self.items.get_unchecked(match2.idx);
282 let len1: u32 = item1
283 .matcher_columns
284 .iter()
285 .map(|haystack| haystack.len() as u32)
286 .sum();
287 let len2 = item2
288 .matcher_columns
289 .iter()
290 .map(|haystack| haystack.len() as u32)
291 .sum();
292 if len1 == len2 {
293 if self.reverse_items {
294 match2.idx < match1.idx
295 } else {
296 match1.idx < match2.idx
297 }
298 } else {
299 len1 < len2
300 }
301 },
302 &self.canceled,
303 )
304 } else {
305 par_quicksort(
306 &mut self.matches,
307 |match1, match2| {
308 if match1.idx == u32::MAX {
309 return false;
310 }
311 if match2.idx == u32::MAX {
312 return true;
313 }
314 if self.reverse_items {
315 match2.idx < match1.idx
316 } else {
317 match1.idx < match2.idx
318 }
319 },
320 &self.canceled,
321 )
322 }
323 }
324 }
325
326 fn reset_matches(&mut self) {
327 self.matches.clear();
328 self.matches
329 .extend((0..self.last_snapshot).map(|idx| Match { score: 0, idx }));
330 self.remove_in_flight_matches();
332 }
333}