use rayon::prelude::*;
use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
use std::sync::Arc;
use super::source_cache::{ParsedSourceLru, SourceHash};
pub fn parse_corpus_parallel<T, F>(
sources: &[(Vec<u8>, Vec<u8>)],
cache: &ParsedSourceLru<T>,
parse: F,
) -> Vec<Arc<T>>
where
T: Send + Sync,
F: Fn(&[u8]) -> T + Sync,
{
let keys: Vec<SourceHash> = sources
.par_iter()
.map(|(content, extra)| SourceHash::of(content, extra))
.collect();
let mut unique_indices = Vec::with_capacity(keys.len());
let mut seen = HashSet::with_capacity_and_hasher(keys.len(), Default::default());
for (idx, key) in keys.iter().enumerate() {
if seen.insert(*key) {
unique_indices.push(idx);
}
}
let unique_parsed: Vec<(SourceHash, Arc<T>)> = unique_indices
.into_par_iter()
.map(|idx| {
let (content, extra) = &sources[idx];
let arc = cache.get_or_parse(content, extra, |s| parse(s));
(keys[idx], arc)
})
.collect();
let lookup: HashMap<SourceHash, Arc<T>> = unique_parsed.into_iter().collect();
keys.into_iter()
.enumerate()
.map(|(idx, key)| {
if let Some(parsed) = lookup.get(&key) {
parsed.clone()
} else {
let (content, extra) = &sources[idx];
cache.get_or_parse(content, extra, |s| parse(s))
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn distinct_corpus_parses_correctly() {
let cache: ParsedSourceLru<usize> = ParsedSourceLru::with_capacity(16);
let sources: Vec<(Vec<u8>, Vec<u8>)> = (0..10)
.map(|i| {
let content = format!("source {}", i).into_bytes();
(content, vec![])
})
.collect();
let results = parse_corpus_parallel(&sources, &cache, |src| src.len());
assert_eq!(results.len(), 10);
for (i, arc) in results.iter().enumerate() {
assert_eq!(**arc, format!("source {}", i).len());
}
}
#[test]
fn shared_content_dedups_parse_calls() {
let cache: ParsedSourceLru<usize> = ParsedSourceLru::with_capacity(16);
let calls = AtomicUsize::new(0);
let sources: Vec<(Vec<u8>, Vec<u8>)> = vec![
(b"alpha".to_vec(), b"".to_vec()),
(b"beta".to_vec(), b"".to_vec()),
(b"alpha".to_vec(), b"".to_vec()),
(b"gamma".to_vec(), b"x".to_vec()),
(b"beta".to_vec(), b"".to_vec()),
(b"alpha".to_vec(), b"".to_vec()),
(b"gamma".to_vec(), b"x".to_vec()),
(b"beta".to_vec(), b"".to_vec()),
(b"alpha".to_vec(), b"".to_vec()),
(b"gamma".to_vec(), b"x".to_vec()),
(b"beta".to_vec(), b"".to_vec()),
(b"alpha".to_vec(), b"".to_vec()),
(b"gamma".to_vec(), b"x".to_vec()),
(b"beta".to_vec(), b"".to_vec()),
(b"alpha".to_vec(), b"".to_vec()),
(b"gamma".to_vec(), b"x".to_vec()),
(b"beta".to_vec(), b"".to_vec()),
(b"alpha".to_vec(), b"".to_vec()),
(b"gamma".to_vec(), b"x".to_vec()),
(b"beta".to_vec(), b"".to_vec()),
];
let _results = parse_corpus_parallel(&sources, &cache, |src| {
calls.fetch_add(1, Ordering::SeqCst);
src.len()
});
assert_eq!(calls.load(Ordering::SeqCst), 3);
assert_eq!(cache.len(), 3);
}
#[test]
fn empty_corpus_returns_empty() {
let cache: ParsedSourceLru<usize> = ParsedSourceLru::with_capacity(4);
let sources: Vec<(Vec<u8>, Vec<u8>)> = vec![];
let results = parse_corpus_parallel(&sources, &cache, |src| src.len());
assert!(results.is_empty());
}
#[test]
fn parallel_parse_overlaps_workers() {
let cache: ParsedSourceLru<usize> = ParsedSourceLru::with_capacity(32);
let n = rayon::current_num_threads().max(2) * 4;
let sources: Vec<(Vec<u8>, Vec<u8>)> = (0..n)
.map(|i| (format!("slow{}", i).into_bytes(), vec![]))
.collect();
let active = AtomicUsize::new(0);
let max_active = AtomicUsize::new(0);
let results = parse_corpus_parallel(&sources, &cache, |src| {
let now_active = active.fetch_add(1, Ordering::SeqCst) + 1;
let mut observed = max_active.load(Ordering::SeqCst);
while now_active > observed {
match max_active.compare_exchange(
observed,
now_active,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(next) => observed = next,
}
}
let mut acc = 0usize;
for i in 0..50_000usize {
acc = acc.wrapping_add(i ^ src.len());
}
std::hint::black_box(acc);
active.fetch_sub(1, Ordering::SeqCst);
src.len()
});
assert_eq!(results.len(), n);
for (i, arc) in results.iter().enumerate() {
assert_eq!(
**arc,
format!("slow{}", i).len(),
"result value mismatch at index {}",
i
);
}
if rayon::current_num_threads() > 1 {
assert!(
max_active.load(Ordering::SeqCst) > 1,
"parse closures did not overlap across rayon workers"
);
}
}
#[test]
fn zero_capacity_cache_still_returns_all() {
let cache: ParsedSourceLru<usize> = ParsedSourceLru::with_capacity(0);
let sources: Vec<(Vec<u8>, Vec<u8>)> = vec![(b"dup".to_vec(), b"".to_vec()); 5];
let results = parse_corpus_parallel(&sources, &cache, |src| src.len());
assert_eq!(results.len(), 5);
for arc in &results {
assert_eq!(**arc, 3);
}
}
#[test]
fn massive_identical_corpus_no_deadlock() {
let cache: ParsedSourceLru<usize> = ParsedSourceLru::with_capacity(1);
let n = 10_000;
let sources: Vec<(Vec<u8>, Vec<u8>)> = vec![(b"identical".to_vec(), b"".to_vec()); n];
let calls = AtomicUsize::new(0);
let results = parse_corpus_parallel(&sources, &cache, |src| {
calls.fetch_add(1, Ordering::SeqCst);
src.len()
});
assert_eq!(results.len(), n);
for arc in &results {
assert_eq!(**arc, 9);
}
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
}