use photon_ring::{channel, TryRecvError};
use std::thread;
const NUM_ORDERS: u32 = 20_000;
const LARGE_ORDER_THRESHOLD: u32 = 500;
#[derive(Clone, Copy, Debug)]
#[repr(C)]
struct Order {
id: u32,
price: f64,
quantity: u32,
}
unsafe impl photon_ring::Pod for Order {}
#[derive(Clone, Copy, Debug)]
#[repr(C)]
#[allow(dead_code)]
struct TaggedOrder {
id: u32,
price: f64,
quantity: u32,
tag: u32,
}
unsafe impl photon_ring::Pod for TaggedOrder {}
const TAG_LARGE: u32 = 0;
const TAG_SMALL: u32 = 1;
fn main() {
let (mut publisher, source_subs) = channel::<Order>(2048);
let (mut pub_a, subs_a) = channel::<TaggedOrder>(1024);
let (mut pub_b, subs_b) = channel::<TaggedOrder>(1024);
let mut sub_a = source_subs.subscribe();
let filter_a = thread::spawn(move || loop {
match sub_a.try_recv() {
Ok(order) => {
if order.quantity >= LARGE_ORDER_THRESHOLD {
pub_a.publish(TaggedOrder {
id: order.id,
price: order.price,
quantity: order.quantity,
tag: TAG_LARGE,
});
}
}
Err(TryRecvError::Empty) => core::hint::spin_loop(),
Err(TryRecvError::Lagged { .. }) => {}
}
});
let mut sub_b = source_subs.subscribe();
let filter_b = thread::spawn(move || loop {
match sub_b.try_recv() {
Ok(order) => {
if order.quantity < LARGE_ORDER_THRESHOLD {
pub_b.publish(TaggedOrder {
id: order.id,
price: order.price,
quantity: order.quantity,
tag: TAG_SMALL,
});
}
}
Err(TryRecvError::Empty) => core::hint::spin_loop(),
Err(TryRecvError::Lagged { .. }) => {}
}
});
let mut agg_sub_a = subs_a.subscribe();
let mut agg_sub_b = subs_b.subscribe();
let aggregator = thread::spawn(move || {
let mut large_count = 0u64;
let mut small_count = 0u64;
let mut large_notional = 0.0f64;
let mut small_notional = 0.0f64;
let target = NUM_ORDERS as u64;
loop {
match agg_sub_a.try_recv() {
Ok(order) => {
large_count += 1;
large_notional += order.price * order.quantity as f64;
if large_count <= 3 {
println!(
" [aggregator] large order #{}: qty={}, price={:.2}",
order.id, order.quantity, order.price
);
}
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Lagged { .. }) => {}
}
match agg_sub_b.try_recv() {
Ok(order) => {
small_count += 1;
small_notional += order.price * order.quantity as f64;
if small_count <= 3 {
println!(
" [aggregator] small order #{}: qty={}, price={:.2}",
order.id, order.quantity, order.price
);
}
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Lagged { .. }) => {}
}
if large_count + small_count >= target {
break;
}
if large_count + small_count > 0 {
} else {
core::hint::spin_loop();
}
}
(large_count, small_count, large_notional, small_notional)
});
println!(
"Diamond topology: {NUM_ORDERS} orders -> {{large_filter, small_filter}} -> aggregator\n"
);
for i in 0..NUM_ORDERS {
publisher.publish(Order {
id: i,
price: 50.0 + (i as f64 * 0.005),
quantity: 100 + (i % 1000), });
}
let (large, small, large_val, small_val) = aggregator.join().unwrap();
println!();
println!("--- Results ---");
println!(" Total published: {NUM_ORDERS}");
println!(" Large orders: {large} (notional: ${large_val:.2})");
println!(" Small orders: {small} (notional: ${small_val:.2})");
println!(" Accounted for: {} / {NUM_ORDERS}", large + small);
drop(filter_a);
drop(filter_b);
}