use itertools::Itertools;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use super::Matcher;
use crate::sort::radix_sort_matches;
use crate::{Config, Match, Matchable};
pub fn match_list_parallel<S1: AsRef<str>, S2: Matchable + Sync>(
needle: S1,
haystacks: &[S2],
config: &Config,
threads: usize,
) -> Vec<Match> {
assert!(
haystacks.len() < (u32::MAX as usize),
"haystack index overflow"
);
if needle.as_ref().is_empty() {
return haystacks
.iter()
.enumerate()
.filter(|(_, item)| item.match_str().is_some())
.map(|(index, _)| Match {
index: index as u32,
score: 0,
exact: false,
#[cfg(feature = "match_end_col")]
end_col: 0,
})
.collect();
}
if haystacks.is_empty() {
return vec![];
}
let chunk_size = 512;
let num_chunks = haystacks.len().div_ceil(chunk_size);
let next_chunk = AtomicUsize::new(0);
let needle = needle.as_ref();
let matcher = Matcher::new(needle, config);
thread::scope(|s| {
let handles: Vec<_> = (0..threads)
.map(|_| {
s.spawn(|| {
let mut local_matches = Vec::new();
let mut matcher = matcher.clone();
loop {
let chunk_idx = next_chunk.fetch_add(1, Ordering::Relaxed);
if chunk_idx >= num_chunks {
break;
}
let start = chunk_idx * chunk_size;
let end = (start + chunk_size).min(haystacks.len());
let haystacks_chunk = &haystacks[start..end];
matcher.match_list_into(haystacks_chunk, start as u32, &mut local_matches);
}
if config.sort {
radix_sort_matches(&mut local_matches);
}
local_matches
})
})
.collect();
if config.sort {
handles
.into_iter()
.map(|h| h.join().unwrap())
.kmerge()
.collect()
} else {
handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect()
}
})
}