use std::{
any::type_name,
collections::VecDeque,
env, mem,
num::NonZero,
sync::{Arc, Barrier, Mutex, mpsc},
thread::JoinHandle,
time::{Duration, Instant},
};
use criterion::{BenchmarkGroup, Criterion, SamplingMode, measurement::WallTime};
use itertools::Itertools;
use many_cpus::ProcessorSet;
use nonempty::{NonEmpty, nonempty};
use rand::{rng, seq::SliceRandom};
use crate::{Payload, WorkDistribution, clean_caches};
extern crate alloc;
pub fn execute_runs<P: Payload, const BATCH_SIZE: u64>(
c: &mut Criterion,
work_distributions: &[WorkDistribution],
) {
let mut g = c.benchmark_group(type_name::<P>());
g.measurement_time(Duration::from_secs(30));
g.sampling_mode(SamplingMode::Flat);
for &distribution in work_distributions {
execute_run::<P, BATCH_SIZE>(&mut g, distribution);
}
g.finish();
}
fn is_fake_run() -> bool {
env::args().any(|a| a == "--test" || a == "--list")
}
fn execute_run<P: Payload, const BATCH_SIZE: u64>(
g: &mut BenchmarkGroup<'_, WallTime>,
work_distribution: WorkDistribution,
) {
let Some(sample_processor_selection) = get_processor_set_pairs(work_distribution) else {
if !is_fake_run() {
eprintln!("Skipping {work_distribution} - system hardware topology is not compatible.");
}
return;
};
if !is_fake_run() {
for (processor_set_1, processor_set_2) in sample_processor_selection {
let cpulist1 = cpulist::emit(
&processor_set_1
.processors()
.iter()
.map(|p| p.id())
.collect_vec(),
);
let cpulist2 = cpulist::emit(
&processor_set_2
.processors()
.iter()
.map(|p| p.id())
.collect_vec(),
);
eprintln!("{work_distribution} reference selection: ({cpulist1}) & ({cpulist2})");
}
}
g.bench_function(work_distribution.to_string(), |b| {
b.iter_custom(move |iters| {
let mut total_duration = Duration::ZERO;
let mut iters_remaining = iters;
while iters_remaining > 0 {
let batch_size = iters_remaining.min(BATCH_SIZE);
iters_remaining -= batch_size;
let processor_set_pairs = get_processor_set_pairs(work_distribution)
.expect("we already validated that we have the right topology");
total_duration +=
BenchmarkBatch::new::<P>(&processor_set_pairs, work_distribution, batch_size)
.wait();
}
total_duration
});
});
}
fn calculate_worker_pair_count() -> NonZero<usize> {
NonZero::new(
ProcessorSet::builder()
.performance_processors_only()
.take_all()
.expect("must have at least one processor")
.processors()
.iter()
.map(|p| p.memory_region_id())
.unique()
.count(),
)
.expect("there must be at least one memory region")
}
const ONE_PROCESSOR: NonZero<usize> = NonZero::new(1).unwrap();
fn get_processor_set_pairs(
distribution: WorkDistribution,
) -> Option<Vec<(ProcessorSet, ProcessorSet)>> {
let worker_pair_count = calculate_worker_pair_count();
let candidates = ProcessorSet::builder()
.performance_processors_only()
.take_all()
.expect("there must be at least one performance processor on any system, by definition");
match distribution {
WorkDistribution::PinnedMemoryRegionPairs => {
if worker_pair_count.get() == 1 {
return None;
}
let first_processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(first_processors.len(), worker_pair_count.get());
let mut partners = first_processors
.processors()
.iter()
.filter_map(|p| {
candidates
.to_builder()
.except([p])
.filter(|c| c.memory_region_id() == p.memory_region_id())
.take(ONE_PROCESSOR)
})
.collect::<VecDeque<_>>();
if partners.len() != worker_pair_count.get() {
return None;
}
let recycled_processor = partners
.pop_front()
.expect("we verified it has the right length");
partners.push_back(recycled_processor);
Some(
first_processors
.processors()
.iter()
.cloned()
.zip(partners)
.map(|(p1, p2_set)| {
let set1 = ProcessorSet::from_processors(nonempty![p1]);
(set1, p2_set)
})
.collect_vec(),
)
}
WorkDistribution::PinnedSameMemoryRegion => {
let first_processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(first_processors.len(), worker_pair_count.get());
let partners = first_processors
.processors()
.iter()
.filter_map(|p| {
candidates
.to_builder()
.except([p])
.filter(|c| c.memory_region_id() == p.memory_region_id())
.take(ONE_PROCESSOR)
})
.collect::<VecDeque<_>>();
if partners.len() != worker_pair_count.get() {
return None;
}
Some(
first_processors
.processors()
.iter()
.cloned()
.zip(partners)
.map(|(p1, p2_set)| {
let set1 = ProcessorSet::from_processors(nonempty![p1]);
(set1, p2_set)
})
.collect_vec(),
)
}
WorkDistribution::PinnedSelf => {
Some(
candidates
.to_builder()
.take(
NonZero::new(worker_pair_count.get() * 2)
.expect("* 2 cannot make a number zero"),
)?
.processors()
.into_iter()
.cloned()
.chunks(2)
.into_iter()
.map(|pair| {
let mut pair = pair.collect_vec();
assert_eq!(pair.len(), 2);
let p1 = pair.pop().expect("we asserted there are two");
let p2 = pair.pop().expect("we asserted there are two");
let set1: ProcessorSet = p1.into();
let set2: ProcessorSet = p2.into();
(set1, set2)
})
.collect_vec(),
)
}
WorkDistribution::PinnedSameProcessor => {
let processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(processors.len(), worker_pair_count.get());
Some(
processors
.processors()
.into_iter()
.map(|p| {
let set1: ProcessorSet = p.clone().into();
let set2: ProcessorSet = p.clone().into();
(set1, set2)
})
.collect_vec(),
)
}
WorkDistribution::UnpinnedMemoryRegionPairs => {
if worker_pair_count.get() == 1 {
return None;
}
let first_processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(first_processors.len(), worker_pair_count.get());
let first_processor_sets = first_processors
.processors()
.into_iter()
.map(|p| {
candidates
.to_builder()
.filter(|c| c.memory_region_id() == p.memory_region_id())
.take_all()
.expect("must have at least one processor in every active memory region")
})
.collect::<VecDeque<_>>();
assert_eq!(first_processor_sets.len(), worker_pair_count.get());
let mut partners = first_processor_sets.clone();
let recycled_processor_set = partners
.pop_front()
.expect("we verified it has the right length");
partners.push_back(recycled_processor_set);
Some(first_processor_sets.into_iter().zip(partners).collect_vec())
}
WorkDistribution::ConstrainedSameMemoryRegion => {
let first_processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(first_processors.len(), worker_pair_count.get());
let partners = first_processors
.processors()
.iter()
.filter_map(|p| {
candidates
.to_builder()
.except([p])
.filter(|c| c.memory_region_id() == p.memory_region_id())
.take(ONE_PROCESSOR)
})
.collect::<VecDeque<_>>();
if partners.len() != worker_pair_count.get() {
return None;
}
let first_processors = first_processors.processors();
let partner_processors = partners
.into_iter()
.map(|set| set.processors().first().clone());
let pairs_single = first_processors
.into_iter()
.cloned()
.zip(partner_processors)
.collect_vec();
Some(
pairs_single
.into_iter()
.map(|(p1, p2)| {
let remaining = candidates
.to_builder()
.except([&p1, &p2])
.filter(|c| c.memory_region_id() == p1.memory_region_id())
.take_all();
match remaining {
None => {
(
ProcessorSet::from_processors(nonempty![p1]),
ProcessorSet::from_processors(nonempty![p2]),
)
}
Some(remaining) => {
let mut remaining_processors =
remaining.processors().into_iter().cloned().collect_vec();
remaining_processors.shuffle(&mut rng());
let (p1_more, p2_more) =
remaining_processors.split_at(remaining_processors.len() / 2);
(
ProcessorSet::from_processors(
NonEmpty::from_vec(
[p1].into_iter()
.chain(p1_more.iter().cloned())
.collect_vec(),
)
.expect(
"we know we have at least one processor for each pair",
),
),
ProcessorSet::from_processors(
NonEmpty::from_vec(
[p2].into_iter()
.chain(p2_more.iter().cloned())
.collect_vec(),
)
.expect(
"we know we have at least one processor for each pair",
),
),
)
}
}
})
.collect_vec(),
)
}
WorkDistribution::UnpinnedSelf => {
Some(
(0..worker_pair_count.get())
.map(|_| (candidates.clone(), candidates.clone()))
.collect(),
)
}
WorkDistribution::UnpinnedPerMemoryRegionSelf => {
let first_processors = candidates
.to_builder()
.different_memory_regions()
.take(worker_pair_count)?;
assert_eq!(first_processors.len(), worker_pair_count.get());
let first_processor_sets = first_processors
.processors()
.into_iter()
.map(|p| {
candidates
.to_builder()
.filter(|c| c.memory_region_id() == p.memory_region_id())
.take_all()
.expect("must have at least one processor in every active memory region")
})
.collect::<VecDeque<_>>();
let partners = first_processor_sets.clone();
Some(first_processor_sets.into_iter().zip(partners).collect_vec())
}
}
}
#[derive(Debug)]
struct BenchmarkBatch {
join_handles: Box<[JoinHandle<Duration>]>,
}
impl BenchmarkBatch {
fn new<P: Payload>(
processor_set_pairs: &[(ProcessorSet, ProcessorSet)],
distribution: WorkDistribution,
batch_size: u64,
) -> Self {
let ready_signal = Arc::new(Barrier::new(1 + 2 * processor_set_pairs.len()));
let mut join_handles = Vec::with_capacity(2 * processor_set_pairs.len());
for processor_set_pair in processor_set_pairs {
let (processor_set_1, processor_set_2) = processor_set_pair;
let (payloads1, payloads2) = (0..batch_size).map(|_| P::new_pair()).unzip();
let payload_barriers = (0..batch_size)
.map(|_| Arc::new(Barrier::new(2)))
.collect_vec();
let (c1_tx, c1_rx) = mpsc::channel::<Vec<P>>();
let (c2_tx, c2_rx) = mpsc::channel::<Vec<P>>();
let ((tx1, rx1), (tx2, rx2)) = match distribution {
WorkDistribution::PinnedMemoryRegionPairs
| WorkDistribution::PinnedSameMemoryRegion
| WorkDistribution::PinnedSameProcessor
| WorkDistribution::UnpinnedMemoryRegionPairs
| WorkDistribution::ConstrainedSameMemoryRegion => {
((c1_tx, c2_rx), (c2_tx, c1_rx))
}
WorkDistribution::PinnedSelf
| WorkDistribution::UnpinnedSelf
| WorkDistribution::UnpinnedPerMemoryRegionSelf => {
((c1_tx, c1_rx), (c2_tx, c2_rx))
}
};
let bag = Arc::new(Mutex::new(vec![
(tx1, rx1, payloads1, payload_barriers.clone()),
(tx2, rx2, payloads2, payload_barriers),
]));
join_handles.push(BenchmarkBatch::spawn_worker(
processor_set_1,
Arc::clone(&ready_signal),
Arc::clone(&bag),
));
join_handles.push(BenchmarkBatch::spawn_worker(
processor_set_2,
Arc::clone(&ready_signal),
Arc::clone(&bag),
));
}
ready_signal.wait();
Self {
join_handles: join_handles.into_boxed_slice(),
}
}
fn wait(&mut self) -> Duration {
let thread_count = self.join_handles.len();
let join_handles = mem::replace(&mut self.join_handles, Box::new([]));
let mut total_elapsed_nanos = 0;
for thread in join_handles {
let elapsed = thread.join().unwrap();
total_elapsed_nanos += elapsed.as_nanos();
}
Duration::from_nanos((total_elapsed_nanos / thread_count as u128) as u64)
}
#[expect(clippy::type_complexity)] fn spawn_worker<P: Payload>(
processor_set: &ProcessorSet,
ready_signal: Arc<Barrier>,
payload_bag: Arc<
Mutex<
Vec<(
mpsc::Sender<Vec<P>>,
mpsc::Receiver<Vec<P>>,
Vec<P>,
Vec<Arc<Barrier>>,
)>,
>,
>,
) -> JoinHandle<Duration> {
processor_set.spawn_thread({
move |_| {
let (payloads_tx, payloads_rx, mut payloads, mut payload_barriers) =
payload_bag.lock().unwrap().pop().unwrap();
for payload in &mut payloads {
payload.prepare();
}
payloads_tx.send(payloads).unwrap();
let mut payloads = payloads_rx.recv().unwrap();
clean_caches();
ready_signal.wait();
let mut total_duration = Duration::ZERO;
for payload in &mut payloads {
payload_barriers
.pop()
.expect("caller gave us wrong number of barriers?!")
.wait();
payload.prepare_local();
let start = Instant::now();
payload.process();
let elapsed = start.elapsed();
total_duration += elapsed;
}
drop(payloads);
total_duration
}
})
}
}
impl Drop for BenchmarkBatch {
fn drop(&mut self) {
let join_handles = mem::replace(&mut self.join_handles, Box::new([]));
for handle in join_handles {
handle.join().unwrap();
}
}
}