Skip to main content

mock_runtime/
mock_runtime.rs

1//! Market data runtime — plugin, driver, pipeline, latency measurement.
2//!
3//! Demonstrates the full nexus-rt lifecycle with realistic domain types,
4//! then runs 1000 iterations to measure dispatch latency in CPU cycles.
5//!
6//! Run with:
7//! ```bash
8//! taskset -c 0 cargo run --release -p nexus-rt --example mock_runtime
9//! ```
10
11#![allow(clippy::needless_pass_by_value, clippy::items_after_statements)]
12
13use std::collections::HashMap;
14use std::hint::black_box;
15
16use nexus_rt::{
17    Driver, Handler, IntoHandler, Local, PipelineStart, Plugin, Res, ResMut, World, WorldBuilder,
18};
19
20// ── Timing ──────────────────────────────────────────────────────────────
21
22#[inline(always)]
23#[cfg(target_arch = "x86_64")]
24fn rdtsc_start() -> u64 {
25    unsafe {
26        core::arch::x86_64::_mm_lfence();
27        core::arch::x86_64::_rdtsc()
28    }
29}
30
31#[inline(always)]
32#[cfg(target_arch = "x86_64")]
33fn rdtsc_end() -> u64 {
34    unsafe {
35        let mut aux = 0u32;
36        let tsc = core::arch::x86_64::__rdtscp(&raw mut aux);
37        core::arch::x86_64::_mm_lfence();
38        tsc
39    }
40}
41
42fn percentile(sorted: &[u64], p: f64) -> u64 {
43    let idx = ((sorted.len() as f64) * p / 100.0) as usize;
44    sorted[idx.min(sorted.len() - 1)]
45}
46
47fn report(label: &str, samples: &mut [u64]) {
48    samples.sort_unstable();
49    println!(
50        "{:<44} {:>8} {:>8} {:>8}",
51        label,
52        percentile(samples, 50.0),
53        percentile(samples, 99.0),
54        percentile(samples, 99.9),
55    );
56}
57
58// ── Domain types ────────────────────────────────────────────────────────
59
60#[derive(Clone, Copy)]
61struct MarketTick {
62    symbol: &'static str,
63    price: f64,
64}
65
66struct PriceCache {
67    prices: HashMap<&'static str, f64>,
68}
69
70impl PriceCache {
71    fn new() -> Self {
72        Self {
73            prices: HashMap::new(),
74        }
75    }
76}
77
78struct SignalBuffer {
79    signals: Vec<&'static str>,
80}
81
82impl SignalBuffer {
83    fn new() -> Self {
84        Self {
85            signals: Vec::new(),
86        }
87    }
88}
89
90struct OrderCount(u64);
91
92/// Optional resource — may or may not be registered.
93struct RiskLimits {
94    max_signals_per_tick: u64,
95}
96
97// ── Plugin ──────────────────────────────────────────────────────────────
98
99struct TradingPlugin {
100    initial_prices: Vec<(&'static str, f64)>,
101    risk_cap: u64,
102}
103
104impl Plugin for TradingPlugin {
105    fn build(self, world: &mut WorldBuilder) {
106        let mut cache = PriceCache::new();
107        for (sym, price) in self.initial_prices {
108            cache.prices.insert(sym, price);
109        }
110        world.register(cache);
111        world.register(OrderCount(0));
112        world.register(SignalBuffer::new());
113        world.register(RiskLimits {
114            max_signals_per_tick: self.risk_cap,
115        });
116    }
117}
118
119// ── Pipeline stages ─────────────────────────────────────────────────────
120
121/// Compare tick price against cache. Emit signal if delta > threshold.
122fn check_signals(
123    cache: Res<PriceCache>,
124    mut signals: ResMut<SignalBuffer>,
125    mut tick_count: Local<u64>,
126    tick: MarketTick,
127) -> MarketTick {
128    *tick_count += 1;
129    if let Some(&prev) = cache.prices.get(tick.symbol) {
130        let delta = (tick.price - prev).abs();
131        if delta > 50.0 {
132            signals.signals.push(tick.symbol);
133        }
134    }
135    tick
136}
137
138/// Update the price cache with the latest price.
139fn update_price(mut cache: ResMut<PriceCache>, tick: MarketTick) -> MarketTick {
140    cache.prices.insert(tick.symbol, tick.price);
141    tick
142}
143
144/// Count accepted trades against risk limits.
145fn count_trades(
146    limits: Res<RiskLimits>,
147    mut signals: ResMut<SignalBuffer>,
148    mut orders: ResMut<OrderCount>,
149    _tick: MarketTick,
150) {
151    let cap = limits.max_signals_per_tick;
152    for _symbol in signals.signals.drain(..) {
153        if orders.0 < cap {
154            orders.0 += 1;
155        }
156    }
157}
158
159// ── Driver ──────────────────────────────────────────────────────────────
160
161struct MarketDataInstaller;
162
163struct MarketDataHandle {
164    pipeline: Box<dyn Handler<MarketTick>>,
165}
166
167impl Driver for MarketDataInstaller {
168    type Handle = MarketDataHandle;
169
170    fn install(self, world: &mut WorldBuilder) -> MarketDataHandle {
171        let r = world.registry_mut();
172        let pipeline = PipelineStart::<MarketTick>::new()
173            .stage(check_signals, r)
174            .stage(update_price, r)
175            .stage(count_trades, r)
176            .build();
177        MarketDataHandle {
178            pipeline: Box::new(pipeline),
179        }
180    }
181}
182
183impl MarketDataHandle {
184    /// Process a batch of market ticks.
185    ///
186    /// Advances sequence once per batch — all ticks share the same sequence
187    /// number. This amortizes the sequence bump but means `inputs_changed()`
188    /// cannot distinguish individual events within a batch. For per-event
189    /// change detection, move `next_sequence()` inside the loop.
190    fn poll(&mut self, world: &mut World, ticks: &[MarketTick]) {
191        if ticks.is_empty() {
192            return;
193        }
194        world.next_sequence();
195        for tick in ticks {
196            self.pipeline.run(world, *tick);
197        }
198    }
199}
200
201// ── main ────────────────────────────────────────────────────────────────
202
203fn main() {
204    // -- Build ----------------------------------------------------------------
205
206    let mut wb = WorldBuilder::new();
207    wb.install_plugin(TradingPlugin {
208        initial_prices: vec![("BTC", 50_000.0), ("ETH", 3_000.0)],
209        risk_cap: 100,
210    });
211    let mut md = wb.install_driver(MarketDataInstaller);
212    let mut world = wb.build();
213
214    // Standalone handler — demonstrates change detection.
215    fn on_signal(signals: Res<SignalBuffer>, _event: ()) {
216        black_box(signals.signals.len());
217    }
218    let mut signal_handler = on_signal.into_handler(world.registry_mut());
219
220    // -- Correctness check ----------------------------------------------------
221
222    let ticks = [
223        MarketTick {
224            symbol: "BTC",
225            price: 50_100.0,
226        }, // delta=100 > 50 → signal
227        MarketTick {
228            symbol: "ETH",
229            price: 3_010.0,
230        }, // delta=10 < 50 → no signal
231        MarketTick {
232            symbol: "BTC",
233            price: 49_900.0,
234        }, // delta=200 > 50 → signal
235    ];
236
237    md.poll(&mut world, &ticks);
238
239    // 2 signals accepted, risk cap=100 so both go through.
240    assert_eq!(world.resource::<OrderCount>().0, 2);
241
242    // Change detection: SignalBuffer was modified → handler should run.
243    assert!(signal_handler.inputs_changed(&world));
244    signal_handler.run(&mut world, ());
245
246    // Advance sequence, no new writes → handler should skip.
247    world.next_sequence();
248    assert!(!signal_handler.inputs_changed(&world));
249
250    println!("Correctness checks passed.\n");
251
252    // -- Latency measurement --------------------------------------------------
253
254    const WARMUP: usize = 1_000;
255    const ITERATIONS: usize = 1_000;
256
257    // Ticks that exercise the full pipeline path (signal detection + cache write).
258    let bench_ticks = [
259        MarketTick {
260            symbol: "BTC",
261            price: 50_100.0,
262        },
263        MarketTick {
264            symbol: "ETH",
265            price: 3_100.0,
266        },
267        MarketTick {
268            symbol: "BTC",
269            price: 50_200.0,
270        },
271        MarketTick {
272            symbol: "ETH",
273            price: 3_200.0,
274        },
275    ];
276
277    println!(
278        "=== nexus-rt Dispatch Latency (cycles, {} iterations) ===\n",
279        ITERATIONS
280    );
281    println!(
282        "{:<44} {:>8} {:>8} {:>8}",
283        "Operation", "p50", "p99", "p999"
284    );
285    println!("{}", "-".repeat(72));
286
287    // Single tick through dyn pipeline
288    {
289        let tick = bench_ticks[0];
290        for _ in 0..WARMUP {
291            world.next_sequence();
292            md.pipeline.run(&mut world, black_box(tick));
293        }
294        let mut samples = Vec::with_capacity(ITERATIONS);
295        for _ in 0..ITERATIONS {
296            world.next_sequence();
297            let start = rdtsc_start();
298            md.pipeline.run(&mut world, black_box(tick));
299            let end = rdtsc_end();
300            samples.push(end.wrapping_sub(start));
301        }
302        report("single tick (dyn pipeline, 3 stages)", &mut samples);
303    }
304
305    // Standalone handler (1 param, Res<T>)
306    {
307        for _ in 0..WARMUP {
308            black_box(());
309            signal_handler.run(&mut world, ());
310        }
311        let mut samples = Vec::with_capacity(ITERATIONS);
312        for _ in 0..ITERATIONS {
313            let start = rdtsc_start();
314            black_box(());
315            signal_handler.run(&mut world, ());
316            let end = rdtsc_end();
317            samples.push(end.wrapping_sub(start));
318        }
319        report("handler dispatch (1 param, Res<T>)", &mut samples);
320    }
321
322    // 4-tick batch through driver poll
323    {
324        for _ in 0..WARMUP {
325            md.poll(&mut world, &bench_ticks);
326        }
327        let mut samples = Vec::with_capacity(ITERATIONS);
328        for _ in 0..ITERATIONS {
329            let start = rdtsc_start();
330            md.poll(&mut world, black_box(&bench_ticks));
331            let end = rdtsc_end();
332            samples.push(end.wrapping_sub(start));
333        }
334        let mut per_tick: Vec<u64> = samples
335            .iter()
336            .map(|&s| s / bench_ticks.len() as u64)
337            .collect();
338        report("4-tick poll (total)", &mut samples);
339        report("4-tick poll (per tick)", &mut per_tick);
340    }
341
342    // Change detection
343    {
344        world.next_sequence(); // ensure stale
345        for _ in 0..WARMUP {
346            black_box(signal_handler.inputs_changed(&world));
347        }
348        let mut samples = Vec::with_capacity(ITERATIONS);
349        for _ in 0..ITERATIONS {
350            let start = rdtsc_start();
351            black_box(signal_handler.inputs_changed(&world));
352            let end = rdtsc_end();
353            samples.push(end.wrapping_sub(start));
354        }
355        report("inputs_changed (1 param, stale)", &mut samples);
356    }
357
358    println!();
359}