use std::{
any::type_name,
collections::VecDeque,
env, mem,
num::NonZero,
sync::{Arc, Barrier, Mutex, mpsc},
thread::JoinHandle,
time::Duration,
};
use criterion::{BatchSize, BenchmarkGroup, Criterion, SamplingMode, measurement::WallTime};
use itertools::Itertools;
use many_cpus::ProcessorSet;
use nonempty::{NonEmpty, nonempty};
use rand::{rng, seq::SliceRandom};
use crate::{Payload, WorkDistribution, clean_caches};
extern crate alloc;
pub fn execute_runs<P: Payload, const PAYLOAD_MULTIPLIER: usize>(
c: &mut Criterion,
work_distributions: &[WorkDistribution],
) {
let mut g = c.benchmark_group(type_name::<P>());
g.measurement_time(Duration::from_secs(60));
g.sampling_mode(SamplingMode::Flat);
for &distribution in work_distributions {
execute_run::<P, PAYLOAD_MULTIPLIER>(&mut g, distribution);
}
g.finish();
}
fn is_fake_run() -> bool {
env::args().any(|a| a == "--test" || a == "--list")
}
fn execute_run<P: Payload, const PAYLOAD_MULTIPLIER: usize>(
g: &mut BenchmarkGroup<'_, WallTime>,
work_distribution: WorkDistribution,
) {
let Some(sample_processor_selection) = get_processor_set_pairs(work_distribution) else {
if !is_fake_run() {
eprintln!("Skipping {work_distribution} - system hardware topology is not compatible.");
}
return;
};
if !is_fake_run() {
for (processor_set_1, processor_set_2) in sample_processor_selection {
let cpulist1 = cpulist::emit(
&processor_set_1
.processors()
.iter()
.map(|p| p.id())
.collect_vec(),
);
let cpulist2 = cpulist::emit(
&processor_set_2
.processors()
.iter()
.map(|p| p.id())
.collect_vec(),
);
eprintln!("{work_distribution} reference selection: ({cpulist1}) & ({cpulist2})");
}
}
let name = if PAYLOAD_MULTIPLIER == 1 {
format!("{work_distribution}")
} else {
format!("{work_distribution}(x{PAYLOAD_MULTIPLIER})")
};
g.bench_function(name, |b| {
b.iter_batched(
|| {
let processor_set_pairs = get_processor_set_pairs(work_distribution)
.expect("we already validated that we have the right topology");
BenchmarkRun::new::<P, PAYLOAD_MULTIPLIER>(&processor_set_pairs, work_distribution)
},
|run| {
run.wait();
run
},
BatchSize::PerIteration,
);
});
}
fn calculate_worker_pair_count() -> NonZero<usize> {
NonZero::new(
ProcessorSet::builder()
.performance_processors_only()
.take_all()
.expect("must have at least one processor")
.processors()
.iter()
.map(|p| p.memory_region_id())
.unique()
.count(),
)
.expect("there must be at least one memory region")
}
const ONE_PROCESSOR: NonZero<usize> = NonZero::new(1).unwrap();
fn get_processor_set_pairs(
distribution: WorkDistribution,
) -> Option<Vec<(ProcessorSet, ProcessorSet)>> {
let worker_pair_count = calculate_worker_pair_count();
let candidates = ProcessorSet::builder()
.performance_processors_only()
.take_all()
.expect("there must be at least one performance processor on any system, by definition");
match distribution {
WorkDistribution::PinnedMemoryRegionPairs => {
if worker_pair_count.get() == 1 {
return None;
}
let first_processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(first_processors.len(), worker_pair_count.get());
let mut partners = first_processors
.processors()
.iter()
.filter_map(|p| {
candidates
.to_builder()
.except([p])
.filter(|c| c.memory_region_id() == p.memory_region_id())
.take(ONE_PROCESSOR)
})
.collect::<VecDeque<_>>();
if partners.len() != worker_pair_count.get() {
return None;
}
let recycled_processor = partners
.pop_front()
.expect("we verified it has the right length");
partners.push_back(recycled_processor);
Some(
first_processors
.processors()
.iter()
.cloned()
.zip(partners)
.map(|(p1, p2_set)| {
let set1 = ProcessorSet::from_processors(nonempty![p1]);
(set1, p2_set)
})
.collect_vec(),
)
}
WorkDistribution::PinnedSameMemoryRegion => {
let first_processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(first_processors.len(), worker_pair_count.get());
let partners = first_processors
.processors()
.iter()
.filter_map(|p| {
candidates
.to_builder()
.except([p])
.filter(|c| c.memory_region_id() == p.memory_region_id())
.take(ONE_PROCESSOR)
})
.collect::<VecDeque<_>>();
if partners.len() != worker_pair_count.get() {
return None;
}
Some(
first_processors
.processors()
.iter()
.cloned()
.zip(partners)
.map(|(p1, p2_set)| {
let set1 = ProcessorSet::from_processors(nonempty![p1]);
(set1, p2_set)
})
.collect_vec(),
)
}
WorkDistribution::PinnedSelf => {
Some(
candidates
.to_builder()
.take(
NonZero::new(worker_pair_count.get() * 2)
.expect("* 2 cannot make a number zero"),
)?
.processors()
.into_iter()
.cloned()
.chunks(2)
.into_iter()
.map(|pair| {
let mut pair = pair.collect_vec();
assert_eq!(pair.len(), 2);
let p1 = pair.pop().expect("we asserted there are two");
let p2 = pair.pop().expect("we asserted there are two");
let set1: ProcessorSet = p1.into();
let set2: ProcessorSet = p2.into();
(set1, set2)
})
.collect_vec(),
)
}
WorkDistribution::PinnedSameProcessor => {
Some(
candidates
.to_builder()
.take(worker_pair_count)?
.processors()
.into_iter()
.map(|p| {
let set1: ProcessorSet = p.clone().into();
let set2: ProcessorSet = p.clone().into();
(set1, set2)
})
.collect_vec(),
)
}
WorkDistribution::UnpinnedMemoryRegionPairs => {
if worker_pair_count.get() == 1 {
return None;
}
let first_processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(first_processors.len(), worker_pair_count.get());
let first_processor_sets = first_processors
.processors()
.into_iter()
.map(|p| {
candidates
.to_builder()
.filter(|c| c.memory_region_id() == p.memory_region_id())
.take_all()
.expect("must have at least one processor in every active memory region")
})
.collect::<VecDeque<_>>();
assert_eq!(first_processor_sets.len(), worker_pair_count.get());
let mut partners = first_processor_sets.clone();
let recycled_processor_set = partners
.pop_front()
.expect("we verified it has the right length");
partners.push_back(recycled_processor_set);
Some(first_processor_sets.into_iter().zip(partners).collect_vec())
}
WorkDistribution::UnpinnedSameMemoryRegion => {
let first_processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(first_processors.len(), worker_pair_count.get());
let partners = first_processors
.processors()
.iter()
.filter_map(|p| {
candidates
.to_builder()
.except([p])
.filter(|c| c.memory_region_id() == p.memory_region_id())
.take(ONE_PROCESSOR)
})
.collect::<VecDeque<_>>();
if partners.len() != worker_pair_count.get() {
return None;
}
let first_processors = first_processors.processors();
let partner_processors = partners
.into_iter()
.map(|set| set.processors().first().clone());
let pairs_single = first_processors
.into_iter()
.cloned()
.zip(partner_processors)
.collect_vec();
Some(
pairs_single
.into_iter()
.map(|(p1, p2)| {
let remaining = candidates
.to_builder()
.except([&p1, &p2])
.filter(|c| c.memory_region_id() == p1.memory_region_id())
.take_all();
match remaining {
None => {
(
ProcessorSet::from_processors(nonempty![p1]),
ProcessorSet::from_processors(nonempty![p2]),
)
}
Some(remaining) => {
let mut remaining_processors =
remaining.processors().into_iter().cloned().collect_vec();
remaining_processors.shuffle(&mut rng());
let (p1_more, p2_more) =
remaining_processors.split_at(remaining_processors.len() / 2);
(
ProcessorSet::from_processors(
NonEmpty::from_vec(
[p1].into_iter()
.chain(p1_more.iter().cloned())
.collect_vec(),
)
.expect(
"we know we have at least one processor for each pair",
),
),
ProcessorSet::from_processors(
NonEmpty::from_vec(
[p2].into_iter()
.chain(p2_more.iter().cloned())
.collect_vec(),
)
.expect(
"we know we have at least one processor for each pair",
),
),
)
}
}
})
.collect_vec(),
)
}
WorkDistribution::UnpinnedSelf => {
Some(
(0..worker_pair_count.get())
.map(|_| (candidates.clone(), candidates.clone()))
.collect(),
)
}
}
}
#[derive(Debug)]
struct BenchmarkRun {
completed: Arc<Barrier>,
join_handles: Box<[JoinHandle<()>]>,
}
impl BenchmarkRun {
fn new<P: Payload, const PAYLOAD_MULTIPLIER: usize>(
processor_set_pairs: &[(ProcessorSet, ProcessorSet)],
distribution: WorkDistribution,
) -> Self {
let ready_signal = Arc::new(Barrier::new(1 + 2 * processor_set_pairs.len()));
let completed_signal = Arc::new(Barrier::new(1 + 2 * processor_set_pairs.len()));
let mut join_handles = Vec::with_capacity(2 * processor_set_pairs.len());
for processor_set_pair in processor_set_pairs {
let (processor_set_1, processor_set_2) = processor_set_pair;
let (payloads1, payloads2) = (0..PAYLOAD_MULTIPLIER).map(|_| P::new_pair()).unzip();
let (c1_tx, c1_rx) = mpsc::channel::<Vec<P>>();
let (c2_tx, c2_rx) = mpsc::channel::<Vec<P>>();
let ((tx1, rx1), (tx2, rx2)) = match distribution {
WorkDistribution::PinnedMemoryRegionPairs
| WorkDistribution::PinnedSameMemoryRegion
| WorkDistribution::PinnedSameProcessor
| WorkDistribution::UnpinnedMemoryRegionPairs
| WorkDistribution::UnpinnedSameMemoryRegion => {
((c1_tx, c2_rx), (c2_tx, c1_rx))
}
WorkDistribution::PinnedSelf | WorkDistribution::UnpinnedSelf => {
((c1_tx, c1_rx), (c2_tx, c2_rx))
}
};
let bag = Arc::new(Mutex::new(vec![
(tx1, rx1, payloads1),
(tx2, rx2, payloads2),
]));
join_handles.push(BenchmarkRun::spawn_worker(
processor_set_1,
Arc::clone(&ready_signal),
Arc::clone(&completed_signal),
Arc::clone(&bag),
));
join_handles.push(BenchmarkRun::spawn_worker(
processor_set_2,
Arc::clone(&ready_signal),
Arc::clone(&completed_signal),
Arc::clone(&bag),
));
}
ready_signal.wait();
Self {
completed: completed_signal,
join_handles: join_handles.into_boxed_slice(),
}
}
fn wait(&self) {
self.completed.wait();
}
#[expect(clippy::type_complexity)] fn spawn_worker<P: Payload>(
processor_set: &ProcessorSet,
ready_signal: Arc<Barrier>,
completed_signal: Arc<Barrier>,
payload_bag: Arc<Mutex<Vec<(mpsc::Sender<Vec<P>>, mpsc::Receiver<Vec<P>>, Vec<P>)>>>,
) -> JoinHandle<()> {
processor_set.spawn_thread({
move |_| {
let (payloads_tx, payloads_rx, mut payloads) =
payload_bag.lock().unwrap().pop().unwrap();
for payload in &mut payloads {
payload.prepare();
}
payloads_tx.send(payloads).unwrap();
let mut payloads = payloads_rx.recv().unwrap();
clean_caches();
ready_signal.wait();
for payload in &mut payloads {
payload.process();
}
completed_signal.wait();
drop(payloads);
}
})
}
}
impl Drop for BenchmarkRun {
fn drop(&mut self) {
let join_handles = mem::replace(&mut self.join_handles, Box::new([]));
for handle in join_handles {
handle.join().unwrap();
}
}
}