use std::ops::{Bound, RangeBounds};
use std::sync::atomic::{self, AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;
use rayon::ThreadPool;
use crate::pattern::MultiPattern;
use crate::worker::Worker;
pub use nucleo_matcher::{chars, Config, Matcher, Utf32Str, Utf32String};
mod boxcar;
mod par_sort;
pub mod pattern;
mod worker;
#[cfg(test)]
mod tests;
pub struct Item<'a, T> {
pub data: &'a T,
pub matcher_columns: &'a [Utf32String],
}
pub struct Injector<T> {
items: Arc<boxcar::Vec<T>>,
notify: Arc<(dyn Fn() + Sync + Send)>,
}
impl<T> Clone for Injector<T> {
fn clone(&self) -> Self {
Injector {
items: self.items.clone(),
notify: self.notify.clone(),
}
}
}
impl<T> Injector<T> {
pub fn push(&self, value: T, fill_columns: impl FnOnce(&T, &mut [Utf32String])) -> u32 {
let idx = self.items.push(value, fill_columns);
(self.notify)();
idx
}
pub fn injected_items(&self) -> u32 {
self.items.count()
}
pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> {
self.items.get_unchecked(index)
}
pub fn get(&self, index: u32) -> Option<Item<'_, T>> {
self.items.get(index)
}
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
pub struct Match {
pub score: u32,
pub idx: u32,
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
pub struct Status {
pub changed: bool,
pub running: bool,
}
pub struct Snapshot<T: Sync + Send + 'static> {
item_count: u32,
matches: Vec<Match>,
pattern: MultiPattern,
items: Arc<boxcar::Vec<T>>,
}
impl<T: Sync + Send + 'static> Snapshot<T> {
fn clear(&mut self, new_items: Arc<boxcar::Vec<T>>) {
self.item_count = 0;
self.matches.clear();
self.items = new_items
}
fn update(&mut self, worker: &Worker<T>) {
self.item_count = worker.item_count();
self.pattern.clone_from(&worker.pattern);
self.matches.clone_from(&worker.matches);
if !Arc::ptr_eq(&worker.items, &self.items) {
self.items = worker.items.clone()
}
}
pub fn item_count(&self) -> u32 {
self.item_count
}
pub fn pattern(&self) -> &MultiPattern {
&self.pattern
}
pub fn matched_item_count(&self) -> u32 {
self.matches.len() as u32
}
pub fn matched_items(
&self,
range: impl RangeBounds<u32>,
) -> impl ExactSizeIterator<Item = Item<'_, T>> + DoubleEndedIterator + '_ {
let start = match range.start_bound() {
Bound::Included(&start) => start as usize,
Bound::Excluded(&start) => start as usize + 1,
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&end) => end as usize + 1,
Bound::Excluded(&end) => end as usize,
Bound::Unbounded => self.matches.len(),
};
self.matches[start..end]
.iter()
.map(|&m| unsafe { self.items.get_unchecked(m.idx) })
}
#[inline]
pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> {
self.items.get_unchecked(index)
}
#[inline]
pub fn get_item(&self, index: u32) -> Option<Item<'_, T>> {
self.items.get(index)
}
#[inline]
pub fn get_matched_item(&self, n: u32) -> Option<Item<'_, T>> {
self.get_item(self.matches.get(n as usize)?.idx)
}
}
#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq)]
enum State {
Init,
Cleared,
Fresh,
}
impl State {
fn matcher_item_refs(self) -> usize {
match self {
State::Cleared => 1,
State::Init | State::Fresh => 2,
}
}
fn canceled(self) -> bool {
self != State::Fresh
}
fn cleared(self) -> bool {
self != State::Fresh
}
}
pub struct Nucleo<T: Sync + Send + 'static> {
canceled: Arc<AtomicBool>,
should_notify: Arc<AtomicBool>,
worker: Arc<Mutex<Worker<T>>>,
pool: ThreadPool,
state: State,
items: Arc<boxcar::Vec<T>>,
notify: Arc<(dyn Fn() + Sync + Send)>,
snapshot: Snapshot<T>,
pub pattern: MultiPattern,
}
impl<T: Sync + Send + 'static> Nucleo<T> {
pub fn new(
config: Config,
notify: Arc<(dyn Fn() + Sync + Send)>,
num_threads: Option<usize>,
columns: u32,
) -> Self {
let (pool, worker) = Worker::new(num_threads, config, notify.clone(), columns);
Self {
canceled: worker.canceled.clone(),
should_notify: worker.should_notify.clone(),
items: worker.items.clone(),
pool,
pattern: MultiPattern::new(columns as usize),
snapshot: Snapshot {
matches: Vec::with_capacity(2 * 1024),
pattern: MultiPattern::new(columns as usize),
item_count: 0,
items: worker.items.clone(),
},
worker: Arc::new(Mutex::new(worker)),
state: State::Init,
notify,
}
}
pub fn active_injectors(&self) -> usize {
Arc::strong_count(&self.items)
- self.state.matcher_item_refs()
- (Arc::ptr_eq(&self.snapshot.items, &self.items)) as usize
}
pub fn snapshot(&self) -> &Snapshot<T> {
&self.snapshot
}
pub fn injector(&self) -> Injector<T> {
Injector {
items: self.items.clone(),
notify: self.notify.clone(),
}
}
pub fn restart(&mut self, clear_snapshot: bool) {
self.canceled.store(true, Ordering::Relaxed);
self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns()));
self.state = State::Cleared;
if clear_snapshot {
self.snapshot.clear(self.items.clone());
}
}
pub fn update_config(&mut self, config: Config) {
self.worker.lock().update_config(config)
}
pub fn tick(&mut self, timeout: u64) -> Status {
self.should_notify.store(false, atomic::Ordering::Relaxed);
let status = self.pattern.status();
let canceled = status != pattern::Status::Unchanged || self.state.canceled();
let mut res = self.tick_inner(timeout, canceled, status);
if !canceled {
return res;
}
self.state = State::Fresh;
let status2 = self.tick_inner(timeout, false, pattern::Status::Unchanged);
res.changed |= status2.changed;
res.running = status2.running;
res
}
fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status {
let mut inner = if canceled {
self.pattern.reset_status();
self.canceled.store(true, atomic::Ordering::Relaxed);
self.worker.lock_arc()
} else {
let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else {
self.should_notify.store(true, Ordering::Release);
return Status {
changed: false,
running: true,
};
};
worker
};
let changed = inner.running;
let running = canceled || self.items.count() > inner.item_count();
if inner.running {
inner.running = false;
if !inner.was_canceled && !self.state.canceled() {
self.snapshot.update(&inner)
}
}
if running {
inner.pattern.clone_from(&self.pattern);
self.canceled.store(false, atomic::Ordering::Relaxed);
if !canceled {
self.should_notify.store(true, atomic::Ordering::Release);
}
let cleared = self.state.cleared();
if cleared {
inner.items = self.items.clone();
}
self.pool
.spawn(move || unsafe { inner.run(status, cleared) })
}
Status { changed, running }
}
}
impl<T: Sync + Send> Drop for Nucleo<T> {
fn drop(&mut self) {
self.canceled.store(true, atomic::Ordering::Relaxed);
let lock = self.worker.try_lock_for(Duration::from_secs(1));
if lock.is_none() {
unreachable!("thread pool failed to shutdown properly")
}
}
}