use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use ringkernel_core::hlc::HlcTimestamp;
use ringkernel_core::message::{MessageEnvelope, MessageHeader};
use ringkernel_core::queue::{MessageQueue, SpscQueue};
const QUEUE_CAPACITY: usize = 4096;
const PAYLOAD_BYTES: usize = 256;
fn run(duration: Duration) -> (u64, u64) {
let queue = Arc::new(SpscQueue::new(QUEUE_CAPACITY));
let stop = Arc::new(AtomicBool::new(false));
let envelope_template = MessageEnvelope {
header: MessageHeader::new(1, 0, 1, 0, HlcTimestamp::now(1)),
payload: Vec::new(),
..Default::default()
};
let producer_queue = Arc::clone(&queue);
let producer_stop = Arc::clone(&stop);
let producer_env = envelope_template.clone();
let producer = thread::spawn(move || {
let mut ops: u64 = 0;
while !producer_stop.load(Ordering::Relaxed) {
if producer_queue.try_enqueue(producer_env.clone()).is_ok() {
ops += 1;
} else {
std::hint::spin_loop();
}
}
ops
});
let consumer_queue = Arc::clone(&queue);
let consumer_stop = Arc::clone(&stop);
let consumer = thread::spawn(move || {
let mut ops: u64 = 0;
while !consumer_stop.load(Ordering::Relaxed) || !consumer_queue.is_empty() {
if consumer_queue.try_dequeue().is_ok() {
ops += 1;
} else {
std::hint::spin_loop();
}
}
ops
});
thread::sleep(duration);
stop.store(true, Ordering::Relaxed);
let p_ops = producer.join().expect("producer joined");
let c_ops = consumer.join().expect("consumer joined");
(p_ops, c_ops)
}
#[test]
#[ignore] fn spsc_two_thread_10s() {
let duration = Duration::from_secs(10);
println!();
println!("══════════════════════════════════════════════════════════════════");
println!(
" SPSC Two-Thread Throughput ({}-second run)",
duration.as_secs()
);
println!(
" Queue capacity: {}, payload: {} B",
QUEUE_CAPACITY, PAYLOAD_BYTES
);
println!("══════════════════════════════════════════════════════════════════");
let _ = run(Duration::from_secs(1));
let t0 = Instant::now();
let (produced, consumed) = run(duration);
let elapsed = t0.elapsed().as_secs_f64();
let p_rate = produced as f64 / elapsed;
let c_rate = consumed as f64 / elapsed;
println!();
println!(" Produced: {produced} ({:.2} Mmsg/s)", p_rate / 1e6);
println!(" Consumed: {consumed} ({:.2} Mmsg/s)", c_rate / 1e6);
println!(" Elapsed: {elapsed:.2} s");
println!();
assert!(produced > 1_000, "producer should make progress");
assert!(consumed > 1_000, "consumer should make progress");
assert!(
produced.saturating_sub(consumed) <= QUEUE_CAPACITY as u64,
"producer/consumer delta should be ≤ queue capacity"
);
}