Skip to main content

nucleo/
lib.rs

1/*!
2`nucleo` is a high level crate that provides a high level matcher API that
3provides a highly effective (parallel) matcher worker. It's designed to allow
4quickly plugging a fully featured (and faster) fzf/skim like fuzzy matcher into
5your TUI application.
6
7It's designed to run matching on a background threadpool while providing a
8snapshot of the last complete match. That means the matcher can update the
9results live while the user is typing while never blocking the main UI thread
10(beyond a user provided timeout). Nucleo also supports fully concurrent lock-free
11(and wait-free) streaming of input items.
12
13The [`Nucleo`] struct serves as the main API entrypoint for this crate.
14
15# Status
16
17Nucleo is used in the helix-editor and therefore has a large user base with lots
18or real world testing. The core matcher implementation is considered complete
19and is unlikely to see major changes. The `nucleo-matcher` crate is finished and
20ready for widespread use, breaking changes should be very rare (a 1.0 release
21should not be far away).
22
23While the high level `nucleo` crate also works well (and is also used in helix),
24there are still additional features that will be added in the future. The high
25level crate also need better documentation and will likely see a few minor API
26changes in the future.
27
28*/
29use std::cmp::Ordering as CmpOrdering;
30use std::ops::{Bound, RangeBounds};
31use std::sync::atomic::{self, AtomicBool, Ordering};
32use std::sync::Arc;
33use std::time::Duration;
34
35use parking_lot::Mutex;
36use rayon::ThreadPool;
37
38use crate::pattern::MultiPattern;
39use crate::worker::Worker;
40pub use nucleo_matcher::{chars, Config, Matcher, Utf32Str, Utf32String};
41
42mod boxcar;
43mod par_sort;
44pub mod pattern;
45mod worker;
46
47#[cfg(test)]
48mod tests;
49
50/// Comparison function for custom sorting of match results.
51pub type SortFn<T> =
52    Box<dyn Fn(&Match, Item<'_, T>, &Match, Item<'_, T>) -> CmpOrdering + Send + Sync>;
53
54/// Strategy for sorting match results.
55#[derive(Default)]
56pub enum SortStrategy<T: Sync + Send + 'static> {
57    /// Sort items by index.
58    Index,
59    /// Sort by score (desc), then length (asc), then index.
60    #[default]
61    Score,
62    /// Custom comparison function.
63    Custom(SortFn<T>),
64}
65
66impl<T: Sync + Send + 'static> std::fmt::Debug for SortStrategy<T> {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            SortStrategy::Index => write!(f, "SortStrategy::None"),
70            SortStrategy::Score => write!(f, "SortStrategy::Score"),
71            SortStrategy::Custom(_) => write!(f, "SortStrategy::Custom(...)"),
72        }
73    }
74}
75
76/// A match candidate stored in a [`Nucleo`] worker.
77pub struct Item<'a, T> {
78    pub data: &'a T,
79    pub matcher_columns: &'a [Utf32String],
80}
81
82/// A handle that allows adding new items to a [`Nucleo`] worker.
83///
84/// It's internally reference counted and can be cheaply cloned
85/// and sent across threads.
86pub struct Injector<T> {
87    items: Arc<boxcar::Vec<T>>,
88    notify: Arc<dyn Fn() + Sync + Send>,
89}
90
91impl<T> Clone for Injector<T> {
92    fn clone(&self) -> Self {
93        Injector {
94            items: self.items.clone(),
95            notify: self.notify.clone(),
96        }
97    }
98}
99
100impl<T> Injector<T> {
101    /// Appends an element to the list of matched items.
102    /// This function is lock-free and wait-free.
103    pub fn push(&self, value: T, fill_columns: impl FnOnce(&T, &mut [Utf32String])) -> u32 {
104        let idx = self.items.push(value, fill_columns);
105        (self.notify)();
106        idx
107    }
108
109    /// Appends multiple elements to the list of matched items.
110    /// This function is lock-free and wait-free.
111    ///
112    /// You should favor this function over `push` if at least one of the following is true:
113    /// - the number of items you're adding can be computed beforehand and is typically larger
114    ///     than 1k
115    /// - you're able to batch incoming items
116    /// - you're adding items from multiple threads concurrently (this function results in less
117    ///     contention)
118    pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
119    where
120        I: IntoIterator<Item = T> + ExactSizeIterator,
121    {
122        self.items.extend(values, fill_columns);
123        (self.notify)();
124    }
125
126    /// Returns the total number of items injected in the matcher. This might
127    /// not match the number of items in the match snapshot (if the matcher
128    /// is still running)
129    pub fn injected_items(&self) -> u32 {
130        self.items.count()
131    }
132
133    /// Returns a reference to the item at the given index.
134    ///
135    /// # Safety
136    ///
137    /// Item at `index` must be initialized. That means you must have observed
138    /// `push` returning this value or `get` returning `Some` for this value.
139    /// Just because a later index is initialized doesn't mean that this index
140    /// is initialized
141    pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
142        self.items.get_unchecked(index)
143    }
144
145    /// Returns a reference to the element at the given index.
146    pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
147        self.items.get(index)
148    }
149}
150
151/// An [item](crate::Item) that was successfully matched by a [`Nucleo`] worker.
152#[derive(PartialEq, Eq, Debug, Clone, Copy)]
153pub struct Match {
154    pub score: u32,
155    pub idx: u32,
156}
157
158/// That status of a [`Nucleo`] worker after a match.
159#[derive(PartialEq, Eq, Debug, Clone, Copy)]
160pub struct Status {
161    /// Whether the current snapshot has changed.
162    pub changed: bool,
163    /// Whether the matcher is still processing in the background.
164    pub running: bool,
165}
166
167/// A snapshot represent the results of a [`Nucleo`] worker after
168/// finishing a [`tick`](Nucleo::tick).
169pub struct Snapshot<T: Sync + Send + 'static> {
170    item_count: u32,
171    matches: Vec<Match>,
172    pattern: MultiPattern,
173    items: Arc<boxcar::Vec<T>>,
174}
175
176impl<T: Sync + Send + 'static> Snapshot<T> {
177    fn clear(&mut self, new_items: Arc<boxcar::Vec<T>>) {
178        self.item_count = 0;
179        self.matches.clear();
180        self.items = new_items
181    }
182
183    fn update(&mut self, worker: &Worker<T>) {
184        self.item_count = worker.item_count();
185        self.pattern.clone_from(&worker.pattern);
186        self.matches.clone_from(&worker.matches);
187        if !Arc::ptr_eq(&worker.items, &self.items) {
188            self.items = worker.items.clone()
189        }
190    }
191
192    /// Returns that total number of items
193    pub fn item_count(&self) -> u32 {
194        self.item_count
195    }
196
197    /// Returns the pattern which items were matched against
198    pub fn pattern(&self) -> &MultiPattern {
199        &self.pattern
200    }
201
202    /// Returns that number of items that matched the pattern
203    pub fn matched_item_count(&self) -> u32 {
204        self.matches.len() as u32
205    }
206
207    /// Returns an iterator over the items that correspond to a subrange of
208    /// all the matches in this snapshot.
209    ///
210    /// # Panics
211    /// Panics if `range` has a range bound that is larger than
212    /// the matched item count
213    pub fn matched_items(
214        &self,
215        range: impl RangeBounds<u32>,
216    ) -> impl ExactSizeIterator<Item = Item<'_, T>> + DoubleEndedIterator + '_ {
217        // TODO: use TAIT
218        let start = match range.start_bound() {
219            Bound::Included(&start) => start as usize,
220            Bound::Excluded(&start) => start as usize + 1,
221            Bound::Unbounded => 0,
222        };
223        let end = match range.end_bound() {
224            Bound::Included(&end) => end as usize + 1,
225            Bound::Excluded(&end) => end as usize,
226            Bound::Unbounded => self.matches.len(),
227        };
228        self.matches[start..end]
229            .iter()
230            .map(|&m| unsafe { self.items.get_unchecked(m.idx) })
231    }
232
233    /// Returns a reference to the item at the given index.
234    ///
235    /// # Safety
236    ///
237    /// Item at `index` must be initialized. That means you must have observed a
238    /// match with the corresponding index in this exact snapshot. Observing
239    /// a higher index is not enough as item indices can be non-contigously
240    /// initialized
241    #[inline]
242    pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> {
243        self.items.get_unchecked(index)
244    }
245
246    /// Returns a reference to the item at the given index.
247    ///
248    /// Returns `None` if the given `index` is not initialized. This function
249    /// is only guarteed to return `Some` for item indices that can be found in
250    /// the `matches` of this struct. Both smaller and larger indices may return
251    /// `None`.
252    #[inline]
253    pub fn get_item(&self, index: u32) -> Option<Item<'_, T>> {
254        self.items.get(index)
255    }
256
257    /// Return the matches corresponding to this snapshot.
258    #[inline]
259    pub fn matches(&self) -> &[Match] {
260        &self.matches
261    }
262
263    /// A convenience function to return the [`Item`] corresponding to the
264    /// `n`th match.
265    ///
266    /// Returns `None` if `n` is greater than or equal to the match count.
267    #[inline]
268    pub fn get_matched_item(&self, n: u32) -> Option<Item<'_, T>> {
269        // SAFETY: A match index is guaranteed to corresponding to a valid global index in this
270        // snapshot.
271        unsafe { Some(self.get_item_unchecked(self.matches.get(n as usize)?.idx)) }
272    }
273}
274
275#[repr(u8)]
276#[derive(Clone, Copy, PartialEq, Eq)]
277enum State {
278    Init,
279    /// items have been cleared but snapshot and items are still outdated
280    Cleared,
281    /// items are fresh
282    Fresh,
283}
284
285impl State {
286    fn matcher_item_refs(self) -> usize {
287        match self {
288            State::Cleared => 1,
289            State::Init | State::Fresh => 2,
290        }
291    }
292
293    fn canceled(self) -> bool {
294        self != State::Fresh
295    }
296
297    fn cleared(self) -> bool {
298        self != State::Fresh
299    }
300}
301
302/// A high level matcher worker that quickly computes matches in a background
303/// threadpool.
304pub struct Nucleo<T: Sync + Send + 'static> {
305    // the way the API is build we totally don't actually need these to be Arcs
306    // but this lets us avoid some unsafe
307    canceled: Arc<AtomicBool>,
308    should_notify: Arc<AtomicBool>,
309    worker: Arc<Mutex<Worker<T>>>,
310    pool: ThreadPool,
311    state: State,
312    items: Arc<boxcar::Vec<T>>,
313    notify: Arc<dyn Fn() + Sync + Send>,
314    snapshot: Snapshot<T>,
315    /// The pattern matched by this matcher. To update the match pattern
316    /// [`MultiPattern::reparse`](`pattern::MultiPattern::reparse`) should be used.
317    /// Note that the matcher worker will only become aware of the new pattern
318    /// after a call to [`tick`](Nucleo::tick).
319    pub pattern: MultiPattern,
320}
321
322impl<T: Sync + Send + 'static> Nucleo<T> {
323    /// Constructs a new `nucleo` worker threadpool with the provided `config`.
324    ///
325    /// `notify` is called everytime new information is available and
326    /// [`tick`](Nucleo::tick) should be called. Note that `notify` is not
327    /// debounced, that should be handled by the downstream crate (for example
328    /// debouncing to only redraw at most every 1/60 seconds).
329    ///
330    /// If `None` is passed for the number of worker threads, nucleo will use
331    /// one thread per hardware thread.
332    ///
333    /// Nucleo can match items with multiple orthogonal properties. `columns`
334    /// indicates how many matching columns each item (and the pattern) has. The
335    /// number of columns cannot be changed after construction.
336    pub fn new(
337        config: Config,
338        notify: Arc<dyn Fn() + Sync + Send>,
339        num_threads: Option<usize>,
340        columns: u32,
341    ) -> Self {
342        let (pool, worker) = Worker::new(num_threads, config, notify.clone(), columns);
343        Self {
344            canceled: worker.canceled.clone(),
345            should_notify: worker.should_notify.clone(),
346            items: worker.items.clone(),
347            pool,
348            pattern: MultiPattern::new(columns as usize),
349            snapshot: Snapshot {
350                matches: Vec::with_capacity(2 * 1024),
351                pattern: MultiPattern::new(columns as usize),
352                item_count: 0,
353                items: worker.items.clone(),
354            },
355            worker: Arc::new(Mutex::new(worker)),
356            state: State::Init,
357            notify,
358        }
359    }
360
361    /// Returns the total number of active injectors
362    pub fn active_injectors(&self) -> usize {
363        Arc::strong_count(&self.items)
364            - self.state.matcher_item_refs()
365            - (Arc::ptr_eq(&self.snapshot.items, &self.items)) as usize
366    }
367
368    /// Returns a snapshot of the current matcher state.
369    pub fn snapshot(&self) -> &Snapshot<T> {
370        &self.snapshot
371    }
372
373    /// Returns an injector that can be used for adding candidates to the matcher.
374    pub fn injector(&self) -> Injector<T> {
375        Injector {
376            items: self.items.clone(),
377            notify: self.notify.clone(),
378        }
379    }
380
381    /// Restart the the item stream. Removes all items and disconnects all
382    /// previously created injectors from this instance. If `clear_snapshot`
383    /// is `true` then all items and matched are removed from the [`Snapshot`]
384    /// immediately. Otherwise the snapshot will keep the current matches until
385    /// the matcher has run again.
386    ///
387    /// # Note
388    ///
389    /// The injectors will continue to function but they will not affect this
390    /// instance anymore. The old items will only be dropped when all injectors
391    /// were dropped.
392    pub fn restart(&mut self, clear_snapshot: bool) {
393        self.canceled.store(true, Ordering::Relaxed);
394        self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns()));
395        self.state = State::Cleared;
396        if clear_snapshot {
397            self.snapshot.clear(self.items.clone());
398        }
399    }
400
401    /// Update the internal configuration.
402    pub fn update_config(&mut self, config: Config) {
403        self.worker.lock().update_config(config)
404    }
405
406    /// Set whether to sort results by score. Defaults to true.
407    pub fn sort_results(&mut self, sort_results: bool) {
408        self.worker.lock().sort_results(sort_results)
409    }
410
411    /// Set the strategy for sorting match results.
412    pub fn set_sort_strategy(&mut self, strategy: SortStrategy<T>) {
413        self.worker.lock().set_sort_strategy(strategy)
414    }
415
416    /// Set whether to reverse the input order. Defaults to false.
417    pub fn reverse_items(&mut self, reverse_items: bool) {
418        self.worker.lock().reverse_items(reverse_items)
419    }
420
421    /// The main way to interact with the matcher, this should be called
422    /// regularly (for example each time a frame is rendered). To avoid
423    /// excessive redraws this method will wait `timeout` milliseconds for the
424    /// worker therad to finish. It is recommend to set the timeout to 10ms.
425    pub fn tick(&mut self, timeout: u64) -> Status {
426        self.should_notify.store(false, atomic::Ordering::Relaxed);
427        let status = self.pattern.status();
428        let canceled = status != pattern::Status::Unchanged || self.state.canceled();
429        let mut res = self.tick_inner(timeout, canceled, status);
430        if !canceled {
431            return res;
432        }
433        self.state = State::Fresh;
434        let status2 = self.tick_inner(timeout, false, pattern::Status::Unchanged);
435        res.changed |= status2.changed;
436        res.running = status2.running;
437        res
438    }
439
440    fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status {
441        let mut inner = if canceled {
442            self.pattern.reset_status();
443            self.canceled.store(true, atomic::Ordering::Relaxed);
444            self.worker.lock_arc()
445        } else {
446            let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else {
447                self.should_notify.store(true, Ordering::Release);
448                return Status {
449                    changed: false,
450                    running: true,
451                };
452            };
453            worker
454        };
455
456        let changed = inner.running;
457
458        let running = canceled || self.items.count() > inner.item_count();
459        if inner.running {
460            inner.running = false;
461            if !inner.was_canceled && !self.state.canceled() {
462                self.snapshot.update(&inner)
463            }
464        }
465        if running {
466            inner.pattern.clone_from(&self.pattern);
467            self.canceled.store(false, atomic::Ordering::Relaxed);
468            if !canceled {
469                self.should_notify.store(true, atomic::Ordering::Release);
470            }
471            let cleared = self.state.cleared();
472            if cleared {
473                inner.items = self.items.clone();
474            }
475            self.pool
476                .spawn(move || unsafe { inner.run(status, cleared) })
477        }
478        Status { changed, running }
479    }
480}
481
482impl<T: Sync + Send> Drop for Nucleo<T> {
483    fn drop(&mut self) {
484        // we ensure the worker quits before dropping items to ensure that
485        // the worker can always assume the items outlive it
486        self.canceled.store(true, atomic::Ordering::Relaxed);
487        let lock = self.worker.try_lock_for(Duration::from_secs(1));
488        if lock.is_none() {
489            unreachable!("thread pool failed to shutdown properly")
490        }
491    }
492}