1#![allow(clippy::needless_pass_by_value, clippy::items_after_statements)]
12
13use std::collections::HashMap;
14use std::hint::black_box;
15
16use nexus_rt::{
17 Driver, Handler, IntoHandler, Local, PipelineStart, Plugin, Res, ResMut, World, WorldBuilder,
18};
19
20#[inline(always)]
23#[cfg(target_arch = "x86_64")]
24fn rdtsc_start() -> u64 {
25 unsafe {
26 core::arch::x86_64::_mm_lfence();
27 core::arch::x86_64::_rdtsc()
28 }
29}
30
31#[inline(always)]
32#[cfg(target_arch = "x86_64")]
33fn rdtsc_end() -> u64 {
34 unsafe {
35 let mut aux = 0u32;
36 let tsc = core::arch::x86_64::__rdtscp(&raw mut aux);
37 core::arch::x86_64::_mm_lfence();
38 tsc
39 }
40}
41
42fn percentile(sorted: &[u64], p: f64) -> u64 {
43 let idx = ((sorted.len() as f64) * p / 100.0) as usize;
44 sorted[idx.min(sorted.len() - 1)]
45}
46
47fn report(label: &str, samples: &mut [u64]) {
48 samples.sort_unstable();
49 println!(
50 "{:<44} {:>8} {:>8} {:>8}",
51 label,
52 percentile(samples, 50.0),
53 percentile(samples, 99.0),
54 percentile(samples, 99.9),
55 );
56}
57
58#[derive(Clone, Copy)]
61struct MarketTick {
62 symbol: &'static str,
63 price: f64,
64}
65
66struct PriceCache {
67 prices: HashMap<&'static str, f64>,
68}
69
70impl PriceCache {
71 fn new() -> Self {
72 Self {
73 prices: HashMap::new(),
74 }
75 }
76}
77
78struct SignalBuffer {
79 signals: Vec<&'static str>,
80}
81
82impl SignalBuffer {
83 fn new() -> Self {
84 Self {
85 signals: Vec::new(),
86 }
87 }
88}
89
90struct OrderCount(u64);
91
92struct RiskLimits {
94 max_signals_per_tick: u64,
95}
96
97struct TradingPlugin {
100 initial_prices: Vec<(&'static str, f64)>,
101 risk_cap: u64,
102}
103
104impl Plugin for TradingPlugin {
105 fn build(self, world: &mut WorldBuilder) {
106 let mut cache = PriceCache::new();
107 for (sym, price) in self.initial_prices {
108 cache.prices.insert(sym, price);
109 }
110 world.register(cache);
111 world.register(OrderCount(0));
112 world.register(SignalBuffer::new());
113 world.register(RiskLimits {
114 max_signals_per_tick: self.risk_cap,
115 });
116 }
117}
118
119fn check_signals(
123 cache: Res<PriceCache>,
124 mut signals: ResMut<SignalBuffer>,
125 mut tick_count: Local<u64>,
126 tick: MarketTick,
127) -> MarketTick {
128 *tick_count += 1;
129 if let Some(&prev) = cache.prices.get(tick.symbol) {
130 let delta = (tick.price - prev).abs();
131 if delta > 50.0 {
132 signals.signals.push(tick.symbol);
133 }
134 }
135 tick
136}
137
138fn update_price(mut cache: ResMut<PriceCache>, tick: MarketTick) -> MarketTick {
140 cache.prices.insert(tick.symbol, tick.price);
141 tick
142}
143
144fn count_trades(
146 limits: Res<RiskLimits>,
147 mut signals: ResMut<SignalBuffer>,
148 mut orders: ResMut<OrderCount>,
149 _tick: MarketTick,
150) {
151 let cap = limits.max_signals_per_tick;
152 for _symbol in signals.signals.drain(..) {
153 if orders.0 < cap {
154 orders.0 += 1;
155 }
156 }
157}
158
159struct MarketDataInstaller;
162
163struct MarketDataHandle {
164 pipeline: Box<dyn Handler<MarketTick>>,
165}
166
167impl Driver for MarketDataInstaller {
168 type Handle = MarketDataHandle;
169
170 fn install(self, world: &mut WorldBuilder) -> MarketDataHandle {
171 let r = world.registry_mut();
172 let pipeline = PipelineStart::<MarketTick>::new()
173 .stage(check_signals, r)
174 .stage(update_price, r)
175 .stage(count_trades, r)
176 .build();
177 MarketDataHandle {
178 pipeline: Box::new(pipeline),
179 }
180 }
181}
182
183impl MarketDataHandle {
184 fn poll(&mut self, world: &mut World, ticks: &[MarketTick]) {
191 if ticks.is_empty() {
192 return;
193 }
194 world.next_sequence();
195 for tick in ticks {
196 self.pipeline.run(world, *tick);
197 }
198 }
199}
200
201fn main() {
204 let mut wb = WorldBuilder::new();
207 wb.install_plugin(TradingPlugin {
208 initial_prices: vec![("BTC", 50_000.0), ("ETH", 3_000.0)],
209 risk_cap: 100,
210 });
211 let mut md = wb.install_driver(MarketDataInstaller);
212 let mut world = wb.build();
213
214 fn on_signal(signals: Res<SignalBuffer>, _event: ()) {
216 black_box(signals.signals.len());
217 }
218 let mut signal_handler = on_signal.into_handler(world.registry_mut());
219
220 let ticks = [
223 MarketTick {
224 symbol: "BTC",
225 price: 50_100.0,
226 }, MarketTick {
228 symbol: "ETH",
229 price: 3_010.0,
230 }, MarketTick {
232 symbol: "BTC",
233 price: 49_900.0,
234 }, ];
236
237 md.poll(&mut world, &ticks);
238
239 assert_eq!(world.resource::<OrderCount>().0, 2);
241
242 assert!(signal_handler.inputs_changed(&world));
244 signal_handler.run(&mut world, ());
245
246 world.next_sequence();
248 assert!(!signal_handler.inputs_changed(&world));
249
250 println!("Correctness checks passed.\n");
251
252 const WARMUP: usize = 1_000;
255 const ITERATIONS: usize = 1_000;
256
257 let bench_ticks = [
259 MarketTick {
260 symbol: "BTC",
261 price: 50_100.0,
262 },
263 MarketTick {
264 symbol: "ETH",
265 price: 3_100.0,
266 },
267 MarketTick {
268 symbol: "BTC",
269 price: 50_200.0,
270 },
271 MarketTick {
272 symbol: "ETH",
273 price: 3_200.0,
274 },
275 ];
276
277 println!(
278 "=== nexus-rt Dispatch Latency (cycles, {} iterations) ===\n",
279 ITERATIONS
280 );
281 println!(
282 "{:<44} {:>8} {:>8} {:>8}",
283 "Operation", "p50", "p99", "p999"
284 );
285 println!("{}", "-".repeat(72));
286
287 {
289 let tick = bench_ticks[0];
290 for _ in 0..WARMUP {
291 world.next_sequence();
292 md.pipeline.run(&mut world, black_box(tick));
293 }
294 let mut samples = Vec::with_capacity(ITERATIONS);
295 for _ in 0..ITERATIONS {
296 world.next_sequence();
297 let start = rdtsc_start();
298 md.pipeline.run(&mut world, black_box(tick));
299 let end = rdtsc_end();
300 samples.push(end.wrapping_sub(start));
301 }
302 report("single tick (dyn pipeline, 3 stages)", &mut samples);
303 }
304
305 {
307 for _ in 0..WARMUP {
308 black_box(());
309 signal_handler.run(&mut world, ());
310 }
311 let mut samples = Vec::with_capacity(ITERATIONS);
312 for _ in 0..ITERATIONS {
313 let start = rdtsc_start();
314 black_box(());
315 signal_handler.run(&mut world, ());
316 let end = rdtsc_end();
317 samples.push(end.wrapping_sub(start));
318 }
319 report("handler dispatch (1 param, Res<T>)", &mut samples);
320 }
321
322 {
324 for _ in 0..WARMUP {
325 md.poll(&mut world, &bench_ticks);
326 }
327 let mut samples = Vec::with_capacity(ITERATIONS);
328 for _ in 0..ITERATIONS {
329 let start = rdtsc_start();
330 md.poll(&mut world, black_box(&bench_ticks));
331 let end = rdtsc_end();
332 samples.push(end.wrapping_sub(start));
333 }
334 let mut per_tick: Vec<u64> = samples
335 .iter()
336 .map(|&s| s / bench_ticks.len() as u64)
337 .collect();
338 report("4-tick poll (total)", &mut samples);
339 report("4-tick poll (per tick)", &mut per_tick);
340 }
341
342 {
344 world.next_sequence(); for _ in 0..WARMUP {
346 black_box(signal_handler.inputs_changed(&world));
347 }
348 let mut samples = Vec::with_capacity(ITERATIONS);
349 for _ in 0..ITERATIONS {
350 let start = rdtsc_start();
351 black_box(signal_handler.inputs_changed(&world));
352 let end = rdtsc_end();
353 samples.push(end.wrapping_sub(start));
354 }
355 report("inputs_changed (1 param, stale)", &mut samples);
356 }
357
358 println!();
359}