nucleo/lib.rs
1/*!
2`nucleo` is a high level crate that provides a high level matcher API that
3provides a highly effective (parallel) matcher worker. It's designed to allow
4quickly plugging a fully featured (and faster) fzf/skim like fuzzy matcher into
5your TUI application.
6
7It's designed to run matching on a background threadpool while providing a
8snapshot of the last complete match. That means the matcher can update the
9results live while the user is typing while never blocking the main UI thread
10(beyond a user provided timeout). Nucleo also supports fully concurrent lock-free
11(and wait-free) streaming of input items.
12
13The [`Nucleo`] struct serves as the main API entrypoint for this crate.
14
15# Status
16
17Nucleo is used in the helix-editor and therefore has a large user base with lots
18or real world testing. The core matcher implementation is considered complete
19and is unlikely to see major changes. The `nucleo-matcher` crate is finished and
20ready for widespread use, breaking changes should be very rare (a 1.0 release
21should not be far away).
22
23While the high level `nucleo` crate also works well (and is also used in helix),
24there are still additional features that will be added in the future. The high
25level crate also need better documentation and will likely see a few minor API
26changes in the future.
27
28*/
29use std::cmp::Ordering as CmpOrdering;
30use std::ops::{Bound, RangeBounds};
31use std::sync::atomic::{self, AtomicBool, Ordering};
32use std::sync::Arc;
33use std::time::Duration;
34
35use parking_lot::Mutex;
36use rayon::ThreadPool;
37
38use crate::pattern::MultiPattern;
39use crate::worker::Worker;
40pub use nucleo_matcher::{chars, Config, Matcher, Utf32Str, Utf32String};
41
42mod boxcar;
43mod par_sort;
44pub mod pattern;
45mod worker;
46
47#[cfg(test)]
48mod tests;
49
50/// Comparison function for custom sorting of match results.
51pub type SortFn<T> =
52 Box<dyn Fn(&Match, Item<'_, T>, &Match, Item<'_, T>) -> CmpOrdering + Send + Sync>;
53
54/// Strategy for sorting match results.
55#[derive(Default)]
56pub enum SortStrategy<T: Sync + Send + 'static> {
57 /// Sort items by index.
58 Index,
59 /// Sort by score (desc), then length (asc), then index.
60 #[default]
61 Score,
62 /// Custom comparison function.
63 Custom(SortFn<T>),
64}
65
66impl<T: Sync + Send + 'static> std::fmt::Debug for SortStrategy<T> {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 SortStrategy::Index => write!(f, "SortStrategy::None"),
70 SortStrategy::Score => write!(f, "SortStrategy::Score"),
71 SortStrategy::Custom(_) => write!(f, "SortStrategy::Custom(...)"),
72 }
73 }
74}
75
76/// A match candidate stored in a [`Nucleo`] worker.
77pub struct Item<'a, T> {
78 pub data: &'a T,
79 pub matcher_columns: &'a [Utf32String],
80}
81
82/// A handle that allows adding new items to a [`Nucleo`] worker.
83///
84/// It's internally reference counted and can be cheaply cloned
85/// and sent across threads.
86pub struct Injector<T> {
87 items: Arc<boxcar::Vec<T>>,
88 notify: Arc<dyn Fn() + Sync + Send>,
89}
90
91impl<T> Clone for Injector<T> {
92 fn clone(&self) -> Self {
93 Injector {
94 items: self.items.clone(),
95 notify: self.notify.clone(),
96 }
97 }
98}
99
100impl<T> Injector<T> {
101 /// Appends an element to the list of matched items.
102 /// This function is lock-free and wait-free.
103 pub fn push(&self, value: T, fill_columns: impl FnOnce(&T, &mut [Utf32String])) -> u32 {
104 let idx = self.items.push(value, fill_columns);
105 (self.notify)();
106 idx
107 }
108
109 /// Appends multiple elements to the list of matched items.
110 /// This function is lock-free and wait-free.
111 ///
112 /// You should favor this function over `push` if at least one of the following is true:
113 /// - the number of items you're adding can be computed beforehand and is typically larger
114 /// than 1k
115 /// - you're able to batch incoming items
116 /// - you're adding items from multiple threads concurrently (this function results in less
117 /// contention)
118 pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
119 where
120 I: IntoIterator<Item = T> + ExactSizeIterator,
121 {
122 self.items.extend(values, fill_columns);
123 (self.notify)();
124 }
125
126 /// Returns the total number of items injected in the matcher. This might
127 /// not match the number of items in the match snapshot (if the matcher
128 /// is still running)
129 pub fn injected_items(&self) -> u32 {
130 self.items.count()
131 }
132
133 /// Returns a reference to the item at the given index.
134 ///
135 /// # Safety
136 ///
137 /// Item at `index` must be initialized. That means you must have observed
138 /// `push` returning this value or `get` returning `Some` for this value.
139 /// Just because a later index is initialized doesn't mean that this index
140 /// is initialized
141 pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
142 self.items.get_unchecked(index)
143 }
144
145 /// Returns a reference to the element at the given index.
146 pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
147 self.items.get(index)
148 }
149}
150
151/// An [item](crate::Item) that was successfully matched by a [`Nucleo`] worker.
152#[derive(PartialEq, Eq, Debug, Clone, Copy)]
153pub struct Match {
154 pub score: u32,
155 pub idx: u32,
156}
157
158/// That status of a [`Nucleo`] worker after a match.
159#[derive(PartialEq, Eq, Debug, Clone, Copy)]
160pub struct Status {
161 /// Whether the current snapshot has changed.
162 pub changed: bool,
163 /// Whether the matcher is still processing in the background.
164 pub running: bool,
165}
166
167/// A snapshot represent the results of a [`Nucleo`] worker after
168/// finishing a [`tick`](Nucleo::tick).
169pub struct Snapshot<T: Sync + Send + 'static> {
170 item_count: u32,
171 matches: Vec<Match>,
172 pattern: MultiPattern,
173 items: Arc<boxcar::Vec<T>>,
174}
175
176impl<T: Sync + Send + 'static> Snapshot<T> {
177 fn clear(&mut self, new_items: Arc<boxcar::Vec<T>>) {
178 self.item_count = 0;
179 self.matches.clear();
180 self.items = new_items
181 }
182
183 fn update(&mut self, worker: &Worker<T>) {
184 self.item_count = worker.item_count();
185 self.pattern.clone_from(&worker.pattern);
186 self.matches.clone_from(&worker.matches);
187 if !Arc::ptr_eq(&worker.items, &self.items) {
188 self.items = worker.items.clone()
189 }
190 }
191
192 /// Returns that total number of items
193 pub fn item_count(&self) -> u32 {
194 self.item_count
195 }
196
197 /// Returns the pattern which items were matched against
198 pub fn pattern(&self) -> &MultiPattern {
199 &self.pattern
200 }
201
202 /// Returns that number of items that matched the pattern
203 pub fn matched_item_count(&self) -> u32 {
204 self.matches.len() as u32
205 }
206
207 /// Returns an iterator over the items that correspond to a subrange of
208 /// all the matches in this snapshot.
209 ///
210 /// # Panics
211 /// Panics if `range` has a range bound that is larger than
212 /// the matched item count
213 pub fn matched_items(
214 &self,
215 range: impl RangeBounds<u32>,
216 ) -> impl ExactSizeIterator<Item = Item<'_, T>> + DoubleEndedIterator + '_ {
217 // TODO: use TAIT
218 let start = match range.start_bound() {
219 Bound::Included(&start) => start as usize,
220 Bound::Excluded(&start) => start as usize + 1,
221 Bound::Unbounded => 0,
222 };
223 let end = match range.end_bound() {
224 Bound::Included(&end) => end as usize + 1,
225 Bound::Excluded(&end) => end as usize,
226 Bound::Unbounded => self.matches.len(),
227 };
228 self.matches[start..end]
229 .iter()
230 .map(|&m| unsafe { self.items.get_unchecked(m.idx) })
231 }
232
233 /// Returns a reference to the item at the given index.
234 ///
235 /// # Safety
236 ///
237 /// Item at `index` must be initialized. That means you must have observed a
238 /// match with the corresponding index in this exact snapshot. Observing
239 /// a higher index is not enough as item indices can be non-contigously
240 /// initialized
241 #[inline]
242 pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> {
243 self.items.get_unchecked(index)
244 }
245
246 /// Returns a reference to the item at the given index.
247 ///
248 /// Returns `None` if the given `index` is not initialized. This function
249 /// is only guarteed to return `Some` for item indices that can be found in
250 /// the `matches` of this struct. Both smaller and larger indices may return
251 /// `None`.
252 #[inline]
253 pub fn get_item(&self, index: u32) -> Option<Item<'_, T>> {
254 self.items.get(index)
255 }
256
257 /// Return the matches corresponding to this snapshot.
258 #[inline]
259 pub fn matches(&self) -> &[Match] {
260 &self.matches
261 }
262
263 /// A convenience function to return the [`Item`] corresponding to the
264 /// `n`th match.
265 ///
266 /// Returns `None` if `n` is greater than or equal to the match count.
267 #[inline]
268 pub fn get_matched_item(&self, n: u32) -> Option<Item<'_, T>> {
269 // SAFETY: A match index is guaranteed to corresponding to a valid global index in this
270 // snapshot.
271 unsafe { Some(self.get_item_unchecked(self.matches.get(n as usize)?.idx)) }
272 }
273}
274
275#[repr(u8)]
276#[derive(Clone, Copy, PartialEq, Eq)]
277enum State {
278 Init,
279 /// items have been cleared but snapshot and items are still outdated
280 Cleared,
281 /// items are fresh
282 Fresh,
283}
284
285impl State {
286 fn matcher_item_refs(self) -> usize {
287 match self {
288 State::Cleared => 1,
289 State::Init | State::Fresh => 2,
290 }
291 }
292
293 fn canceled(self) -> bool {
294 self != State::Fresh
295 }
296
297 fn cleared(self) -> bool {
298 self != State::Fresh
299 }
300}
301
302/// A high level matcher worker that quickly computes matches in a background
303/// threadpool.
304pub struct Nucleo<T: Sync + Send + 'static> {
305 // the way the API is build we totally don't actually need these to be Arcs
306 // but this lets us avoid some unsafe
307 canceled: Arc<AtomicBool>,
308 should_notify: Arc<AtomicBool>,
309 worker: Arc<Mutex<Worker<T>>>,
310 pool: ThreadPool,
311 state: State,
312 items: Arc<boxcar::Vec<T>>,
313 notify: Arc<dyn Fn() + Sync + Send>,
314 snapshot: Snapshot<T>,
315 /// The pattern matched by this matcher. To update the match pattern
316 /// [`MultiPattern::reparse`](`pattern::MultiPattern::reparse`) should be used.
317 /// Note that the matcher worker will only become aware of the new pattern
318 /// after a call to [`tick`](Nucleo::tick).
319 pub pattern: MultiPattern,
320}
321
322impl<T: Sync + Send + 'static> Nucleo<T> {
323 /// Constructs a new `nucleo` worker threadpool with the provided `config`.
324 ///
325 /// `notify` is called everytime new information is available and
326 /// [`tick`](Nucleo::tick) should be called. Note that `notify` is not
327 /// debounced, that should be handled by the downstream crate (for example
328 /// debouncing to only redraw at most every 1/60 seconds).
329 ///
330 /// If `None` is passed for the number of worker threads, nucleo will use
331 /// one thread per hardware thread.
332 ///
333 /// Nucleo can match items with multiple orthogonal properties. `columns`
334 /// indicates how many matching columns each item (and the pattern) has. The
335 /// number of columns cannot be changed after construction.
336 pub fn new(
337 config: Config,
338 notify: Arc<dyn Fn() + Sync + Send>,
339 num_threads: Option<usize>,
340 columns: u32,
341 ) -> Self {
342 let (pool, worker) = Worker::new(num_threads, config, notify.clone(), columns);
343 Self {
344 canceled: worker.canceled.clone(),
345 should_notify: worker.should_notify.clone(),
346 items: worker.items.clone(),
347 pool,
348 pattern: MultiPattern::new(columns as usize),
349 snapshot: Snapshot {
350 matches: Vec::with_capacity(2 * 1024),
351 pattern: MultiPattern::new(columns as usize),
352 item_count: 0,
353 items: worker.items.clone(),
354 },
355 worker: Arc::new(Mutex::new(worker)),
356 state: State::Init,
357 notify,
358 }
359 }
360
361 /// Returns the total number of active injectors
362 pub fn active_injectors(&self) -> usize {
363 Arc::strong_count(&self.items)
364 - self.state.matcher_item_refs()
365 - (Arc::ptr_eq(&self.snapshot.items, &self.items)) as usize
366 }
367
368 /// Returns a snapshot of the current matcher state.
369 pub fn snapshot(&self) -> &Snapshot<T> {
370 &self.snapshot
371 }
372
373 /// Returns an injector that can be used for adding candidates to the matcher.
374 pub fn injector(&self) -> Injector<T> {
375 Injector {
376 items: self.items.clone(),
377 notify: self.notify.clone(),
378 }
379 }
380
381 /// Restart the the item stream. Removes all items and disconnects all
382 /// previously created injectors from this instance. If `clear_snapshot`
383 /// is `true` then all items and matched are removed from the [`Snapshot`]
384 /// immediately. Otherwise the snapshot will keep the current matches until
385 /// the matcher has run again.
386 ///
387 /// # Note
388 ///
389 /// The injectors will continue to function but they will not affect this
390 /// instance anymore. The old items will only be dropped when all injectors
391 /// were dropped.
392 pub fn restart(&mut self, clear_snapshot: bool) {
393 self.canceled.store(true, Ordering::Relaxed);
394 self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns()));
395 self.state = State::Cleared;
396 if clear_snapshot {
397 self.snapshot.clear(self.items.clone());
398 }
399 }
400
401 /// Update the internal configuration.
402 pub fn update_config(&mut self, config: Config) {
403 self.worker.lock().update_config(config)
404 }
405
406 /// Set whether to sort results by score. Defaults to true.
407 pub fn sort_results(&mut self, sort_results: bool) {
408 self.worker.lock().sort_results(sort_results)
409 }
410
411 /// Set the strategy for sorting match results.
412 pub fn set_sort_strategy(&mut self, strategy: SortStrategy<T>) {
413 self.worker.lock().set_sort_strategy(strategy)
414 }
415
416 /// Set whether to reverse the input order. Defaults to false.
417 pub fn reverse_items(&mut self, reverse_items: bool) {
418 self.worker.lock().reverse_items(reverse_items)
419 }
420
421 /// The main way to interact with the matcher, this should be called
422 /// regularly (for example each time a frame is rendered). To avoid
423 /// excessive redraws this method will wait `timeout` milliseconds for the
424 /// worker therad to finish. It is recommend to set the timeout to 10ms.
425 pub fn tick(&mut self, timeout: u64) -> Status {
426 self.should_notify.store(false, atomic::Ordering::Relaxed);
427 let status = self.pattern.status();
428 let canceled = status != pattern::Status::Unchanged || self.state.canceled();
429 let mut res = self.tick_inner(timeout, canceled, status);
430 if !canceled {
431 return res;
432 }
433 self.state = State::Fresh;
434 let status2 = self.tick_inner(timeout, false, pattern::Status::Unchanged);
435 res.changed |= status2.changed;
436 res.running = status2.running;
437 res
438 }
439
440 fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status {
441 let mut inner = if canceled {
442 self.pattern.reset_status();
443 self.canceled.store(true, atomic::Ordering::Relaxed);
444 self.worker.lock_arc()
445 } else {
446 let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else {
447 self.should_notify.store(true, Ordering::Release);
448 return Status {
449 changed: false,
450 running: true,
451 };
452 };
453 worker
454 };
455
456 let changed = inner.running;
457
458 let running = canceled || self.items.count() > inner.item_count();
459 if inner.running {
460 inner.running = false;
461 if !inner.was_canceled && !self.state.canceled() {
462 self.snapshot.update(&inner)
463 }
464 }
465 if running {
466 inner.pattern.clone_from(&self.pattern);
467 self.canceled.store(false, atomic::Ordering::Relaxed);
468 if !canceled {
469 self.should_notify.store(true, atomic::Ordering::Release);
470 }
471 let cleared = self.state.cleared();
472 if cleared {
473 inner.items = self.items.clone();
474 }
475 self.pool
476 .spawn(move || unsafe { inner.run(status, cleared) })
477 }
478 Status { changed, running }
479 }
480}
481
482impl<T: Sync + Send> Drop for Nucleo<T> {
483 fn drop(&mut self) {
484 // we ensure the worker quits before dropping items to ensure that
485 // the worker can always assume the items outlive it
486 self.canceled.store(true, atomic::Ordering::Relaxed);
487 let lock = self.worker.try_lock_for(Duration::from_secs(1));
488 if lock.is_none() {
489 unreachable!("thread pool failed to shutdown properly")
490 }
491 }
492}