ncp_engine/lib.rs
1//! # Nucleo-picker-engine
2//!
3//! The `ncp-engine` crate is a fork of the [nucleo](https://docs.rs/nucleo) crate.
4//! It is not recommended for general use. This fork mainly exists to meet the specific
5//! requirements of [`nucleo-picker`](https://docs.rs/nucleo-picker).
6//!
7//! `ncp-engine` implements a high level matcher API that provides a performant
8//! parallel matcher worker threadpool. It is designed to allow integrating a `fzf`-like
9//! fuzzy matcher into a TUI application.
10//!
11//! For a fully-featured TUI implementation, see [nucleo-picker](htpps://docs.rs/nucleo-picker).
12//!
13//! Matching runs in a background threadpool while providing a snapshot of the last
14//! complete match on request. That means the matcher can update the results live while
15//! the user is typing, while never blocking the main UI thread (beyond a user provided
16//! timeout). Nucleo also supports fully concurrent lock-free (and wait-free) streaming
17//! of input items.
18//!
19//! The [`Nucleo`] struct serves as the main API entrypoint for this crate.
20
21#![deny(missing_docs)]
22
23use std::ops::{Bound, RangeBounds};
24use std::sync::Arc;
25use std::sync::atomic::{self, AtomicBool, Ordering};
26use std::time::Duration;
27
28use parking_lot::Mutex;
29use rayon::ThreadPool;
30
31use crate::pattern::MultiPattern;
32use crate::worker::Worker;
33pub use ncp_matcher::{Config, Matcher, Utf32Str, Utf32String, chars};
34
35mod boxcar;
36mod par_sort;
37pub mod pattern;
38mod worker;
39
40#[cfg(test)]
41mod tests;
42
43/// A match candidate stored in a [`Nucleo`] worker.
44pub struct Item<'a, T> {
45 /// A reference to the underlying item provided to the matcher.
46 pub data: &'a T,
47 /// The representation of the data within the matcher.
48 pub matcher_columns: &'a [Utf32String],
49}
50
51/// A handle that allows adding new items to a [`Nucleo`] worker.
52///
53/// An `Injector` is internally reference counted and can be cheaply
54/// cloned and sent across threads.
55pub struct Injector<T> {
56 items: Arc<boxcar::Vec<T>>,
57 notify: Arc<dyn Fn() + Sync + Send>,
58}
59
60impl<T> Clone for Injector<T> {
61 fn clone(&self) -> Self {
62 Self {
63 items: self.items.clone(),
64 notify: self.notify.clone(),
65 }
66 }
67}
68
69impl<T> Injector<T> {
70 /// Appends an element to the list of match candidates.
71 ///
72 /// This function is lock-free and wait-free. The returned `u32` is the internal index which
73 /// has been assigned to the provided value and is guaranteed to be valid unless
74 /// [`Nucleo::restart`] has been called.
75 ///
76 /// The `fill_columns` closure is called to generate the representation of the pushed value
77 /// within the matcher engine. The first argument is a reference to the provided value, and the
78 /// second argument is a slice where each entry corresponds to a column within the [`Nucleo`]
79 /// instance from which this `Injector` was created.
80 ///
81 /// ## Example
82 /// If the matcher has exactly one column and the item type `T` is a `String`, an appropriate
83 /// `fill_columns` closure might look like
84 /// ```
85 /// # use ncp_engine::Utf32String;
86 /// let fill_columns = |s: &String, cols: &mut [Utf32String]| {
87 /// cols[0] = (&**s).into();
88 /// };
89 /// ```
90 pub fn push(&self, value: T, fill_columns: impl FnOnce(&T, &mut [Utf32String])) -> u32 {
91 let idx = self.items.push(value, fill_columns);
92 (self.notify)();
93 idx
94 }
95
96 /// Appends multiple elements to the list of matched items.
97 /// This function is lock-free and wait-free.
98 ///
99 /// You should favor this function over `push` if at least one of the following is true:
100 /// - the number of items you're adding can be computed beforehand and is typically larger
101 /// than 1k
102 /// - you're able to batch incoming items
103 /// - you're adding items from multiple threads concurrently (this function results in less
104 /// contention)
105 pub fn extend_exact<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
106 where
107 I: IntoIterator<Item = T> + ExactSizeIterator,
108 {
109 self.items.extend_exact(values, fill_columns);
110 (self.notify)();
111 }
112
113 /// Returns the total number of items injected in the matcher.
114 ///
115 /// This may not match the number of items in the match snapshot if the matcher
116 /// is still running.
117 pub fn injected_items(&self) -> u32 {
118 self.items.count()
119 }
120
121 /// Returns a reference to the item at the given index.
122 ///
123 /// # Safety
124 ///
125 /// Item at `index` must be initialized. That means you must have observed
126 /// `push` returning this value or `get` returning `Some` for this value.
127 /// Just because a later index is initialized doesn't mean that this index
128 /// is initialized
129 pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
130 unsafe { self.items.get_unchecked(index) }
131 }
132
133 /// Returns a reference to the element at the given index.
134 pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
135 self.items.get(index)
136 }
137}
138
139/// A successful match computed by the [`Nucleo`] match.
140#[derive(PartialEq, Eq, Debug, Clone, Copy)]
141pub struct Match {
142 /// The score of the match.
143 pub score: u32,
144 /// The index of the match.
145 ///
146 /// The index is guaranteed to correspond to a valid item within the matcher and within the
147 /// same snapshot. Note that indices are invalidated of the matcher engine has been
148 /// [restarted](Nucleo::restart).
149 pub idx: u32,
150}
151
152/// The status of a [`Nucleo`] worker after a match.
153#[derive(PartialEq, Eq, Debug, Clone, Copy)]
154pub struct Status {
155 /// Whether the current snapshot has changed.
156 pub changed: bool,
157 /// Whether the matcher is still processing in the background.
158 pub running: bool,
159}
160
161/// A representation of the results of a [`Nucleo`] worker after finishing a
162/// [`tick`](Nucleo::tick).
163pub struct Snapshot<T: Sync + Send + 'static> {
164 item_count: u32,
165 matches: Vec<Match>,
166 pattern: MultiPattern,
167 items: Arc<boxcar::Vec<T>>,
168}
169
170impl<T: Sync + Send + 'static> Snapshot<T> {
171 fn clear(&mut self, new_items: Arc<boxcar::Vec<T>>) {
172 self.item_count = 0;
173 self.matches.clear();
174 self.items = new_items;
175 }
176
177 fn update(&mut self, worker: &Worker<T>) {
178 self.item_count = worker.item_count();
179 self.pattern.clone_from(&worker.pattern);
180 self.matches.clone_from(&worker.matches);
181 if !Arc::ptr_eq(&worker.items, &self.items) {
182 self.items = worker.items.clone();
183 }
184 }
185
186 /// Returns that total number of items
187 pub fn item_count(&self) -> u32 {
188 self.item_count
189 }
190
191 /// Returns the pattern which items were matched against
192 pub fn pattern(&self) -> &MultiPattern {
193 &self.pattern
194 }
195
196 /// Returns that number of items that matched the pattern
197 pub fn matched_item_count(&self) -> u32 {
198 self.matches.len() as u32
199 }
200
201 /// Returns an iterator over the items that correspond to a subrange of
202 /// all the matches in this snapshot.
203 ///
204 /// # Panics
205 /// Panics if `range` has a range bound that is larger than
206 /// the matched item count
207 pub fn matched_items(
208 &self,
209 range: impl RangeBounds<u32>,
210 ) -> impl ExactSizeIterator<Item = Item<'_, T>> + DoubleEndedIterator + '_ {
211 // TODO: use TAIT
212 let start = match range.start_bound() {
213 Bound::Included(&start) => start as usize,
214 Bound::Excluded(&start) => start as usize + 1,
215 Bound::Unbounded => 0,
216 };
217 let end = match range.end_bound() {
218 Bound::Included(&end) => end as usize + 1,
219 Bound::Excluded(&end) => end as usize,
220 Bound::Unbounded => self.matches.len(),
221 };
222 self.matches[start..end]
223 .iter()
224 .map(|&m| unsafe { self.items.get_unchecked(m.idx) })
225 }
226
227 /// Returns a reference to the item at the given index.
228 ///
229 /// # Safety
230 ///
231 /// Item at `index` must be initialized. That means you must have observed a
232 /// match with the corresponding index in this exact snapshot. Observing
233 /// a higher index is not enough as item indices can be non-contigously
234 /// initialized
235 #[inline]
236 pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> {
237 unsafe { self.items.get_unchecked(index) }
238 }
239
240 /// Returns a reference to the item at the given index.
241 ///
242 /// Returns `None` if the given `index` is not initialized. This function
243 /// is only guarteed to return `Some` for item indices that can be found in
244 /// the `matches` of this struct. Both smaller and larger indices may return
245 /// `None`.
246 #[inline]
247 pub fn get_item(&self, index: u32) -> Option<Item<'_, T>> {
248 self.items.get(index)
249 }
250
251 /// Returns the matches corresponding to this snapshot.
252 #[inline]
253 pub fn matches(&self) -> &[Match] {
254 &self.matches
255 }
256
257 /// A convenience function to return the [`Item`] corresponding to the
258 /// `n`th match.
259 ///
260 /// Returns `None` if `n` is greater than or equal to the match count.
261 #[inline]
262 pub fn get_matched_item(&self, n: u32) -> Option<Item<'_, T>> {
263 // SAFETY: A match index is guaranteed to corresponding to a valid global index in this
264 // snapshot.
265 unsafe { Some(self.get_item_unchecked(self.matches.get(n as usize)?.idx)) }
266 }
267}
268
269#[repr(u8)]
270#[derive(Clone, Copy, PartialEq, Eq)]
271enum State {
272 Init,
273 /// items have been cleared but snapshot and items are still outdated
274 Cleared,
275 /// items are fresh
276 Fresh,
277}
278
279impl State {
280 fn matcher_item_refs(self) -> usize {
281 match self {
282 Self::Cleared => 1,
283 Self::Init | Self::Fresh => 2,
284 }
285 }
286
287 fn canceled(self) -> bool {
288 self != Self::Fresh
289 }
290
291 fn cleared(self) -> bool {
292 self != Self::Fresh
293 }
294}
295
296/// A high level matcher worker that quickly computes matches in a background
297/// threadpool.
298///
299/// ## Example
300/// ```
301/// use std::sync::atomic::{AtomicBool, Ordering};
302/// use std::sync::Arc;
303/// use std::thread;
304///
305/// use ncp_engine::{Config, Nucleo};
306///
307/// static NEEDS_UPDATE: AtomicBool = AtomicBool::new(false);
308///
309/// // initialize a new matcher with default configuration and one column
310/// let matcher = Nucleo::new(
311/// Config::DEFAULT,
312/// Arc::new(|| NEEDS_UPDATE.store(true, Ordering::Relaxed)),
313/// None,
314/// 1
315/// );
316///
317/// // get a handle to add items to the matcher
318/// let injector = matcher.injector();
319///
320/// // add items to the matcher
321/// thread::spawn(move || {
322/// injector.push("Hello, world!".to_string(), |s, cols| {
323/// cols[0] = (&**s).into();
324/// });
325/// });
326/// ```
327pub struct Nucleo<T: Sync + Send + 'static> {
328 // the way the API is build we totally don't actually need these to be Arcs
329 // but this lets us avoid some unsafe
330 canceled: Arc<AtomicBool>,
331 should_notify: Arc<AtomicBool>,
332 worker: Arc<Mutex<Worker<T>>>,
333 pool: ThreadPool,
334 state: State,
335 items: Arc<boxcar::Vec<T>>,
336 notify: Arc<dyn Fn() + Sync + Send>,
337 snapshot: Snapshot<T>,
338 /// The pattern matched by this matcher.
339 ///
340 /// To update the match pattern, use [`MultiPattern::reparse`]. Note that
341 /// the matcher worker will only become aware of the new pattern after a
342 /// call to [`tick`](Nucleo::tick).
343 pub pattern: MultiPattern,
344}
345
346impl<T: Sync + Send + 'static> Nucleo<T> {
347 /// Constructs a new `nucleo` worker threadpool with the provided `config`.
348 ///
349 /// `notify` is called whenever new information is available and
350 /// [`tick`](Nucleo::tick) should be called. Note that `notify` is not
351 /// debounced; that should be handled by the downstream crate (for example,
352 /// debouncing to only redraw at most every 1/60 seconds).
353 ///
354 /// If `None` is passed for the number of worker threads, nucleo will use
355 /// one thread per hardware thread.
356 ///
357 /// Nucleo can match items with multiple orthogonal properties. `columns`
358 /// indicates how many matching columns each item (and the pattern) has. The
359 /// number of columns cannot be changed after construction.
360 pub fn new(
361 config: Config,
362 notify: Arc<dyn Fn() + Sync + Send>,
363 num_threads: Option<usize>,
364 columns: u32,
365 ) -> Self {
366 let (pool, worker) = Worker::new(num_threads, config, notify.clone(), columns);
367 Self {
368 canceled: worker.canceled.clone(),
369 should_notify: worker.should_notify.clone(),
370 items: worker.items.clone(),
371 pool,
372 pattern: MultiPattern::new(columns as usize),
373 snapshot: Snapshot {
374 matches: Vec::with_capacity(2 * 1024),
375 pattern: MultiPattern::new(columns as usize),
376 item_count: 0,
377 items: worker.items.clone(),
378 },
379 worker: Arc::new(Mutex::new(worker)),
380 state: State::Init,
381 notify,
382 }
383 }
384
385 /// Returns the total number of active injectors.
386 pub fn active_injectors(&self) -> usize {
387 Arc::strong_count(&self.items)
388 - self.state.matcher_item_refs()
389 - (Arc::ptr_eq(&self.snapshot.items, &self.items)) as usize
390 }
391
392 /// Returns a snapshot of the current matcher state.
393 ///
394 /// This method is very cheap and can be called every time a snapshot is required. The
395 /// snapshot will not change unless [`tick`](Nucleo::tick) is called.
396 pub fn snapshot(&self) -> &Snapshot<T> {
397 &self.snapshot
398 }
399
400 /// Returns an injector that can be used for adding candidates to the matcher.
401 pub fn injector(&self) -> Injector<T> {
402 Injector {
403 items: self.items.clone(),
404 notify: self.notify.clone(),
405 }
406 }
407
408 /// Restart the the item stream. Removes all items and disconnects all
409 /// previously created injectors from this instance. If `clear_snapshot`
410 /// is `true` then all items and matched are removed from the [`Snapshot`]
411 /// immediately. Otherwise the snapshot will keep the current matches until
412 /// the matcher has run again.
413 ///
414 /// # Note
415 ///
416 /// The injectors will continue to function but they will not affect this
417 /// instance anymore. The old items will only be dropped when all injectors
418 /// were dropped.
419 pub fn restart(&mut self, clear_snapshot: bool) {
420 self.canceled.store(true, Ordering::Relaxed);
421 self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns()));
422 self.state = State::Cleared;
423 if clear_snapshot {
424 self.snapshot.clear(self.items.clone());
425 }
426 }
427
428 /// Update the internal configuration.
429 pub fn update_config(&mut self, config: Config) {
430 self.worker.lock().update_config(config);
431 }
432
433 /// Set whether the matcher should sort search results by score after
434 /// matching. Defaults to true.
435 pub fn sort_results(&mut self, sort_results: bool) {
436 self.worker.lock().sort_results(sort_results);
437 }
438
439 /// Set whether the matcher should reverse the order of the input.
440 /// Defaults to false.
441 pub fn reverse_items(&mut self, reverse_items: bool) {
442 self.worker.lock().reverse_items(reverse_items);
443 }
444
445 /// Update the internal state to reflect any changes from the background worker
446 /// threads.
447 ///
448 /// This is the main way to interact with the matcher, and should be called
449 /// regularly (for example each time a frame is rendered). To avoid excessive
450 /// redraws this method will wait `timeout` milliseconds for the
451 /// worker thread to finish. It is recommend to set the timeout to 10ms.
452 pub fn tick(&mut self, timeout: u64) -> Status {
453 self.should_notify.store(false, atomic::Ordering::Relaxed);
454 let status = self.pattern.status();
455 let canceled = status != pattern::Status::Unchanged || self.state.canceled();
456 let mut res = self.tick_inner(timeout, canceled, status);
457 if !canceled {
458 return res;
459 }
460 self.state = State::Fresh;
461 let status2 = self.tick_inner(timeout, false, pattern::Status::Unchanged);
462 res.changed |= status2.changed;
463 res.running = status2.running;
464 res
465 }
466
467 fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status {
468 let mut inner = if canceled {
469 self.pattern.reset_status();
470 self.canceled.store(true, atomic::Ordering::Relaxed);
471 self.worker.lock_arc()
472 } else {
473 let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else {
474 self.should_notify.store(true, Ordering::Release);
475 return Status {
476 changed: false,
477 running: true,
478 };
479 };
480 worker
481 };
482
483 let changed = inner.running;
484
485 let running = canceled || self.items.count() > inner.item_count();
486 if inner.running {
487 inner.running = false;
488 if !inner.was_canceled && !self.state.canceled() {
489 self.snapshot.update(&inner);
490 }
491 }
492 if running {
493 inner.pattern.clone_from(&self.pattern);
494 self.canceled.store(false, atomic::Ordering::Relaxed);
495 if !canceled {
496 self.should_notify.store(true, atomic::Ordering::Release);
497 }
498 let cleared = self.state.cleared();
499 if cleared {
500 inner.items = self.items.clone();
501 }
502 self.pool
503 .spawn(move || unsafe { inner.run(status, cleared) });
504 }
505 Status { changed, running }
506 }
507}
508
509impl<T: Sync + Send> Drop for Nucleo<T> {
510 fn drop(&mut self) {
511 // we ensure the worker quits before dropping items to ensure that
512 // the worker can always assume the items outlive it
513 self.canceled.store(true, atomic::Ordering::Relaxed);
514 let lock = self.worker.try_lock_for(Duration::from_secs(1));
515 if lock.is_none() {
516 unreachable!("thread pool failed to shutdown properly")
517 }
518 }
519}