#![allow(
missing_docs,
clippy::let_underscore_must_use,
reason = "No need for API documentation in example code"
)]
use std::hint::black_box;
use std::sync::mpsc;
use criterion::{Criterion, criterion_group, criterion_main};
use many_cpus_benchmarking::{Payload, WorkDistribution, execute_runs};
criterion_group!(benches, entrypoint);
criterion_main!(benches);
fn entrypoint(c: &mut Criterion) {
execute_runs::<ProducerConsumerChannels, 500>(
c,
WorkDistribution::all_with_unique_processors(),
);
}
#[derive(Debug)]
struct ProducerConsumerChannels {
rx: mpsc::Receiver<u64>,
tx: mpsc::Sender<u64>,
is_primary_producer: bool,
}
impl Payload for ProducerConsumerChannels {
fn new_pair() -> (Self, Self) {
let (producer_tx, consumer_rx) = mpsc::channel::<u64>();
let (consumer_tx, producer_rx) = mpsc::channel::<u64>();
let worker1 = Self {
rx: producer_rx,
tx: producer_tx,
is_primary_producer: true,
};
let worker2 = Self {
rx: consumer_rx,
tx: consumer_tx,
is_primary_producer: false,
};
(worker1, worker2)
}
fn prepare(&mut self) {
const INITIAL_MESSAGE_COUNT: usize = 5000;
for i in 0..INITIAL_MESSAGE_COUNT {
self.tx.send(i as u64).unwrap();
}
}
fn process(&mut self) {
const OPERATION_COUNT: usize = 25000;
if self.is_primary_producer {
for i in 0..OPERATION_COUNT {
_ = self.tx.send(i as u64);
if i % 10 == 0
&& let Ok(received) = self.rx.try_recv()
{
black_box(received);
}
}
} else {
for i in 0..OPERATION_COUNT {
let received = self
.rx
.recv()
.expect("Channel should not be closed during benchmark");
let processed_value = received.wrapping_mul(2);
black_box(processed_value);
if i % 5 == 0 {
_ = self.tx.send(processed_value);
}
}
}
}
}