use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use std::time::Instant;
use tokio::runtime::Runtime as TokioRuntime;
use ringkernel_core::hlc::HlcTimestamp;
use ringkernel_core::k2k::{DeliveryStatus, K2KBuilder, K2KMessage};
use ringkernel_core::message::{MessageEnvelope, MessageHeader};
use ringkernel_core::queue::{MessageQueue, SpscQueue};
use ringkernel_core::runtime::KernelId;
fn make_envelope(payload_size: usize) -> MessageEnvelope {
MessageEnvelope {
header: MessageHeader::new(1, 1, 2, payload_size, HlcTimestamp::now(1)),
payload: vec![0u8; payload_size],
}
}
fn make_k2k_message(payload_size: usize) -> K2KMessage {
K2KMessage::new(
KernelId::new("source_kernel"),
KernelId::new("dest_kernel"),
make_envelope(payload_size),
HlcTimestamp::now(1),
)
}
fn bench_k2k_message_creation(c: &mut Criterion) {
let mut group = c.benchmark_group("k2k/message_creation");
for payload_size in [64, 256, 1024, 4096].iter() {
group.throughput(Throughput::Bytes(*payload_size as u64));
group.bench_with_input(
BenchmarkId::new("create_message", payload_size),
payload_size,
|b, &size| {
b.iter(|| {
let msg = make_k2k_message(size);
black_box(msg);
});
},
);
}
group.finish();
}
fn bench_k2k_broker(c: &mut Criterion) {
let mut group = c.benchmark_group("k2k/broker");
group.sample_size(100);
group.bench_function("create_default", |b| {
b.iter(|| {
let broker = K2KBuilder::new().build();
black_box(broker);
});
});
group.bench_function("create_with_config", |b| {
b.iter(|| {
let broker = K2KBuilder::new()
.max_pending_messages(1024)
.delivery_timeout_ms(5000)
.build();
black_box(broker);
});
});
group.bench_function("register_endpoint", |b| {
let broker = K2KBuilder::new().build();
let mut counter = 0u64;
b.iter(|| {
counter += 1;
let kernel_id = KernelId::new(format!("kernel_{}", counter));
let endpoint = broker.register(kernel_id);
black_box(endpoint);
});
});
group.finish();
}
fn bench_k2k_endpoint(c: &mut Criterion) {
let mut group = c.benchmark_group("k2k/endpoint");
group.sample_size(50);
let rt = TokioRuntime::new().unwrap();
let broker = K2KBuilder::new().max_pending_messages(4096).build();
let endpoint1 = broker.register(KernelId::new("kernel_1"));
let mut endpoint2 = broker.register(KernelId::new("kernel_2"));
for payload_size in [64, 256, 1024].iter() {
group.throughput(Throughput::Bytes(*payload_size as u64));
group.bench_with_input(
BenchmarkId::new("send_async", payload_size),
payload_size,
|b, &size| {
b.iter(|| {
rt.block_on(async {
let envelope = make_envelope(size);
let receipt = endpoint1.send(KernelId::new("kernel_2"), envelope).await;
let _ = black_box(receipt);
});
while endpoint2.try_receive().is_some() {}
});
},
);
}
group.finish();
}
fn bench_k2k_throughput(c: &mut Criterion) {
let mut group = c.benchmark_group("k2k/throughput");
group.sample_size(50);
let rt = TokioRuntime::new().unwrap();
let broker = K2KBuilder::new().max_pending_messages(8192).build();
let sender = broker.register(KernelId::new("sender"));
let mut receiver = broker.register(KernelId::new("receiver"));
for batch_size in [10, 50, 100].iter() {
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(
BenchmarkId::new("batch_send", batch_size),
batch_size,
|b, &size| {
b.iter(|| {
rt.block_on(async {
for _ in 0..size {
let envelope = make_envelope(256);
let receipt = sender.send(KernelId::new("receiver"), envelope).await;
let _ = black_box(receipt);
}
});
while let Some(received) = receiver.try_receive() {
black_box(received);
}
});
},
);
}
group.finish();
}
fn bench_lock_free_queue(c: &mut Criterion) {
let mut group = c.benchmark_group("k2k/lock_free_queue");
group.sample_size(200);
for capacity in [256, 1024, 4096].iter() {
group.bench_with_input(BenchmarkId::new("create", capacity), capacity, |b, &cap| {
b.iter(|| {
let queue = SpscQueue::new(cap);
black_box(queue);
});
});
}
let queue = SpscQueue::new(1024);
let envelope = make_envelope(256);
group.bench_function("single_enqueue_dequeue", |b| {
b.iter(|| {
queue.try_enqueue(envelope.clone()).unwrap();
let msg = queue.try_dequeue().unwrap();
black_box(msg);
});
});
for batch in [10, 50, 100].iter() {
group.bench_with_input(
BenchmarkId::new("batch_operations", batch),
batch,
|b, &size| {
let queue = SpscQueue::new(1024);
let envelope = make_envelope(256);
b.iter(|| {
for _ in 0..size {
queue.try_enqueue(envelope.clone()).unwrap();
}
for _ in 0..size {
let msg = queue.try_dequeue().unwrap();
black_box(msg);
}
});
},
);
}
group.finish();
}
fn bench_multi_sender(c: &mut Criterion) {
let mut group = c.benchmark_group("k2k/multi_sender");
group.sample_size(30);
let rt = TokioRuntime::new().unwrap();
for num_senders in [2, 4, 8].iter() {
group.bench_with_input(
BenchmarkId::new("senders", num_senders),
num_senders,
|b, &n| {
let broker = K2KBuilder::new().max_pending_messages(2048).build();
let mut receiver = broker.register(KernelId::new("receiver"));
let senders: Vec<_> = (0..n)
.map(|i| broker.register(KernelId::new(format!("sender_{}", i))))
.collect();
b.iter(|| {
rt.block_on(async {
for (i, sender) in senders.iter().enumerate() {
let envelope = MessageEnvelope {
header: MessageHeader::new(
1,
i as u64 + 1,
0,
256,
HlcTimestamp::now(i as u64 + 1),
),
payload: vec![0u8; 256],
};
let _ = sender.send(KernelId::new("receiver"), envelope).await;
}
});
let mut count = 0;
while let Some(msg) = receiver.try_receive() {
black_box(msg);
count += 1;
}
black_box(count);
});
},
);
}
group.finish();
}
fn bench_k2k_validation(c: &mut Criterion) {
let mut group = c.benchmark_group("k2k_validation");
group.bench_function("lock_free_guarantee", |b| {
let queue = SpscQueue::new(1024);
let envelope = make_envelope(256);
b.iter_custom(|iters| {
let start = Instant::now();
for _ in 0..iters {
queue.try_enqueue(envelope.clone()).unwrap();
let msg = queue.try_dequeue().unwrap();
black_box(msg);
}
start.elapsed()
});
});
group.bench_function("delivery_guarantee", |b| {
let rt = TokioRuntime::new().unwrap();
let broker = K2KBuilder::new().max_pending_messages(1024).build();
let sender = broker.register(KernelId::new("sender"));
let mut receiver = broker.register(KernelId::new("receiver"));
b.iter(|| {
rt.block_on(async {
let envelope = make_envelope(256);
let receipt = sender
.send(KernelId::new("receiver"), envelope)
.await
.unwrap();
assert!(
matches!(
receipt.status,
DeliveryStatus::Delivered | DeliveryStatus::Pending
),
"Message delivery must succeed"
);
});
if let Some(received) = receiver.try_receive() {
assert_eq!(received.envelope.payload.len(), 256);
black_box(received);
}
});
});
group.finish();
}
criterion_group!(
benches,
bench_k2k_message_creation,
bench_k2k_broker,
bench_k2k_endpoint,
bench_k2k_throughput,
bench_lock_free_queue,
bench_multi_sender,
bench_k2k_validation,
);
criterion_main!(benches);