use std::io::{self, Write};
use subms::{SubMsPerfHarness, SubMsStageKind, summarize, summary_to_json};
use subms_mpsc_queue::{MpscQueue, PopResult};
const ENTRIES: usize = 50_000;
const SEED: u64 = 0;
fn main() -> io::Result<()> {
let mut h = SubMsPerfHarness::new("mpsc-queue-features", "rust");
h.input("entries", &ENTRIES.to_string());
h.input("seed", &SEED.to_string());
h.add_meta("subms.recipe.slug", "subms-mpsc-queue");
h.add_meta("subms.recipe.category", "concurrency");
{
h.add_meta("subms.workload.feature", "base");
let q: MpscQueue<u64> = MpscQueue::new();
let stage = h
.stage("base_enqueue", ENTRIES)
.with_kind(SubMsStageKind::HotPath);
for i in 0..ENTRIES as u64 {
stage.time(|| q.push(i));
}
let mut q = q;
let stage = h
.stage("base_dequeue", ENTRIES)
.with_kind(SubMsStageKind::HotPath);
for _ in 0..ENTRIES {
stage.time(|| match q.try_pop() {
PopResult::Some(v) => {
let _ = v;
}
PopResult::Empty | PopResult::Inconsistent => {}
});
}
}
#[cfg(feature = "mpmc")]
{
use subms_mpsc_queue::MpmcQueue;
h.add_meta("subms.workload.feature", "mpmc");
let q: MpmcQueue<u64> = MpmcQueue::new(ENTRIES);
let cap = q.capacity();
let stage = h
.stage("mpmc_enqueue", ENTRIES)
.with_kind(SubMsStageKind::HotPath);
for i in 0..ENTRIES as u64 {
if i as usize >= cap {
break;
}
stage.time(|| {
let _ = q.try_enqueue(i);
});
}
let stage = h
.stage("mpmc_dequeue", ENTRIES)
.with_kind(SubMsStageKind::HotPath);
for _ in 0..ENTRIES {
stage.time(|| {
let _ = q.try_dequeue();
});
}
}
#[cfg(feature = "bounded")]
{
use subms_mpsc_queue::BoundedMpscQueue;
h.add_meta("subms.workload.feature", "bounded");
let q: BoundedMpscQueue<u64> = BoundedMpscQueue::new(ENTRIES);
let cap = q.capacity();
let stage = h
.stage("bounded_enqueue", ENTRIES)
.with_kind(SubMsStageKind::HotPath);
for i in 0..ENTRIES as u64 {
if i as usize >= cap {
break;
}
stage.time(|| {
let _ = q.try_enqueue(i);
});
}
let mut q = q;
let stage = h
.stage("bounded_dequeue", ENTRIES)
.with_kind(SubMsStageKind::HotPath);
for _ in 0..ENTRIES {
stage.time(|| {
let _ = q.try_dequeue();
});
}
}
#[cfg(feature = "batch")]
{
use subms_mpsc_queue::BatchMpscQueue;
h.add_meta("subms.workload.feature", "batch");
const BATCH: usize = 256;
let q: BatchMpscQueue<u64> = BatchMpscQueue::new();
for i in 0..ENTRIES as u64 {
q.push(i);
}
let mut q = q;
let mut buf: Vec<Option<u64>> = (0..BATCH).map(|_| None).collect();
let batches = ENTRIES.div_ceil(BATCH);
let stage = h
.stage("batch_drain", batches)
.with_kind(SubMsStageKind::HotPath);
for _ in 0..batches {
stage.time(|| {
let n = q.try_dequeue_batch(&mut buf);
for slot in buf.iter_mut().take(n) {
let _ = slot.take();
}
});
}
}
#[cfg(feature = "metrics")]
{
use subms_mpsc_queue::MetricsMpscQueue;
h.add_meta("subms.workload.feature", "metrics");
let q: MetricsMpscQueue<u64> = MetricsMpscQueue::new();
let stage = h
.stage("metrics_enqueue", ENTRIES)
.with_kind(SubMsStageKind::HotPath);
for i in 0..ENTRIES as u64 {
stage.time(|| q.push(i));
}
let mut q = q;
let stage = h
.stage("metrics_dequeue", ENTRIES)
.with_kind(SubMsStageKind::HotPath);
for _ in 0..ENTRIES {
stage.time(|| match q.try_pop() {
PopResult::Some(v) => {
let _ = v;
}
PopResult::Empty | PopResult::Inconsistent => {}
});
}
}
#[cfg(feature = "affinity")]
{
use subms_mpsc_queue::set_affinity;
h.add_meta("subms.workload.feature", "affinity");
let stage = h
.stage("affinity_set", 1)
.with_kind(SubMsStageKind::OneShot);
stage.time(|| {
let _ = set_affinity(&[0]);
});
}
let summary = summarize(&h);
let mut stdout = io::stdout();
summary_to_json(&summary, &mut stdout)?;
writeln!(stdout)?;
Ok(())
}