use photon_ring::{channel_bounded, PublishError, TryRecvError};
use std::thread;
use std::time::Instant;
const NUM_FILLS: u64 = 10_000;
const RING_SIZE: usize = 256;
#[derive(Debug, Clone, Copy)]
#[repr(C)]
#[allow(dead_code)]
struct Fill {
order_id: u64,
price: f64,
quantity: u32,
side: u32,
}
unsafe impl photon_ring::Pod for Fill {}
fn main() {
println!("Backpressure demo: reliable order fill pipeline");
println!(" Ring size: {RING_SIZE} slots");
println!(" Fills: {NUM_FILLS}");
println!(" Watermark: 0 (block as soon as ring is full)");
println!();
let (mut publisher, subscribable) = channel_bounded::<Fill>(RING_SIZE, 0);
let mut subscriber = subscribable.subscribe();
let start = Instant::now();
let consumer = thread::spawn(move || {
let mut received: u64 = 0;
let mut total_value = 0.0f64;
loop {
match subscriber.try_recv() {
Ok(fill) => {
assert_eq!(
fill.order_id, received,
"out-of-order fill: expected {received}, got {}",
fill.order_id
);
total_value += fill.price * fill.quantity as f64;
received += 1;
busy_wait_ns(1_000);
if received == NUM_FILLS {
break;
}
}
Err(TryRecvError::Empty) => {
core::hint::spin_loop();
}
Err(TryRecvError::Lagged { skipped }) => {
panic!("unexpected lag: {skipped} fills lost — bounded channel violated");
}
}
}
(received, total_value)
});
let mut published: u64 = 0;
let mut backpressure_count: u64 = 0;
while published < NUM_FILLS {
let fill = Fill {
order_id: published,
price: 100.0 + (published as f64) * 0.01,
quantity: 100 + (published % 500) as u32,
side: if published.is_multiple_of(2) { 0 } else { 1 },
};
match publisher.try_publish(fill) {
Ok(()) => {
published += 1;
}
Err(PublishError::Full(_)) => {
backpressure_count += 1;
core::hint::spin_loop();
}
}
}
let (received, total_value) = consumer.join().expect("consumer thread panicked");
let elapsed = start.elapsed();
assert_eq!(
published, received,
"MISMATCH: published {published} but received {received}"
);
println!("--- Results ---");
println!(" Published: {published}");
println!(" Received: {received}");
println!(" Dropped: 0 (guaranteed by backpressure)");
println!(" Backpressure stalls:{backpressure_count:>8}");
println!(" Total notional: ${total_value:>14.2}");
println!(" Elapsed: {elapsed:.2?}");
println!(
" Throughput: {:.1}K fills/s",
published as f64 / elapsed.as_secs_f64() / 1_000.0
);
println!();
println!("All {published} fills delivered. Zero drops.");
}
#[inline(never)]
fn busy_wait_ns(ns: u64) {
let start = Instant::now();
let target = std::time::Duration::from_nanos(ns);
while start.elapsed() < target {
core::hint::spin_loop();
}
}