ncp_engine/
worker.rs

1use std::cell::UnsafeCell;
2use std::mem::take;
3use std::sync::Arc;
4use std::sync::atomic::{self, AtomicBool, AtomicU32};
5
6use ncp_matcher::Config;
7use parking_lot::Mutex;
8use rayon::{ThreadPool, prelude::*};
9
10use crate::par_sort::par_quicksort;
11use crate::pattern::{self, MultiPattern};
12use crate::{Match, boxcar};
13
14/// Configuration which influences the match list.
15pub struct MatchListConfig {
16    /// Whether the matcher should sort results by score after matching.
17    pub sort_results: bool,
18    /// Whether the matcher should reverse the order of the input.
19    pub reverse_items: bool,
20}
21
22impl Default for MatchListConfig {
23    fn default() -> Self {
24        Self::DEFAULT
25    }
26}
27
28impl MatchListConfig {
29    /// The default value, implemented as a constant since the Default trait is not const.
30    pub const DEFAULT: Self = Self {
31        sort_results: true,
32        reverse_items: false,
33    };
34}
35
36struct Matchers(Box<[UnsafeCell<ncp_matcher::Matcher>]>);
37
38impl Matchers {
39    // this is not a true mut from ref, we use a cell here
40    #[allow(clippy::mut_from_ref)]
41    unsafe fn get(&self) -> &mut ncp_matcher::Matcher {
42        unsafe { &mut *self.0[rayon::current_thread_index().unwrap()].get() }
43    }
44}
45
46unsafe impl Sync for Matchers {}
47unsafe impl Send for Matchers {}
48
49pub(crate) struct Worker<T: Sync + Send + 'static> {
50    pub(crate) running: bool,
51    matchers: Matchers,
52    pub(crate) matches: Vec<Match>,
53    pub(crate) pattern: MultiPattern,
54    pub(crate) sort_results: bool,
55    pub(crate) reverse_items: bool,
56    pub(crate) canceled: Arc<AtomicBool>,
57    pub(crate) should_notify: Arc<AtomicBool>,
58    pub(crate) was_canceled: bool,
59    pub(crate) last_snapshot: u32,
60    notify: Arc<dyn Fn() + Sync + Send>,
61    pub(crate) items: Arc<boxcar::Vec<T>>,
62    in_flight: Vec<u32>,
63}
64
65impl<T: Sync + Send + 'static> Worker<T> {
66    pub(crate) fn item_count(&self) -> u32 {
67        self.last_snapshot - self.in_flight.len() as u32
68    }
69    pub(crate) fn update_config(&mut self, config: Config) {
70        for matcher in self.matchers.0.iter_mut() {
71            matcher.get_mut().config = config.clone();
72        }
73    }
74    pub(crate) fn sort_results(&mut self, sort_results: bool) {
75        self.sort_results = sort_results;
76    }
77    pub(crate) fn reverse_items(&mut self, reverse_items: bool) {
78        self.reverse_items = reverse_items;
79    }
80
81    pub(crate) fn new(
82        worker_threads: Option<usize>,
83        config: Config,
84        notify: Arc<dyn Fn() + Sync + Send>,
85        cols: u32,
86        match_config: MatchListConfig,
87    ) -> (ThreadPool, Self) {
88        let worker_threads = worker_threads.unwrap_or_else(|| {
89            std::thread::available_parallelism().map_or(4, std::num::NonZero::get)
90        });
91        let pool = rayon::ThreadPoolBuilder::new()
92            .thread_name(|i| format!("nucleo worker {i}"))
93            .num_threads(worker_threads)
94            .build()
95            .expect("creating threadpool failed");
96        let matchers = (0..worker_threads)
97            .map(|_| UnsafeCell::new(ncp_matcher::Matcher::new(config.clone())))
98            .collect();
99        let worker = Self {
100            running: false,
101            matchers: Matchers(matchers),
102            last_snapshot: 0,
103            matches: Vec::new(),
104            // just a placeholder
105            pattern: MultiPattern::new(cols as usize),
106            sort_results: match_config.sort_results,
107            reverse_items: match_config.reverse_items,
108            canceled: Arc::new(AtomicBool::new(false)),
109            should_notify: Arc::new(AtomicBool::new(false)),
110            was_canceled: false,
111            notify,
112            items: Arc::new(boxcar::Vec::with_capacity(2 * 1024, cols)),
113            in_flight: Vec::with_capacity(64),
114        };
115        (pool, worker)
116    }
117
118    unsafe fn process_new_items(&mut self, unmatched: &AtomicU32) {
119        unsafe {
120            let matchers = &self.matchers;
121            let pattern = &self.pattern;
122            self.matches.reserve(self.in_flight.len());
123            self.in_flight.retain(|&idx| {
124                let Some(item) = self.items.get(idx) else {
125                    return true;
126                };
127                if let Some(score) = pattern.score(item.matcher_columns, matchers.get()) {
128                    self.matches.push(Match { score, idx });
129                };
130                false
131            });
132            let new_snapshot = self.items.par_snapshot(self.last_snapshot);
133            if new_snapshot.end() != self.last_snapshot {
134                let end = new_snapshot.end();
135                let in_flight = Mutex::new(&mut self.in_flight);
136                let items = new_snapshot.map(|(idx, item)| {
137                    let Some(item) = item else {
138                        in_flight.lock().push(idx);
139                        unmatched.fetch_add(1, atomic::Ordering::Relaxed);
140                        return Match {
141                            score: 0,
142                            idx: u32::MAX,
143                        };
144                    };
145                    if self.canceled.load(atomic::Ordering::Relaxed) {
146                        return Match { score: 0, idx };
147                    }
148                    let Some(score) = pattern.score(item.matcher_columns, matchers.get()) else {
149                        unmatched.fetch_add(1, atomic::Ordering::Relaxed);
150                        return Match {
151                            score: 0,
152                            idx: u32::MAX,
153                        };
154                    };
155                    Match { score, idx }
156                });
157                self.matches.par_extend(items);
158                self.last_snapshot = end;
159            }
160        }
161    }
162
163    fn remove_in_flight_matches(&mut self) {
164        let mut off = 0;
165        self.in_flight.retain(|&i| {
166            let is_in_flight = self.items.get(i).is_none();
167            if is_in_flight {
168                self.matches.remove((i - off) as usize);
169                off += 1;
170            }
171            is_in_flight
172        });
173    }
174
175    unsafe fn process_new_items_trivial(&mut self) {
176        unsafe {
177            let new_snapshot = self.items.snapshot(self.last_snapshot);
178            if new_snapshot.end() != self.last_snapshot {
179                let end = new_snapshot.end();
180                let items = new_snapshot.filter_map(|(idx, item)| {
181                    if item.is_none() {
182                        self.in_flight.push(idx);
183                        return None;
184                    };
185                    Some(Match { score: 0, idx })
186                });
187                self.matches.extend(items);
188                self.last_snapshot = end;
189            }
190        }
191    }
192
193    pub(crate) unsafe fn run(&mut self, pattern_status: pattern::Status, cleared: bool) {
194        unsafe {
195            self.running = true;
196            self.was_canceled = false;
197
198            if cleared {
199                self.last_snapshot = 0;
200                self.in_flight.clear();
201                self.matches.clear();
202            }
203
204            // TODO: be smarter around reusing past results for rescoring
205            if self.pattern.is_empty() {
206                self.reset_matches();
207                self.process_new_items_trivial();
208                let canceled = self.sort_matches();
209                if canceled {
210                    self.was_canceled = true;
211                } else if self.should_notify.load(atomic::Ordering::Relaxed) {
212                    (self.notify)();
213                }
214                return;
215            }
216
217            if pattern_status == pattern::Status::Rescore {
218                self.reset_matches();
219            }
220
221            let mut unmatched = AtomicU32::new(0);
222            if pattern_status != pattern::Status::Unchanged && !self.matches.is_empty() {
223                self.process_new_items_trivial();
224                let matchers = &self.matchers;
225                let pattern = &self.pattern;
226                self.matches
227                    .par_iter_mut()
228                    .take_any_while(|_| !self.canceled.load(atomic::Ordering::Relaxed))
229                    .for_each(|match_| {
230                        if match_.idx == u32::MAX {
231                            debug_assert_eq!(match_.score, 0);
232                            unmatched.fetch_add(1, atomic::Ordering::Relaxed);
233                            return;
234                        }
235                        // safety: in-flight items are never added to the matches
236                        let item = self.items.get_unchecked(match_.idx);
237                        if let Some(score) = pattern.score(item.matcher_columns, matchers.get()) {
238                            match_.score = score;
239                        } else {
240                            unmatched.fetch_add(1, atomic::Ordering::Relaxed);
241                            match_.score = 0;
242                            match_.idx = u32::MAX;
243                        }
244                    });
245            } else {
246                self.process_new_items(&unmatched);
247            }
248
249            let canceled = self.sort_matches();
250            if canceled {
251                self.was_canceled = true;
252            } else {
253                self.matches
254                    .truncate(self.matches.len() - take(unmatched.get_mut()) as usize);
255                if self.should_notify.load(atomic::Ordering::Relaxed) {
256                    (self.notify)();
257                }
258            }
259        }
260    }
261
262    unsafe fn sort_matches(&mut self) -> bool {
263        unsafe {
264            if self.sort_results {
265                par_quicksort(
266                    &mut self.matches,
267                    |match1, match2| {
268                        if match1.score != match2.score {
269                            return match1.score > match2.score;
270                        }
271                        if match1.idx == u32::MAX {
272                            return false;
273                        }
274                        if match2.idx == u32::MAX {
275                            return true;
276                        }
277                        // the tie breaker is comparatively rarely needed so we keep it
278                        // in a branch especially because we need to access the items
279                        // array here which involves some pointer chasing
280                        let item1 = self.items.get_unchecked(match1.idx);
281                        let item2 = &self.items.get_unchecked(match2.idx);
282                        let len1: u32 = item1
283                            .matcher_columns
284                            .iter()
285                            .map(|haystack| haystack.len() as u32)
286                            .sum();
287                        let len2 = item2
288                            .matcher_columns
289                            .iter()
290                            .map(|haystack| haystack.len() as u32)
291                            .sum();
292                        if len1 == len2 {
293                            if self.reverse_items {
294                                match2.idx < match1.idx
295                            } else {
296                                match1.idx < match2.idx
297                            }
298                        } else {
299                            len1 < len2
300                        }
301                    },
302                    &self.canceled,
303                )
304            } else {
305                par_quicksort(
306                    &mut self.matches,
307                    |match1, match2| {
308                        if match1.idx == u32::MAX {
309                            return false;
310                        }
311                        if match2.idx == u32::MAX {
312                            return true;
313                        }
314                        if self.reverse_items {
315                            match2.idx < match1.idx
316                        } else {
317                            match1.idx < match2.idx
318                        }
319                    },
320                    &self.canceled,
321                )
322            }
323        }
324    }
325
326    fn reset_matches(&mut self) {
327        self.matches.clear();
328        self.matches
329            .extend((0..self.last_snapshot).map(|idx| Match { score: 0, idx }));
330        // there are usually only very few in flight items (one for each writer)
331        self.remove_in_flight_matches();
332    }
333}