Skip to main content

perf_pipeline/
perf_pipeline.rs

1//! Pipeline + Handler dispatch codegen inspection + latency benchmark.
2//!
3//! Run asm inspection (pipelines):
4//! ```bash
5//! cargo asm -p nexus-rt --example perf_pipeline perf_pipeline::bare_3stage_run
6//! cargo asm -p nexus-rt --example perf_pipeline perf_pipeline::option_3stage_run
7//! cargo asm -p nexus-rt --example perf_pipeline perf_pipeline::world_access_run
8//! cargo asm -p nexus-rt --example perf_pipeline perf_pipeline::boxed_pipeline_run
9//! ```
10//!
11//! Run asm inspection (Handler dispatch):
12//! ```bash
13//! cargo asm -p nexus-rt --example perf_pipeline perf_pipeline::probe_handler_res_read
14//! cargo asm -p nexus-rt --example perf_pipeline perf_pipeline::probe_handler_res_mut
15//! cargo asm -p nexus-rt --example perf_pipeline perf_pipeline::probe_handler_two_res
16//! cargo asm -p nexus-rt --example perf_pipeline perf_pipeline::probe_dyn_handler
17//! ```
18//!
19//! Run benchmark:
20//! ```bash
21//! taskset -c 0 cargo run --release -p nexus-rt --example perf_pipeline
22//! ```
23
24use std::hint::black_box;
25
26use nexus_rt::{Handler, IntoHandler, PipelineStart, Res, ResMut, WorldBuilder};
27
28// =============================================================================
29// Bench infrastructure
30// =============================================================================
31
32const ITERATIONS: usize = 100_000;
33const WARMUP: usize = 10_000;
34const BATCH: u64 = 100;
35
36#[inline(always)]
37#[cfg(target_arch = "x86_64")]
38fn rdtsc_start() -> u64 {
39    unsafe {
40        core::arch::x86_64::_mm_lfence();
41        core::arch::x86_64::_rdtsc()
42    }
43}
44
45#[inline(always)]
46#[cfg(target_arch = "x86_64")]
47fn rdtsc_end() -> u64 {
48    unsafe {
49        let mut aux = 0u32;
50        let tsc = core::arch::x86_64::__rdtscp(&raw mut aux);
51        core::arch::x86_64::_mm_lfence();
52        tsc
53    }
54}
55
56fn percentile(sorted: &[u64], p: f64) -> u64 {
57    let idx = ((sorted.len() as f64) * p / 100.0) as usize;
58    sorted[idx.min(sorted.len() - 1)]
59}
60
61fn bench_batched<F: FnMut() -> u64>(name: &str, mut f: F) -> (u64, u64, u64) {
62    for _ in 0..WARMUP {
63        black_box(f());
64    }
65    let mut samples = Vec::with_capacity(ITERATIONS);
66    for _ in 0..ITERATIONS {
67        let start = rdtsc_start();
68        for _ in 0..BATCH {
69            black_box(f());
70        }
71        let end = rdtsc_end();
72        samples.push(end.wrapping_sub(start) / BATCH);
73    }
74    samples.sort_unstable();
75    let p50 = percentile(&samples, 50.0);
76    let p99 = percentile(&samples, 99.0);
77    let p999 = percentile(&samples, 99.9);
78    println!("{:<44} {:>8} {:>8} {:>8}", name, p50, p99, p999);
79    (p50, p99, p999)
80}
81
82fn print_header(title: &str) {
83    println!("=== {} ===\n", title);
84    println!(
85        "{:<44} {:>8} {:>8} {:>8}",
86        "Operation", "p50", "p99", "p999"
87    );
88    println!("{}", "-".repeat(72));
89}
90
91// =============================================================================
92// Pipeline codegen probes
93// =============================================================================
94
95/// 3-stage bare pipeline: multiply, add, shift.
96#[inline(never)]
97pub fn bare_3stage_run(
98    p: &mut nexus_rt::PipelineBuilder<u64, u64, impl FnMut(&mut nexus_rt::World, u64) -> u64>,
99    world: &mut nexus_rt::World,
100    input: u64,
101) -> u64 {
102    p.run(world, input)
103}
104
105/// 3-stage Option pipeline: Some, map, filter.
106#[inline(never)]
107pub fn option_3stage_run(
108    p: &mut nexus_rt::PipelineBuilder<
109        u64,
110        Option<u64>,
111        impl FnMut(&mut nexus_rt::World, u64) -> Option<u64>,
112    >,
113    world: &mut nexus_rt::World,
114    input: u64,
115) -> Option<u64> {
116    p.run(world, input)
117}
118
119/// Pipeline that reads World via pre-resolved Res<T> stages.
120#[inline(never)]
121pub fn world_access_run(
122    p: &mut nexus_rt::PipelineBuilder<u64, u64, impl FnMut(&mut nexus_rt::World, u64) -> u64>,
123    world: &mut nexus_rt::World,
124    input: u64,
125) -> u64 {
126    p.run(world, input)
127}
128
129/// Built Pipeline through dyn dispatch.
130#[inline(never)]
131pub fn boxed_pipeline_run(
132    p: &mut dyn nexus_rt::Handler<u64>,
133    world: &mut nexus_rt::World,
134    input: u64,
135) {
136    p.run(world, input);
137}
138
139/// Baseline: equivalent hand-written function (no pipeline).
140#[inline(never)]
141pub fn baseline_handwritten(world: &mut nexus_rt::World, input: u64) -> u64 {
142    let x = input.wrapping_mul(3);
143    let x = x.wrapping_add(7);
144    let _ = world;
145    x >> 1
146}
147
148// =============================================================================
149// Stage functions for World-accessing pipeline
150// =============================================================================
151
152fn add_resource(val: Res<u64>, x: u64) -> u64 {
153    x.wrapping_add(*val)
154}
155
156fn mul_resource(val: Res<u64>, x: u64) -> u64 {
157    x.wrapping_mul(*val)
158}
159
160fn sub_resource(val: Res<u32>, x: u64) -> u64 {
161    x.wrapping_sub(*val as u64)
162}
163
164// =============================================================================
165// inputs_changed probe functions at various arities
166// =============================================================================
167
168fn ic_1p(_a: Res<u64>, _: ()) {}
169fn ic_2p(_a: Res<u64>, _b: Res<u32>, _: ()) {}
170fn ic_4p(_a: Res<u64>, _b: Res<u32>, _c: Res<bool>, _d: Res<f64>, _: ()) {}
171
172#[allow(clippy::too_many_arguments)]
173fn ic_8p(
174    _a: Res<u64>,
175    _b: Res<u32>,
176    _c: Res<bool>,
177    _d: Res<f64>,
178    _e: Res<i64>,
179    _f: Res<i32>,
180    _g: Res<u8>,
181    _h: Res<u16>,
182    _: (),
183) {
184}
185
186// =============================================================================
187// Handler dispatch probes — SystemParam fetch hot path
188// =============================================================================
189
190fn handler_res_read(counter: Res<u64>, input: u64) {
191    black_box((*counter).wrapping_add(input));
192}
193
194fn handler_res_mut_write(mut counter: ResMut<u64>, input: u64) {
195    *counter = (*counter).wrapping_add(input);
196}
197
198fn handler_two_res(a: Res<u64>, b: Res<u32>, input: u64) {
199    black_box((*a).wrapping_add(input).wrapping_add(*b as u64));
200}
201
202/// Monomorphized Handler dispatch with Res<u64>.
203/// Full path: Handler::run → SystemParam::fetch → World::get_ptr + changed_at + current_sequence.
204#[inline(never)]
205pub fn probe_handler_res_read(
206    sys: &mut impl Handler<u64>,
207    world: &mut nexus_rt::World,
208    input: u64,
209) {
210    sys.run(world, input);
211}
212
213/// Monomorphized Handler dispatch with ResMut<u64>.
214/// Full path: fetch + DerefMut stamps changed_at on write.
215#[inline(never)]
216pub fn probe_handler_res_mut(sys: &mut impl Handler<u64>, world: &mut nexus_rt::World, input: u64) {
217    sys.run(world, input);
218}
219
220/// Monomorphized Handler dispatch with two Res params (tuple fetch).
221#[inline(never)]
222pub fn probe_handler_two_res(sys: &mut impl Handler<u64>, world: &mut nexus_rt::World, input: u64) {
223    sys.run(world, input);
224}
225
226/// Dyn-dispatched Handler — vtable call + SystemParam fetch.
227#[inline(never)]
228pub fn probe_dyn_handler(sys: &mut dyn Handler<u64>, world: &mut nexus_rt::World, input: u64) {
229    sys.run(world, input);
230}
231
232// =============================================================================
233// Main — benchmark
234// =============================================================================
235
236fn main() {
237    let mut wb = WorldBuilder::new();
238    wb.register::<u64>(42);
239    wb.register::<u32>(7);
240    let mut world = wb.build();
241    let r = world.registry_mut();
242
243    // --- Bare 3-stage pipeline (no Option, no World access) ---
244
245    let mut bare = PipelineStart::<u64>::new()
246        .stage(|x: u64| x.wrapping_mul(3), r)
247        .stage(|x: u64| x.wrapping_add(7), r)
248        .stage(|x: u64| x >> 1, r);
249
250    // --- Option 3-stage pipeline ---
251
252    let mut option = PipelineStart::<u64>::new()
253        .stage(
254            |x: u64| -> Option<u64> { if x > 0 { Some(x) } else { None } },
255            r,
256        )
257        .map(|x: u64| x.wrapping_mul(3), r)
258        .filter(|_w, x| *x < 1_000_000);
259
260    // --- World-accessing pipeline (pre-resolved via Res<T>) ---
261
262    let mut world_resolved = PipelineStart::<u64>::new()
263        .stage(add_resource, r)
264        .stage(mul_resource, r);
265
266    // --- World-accessing 3-stage pipeline ---
267
268    let mut stage_3 = PipelineStart::<u64>::new()
269        .stage(add_resource, r)
270        .stage(mul_resource, r)
271        .stage(sub_resource, r);
272
273    // --- Built (boxed) pipeline ---
274
275    let mut boxed = PipelineStart::<u64>::new()
276        .stage(|x: u64| x.wrapping_mul(3), r)
277        .stage(|x: u64| x.wrapping_add(7), r)
278        .stage(|_x: u64| {}, r)
279        .build();
280
281    // --- Batch pipelines (same chains as their linear counterparts) ---
282
283    fn sink(mut acc: ResMut<u64>, x: u64) {
284        *acc = acc.wrapping_add(x);
285    }
286
287    // Bare: 3 compute stages + sink (same chain for both batch and linear)
288    let mut batch_bare = PipelineStart::<u64>::new()
289        .stage(|x: u64| x.wrapping_mul(3), r)
290        .stage(|x: u64| x.wrapping_add(7), r)
291        .stage(sink, r)
292        .build_batch(1024);
293
294    let mut linear_bare = PipelineStart::<u64>::new()
295        .stage(|x: u64| x.wrapping_mul(3), r)
296        .stage(|x: u64| x.wrapping_add(7), r)
297        .stage(sink, r);
298
299    // Res<T>: 3 world-access stages + sink (same chain for both)
300    let mut batch_res = PipelineStart::<u64>::new()
301        .stage(add_resource, r)
302        .stage(mul_resource, r)
303        .stage(sub_resource, r)
304        .stage(sink, r)
305        .build_batch(1024);
306
307    let mut linear_res = PipelineStart::<u64>::new()
308        .stage(add_resource, r)
309        .stage(mul_resource, r)
310        .stage(sub_resource, r)
311        .stage(sink, r);
312
313    // --- Result→catch→map→unwrap_or ---
314
315    let mut catch_pipeline = PipelineStart::<u64>::new()
316        .stage(
317            |x: u64| -> Result<u64, &'static str> { if x > 0 { Ok(x) } else { Err("zero") } },
318            r,
319        )
320        .catch(|_err: &'static str| {}, r)
321        .map(|x: u64| x.wrapping_mul(2), r)
322        .unwrap_or(0);
323
324    // --- Handler dispatch setup ---
325
326    let mut sys_res = handler_res_read.into_handler(world.registry_mut());
327    let mut sys_res_mut = handler_res_mut_write.into_handler(world.registry_mut());
328    let mut sys_two = handler_two_res.into_handler(world.registry_mut());
329    let mut sys_dyn: Box<dyn Handler<u64>> =
330        Box::new(handler_res_read.into_handler(world.registry_mut()));
331
332    // --- Pipeline benchmarks ---
333
334    print_header("Pipeline Dispatch Latency (cycles)");
335
336    let mut input = 1u64;
337
338    bench_batched("baseline (hand-written fn)", || {
339        input = input.wrapping_add(1);
340        baseline_handwritten(&mut world, black_box(input))
341    });
342
343    bench_batched("bare 3-stage pipe", || {
344        input = input.wrapping_add(1);
345        bare_3stage_run(&mut bare, &mut world, black_box(input))
346    });
347
348    bench_batched("option 3-stage (Some path)", || {
349        input = input.wrapping_add(1);
350        option_3stage_run(&mut option, &mut world, black_box(input + 1)).unwrap_or(0)
351    });
352
353    bench_batched("option 3-stage (None path)", || {
354        option_3stage_run(&mut option, &mut world, black_box(0)).unwrap_or(0)
355    });
356
357    bench_batched("world-access 2-stage (Res<T>)", || {
358        input = input.wrapping_add(1);
359        world_access_run(&mut world_resolved, &mut world, black_box(input))
360    });
361
362    bench_batched("boxed Pipeline (dyn dispatch)", || {
363        input = input.wrapping_add(1);
364        boxed_pipeline_run(&mut boxed, &mut world, black_box(input));
365        0
366    });
367
368    bench_batched("result→catch→map→unwrap_or", || {
369        input = input.wrapping_add(1);
370        catch_pipeline.run(&mut world, black_box(input))
371    });
372
373    // --- Handler dispatch benchmarks ---
374
375    println!();
376    print_header("Handler Dispatch Latency (cycles)");
377
378    bench_batched("Handler + Res<u64> (read)", || {
379        input = input.wrapping_add(1);
380        probe_handler_res_read(&mut sys_res, &mut world, black_box(input));
381        0
382    });
383
384    bench_batched("Handler + ResMut<u64> (write+stamp)", || {
385        input = input.wrapping_add(1);
386        probe_handler_res_mut(&mut sys_res_mut, &mut world, black_box(input));
387        0
388    });
389
390    bench_batched("Handler + 2x Res (tuple fetch)", || {
391        input = input.wrapping_add(1);
392        probe_handler_two_res(&mut sys_two, &mut world, black_box(input));
393        0
394    });
395
396    bench_batched("Box<dyn Handler> + Res<u64>", || {
397        input = input.wrapping_add(1);
398        probe_dyn_handler(&mut *sys_dyn, &mut world, black_box(input));
399        0
400    });
401
402    // --- Stage pipeline with Res<T> (3-stage) ---
403
404    println!();
405    print_header("Stage Pipeline with Res<T> (cycles)");
406
407    bench_batched("3-stage pipeline (Res<T>)", || {
408        input = input.wrapping_add(1);
409        stage_3.run(&mut world, black_box(input))
410    });
411
412    // --- Batch vs Linear throughput (total cycles for 100 items) ---
413
414    println!();
415    print_header("Batch vs Linear Throughput (total cycles, 100 items)");
416
417    let items_100: Vec<u64> = (0..100).collect();
418
419    // Batch bare: fill + run
420    {
421        for _ in 0..WARMUP {
422            batch_bare.input_mut().extend_from_slice(&items_100);
423            batch_bare.run(&mut world);
424        }
425        let mut samples = Vec::with_capacity(ITERATIONS);
426        for _ in 0..ITERATIONS {
427            batch_bare.input_mut().extend_from_slice(&items_100);
428            let start = rdtsc_start();
429            batch_bare.run(&mut world);
430            let end = rdtsc_end();
431            samples.push(end.wrapping_sub(start));
432        }
433        samples.sort_unstable();
434        println!(
435            "{:<44} {:>8} {:>8} {:>8}",
436            "batch bare (100 items)",
437            percentile(&samples, 50.0),
438            percentile(&samples, 99.0),
439            percentile(&samples, 99.9),
440        );
441    }
442
443    // Linear bare: 100 individual calls (same chain)
444    {
445        for _ in 0..WARMUP {
446            for i in 0..100u64 {
447                linear_bare.run(&mut world, black_box(i));
448            }
449        }
450        let mut samples = Vec::with_capacity(ITERATIONS);
451        for _ in 0..ITERATIONS {
452            let start = rdtsc_start();
453            for i in 0..100u64 {
454                linear_bare.run(&mut world, black_box(i));
455            }
456            let end = rdtsc_end();
457            samples.push(end.wrapping_sub(start));
458        }
459        samples.sort_unstable();
460        println!(
461            "{:<44} {:>8} {:>8} {:>8}",
462            "linear bare (100 calls)",
463            percentile(&samples, 50.0),
464            percentile(&samples, 99.0),
465            percentile(&samples, 99.9),
466        );
467    }
468
469    // Batch Res<T>: fill + run
470    {
471        for _ in 0..WARMUP {
472            batch_res.input_mut().extend_from_slice(&items_100);
473            batch_res.run(&mut world);
474        }
475        let mut samples = Vec::with_capacity(ITERATIONS);
476        for _ in 0..ITERATIONS {
477            batch_res.input_mut().extend_from_slice(&items_100);
478            let start = rdtsc_start();
479            batch_res.run(&mut world);
480            let end = rdtsc_end();
481            samples.push(end.wrapping_sub(start));
482        }
483        samples.sort_unstable();
484        println!(
485            "{:<44} {:>8} {:>8} {:>8}",
486            "batch Res<T> (100 items)",
487            percentile(&samples, 50.0),
488            percentile(&samples, 99.0),
489            percentile(&samples, 99.9),
490        );
491    }
492
493    // Linear Res<T>: 100 individual calls (same chain)
494    {
495        for _ in 0..WARMUP {
496            for i in 0..100u64 {
497                linear_res.run(&mut world, black_box(i));
498            }
499        }
500        let mut samples = Vec::with_capacity(ITERATIONS);
501        for _ in 0..ITERATIONS {
502            let start = rdtsc_start();
503            for i in 0..100u64 {
504                linear_res.run(&mut world, black_box(i));
505            }
506            let end = rdtsc_end();
507            samples.push(end.wrapping_sub(start));
508        }
509        samples.sort_unstable();
510        println!(
511            "{:<44} {:>8} {:>8} {:>8}",
512            "linear Res<T> (100 calls)",
513            percentile(&samples, 50.0),
514            percentile(&samples, 99.0),
515            percentile(&samples, 99.9),
516        );
517    }
518
519    // --- inputs_changed cost ---
520
521    println!();
522    print_header("inputs_changed Latency (cycles)");
523
524    // Build a world with enough resources for 8-param handlers.
525    let mut ic_wb = WorldBuilder::new();
526    ic_wb.register::<u64>(0);
527    ic_wb.register::<u32>(0);
528    ic_wb.register::<bool>(false);
529    ic_wb.register::<f64>(0.0);
530    ic_wb.register::<i64>(0);
531    ic_wb.register::<i32>(0);
532    ic_wb.register::<u8>(0);
533    ic_wb.register::<u16>(0);
534    let mut ic_world = ic_wb.build();
535    let ic_r = ic_world.registry_mut();
536
537    let ic1 = ic_1p.into_handler(ic_r);
538    let ic2 = ic_2p.into_handler(ic_r);
539    let ic4 = ic_4p.into_handler(ic_r);
540    let ic8 = ic_8p.into_handler(ic_r);
541
542    // Tick 0: all changed (changed_at == current_sequence).
543    bench_batched("inputs_changed 1-param (changed)", || {
544        if ic1.inputs_changed(&ic_world) { 1 } else { 0 }
545    });
546
547    bench_batched("inputs_changed 2-param (changed)", || {
548        if ic2.inputs_changed(&ic_world) { 1 } else { 0 }
549    });
550
551    bench_batched("inputs_changed 4-param (changed)", || {
552        if ic4.inputs_changed(&ic_world) { 1 } else { 0 }
553    });
554
555    bench_batched("inputs_changed 8-param (changed)", || {
556        if ic8.inputs_changed(&ic_world) { 1 } else { 0 }
557    });
558
559    // Advance tick so inputs are stale.
560    ic_world.next_sequence();
561
562    bench_batched("inputs_changed 1-param (stale)", || {
563        if ic1.inputs_changed(&ic_world) { 1 } else { 0 }
564    });
565
566    bench_batched("inputs_changed 2-param (stale)", || {
567        if ic2.inputs_changed(&ic_world) { 1 } else { 0 }
568    });
569
570    bench_batched("inputs_changed 4-param (stale)", || {
571        if ic4.inputs_changed(&ic_world) { 1 } else { 0 }
572    });
573
574    bench_batched("inputs_changed 8-param (stale)", || {
575        if ic8.inputs_changed(&ic_world) { 1 } else { 0 }
576    });
577
578    println!();
579}