ncp_engine/
lib.rs

1//! # Nucleo-picker-engine
2//!
3//! The `ncp-engine` crate is a fork of the [nucleo](https://docs.rs/nucleo) crate.
4//! It is not recommended for general use. This fork mainly exists to meet the specific
5//! requirements of [`nucleo-picker`](https://docs.rs/nucleo-picker).
6//!
7//! `ncp-engine` implements a high level matcher API that provides a performant
8//! parallel matcher worker threadpool. It is designed to allow integrating a `fzf`-like
9//! fuzzy matcher into a TUI application.
10//!
11//! For a fully-featured TUI implementation, see [nucleo-picker](htpps://docs.rs/nucleo-picker).
12//!
13//! Matching runs in a background threadpool while providing a snapshot of the last
14//! complete match on request. That means the matcher can update the results live while
15//! the user is typing, while never blocking the main UI thread (beyond a user provided
16//! timeout). Nucleo also supports fully concurrent lock-free (and wait-free) streaming
17//! of input items.
18//!
19//! The [`Nucleo`] struct serves as the main API entrypoint for this crate.
20
21#![deny(missing_docs)]
22
23use std::ops::{Bound, RangeBounds};
24use std::sync::Arc;
25use std::sync::atomic::{self, AtomicBool, Ordering};
26use std::time::Duration;
27
28use parking_lot::Mutex;
29use rayon::ThreadPool;
30
31use crate::pattern::MultiPattern;
32use crate::worker::Worker;
33pub use ncp_matcher::{Config, Matcher, Utf32Str, Utf32String, chars};
34
35mod boxcar;
36mod par_sort;
37pub mod pattern;
38mod worker;
39
40#[cfg(test)]
41mod tests;
42
43/// A match candidate stored in a [`Nucleo`] worker.
44pub struct Item<'a, T> {
45    /// A reference to the underlying item provided to the matcher.
46    pub data: &'a T,
47    /// The representation of the data within the matcher.
48    pub matcher_columns: &'a [Utf32String],
49}
50
51/// A handle that allows adding new items to a [`Nucleo`] worker.
52///
53/// An `Injector` is internally reference counted and can be cheaply
54/// cloned and sent across threads.
55pub struct Injector<T> {
56    items: Arc<boxcar::Vec<T>>,
57    notify: Arc<dyn Fn() + Sync + Send>,
58}
59
60impl<T> Clone for Injector<T> {
61    fn clone(&self) -> Self {
62        Self {
63            items: self.items.clone(),
64            notify: self.notify.clone(),
65        }
66    }
67}
68
69impl<T> Injector<T> {
70    /// Appends an element to the list of match candidates.
71    ///
72    /// This function is lock-free and wait-free. The returned `u32` is the internal index which
73    /// has been assigned to the provided value and is guaranteed to be valid unless
74    /// [`Nucleo::restart`] has been called.
75    ///
76    /// The `fill_columns` closure is called to generate the representation of the pushed value
77    /// within the matcher engine. The first argument is a reference to the provided value, and the
78    /// second argument is a slice where each entry corresponds to a column within the [`Nucleo`]
79    /// instance from which this `Injector` was created.
80    ///
81    /// ## Example
82    /// If the matcher has exactly one column and the item type `T` is a `String`, an appropriate
83    /// `fill_columns` closure might look like
84    /// ```
85    /// # use ncp_engine::Utf32String;
86    /// let fill_columns = |s: &String, cols: &mut [Utf32String]| {
87    ///      cols[0] = (&**s).into();
88    /// };
89    /// ```
90    pub fn push(&self, value: T, fill_columns: impl FnOnce(&T, &mut [Utf32String])) -> u32 {
91        let idx = self.items.push(value, fill_columns);
92        (self.notify)();
93        idx
94    }
95
96    /// Appends multiple elements to the list of matched items.
97    /// This function is lock-free and wait-free.
98    ///
99    /// You should favor this function over `push` if at least one of the following is true:
100    /// - the number of items you're adding can be computed beforehand and is typically larger
101    ///   than 1k
102    /// - you're able to batch incoming items
103    /// - you're adding items from multiple threads concurrently (this function results in less
104    ///   contention)
105    pub fn extend_exact<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
106    where
107        I: IntoIterator<Item = T> + ExactSizeIterator,
108    {
109        self.items.extend_exact(values, fill_columns);
110        (self.notify)();
111    }
112
113    /// Returns the total number of items injected in the matcher.
114    ///
115    /// This may not match the number of items in the match snapshot if the matcher
116    /// is still running.
117    pub fn injected_items(&self) -> u32 {
118        self.items.count()
119    }
120
121    /// Returns a reference to the item at the given index.
122    ///
123    /// # Safety
124    ///
125    /// Item at `index` must be initialized. That means you must have observed
126    /// `push` returning this value or `get` returning `Some` for this value.
127    /// Just because a later index is initialized doesn't mean that this index
128    /// is initialized
129    pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
130        unsafe { self.items.get_unchecked(index) }
131    }
132
133    /// Returns a reference to the element at the given index.
134    pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
135        self.items.get(index)
136    }
137}
138
139/// A successful match computed by the [`Nucleo`] match.
140#[derive(PartialEq, Eq, Debug, Clone, Copy)]
141pub struct Match {
142    /// The score of the match.
143    pub score: u32,
144    /// The index of the match.
145    ///
146    /// The index is guaranteed to correspond to a valid item within the matcher and within the
147    /// same snapshot. Note that indices are invalidated of the matcher engine has been
148    /// [restarted](Nucleo::restart).
149    pub idx: u32,
150}
151
152/// The status of a [`Nucleo`] worker after a match.
153#[derive(PartialEq, Eq, Debug, Clone, Copy)]
154pub struct Status {
155    /// Whether the current snapshot has changed.
156    pub changed: bool,
157    /// Whether the matcher is still processing in the background.
158    pub running: bool,
159}
160
161/// A representation of the results of a [`Nucleo`] worker after finishing a
162/// [`tick`](Nucleo::tick).
163pub struct Snapshot<T: Sync + Send + 'static> {
164    item_count: u32,
165    matches: Vec<Match>,
166    pattern: MultiPattern,
167    items: Arc<boxcar::Vec<T>>,
168}
169
170impl<T: Sync + Send + 'static> Snapshot<T> {
171    fn clear(&mut self, new_items: Arc<boxcar::Vec<T>>) {
172        self.item_count = 0;
173        self.matches.clear();
174        self.items = new_items;
175    }
176
177    fn update(&mut self, worker: &Worker<T>) {
178        self.item_count = worker.item_count();
179        self.pattern.clone_from(&worker.pattern);
180        self.matches.clone_from(&worker.matches);
181        if !Arc::ptr_eq(&worker.items, &self.items) {
182            self.items = worker.items.clone();
183        }
184    }
185
186    /// Returns that total number of items
187    pub fn item_count(&self) -> u32 {
188        self.item_count
189    }
190
191    /// Returns the pattern which items were matched against
192    pub fn pattern(&self) -> &MultiPattern {
193        &self.pattern
194    }
195
196    /// Returns that number of items that matched the pattern
197    pub fn matched_item_count(&self) -> u32 {
198        self.matches.len() as u32
199    }
200
201    /// Returns an iterator over the items that correspond to a subrange of
202    /// all the matches in this snapshot.
203    ///
204    /// # Panics
205    /// Panics if `range` has a range bound that is larger than
206    /// the matched item count
207    pub fn matched_items(
208        &self,
209        range: impl RangeBounds<u32>,
210    ) -> impl ExactSizeIterator<Item = Item<'_, T>> + DoubleEndedIterator + '_ {
211        // TODO: use TAIT
212        let start = match range.start_bound() {
213            Bound::Included(&start) => start as usize,
214            Bound::Excluded(&start) => start as usize + 1,
215            Bound::Unbounded => 0,
216        };
217        let end = match range.end_bound() {
218            Bound::Included(&end) => end as usize + 1,
219            Bound::Excluded(&end) => end as usize,
220            Bound::Unbounded => self.matches.len(),
221        };
222        self.matches[start..end]
223            .iter()
224            .map(|&m| unsafe { self.items.get_unchecked(m.idx) })
225    }
226
227    /// Returns a reference to the item at the given index.
228    ///
229    /// # Safety
230    ///
231    /// Item at `index` must be initialized. That means you must have observed a
232    /// match with the corresponding index in this exact snapshot. Observing
233    /// a higher index is not enough as item indices can be non-contigously
234    /// initialized
235    #[inline]
236    pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> {
237        unsafe { self.items.get_unchecked(index) }
238    }
239
240    /// Returns a reference to the item at the given index.
241    ///
242    /// Returns `None` if the given `index` is not initialized. This function
243    /// is only guarteed to return `Some` for item indices that can be found in
244    /// the `matches` of this struct. Both smaller and larger indices may return
245    /// `None`.
246    #[inline]
247    pub fn get_item(&self, index: u32) -> Option<Item<'_, T>> {
248        self.items.get(index)
249    }
250
251    /// Returns the matches corresponding to this snapshot.
252    #[inline]
253    pub fn matches(&self) -> &[Match] {
254        &self.matches
255    }
256
257    /// A convenience function to return the [`Item`] corresponding to the
258    /// `n`th match.
259    ///
260    /// Returns `None` if `n` is greater than or equal to the match count.
261    #[inline]
262    pub fn get_matched_item(&self, n: u32) -> Option<Item<'_, T>> {
263        // SAFETY: A match index is guaranteed to corresponding to a valid global index in this
264        // snapshot.
265        unsafe { Some(self.get_item_unchecked(self.matches.get(n as usize)?.idx)) }
266    }
267}
268
269#[repr(u8)]
270#[derive(Clone, Copy, PartialEq, Eq)]
271enum State {
272    Init,
273    /// items have been cleared but snapshot and items are still outdated
274    Cleared,
275    /// items are fresh
276    Fresh,
277}
278
279impl State {
280    fn matcher_item_refs(self) -> usize {
281        match self {
282            Self::Cleared => 1,
283            Self::Init | Self::Fresh => 2,
284        }
285    }
286
287    fn canceled(self) -> bool {
288        self != Self::Fresh
289    }
290
291    fn cleared(self) -> bool {
292        self != Self::Fresh
293    }
294}
295
296/// A high level matcher worker that quickly computes matches in a background
297/// threadpool.
298///
299/// ## Example
300/// ```
301/// use std::sync::atomic::{AtomicBool, Ordering};
302/// use std::sync::Arc;
303/// use std::thread;
304///
305/// use ncp_engine::{Config, Nucleo};
306///
307/// static NEEDS_UPDATE: AtomicBool = AtomicBool::new(false);
308///
309/// // initialize a new matcher with default configuration and one column
310/// let matcher = Nucleo::new(
311///     Config::DEFAULT,
312///     Arc::new(|| NEEDS_UPDATE.store(true, Ordering::Relaxed)),
313///     None,
314///     1
315/// );
316///
317/// // get a handle to add items to the matcher
318/// let injector = matcher.injector();
319///
320/// // add items to the matcher
321/// thread::spawn(move || {
322///     injector.push("Hello, world!".to_string(), |s, cols| {
323///         cols[0] = (&**s).into();
324///     });
325/// });
326/// ```
327pub struct Nucleo<T: Sync + Send + 'static> {
328    // the way the API is build we totally don't actually need these to be Arcs
329    // but this lets us avoid some unsafe
330    canceled: Arc<AtomicBool>,
331    should_notify: Arc<AtomicBool>,
332    worker: Arc<Mutex<Worker<T>>>,
333    pool: ThreadPool,
334    state: State,
335    items: Arc<boxcar::Vec<T>>,
336    notify: Arc<dyn Fn() + Sync + Send>,
337    snapshot: Snapshot<T>,
338    /// The pattern matched by this matcher.
339    ///
340    /// To update the match pattern, use [`MultiPattern::reparse`]. Note that
341    /// the matcher worker will only become aware of the new pattern after a
342    /// call to [`tick`](Nucleo::tick).
343    pub pattern: MultiPattern,
344}
345
346impl<T: Sync + Send + 'static> Nucleo<T> {
347    /// Constructs a new `nucleo` worker threadpool with the provided `config`.
348    ///
349    /// `notify` is called whenever new information is available and
350    /// [`tick`](Nucleo::tick) should be called. Note that `notify` is not
351    /// debounced; that should be handled by the downstream crate (for example,
352    /// debouncing to only redraw at most every 1/60 seconds).
353    ///
354    /// If `None` is passed for the number of worker threads, nucleo will use
355    /// one thread per hardware thread.
356    ///
357    /// Nucleo can match items with multiple orthogonal properties. `columns`
358    /// indicates how many matching columns each item (and the pattern) has. The
359    /// number of columns cannot be changed after construction.
360    pub fn new(
361        config: Config,
362        notify: Arc<dyn Fn() + Sync + Send>,
363        num_threads: Option<usize>,
364        columns: u32,
365    ) -> Self {
366        let (pool, worker) = Worker::new(num_threads, config, notify.clone(), columns);
367        Self {
368            canceled: worker.canceled.clone(),
369            should_notify: worker.should_notify.clone(),
370            items: worker.items.clone(),
371            pool,
372            pattern: MultiPattern::new(columns as usize),
373            snapshot: Snapshot {
374                matches: Vec::with_capacity(2 * 1024),
375                pattern: MultiPattern::new(columns as usize),
376                item_count: 0,
377                items: worker.items.clone(),
378            },
379            worker: Arc::new(Mutex::new(worker)),
380            state: State::Init,
381            notify,
382        }
383    }
384
385    /// Returns the total number of active injectors.
386    pub fn active_injectors(&self) -> usize {
387        Arc::strong_count(&self.items)
388            - self.state.matcher_item_refs()
389            - (Arc::ptr_eq(&self.snapshot.items, &self.items)) as usize
390    }
391
392    /// Returns a snapshot of the current matcher state.
393    ///
394    /// This method is very cheap and can be called every time a snapshot is required. The
395    /// snapshot will not change unless [`tick`](Nucleo::tick) is called.
396    pub fn snapshot(&self) -> &Snapshot<T> {
397        &self.snapshot
398    }
399
400    /// Returns an injector that can be used for adding candidates to the matcher.
401    pub fn injector(&self) -> Injector<T> {
402        Injector {
403            items: self.items.clone(),
404            notify: self.notify.clone(),
405        }
406    }
407
408    /// Restart the the item stream. Removes all items and disconnects all
409    /// previously created injectors from this instance. If `clear_snapshot`
410    /// is `true` then all items and matched are removed from the [`Snapshot`]
411    /// immediately. Otherwise the snapshot will keep the current matches until
412    /// the matcher has run again.
413    ///
414    /// # Note
415    ///
416    /// The injectors will continue to function but they will not affect this
417    /// instance anymore. The old items will only be dropped when all injectors
418    /// were dropped.
419    pub fn restart(&mut self, clear_snapshot: bool) {
420        self.canceled.store(true, Ordering::Relaxed);
421        self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns()));
422        self.state = State::Cleared;
423        if clear_snapshot {
424            self.snapshot.clear(self.items.clone());
425        }
426    }
427
428    /// Update the internal configuration.
429    pub fn update_config(&mut self, config: Config) {
430        self.worker.lock().update_config(config);
431    }
432
433    /// Set whether the matcher should sort search results by score after
434    /// matching. Defaults to true.
435    pub fn sort_results(&mut self, sort_results: bool) {
436        self.worker.lock().sort_results(sort_results);
437    }
438
439    /// Set whether the matcher should reverse the order of the input.
440    /// Defaults to false.
441    pub fn reverse_items(&mut self, reverse_items: bool) {
442        self.worker.lock().reverse_items(reverse_items);
443    }
444
445    /// Update the internal state to reflect any changes from the background worker
446    /// threads.
447    ///
448    /// This is the main way to interact with the matcher, and should be called
449    /// regularly (for example each time a frame is rendered). To avoid excessive
450    /// redraws this method will wait `timeout` milliseconds for the
451    /// worker thread to finish. It is recommend to set the timeout to 10ms.
452    pub fn tick(&mut self, timeout: u64) -> Status {
453        self.should_notify.store(false, atomic::Ordering::Relaxed);
454        let status = self.pattern.status();
455        let canceled = status != pattern::Status::Unchanged || self.state.canceled();
456        let mut res = self.tick_inner(timeout, canceled, status);
457        if !canceled {
458            return res;
459        }
460        self.state = State::Fresh;
461        let status2 = self.tick_inner(timeout, false, pattern::Status::Unchanged);
462        res.changed |= status2.changed;
463        res.running = status2.running;
464        res
465    }
466
467    fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status {
468        let mut inner = if canceled {
469            self.pattern.reset_status();
470            self.canceled.store(true, atomic::Ordering::Relaxed);
471            self.worker.lock_arc()
472        } else {
473            let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else {
474                self.should_notify.store(true, Ordering::Release);
475                return Status {
476                    changed: false,
477                    running: true,
478                };
479            };
480            worker
481        };
482
483        let changed = inner.running;
484
485        let running = canceled || self.items.count() > inner.item_count();
486        if inner.running {
487            inner.running = false;
488            if !inner.was_canceled && !self.state.canceled() {
489                self.snapshot.update(&inner);
490            }
491        }
492        if running {
493            inner.pattern.clone_from(&self.pattern);
494            self.canceled.store(false, atomic::Ordering::Relaxed);
495            if !canceled {
496                self.should_notify.store(true, atomic::Ordering::Release);
497            }
498            let cleared = self.state.cleared();
499            if cleared {
500                inner.items = self.items.clone();
501            }
502            self.pool
503                .spawn(move || unsafe { inner.run(status, cleared) });
504        }
505        Status { changed, running }
506    }
507}
508
509impl<T: Sync + Send> Drop for Nucleo<T> {
510    fn drop(&mut self) {
511        // we ensure the worker quits before dropping items to ensure that
512        // the worker can always assume the items outlive it
513        self.canceled.store(true, atomic::Ordering::Relaxed);
514        let lock = self.worker.try_lock_for(Duration::from_secs(1));
515        if lock.is_none() {
516            unreachable!("thread pool failed to shutdown properly")
517        }
518    }
519}