ncp_engine/
lib.rs

1//! # Nucleo-picker-engine
2//!
3//! The `ncp-engine` crate is a fork of the [nucleo](https://docs.rs/nucleo) crate.
4//! It is not recommended for general use. This fork mainly exists to meet the specific
5//! requirements of [`nucleo-picker`](https://docs.rs/nucleo-picker).
6//!
7//! `ncp-engine` implements a high level matcher API that provides a performant
8//! parallel matcher worker threadpool. It is designed to allow integrating a `fzf`-like
9//! fuzzy matcher into a TUI application.
10//!
11//! For a fully-featured TUI implementation, see [nucleo-picker](htpps://docs.rs/nucleo-picker).
12//!
13//! Matching runs in a background threadpool while providing a snapshot of the last
14//! complete match on request. That means the matcher can update the results live while
15//! the user is typing, while never blocking the main UI thread (beyond a user provided
16//! timeout). Nucleo also supports fully concurrent lock-free (and wait-free) streaming
17//! of input items.
18//!
19//! The [`Nucleo`] struct serves as the main API entrypoint for this crate.
20
21#![deny(missing_docs)]
22
23use std::ops::{Bound, RangeBounds};
24use std::sync::Arc;
25use std::sync::atomic::{self, AtomicBool, Ordering};
26use std::time::Duration;
27
28use parking_lot::Mutex;
29use rayon::ThreadPool;
30
31use crate::pattern::MultiPattern;
32pub use crate::worker::MatchListConfig;
33use crate::worker::Worker;
34pub use ncp_matcher::{Config, Matcher, Utf32Str, Utf32String, chars};
35
36mod boxcar;
37mod par_sort;
38pub mod pattern;
39mod worker;
40
41#[cfg(test)]
42mod tests;
43
44/// A match candidate stored in a [`Nucleo`] worker.
45pub struct Item<'a, T> {
46    /// A reference to the underlying item provided to the matcher.
47    pub data: &'a T,
48    /// The representation of the data within the matcher.
49    pub matcher_columns: &'a [Utf32String],
50}
51
52/// A handle that allows adding new items to a [`Nucleo`] worker.
53///
54/// An `Injector` is internally reference counted and can be cheaply
55/// cloned and sent across threads.
56pub struct Injector<T> {
57    items: Arc<boxcar::Vec<T>>,
58    notify: Arc<dyn Fn() + Sync + Send>,
59}
60
61impl<T> Clone for Injector<T> {
62    fn clone(&self) -> Self {
63        Self {
64            items: self.items.clone(),
65            notify: self.notify.clone(),
66        }
67    }
68}
69
70impl<T> Injector<T> {
71    /// Appends an element to the list of match candidates.
72    ///
73    /// This function is lock-free and wait-free. The returned `u32` is the internal index which
74    /// has been assigned to the provided value and is guaranteed to be valid unless
75    /// [`Nucleo::restart`] has been called.
76    ///
77    /// The `fill_columns` closure is called to generate the representation of the pushed value
78    /// within the matcher engine. The first argument is a reference to the provided value, and the
79    /// second argument is a slice where each entry corresponds to a column within the [`Nucleo`]
80    /// instance from which this `Injector` was created.
81    ///
82    /// ## Example
83    /// If the matcher has exactly one column and the item type `T` is a `String`, an appropriate
84    /// `fill_columns` closure might look like
85    /// ```
86    /// # use ncp_engine::Utf32String;
87    /// let fill_columns = |s: &String, cols: &mut [Utf32String]| {
88    ///      cols[0] = (&**s).into();
89    /// };
90    /// ```
91    pub fn push(&self, value: T, fill_columns: impl FnOnce(&T, &mut [Utf32String])) -> u32 {
92        let idx = self.items.push(value, fill_columns);
93        (self.notify)();
94        idx
95    }
96
97    /// Appends multiple elements to the list of matched items.
98    /// This function is lock-free and wait-free.
99    ///
100    /// You should favor this function over `push` if at least one of the following is true:
101    /// - the number of items you're adding can be computed beforehand and is typically larger
102    ///   than 1k
103    /// - you're able to batch incoming items
104    /// - you're adding items from multiple threads concurrently (this function results in less
105    ///   contention)
106    pub fn extend_exact<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
107    where
108        I: IntoIterator<Item = T>,
109        <I as IntoIterator>::IntoIter: ExactSizeIterator,
110    {
111        self.items.extend_exact(values, fill_columns);
112        (self.notify)();
113    }
114
115    /// Returns the total number of items injected in the matcher.
116    ///
117    /// This may not match the number of items in the match snapshot if the matcher
118    /// is still running.
119    pub fn injected_items(&self) -> u32 {
120        self.items.count()
121    }
122
123    /// Returns a reference to the item at the given index.
124    ///
125    /// # Safety
126    ///
127    /// Item at `index` must be initialized. That means you must have observed
128    /// `push` returning this value or `get` returning `Some` for this value.
129    /// Just because a later index is initialized doesn't mean that this index
130    /// is initialized
131    pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
132        unsafe { self.items.get_unchecked(index) }
133    }
134
135    /// Returns a reference to the element at the given index.
136    pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
137        self.items.get(index)
138    }
139}
140
141/// A successful match computed by the [`Nucleo`] match.
142#[derive(PartialEq, Eq, Debug, Clone, Copy)]
143pub struct Match {
144    /// The score of the match.
145    pub score: u32,
146    /// The index of the match.
147    ///
148    /// The index is guaranteed to correspond to a valid item within the matcher and within the
149    /// same snapshot. Note that indices are invalidated of the matcher engine has been
150    /// [restarted](Nucleo::restart).
151    pub idx: u32,
152}
153
154/// The status of a [`Nucleo`] worker after a match.
155#[derive(PartialEq, Eq, Debug, Clone, Copy)]
156pub struct Status {
157    /// Whether the current snapshot has changed.
158    pub changed: bool,
159    /// Whether the matcher is still processing in the background.
160    pub running: bool,
161}
162
163/// A representation of the results of a [`Nucleo`] worker after finishing a
164/// [`tick`](Nucleo::tick).
165pub struct Snapshot<T: Sync + Send + 'static> {
166    item_count: u32,
167    matches: Vec<Match>,
168    pattern: MultiPattern,
169    items: Arc<boxcar::Vec<T>>,
170}
171
172impl<T: Sync + Send + 'static> Snapshot<T> {
173    fn clear(&mut self, new_items: Arc<boxcar::Vec<T>>) {
174        self.item_count = 0;
175        self.matches.clear();
176        self.items = new_items;
177    }
178
179    fn update(&mut self, worker: &Worker<T>) {
180        self.item_count = worker.item_count();
181        self.pattern.clone_from(&worker.pattern);
182        self.matches.clone_from(&worker.matches);
183        if !Arc::ptr_eq(&worker.items, &self.items) {
184            self.items = worker.items.clone();
185        }
186    }
187
188    /// Returns that total number of items
189    pub fn item_count(&self) -> u32 {
190        self.item_count
191    }
192
193    /// Returns the pattern which items were matched against
194    pub fn pattern(&self) -> &MultiPattern {
195        &self.pattern
196    }
197
198    /// Returns that number of items that matched the pattern
199    pub fn matched_item_count(&self) -> u32 {
200        self.matches.len() as u32
201    }
202
203    /// Returns an iterator over the items that correspond to a subrange of
204    /// all the matches in this snapshot.
205    ///
206    /// # Panics
207    /// Panics if `range` has a range bound that is larger than
208    /// the matched item count
209    pub fn matched_items(
210        &self,
211        range: impl RangeBounds<u32>,
212    ) -> impl ExactSizeIterator<Item = Item<'_, T>> + DoubleEndedIterator + '_ {
213        // TODO: use TAIT
214        let start = match range.start_bound() {
215            Bound::Included(&start) => start as usize,
216            Bound::Excluded(&start) => start as usize + 1,
217            Bound::Unbounded => 0,
218        };
219        let end = match range.end_bound() {
220            Bound::Included(&end) => end as usize + 1,
221            Bound::Excluded(&end) => end as usize,
222            Bound::Unbounded => self.matches.len(),
223        };
224        self.matches[start..end]
225            .iter()
226            .map(|&m| unsafe { self.items.get_unchecked(m.idx) })
227    }
228
229    /// Returns a reference to the item at the given index.
230    ///
231    /// # Safety
232    ///
233    /// Item at `index` must be initialized. That means you must have observed a
234    /// match with the corresponding index in this exact snapshot. Observing
235    /// a higher index is not enough as item indices can be non-contigously
236    /// initialized
237    #[inline]
238    pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> {
239        unsafe { self.items.get_unchecked(index) }
240    }
241
242    /// Returns a reference to the item at the given index.
243    ///
244    /// Returns `None` if the given `index` is not initialized. This function
245    /// is only guarteed to return `Some` for item indices that can be found in
246    /// the `matches` of this struct. Both smaller and larger indices may return
247    /// `None`.
248    #[inline]
249    pub fn get_item(&self, index: u32) -> Option<Item<'_, T>> {
250        self.items.get(index)
251    }
252
253    /// Returns the matches corresponding to this snapshot.
254    #[inline]
255    pub fn matches(&self) -> &[Match] {
256        &self.matches
257    }
258
259    /// A convenience function to return the [`Item`] corresponding to the
260    /// `n`th match.
261    ///
262    /// Returns `None` if `n` is greater than or equal to the match count.
263    #[inline]
264    pub fn get_matched_item(&self, n: u32) -> Option<Item<'_, T>> {
265        // SAFETY: A match index is guaranteed to corresponding to a valid global index in this
266        // snapshot.
267        unsafe { Some(self.get_item_unchecked(self.matches.get(n as usize)?.idx)) }
268    }
269}
270
271#[repr(u8)]
272#[derive(Clone, Copy, PartialEq, Eq)]
273enum State {
274    Init,
275    /// items have been cleared but snapshot and items are still outdated
276    Cleared,
277    /// items are fresh
278    Fresh,
279}
280
281impl State {
282    fn matcher_item_refs(self) -> usize {
283        match self {
284            Self::Cleared => 1,
285            Self::Init | Self::Fresh => 2,
286        }
287    }
288
289    fn canceled(self) -> bool {
290        self != Self::Fresh
291    }
292
293    fn cleared(self) -> bool {
294        self != Self::Fresh
295    }
296}
297
298/// A high level matcher worker that quickly computes matches in a background
299/// threadpool.
300///
301/// ## Example
302/// ```
303/// use std::sync::atomic::{AtomicBool, Ordering};
304/// use std::sync::Arc;
305/// use std::thread;
306///
307/// use ncp_engine::{Config, Nucleo};
308///
309/// static NEEDS_UPDATE: AtomicBool = AtomicBool::new(false);
310///
311/// // initialize a new matcher with default configuration and one column
312/// let matcher = Nucleo::new(
313///     Config::DEFAULT,
314///     Arc::new(|| NEEDS_UPDATE.store(true, Ordering::Relaxed)),
315///     None,
316///     1
317/// );
318///
319/// // get a handle to add items to the matcher
320/// let injector = matcher.injector();
321///
322/// // add items to the matcher
323/// thread::spawn(move || {
324///     injector.push("Hello, world!".to_string(), |s, cols| {
325///         cols[0] = (&**s).into();
326///     });
327/// });
328/// ```
329pub struct Nucleo<T: Sync + Send + 'static> {
330    // the way the API is build we totally don't actually need these to be Arcs
331    // but this lets us avoid some unsafe
332    canceled: Arc<AtomicBool>,
333    should_notify: Arc<AtomicBool>,
334    worker: Arc<Mutex<Worker<T>>>,
335    pool: ThreadPool,
336    state: State,
337    items: Arc<boxcar::Vec<T>>,
338    notify: Arc<dyn Fn() + Sync + Send>,
339    snapshot: Snapshot<T>,
340    /// The pattern matched by this matcher.
341    ///
342    /// To update the match pattern, use [`MultiPattern::reparse`]. Note that
343    /// the matcher worker will only become aware of the new pattern after a
344    /// call to [`tick`](Nucleo::tick).
345    pub pattern: MultiPattern,
346}
347
348impl<T: Sync + Send + 'static> Nucleo<T> {
349    /// Constructs a new `nucleo` worker threadpool with the provided `config`.
350    ///
351    /// `notify` is called whenever new information is available and
352    /// [`tick`](Nucleo::tick) should be called. Note that `notify` is not
353    /// debounced; that should be handled by the downstream crate (for example,
354    /// debouncing to only redraw at most every 1/60 seconds).
355    ///
356    /// If `None` is passed for the number of worker threads, nucleo will use
357    /// one thread per hardware thread.
358    ///
359    /// Nucleo can match items with multiple orthogonal properties. `columns`
360    /// indicates how many matching columns each item (and the pattern) has. The
361    /// number of columns cannot be changed after construction.
362    pub fn new(
363        config: Config,
364        notify: Arc<dyn Fn() + Sync + Send>,
365        num_threads: Option<usize>,
366        columns: u32,
367    ) -> Self {
368        Self::with_match_list_config(
369            config,
370            notify,
371            num_threads,
372            columns,
373            MatchListConfig::DEFAULT,
374        )
375    }
376
377    /// Constructs a new worker threadpool with the provided configuration, with pre-defined
378    /// configuration for the match list.
379    pub fn with_match_list_config(
380        config: Config,
381        notify: Arc<dyn Fn() + Sync + Send>,
382        num_threads: Option<usize>,
383        columns: u32,
384        match_list_config: MatchListConfig,
385    ) -> Self {
386        let (pool, worker) = Worker::new(
387            num_threads,
388            config,
389            notify.clone(),
390            columns,
391            match_list_config,
392        );
393        Self {
394            canceled: worker.canceled.clone(),
395            should_notify: worker.should_notify.clone(),
396            items: worker.items.clone(),
397            pool,
398            pattern: MultiPattern::new(columns as usize),
399            snapshot: Snapshot {
400                matches: Vec::with_capacity(2 * 1024),
401                pattern: MultiPattern::new(columns as usize),
402                item_count: 0,
403                items: worker.items.clone(),
404            },
405            worker: Arc::new(Mutex::new(worker)),
406            state: State::Init,
407            notify,
408        }
409    }
410
411    /// Returns the total number of active injectors.
412    pub fn active_injectors(&self) -> usize {
413        Arc::strong_count(&self.items)
414            - self.state.matcher_item_refs()
415            - (Arc::ptr_eq(&self.snapshot.items, &self.items)) as usize
416    }
417
418    /// Returns a snapshot of the current matcher state.
419    ///
420    /// This method is very cheap and can be called every time a snapshot is required. The
421    /// snapshot will not change unless [`tick`](Nucleo::tick) is called.
422    pub fn snapshot(&self) -> &Snapshot<T> {
423        &self.snapshot
424    }
425
426    /// Returns an injector that can be used for adding candidates to the matcher.
427    pub fn injector(&self) -> Injector<T> {
428        Injector {
429            items: self.items.clone(),
430            notify: self.notify.clone(),
431        }
432    }
433
434    /// Restart the the item stream. Removes all items and disconnects all
435    /// previously created injectors from this instance. If `clear_snapshot`
436    /// is `true` then all items and matched are removed from the [`Snapshot`]
437    /// immediately. Otherwise the snapshot will keep the current matches until
438    /// the matcher has run again.
439    ///
440    /// # Note
441    ///
442    /// The injectors will continue to function but they will not affect this
443    /// instance anymore. The old items will only be dropped when all injectors
444    /// were dropped.
445    pub fn restart(&mut self, clear_snapshot: bool) {
446        self.canceled.store(true, Ordering::Relaxed);
447        self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns()));
448        self.state = State::Cleared;
449        if clear_snapshot {
450            self.snapshot.clear(self.items.clone());
451        }
452    }
453
454    /// Update the internal configuration.
455    pub fn update_config(&mut self, config: Config) {
456        self.worker.lock().update_config(config);
457    }
458
459    /// Set whether the matcher should sort search results by score after
460    /// matching. Defaults to true.
461    pub fn sort_results(&mut self, sort_results: bool) {
462        self.worker.lock().sort_results(sort_results);
463    }
464
465    /// Set whether the matcher should reverse the order of the input.
466    /// Defaults to false.
467    pub fn reverse_items(&mut self, reverse_items: bool) {
468        self.worker.lock().reverse_items(reverse_items);
469    }
470
471    /// Update the internal state to reflect any changes from the background worker
472    /// threads.
473    ///
474    /// This is the main way to interact with the matcher, and should be called
475    /// regularly (for example each time a frame is rendered). To avoid excessive
476    /// redraws this method will wait `timeout` milliseconds for the
477    /// worker thread to finish. It is recommend to set the timeout to 10ms.
478    pub fn tick(&mut self, timeout: u64) -> Status {
479        self.should_notify.store(false, atomic::Ordering::Relaxed);
480        let status = self.pattern.status();
481        let canceled = status != pattern::Status::Unchanged || self.state.canceled();
482        let mut res = self.tick_inner(timeout, canceled, status);
483        if !canceled {
484            return res;
485        }
486        self.state = State::Fresh;
487        let status2 = self.tick_inner(timeout, false, pattern::Status::Unchanged);
488        res.changed |= status2.changed;
489        res.running = status2.running;
490        res
491    }
492
493    fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status {
494        let mut inner = if canceled {
495            self.pattern.reset_status();
496            self.canceled.store(true, atomic::Ordering::Relaxed);
497            self.worker.lock_arc()
498        } else {
499            let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else {
500                self.should_notify.store(true, Ordering::Release);
501                return Status {
502                    changed: false,
503                    running: true,
504                };
505            };
506            worker
507        };
508
509        let changed = inner.running;
510
511        let running = canceled || self.items.count() > inner.item_count();
512        if inner.running {
513            inner.running = false;
514            if !inner.was_canceled && !self.state.canceled() {
515                self.snapshot.update(&inner);
516            }
517        }
518        if running {
519            inner.pattern.clone_from(&self.pattern);
520            self.canceled.store(false, atomic::Ordering::Relaxed);
521            if !canceled {
522                self.should_notify.store(true, atomic::Ordering::Release);
523            }
524            let cleared = self.state.cleared();
525            if cleared {
526                inner.items = self.items.clone();
527            }
528            self.pool
529                .spawn(move || unsafe { inner.run(status, cleared) });
530        }
531        Status { changed, running }
532    }
533}
534
535impl<T: Sync + Send> Drop for Nucleo<T> {
536    fn drop(&mut self) {
537        // we ensure the worker quits before dropping items to ensure that
538        // the worker can always assume the items outlive it
539        self.canceled.store(true, atomic::Ordering::Relaxed);
540        let lock = self.worker.try_lock_for(Duration::from_secs(1));
541        if lock.is_none() {
542            unreachable!("thread pool failed to shutdown properly")
543        }
544    }
545}