use std::sync::Arc;
use super::match_list;
use crate::one_shot::matcher::match_list_impl;
use crate::{Match, Options};
mod thread_slice;
mod threaded_vec;
use thread_slice::ThreadSlice;
use threaded_vec::ThreadedVec;
pub fn match_list_parallel<S1: AsRef<str>, S2: AsRef<str> + Sync + Send>(
needle: S1,
haystacks: &[S2],
opts: Options,
max_threads: usize,
) -> Vec<Match> {
let thread_count = choose_thread_count(haystacks.len(), opts.max_typos).clamp(max_threads, 1);
if thread_count == 1 {
return match_list(needle, haystacks, opts);
}
let mut matches = match opts.max_typos {
None => match_list_parallel_fixed(needle, haystacks, opts, thread_count),
_ => match_list_parallel_expandable(needle, haystacks, opts, thread_count),
};
if opts.sort {
#[cfg(feature = "parallel_sort")]
{
use rayon::prelude::*;
matches.par_sort();
}
#[cfg(not(feature = "parallel_sort"))]
matches.sort_unstable();
}
matches
}
fn match_list_parallel_fixed<S1: AsRef<str>, S2: AsRef<str> + Sync + Send>(
needle: S1,
haystacks: &[S2],
opts: Options,
thread_count: usize,
) -> Vec<Match> {
assert!(opts.max_typos.is_none(), "max_typos must be None");
let mut matches = Vec::with_capacity(haystacks.len());
#[allow(clippy::uninit_vec)]
unsafe {
matches.set_len(haystacks.len())
};
let mut matches_remaining_slice = matches.as_mut_slice();
let items_per_thread = haystacks.len().div_ceil(thread_count);
std::thread::scope(|s| {
for (thread_idx, haystacks) in haystacks.chunks(items_per_thread).enumerate() {
assert!(thread_idx < thread_count, "thread index out of bounds");
let (matches_slice, remaining_slice) =
matches_remaining_slice.split_at_mut(haystacks.len());
matches_remaining_slice = remaining_slice;
let needle = needle.as_ref().to_owned();
let mut thread_slice = ThreadSlice::new(matches_slice);
s.spawn(move || {
match_list_impl(
needle,
haystacks,
(thread_idx * items_per_thread) as u32,
opts,
&mut thread_slice,
)
});
}
});
matches
}
fn match_list_parallel_expandable<S1: AsRef<str>, S2: AsRef<str> + Sync + Send>(
needle: S1,
haystacks: &[S2],
opts: Options,
thread_count: usize,
) -> Vec<Match> {
assert!(opts.max_typos.is_some(), "max_typos must be Some");
let batch_size = 1024;
let matches = Arc::new(ThreadedVec::new(batch_size, thread_count));
let items_per_thread = haystacks.len().div_ceil(thread_count);
std::thread::scope(|s| {
for (thread_idx, haystacks) in haystacks.chunks(items_per_thread).enumerate() {
assert!(thread_idx < thread_count, "thread index out of bounds");
let needle = needle.as_ref().to_owned();
let mut matches = matches.clone();
s.spawn(move || {
match_list_impl(
needle,
haystacks,
(thread_idx * items_per_thread) as u32,
opts,
&mut matches,
)
});
}
});
Arc::try_unwrap(matches).unwrap().into_vec()
}
fn choose_thread_count(haystacks_len: usize, max_typos: Option<u16>) -> usize {
let min_items_per_thread = match max_typos {
Some(0) => 5000,
Some(1) => 3000,
Some(_) => 2000,
None => 2500,
};
haystacks_len / min_items_per_thread
}