ncp_engine/lib.rs
1//! # Nucleo-picker-engine
2//!
3//! The `ncp-engine` crate is a fork of the [nucleo](https://docs.rs/nucleo) crate.
4//! It is not recommended for general use. This fork mainly exists to meet the specific
5//! requirements of [`nucleo-picker`](https://docs.rs/nucleo-picker).
6//!
7//! `ncp-engine` implements a high level matcher API that provides a performant
8//! parallel matcher worker threadpool. It is designed to allow integrating a `fzf`-like
9//! fuzzy matcher into a TUI application.
10//!
11//! For a fully-featured TUI implementation, see [nucleo-picker](htpps://docs.rs/nucleo-picker).
12//!
13//! Matching runs in a background threadpool while providing a snapshot of the last
14//! complete match on request. That means the matcher can update the results live while
15//! the user is typing, while never blocking the main UI thread (beyond a user provided
16//! timeout). Nucleo also supports fully concurrent lock-free (and wait-free) streaming
17//! of input items.
18//!
19//! The [`Nucleo`] struct serves as the main API entrypoint for this crate.
20
21#![deny(missing_docs)]
22
23use std::ops::{Bound, RangeBounds};
24use std::sync::Arc;
25use std::sync::atomic::{self, AtomicBool, Ordering};
26use std::time::Duration;
27
28use parking_lot::Mutex;
29use rayon::ThreadPool;
30
31use crate::pattern::MultiPattern;
32pub use crate::worker::MatchListConfig;
33use crate::worker::Worker;
34pub use ncp_matcher::{Config, Matcher, Utf32Str, Utf32String, chars};
35
36mod boxcar;
37mod par_sort;
38pub mod pattern;
39mod worker;
40
41#[cfg(test)]
42mod tests;
43
44/// A match candidate stored in a [`Nucleo`] worker.
45pub struct Item<'a, T> {
46 /// A reference to the underlying item provided to the matcher.
47 pub data: &'a T,
48 /// The representation of the data within the matcher.
49 pub matcher_columns: &'a [Utf32String],
50}
51
52/// A handle that allows adding new items to a [`Nucleo`] worker.
53///
54/// An `Injector` is internally reference counted and can be cheaply
55/// cloned and sent across threads.
56pub struct Injector<T> {
57 items: Arc<boxcar::Vec<T>>,
58 notify: Arc<dyn Fn() + Sync + Send>,
59}
60
61impl<T> Clone for Injector<T> {
62 fn clone(&self) -> Self {
63 Self {
64 items: self.items.clone(),
65 notify: self.notify.clone(),
66 }
67 }
68}
69
70impl<T> Injector<T> {
71 /// Appends an element to the list of match candidates.
72 ///
73 /// This function is lock-free and wait-free. The returned `u32` is the internal index which
74 /// has been assigned to the provided value and is guaranteed to be valid unless
75 /// [`Nucleo::restart`] has been called.
76 ///
77 /// The `fill_columns` closure is called to generate the representation of the pushed value
78 /// within the matcher engine. The first argument is a reference to the provided value, and the
79 /// second argument is a slice where each entry corresponds to a column within the [`Nucleo`]
80 /// instance from which this `Injector` was created.
81 ///
82 /// ## Example
83 /// If the matcher has exactly one column and the item type `T` is a `String`, an appropriate
84 /// `fill_columns` closure might look like
85 /// ```
86 /// # use ncp_engine::Utf32String;
87 /// let fill_columns = |s: &String, cols: &mut [Utf32String]| {
88 /// cols[0] = (&**s).into();
89 /// };
90 /// ```
91 pub fn push(&self, value: T, fill_columns: impl FnOnce(&T, &mut [Utf32String])) -> u32 {
92 let idx = self.items.push(value, fill_columns);
93 (self.notify)();
94 idx
95 }
96
97 /// Appends multiple elements to the list of matched items.
98 /// This function is lock-free and wait-free.
99 ///
100 /// You should favor this function over `push` if at least one of the following is true:
101 /// - the number of items you're adding can be computed beforehand and is typically larger
102 /// than 1k
103 /// - you're able to batch incoming items
104 /// - you're adding items from multiple threads concurrently (this function results in less
105 /// contention)
106 pub fn extend_exact<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
107 where
108 I: IntoIterator<Item = T> + ExactSizeIterator,
109 {
110 self.items.extend_exact(values, fill_columns);
111 (self.notify)();
112 }
113
114 /// Returns the total number of items injected in the matcher.
115 ///
116 /// This may not match the number of items in the match snapshot if the matcher
117 /// is still running.
118 pub fn injected_items(&self) -> u32 {
119 self.items.count()
120 }
121
122 /// Returns a reference to the item at the given index.
123 ///
124 /// # Safety
125 ///
126 /// Item at `index` must be initialized. That means you must have observed
127 /// `push` returning this value or `get` returning `Some` for this value.
128 /// Just because a later index is initialized doesn't mean that this index
129 /// is initialized
130 pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
131 unsafe { self.items.get_unchecked(index) }
132 }
133
134 /// Returns a reference to the element at the given index.
135 pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
136 self.items.get(index)
137 }
138}
139
140/// A successful match computed by the [`Nucleo`] match.
141#[derive(PartialEq, Eq, Debug, Clone, Copy)]
142pub struct Match {
143 /// The score of the match.
144 pub score: u32,
145 /// The index of the match.
146 ///
147 /// The index is guaranteed to correspond to a valid item within the matcher and within the
148 /// same snapshot. Note that indices are invalidated of the matcher engine has been
149 /// [restarted](Nucleo::restart).
150 pub idx: u32,
151}
152
153/// The status of a [`Nucleo`] worker after a match.
154#[derive(PartialEq, Eq, Debug, Clone, Copy)]
155pub struct Status {
156 /// Whether the current snapshot has changed.
157 pub changed: bool,
158 /// Whether the matcher is still processing in the background.
159 pub running: bool,
160}
161
162/// A representation of the results of a [`Nucleo`] worker after finishing a
163/// [`tick`](Nucleo::tick).
164pub struct Snapshot<T: Sync + Send + 'static> {
165 item_count: u32,
166 matches: Vec<Match>,
167 pattern: MultiPattern,
168 items: Arc<boxcar::Vec<T>>,
169}
170
171impl<T: Sync + Send + 'static> Snapshot<T> {
172 fn clear(&mut self, new_items: Arc<boxcar::Vec<T>>) {
173 self.item_count = 0;
174 self.matches.clear();
175 self.items = new_items;
176 }
177
178 fn update(&mut self, worker: &Worker<T>) {
179 self.item_count = worker.item_count();
180 self.pattern.clone_from(&worker.pattern);
181 self.matches.clone_from(&worker.matches);
182 if !Arc::ptr_eq(&worker.items, &self.items) {
183 self.items = worker.items.clone();
184 }
185 }
186
187 /// Returns that total number of items
188 pub fn item_count(&self) -> u32 {
189 self.item_count
190 }
191
192 /// Returns the pattern which items were matched against
193 pub fn pattern(&self) -> &MultiPattern {
194 &self.pattern
195 }
196
197 /// Returns that number of items that matched the pattern
198 pub fn matched_item_count(&self) -> u32 {
199 self.matches.len() as u32
200 }
201
202 /// Returns an iterator over the items that correspond to a subrange of
203 /// all the matches in this snapshot.
204 ///
205 /// # Panics
206 /// Panics if `range` has a range bound that is larger than
207 /// the matched item count
208 pub fn matched_items(
209 &self,
210 range: impl RangeBounds<u32>,
211 ) -> impl ExactSizeIterator<Item = Item<'_, T>> + DoubleEndedIterator + '_ {
212 // TODO: use TAIT
213 let start = match range.start_bound() {
214 Bound::Included(&start) => start as usize,
215 Bound::Excluded(&start) => start as usize + 1,
216 Bound::Unbounded => 0,
217 };
218 let end = match range.end_bound() {
219 Bound::Included(&end) => end as usize + 1,
220 Bound::Excluded(&end) => end as usize,
221 Bound::Unbounded => self.matches.len(),
222 };
223 self.matches[start..end]
224 .iter()
225 .map(|&m| unsafe { self.items.get_unchecked(m.idx) })
226 }
227
228 /// Returns a reference to the item at the given index.
229 ///
230 /// # Safety
231 ///
232 /// Item at `index` must be initialized. That means you must have observed a
233 /// match with the corresponding index in this exact snapshot. Observing
234 /// a higher index is not enough as item indices can be non-contigously
235 /// initialized
236 #[inline]
237 pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> {
238 unsafe { self.items.get_unchecked(index) }
239 }
240
241 /// Returns a reference to the item at the given index.
242 ///
243 /// Returns `None` if the given `index` is not initialized. This function
244 /// is only guarteed to return `Some` for item indices that can be found in
245 /// the `matches` of this struct. Both smaller and larger indices may return
246 /// `None`.
247 #[inline]
248 pub fn get_item(&self, index: u32) -> Option<Item<'_, T>> {
249 self.items.get(index)
250 }
251
252 /// Returns the matches corresponding to this snapshot.
253 #[inline]
254 pub fn matches(&self) -> &[Match] {
255 &self.matches
256 }
257
258 /// A convenience function to return the [`Item`] corresponding to the
259 /// `n`th match.
260 ///
261 /// Returns `None` if `n` is greater than or equal to the match count.
262 #[inline]
263 pub fn get_matched_item(&self, n: u32) -> Option<Item<'_, T>> {
264 // SAFETY: A match index is guaranteed to corresponding to a valid global index in this
265 // snapshot.
266 unsafe { Some(self.get_item_unchecked(self.matches.get(n as usize)?.idx)) }
267 }
268}
269
270#[repr(u8)]
271#[derive(Clone, Copy, PartialEq, Eq)]
272enum State {
273 Init,
274 /// items have been cleared but snapshot and items are still outdated
275 Cleared,
276 /// items are fresh
277 Fresh,
278}
279
280impl State {
281 fn matcher_item_refs(self) -> usize {
282 match self {
283 Self::Cleared => 1,
284 Self::Init | Self::Fresh => 2,
285 }
286 }
287
288 fn canceled(self) -> bool {
289 self != Self::Fresh
290 }
291
292 fn cleared(self) -> bool {
293 self != Self::Fresh
294 }
295}
296
297/// A high level matcher worker that quickly computes matches in a background
298/// threadpool.
299///
300/// ## Example
301/// ```
302/// use std::sync::atomic::{AtomicBool, Ordering};
303/// use std::sync::Arc;
304/// use std::thread;
305///
306/// use ncp_engine::{Config, Nucleo};
307///
308/// static NEEDS_UPDATE: AtomicBool = AtomicBool::new(false);
309///
310/// // initialize a new matcher with default configuration and one column
311/// let matcher = Nucleo::new(
312/// Config::DEFAULT,
313/// Arc::new(|| NEEDS_UPDATE.store(true, Ordering::Relaxed)),
314/// None,
315/// 1
316/// );
317///
318/// // get a handle to add items to the matcher
319/// let injector = matcher.injector();
320///
321/// // add items to the matcher
322/// thread::spawn(move || {
323/// injector.push("Hello, world!".to_string(), |s, cols| {
324/// cols[0] = (&**s).into();
325/// });
326/// });
327/// ```
328pub struct Nucleo<T: Sync + Send + 'static> {
329 // the way the API is build we totally don't actually need these to be Arcs
330 // but this lets us avoid some unsafe
331 canceled: Arc<AtomicBool>,
332 should_notify: Arc<AtomicBool>,
333 worker: Arc<Mutex<Worker<T>>>,
334 pool: ThreadPool,
335 state: State,
336 items: Arc<boxcar::Vec<T>>,
337 notify: Arc<dyn Fn() + Sync + Send>,
338 snapshot: Snapshot<T>,
339 /// The pattern matched by this matcher.
340 ///
341 /// To update the match pattern, use [`MultiPattern::reparse`]. Note that
342 /// the matcher worker will only become aware of the new pattern after a
343 /// call to [`tick`](Nucleo::tick).
344 pub pattern: MultiPattern,
345}
346
347impl<T: Sync + Send + 'static> Nucleo<T> {
348 /// Constructs a new `nucleo` worker threadpool with the provided `config`.
349 ///
350 /// `notify` is called whenever new information is available and
351 /// [`tick`](Nucleo::tick) should be called. Note that `notify` is not
352 /// debounced; that should be handled by the downstream crate (for example,
353 /// debouncing to only redraw at most every 1/60 seconds).
354 ///
355 /// If `None` is passed for the number of worker threads, nucleo will use
356 /// one thread per hardware thread.
357 ///
358 /// Nucleo can match items with multiple orthogonal properties. `columns`
359 /// indicates how many matching columns each item (and the pattern) has. The
360 /// number of columns cannot be changed after construction.
361 pub fn new(
362 config: Config,
363 notify: Arc<dyn Fn() + Sync + Send>,
364 num_threads: Option<usize>,
365 columns: u32,
366 ) -> Self {
367 Self::with_match_list_config(
368 config,
369 notify,
370 num_threads,
371 columns,
372 MatchListConfig::DEFAULT,
373 )
374 }
375
376 /// Constructs a new worker threadpool with the provided configuration, with pre-defined
377 /// configuration for the match list.
378 pub fn with_match_list_config(
379 config: Config,
380 notify: Arc<dyn Fn() + Sync + Send>,
381 num_threads: Option<usize>,
382 columns: u32,
383 match_list_config: MatchListConfig,
384 ) -> Self {
385 let (pool, worker) = Worker::new(
386 num_threads,
387 config,
388 notify.clone(),
389 columns,
390 match_list_config,
391 );
392 Self {
393 canceled: worker.canceled.clone(),
394 should_notify: worker.should_notify.clone(),
395 items: worker.items.clone(),
396 pool,
397 pattern: MultiPattern::new(columns as usize),
398 snapshot: Snapshot {
399 matches: Vec::with_capacity(2 * 1024),
400 pattern: MultiPattern::new(columns as usize),
401 item_count: 0,
402 items: worker.items.clone(),
403 },
404 worker: Arc::new(Mutex::new(worker)),
405 state: State::Init,
406 notify,
407 }
408 }
409
410 /// Returns the total number of active injectors.
411 pub fn active_injectors(&self) -> usize {
412 Arc::strong_count(&self.items)
413 - self.state.matcher_item_refs()
414 - (Arc::ptr_eq(&self.snapshot.items, &self.items)) as usize
415 }
416
417 /// Returns a snapshot of the current matcher state.
418 ///
419 /// This method is very cheap and can be called every time a snapshot is required. The
420 /// snapshot will not change unless [`tick`](Nucleo::tick) is called.
421 pub fn snapshot(&self) -> &Snapshot<T> {
422 &self.snapshot
423 }
424
425 /// Returns an injector that can be used for adding candidates to the matcher.
426 pub fn injector(&self) -> Injector<T> {
427 Injector {
428 items: self.items.clone(),
429 notify: self.notify.clone(),
430 }
431 }
432
433 /// Restart the the item stream. Removes all items and disconnects all
434 /// previously created injectors from this instance. If `clear_snapshot`
435 /// is `true` then all items and matched are removed from the [`Snapshot`]
436 /// immediately. Otherwise the snapshot will keep the current matches until
437 /// the matcher has run again.
438 ///
439 /// # Note
440 ///
441 /// The injectors will continue to function but they will not affect this
442 /// instance anymore. The old items will only be dropped when all injectors
443 /// were dropped.
444 pub fn restart(&mut self, clear_snapshot: bool) {
445 self.canceled.store(true, Ordering::Relaxed);
446 self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns()));
447 self.state = State::Cleared;
448 if clear_snapshot {
449 self.snapshot.clear(self.items.clone());
450 }
451 }
452
453 /// Update the internal configuration.
454 pub fn update_config(&mut self, config: Config) {
455 self.worker.lock().update_config(config);
456 }
457
458 /// Set whether the matcher should sort search results by score after
459 /// matching. Defaults to true.
460 pub fn sort_results(&mut self, sort_results: bool) {
461 self.worker.lock().sort_results(sort_results);
462 }
463
464 /// Set whether the matcher should reverse the order of the input.
465 /// Defaults to false.
466 pub fn reverse_items(&mut self, reverse_items: bool) {
467 self.worker.lock().reverse_items(reverse_items);
468 }
469
470 /// Update the internal state to reflect any changes from the background worker
471 /// threads.
472 ///
473 /// This is the main way to interact with the matcher, and should be called
474 /// regularly (for example each time a frame is rendered). To avoid excessive
475 /// redraws this method will wait `timeout` milliseconds for the
476 /// worker thread to finish. It is recommend to set the timeout to 10ms.
477 pub fn tick(&mut self, timeout: u64) -> Status {
478 self.should_notify.store(false, atomic::Ordering::Relaxed);
479 let status = self.pattern.status();
480 let canceled = status != pattern::Status::Unchanged || self.state.canceled();
481 let mut res = self.tick_inner(timeout, canceled, status);
482 if !canceled {
483 return res;
484 }
485 self.state = State::Fresh;
486 let status2 = self.tick_inner(timeout, false, pattern::Status::Unchanged);
487 res.changed |= status2.changed;
488 res.running = status2.running;
489 res
490 }
491
492 fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status {
493 let mut inner = if canceled {
494 self.pattern.reset_status();
495 self.canceled.store(true, atomic::Ordering::Relaxed);
496 self.worker.lock_arc()
497 } else {
498 let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else {
499 self.should_notify.store(true, Ordering::Release);
500 return Status {
501 changed: false,
502 running: true,
503 };
504 };
505 worker
506 };
507
508 let changed = inner.running;
509
510 let running = canceled || self.items.count() > inner.item_count();
511 if inner.running {
512 inner.running = false;
513 if !inner.was_canceled && !self.state.canceled() {
514 self.snapshot.update(&inner);
515 }
516 }
517 if running {
518 inner.pattern.clone_from(&self.pattern);
519 self.canceled.store(false, atomic::Ordering::Relaxed);
520 if !canceled {
521 self.should_notify.store(true, atomic::Ordering::Release);
522 }
523 let cleared = self.state.cleared();
524 if cleared {
525 inner.items = self.items.clone();
526 }
527 self.pool
528 .spawn(move || unsafe { inner.run(status, cleared) });
529 }
530 Status { changed, running }
531 }
532}
533
534impl<T: Sync + Send> Drop for Nucleo<T> {
535 fn drop(&mut self) {
536 // we ensure the worker quits before dropping items to ensure that
537 // the worker can always assume the items outlive it
538 self.canceled.store(true, atomic::Ordering::Relaxed);
539 let lock = self.worker.try_lock_for(Duration::from_secs(1));
540 if lock.is_none() {
541 unreachable!("thread pool failed to shutdown properly")
542 }
543 }
544}