Skip to main content

Handler

Trait Handler 

Source
pub trait Handler<E> {
    // Required method
    fn run(&mut self, world: &mut World, event: E);

    // Provided methods
    fn inputs_changed(&self, world: &World) -> bool { ... }
    fn name(&self) -> &'static str { ... }
}
Expand description

Object-safe dispatch trait for event handlers.

Enables Box<dyn Handler<E>> for type-erased heterogeneous dispatch. Storage and scheduling are the driver’s responsibility — this trait only defines the dispatch interface.

Takes &mut World — drivers call this directly in their poll loop.

Required Methods§

Source

fn run(&mut self, world: &mut World, event: E)

Run this handler with the given event.

Provided Methods§

Source

fn inputs_changed(&self, world: &World) -> bool

Returns true if any input resource was modified this sequence.

Default returns true (always run). Callback overrides by checking its state via SystemParam::any_changed.

Examples found in repository?
examples/mock_runtime.rs (line 243)
203fn main() {
204    // -- Build ----------------------------------------------------------------
205
206    let mut wb = WorldBuilder::new();
207    wb.install_plugin(TradingPlugin {
208        initial_prices: vec![("BTC", 50_000.0), ("ETH", 3_000.0)],
209        risk_cap: 100,
210    });
211    let mut md = wb.install_driver(MarketDataInstaller);
212    let mut world = wb.build();
213
214    // Standalone handler — demonstrates change detection.
215    fn on_signal(signals: Res<SignalBuffer>, _event: ()) {
216        black_box(signals.signals.len());
217    }
218    let mut signal_handler = on_signal.into_handler(world.registry_mut());
219
220    // -- Correctness check ----------------------------------------------------
221
222    let ticks = [
223        MarketTick {
224            symbol: "BTC",
225            price: 50_100.0,
226        }, // delta=100 > 50 → signal
227        MarketTick {
228            symbol: "ETH",
229            price: 3_010.0,
230        }, // delta=10 < 50 → no signal
231        MarketTick {
232            symbol: "BTC",
233            price: 49_900.0,
234        }, // delta=200 > 50 → signal
235    ];
236
237    md.poll(&mut world, &ticks);
238
239    // 2 signals accepted, risk cap=100 so both go through.
240    assert_eq!(world.resource::<OrderCount>().0, 2);
241
242    // Change detection: SignalBuffer was modified → handler should run.
243    assert!(signal_handler.inputs_changed(&world));
244    signal_handler.run(&mut world, ());
245
246    // Advance sequence, no new writes → handler should skip.
247    world.next_sequence();
248    assert!(!signal_handler.inputs_changed(&world));
249
250    println!("Correctness checks passed.\n");
251
252    // -- Latency measurement --------------------------------------------------
253
254    const WARMUP: usize = 1_000;
255    const ITERATIONS: usize = 1_000;
256
257    // Ticks that exercise the full pipeline path (signal detection + cache write).
258    let bench_ticks = [
259        MarketTick {
260            symbol: "BTC",
261            price: 50_100.0,
262        },
263        MarketTick {
264            symbol: "ETH",
265            price: 3_100.0,
266        },
267        MarketTick {
268            symbol: "BTC",
269            price: 50_200.0,
270        },
271        MarketTick {
272            symbol: "ETH",
273            price: 3_200.0,
274        },
275    ];
276
277    println!(
278        "=== nexus-rt Dispatch Latency (cycles, {} iterations) ===\n",
279        ITERATIONS
280    );
281    println!(
282        "{:<44} {:>8} {:>8} {:>8}",
283        "Operation", "p50", "p99", "p999"
284    );
285    println!("{}", "-".repeat(72));
286
287    // Single tick through dyn pipeline
288    {
289        let tick = bench_ticks[0];
290        for _ in 0..WARMUP {
291            world.next_sequence();
292            md.pipeline.run(&mut world, black_box(tick));
293        }
294        let mut samples = Vec::with_capacity(ITERATIONS);
295        for _ in 0..ITERATIONS {
296            world.next_sequence();
297            let start = rdtsc_start();
298            md.pipeline.run(&mut world, black_box(tick));
299            let end = rdtsc_end();
300            samples.push(end.wrapping_sub(start));
301        }
302        report("single tick (dyn pipeline, 3 stages)", &mut samples);
303    }
304
305    // Standalone handler (1 param, Res<T>)
306    {
307        for _ in 0..WARMUP {
308            black_box(());
309            signal_handler.run(&mut world, ());
310        }
311        let mut samples = Vec::with_capacity(ITERATIONS);
312        for _ in 0..ITERATIONS {
313            let start = rdtsc_start();
314            black_box(());
315            signal_handler.run(&mut world, ());
316            let end = rdtsc_end();
317            samples.push(end.wrapping_sub(start));
318        }
319        report("handler dispatch (1 param, Res<T>)", &mut samples);
320    }
321
322    // 4-tick batch through driver poll
323    {
324        for _ in 0..WARMUP {
325            md.poll(&mut world, &bench_ticks);
326        }
327        let mut samples = Vec::with_capacity(ITERATIONS);
328        for _ in 0..ITERATIONS {
329            let start = rdtsc_start();
330            md.poll(&mut world, black_box(&bench_ticks));
331            let end = rdtsc_end();
332            samples.push(end.wrapping_sub(start));
333        }
334        let mut per_tick: Vec<u64> = samples
335            .iter()
336            .map(|&s| s / bench_ticks.len() as u64)
337            .collect();
338        report("4-tick poll (total)", &mut samples);
339        report("4-tick poll (per tick)", &mut per_tick);
340    }
341
342    // Change detection
343    {
344        world.next_sequence(); // ensure stale
345        for _ in 0..WARMUP {
346            black_box(signal_handler.inputs_changed(&world));
347        }
348        let mut samples = Vec::with_capacity(ITERATIONS);
349        for _ in 0..ITERATIONS {
350            let start = rdtsc_start();
351            black_box(signal_handler.inputs_changed(&world));
352            let end = rdtsc_end();
353            samples.push(end.wrapping_sub(start));
354        }
355        report("inputs_changed (1 param, stale)", &mut samples);
356    }
357
358    println!();
359}
More examples
Hide additional examples
examples/perf_pipeline.rs (line 544)
236fn main() {
237    let mut wb = WorldBuilder::new();
238    wb.register::<u64>(42);
239    wb.register::<u32>(7);
240    let mut world = wb.build();
241    let r = world.registry_mut();
242
243    // --- Bare 3-stage pipeline (no Option, no World access) ---
244
245    let mut bare = PipelineStart::<u64>::new()
246        .stage(|x: u64| x.wrapping_mul(3), r)
247        .stage(|x: u64| x.wrapping_add(7), r)
248        .stage(|x: u64| x >> 1, r);
249
250    // --- Option 3-stage pipeline ---
251
252    let mut option = PipelineStart::<u64>::new()
253        .stage(
254            |x: u64| -> Option<u64> { if x > 0 { Some(x) } else { None } },
255            r,
256        )
257        .map(|x: u64| x.wrapping_mul(3), r)
258        .filter(|_w, x| *x < 1_000_000);
259
260    // --- World-accessing pipeline (pre-resolved via Res<T>) ---
261
262    let mut world_resolved = PipelineStart::<u64>::new()
263        .stage(add_resource, r)
264        .stage(mul_resource, r);
265
266    // --- World-accessing 3-stage pipeline ---
267
268    let mut stage_3 = PipelineStart::<u64>::new()
269        .stage(add_resource, r)
270        .stage(mul_resource, r)
271        .stage(sub_resource, r);
272
273    // --- Built (boxed) pipeline ---
274
275    let mut boxed = PipelineStart::<u64>::new()
276        .stage(|x: u64| x.wrapping_mul(3), r)
277        .stage(|x: u64| x.wrapping_add(7), r)
278        .stage(|_x: u64| {}, r)
279        .build();
280
281    // --- Batch pipelines (same chains as their linear counterparts) ---
282
283    fn sink(mut acc: ResMut<u64>, x: u64) {
284        *acc = acc.wrapping_add(x);
285    }
286
287    // Bare: 3 compute stages + sink (same chain for both batch and linear)
288    let mut batch_bare = PipelineStart::<u64>::new()
289        .stage(|x: u64| x.wrapping_mul(3), r)
290        .stage(|x: u64| x.wrapping_add(7), r)
291        .stage(sink, r)
292        .build_batch(1024);
293
294    let mut linear_bare = PipelineStart::<u64>::new()
295        .stage(|x: u64| x.wrapping_mul(3), r)
296        .stage(|x: u64| x.wrapping_add(7), r)
297        .stage(sink, r);
298
299    // Res<T>: 3 world-access stages + sink (same chain for both)
300    let mut batch_res = PipelineStart::<u64>::new()
301        .stage(add_resource, r)
302        .stage(mul_resource, r)
303        .stage(sub_resource, r)
304        .stage(sink, r)
305        .build_batch(1024);
306
307    let mut linear_res = PipelineStart::<u64>::new()
308        .stage(add_resource, r)
309        .stage(mul_resource, r)
310        .stage(sub_resource, r)
311        .stage(sink, r);
312
313    // --- Result→catch→map→unwrap_or ---
314
315    let mut catch_pipeline = PipelineStart::<u64>::new()
316        .stage(
317            |x: u64| -> Result<u64, &'static str> { if x > 0 { Ok(x) } else { Err("zero") } },
318            r,
319        )
320        .catch(|_err: &'static str| {}, r)
321        .map(|x: u64| x.wrapping_mul(2), r)
322        .unwrap_or(0);
323
324    // --- Handler dispatch setup ---
325
326    let mut sys_res = handler_res_read.into_handler(world.registry_mut());
327    let mut sys_res_mut = handler_res_mut_write.into_handler(world.registry_mut());
328    let mut sys_two = handler_two_res.into_handler(world.registry_mut());
329    let mut sys_dyn: Box<dyn Handler<u64>> =
330        Box::new(handler_res_read.into_handler(world.registry_mut()));
331
332    // --- Pipeline benchmarks ---
333
334    print_header("Pipeline Dispatch Latency (cycles)");
335
336    let mut input = 1u64;
337
338    bench_batched("baseline (hand-written fn)", || {
339        input = input.wrapping_add(1);
340        baseline_handwritten(&mut world, black_box(input))
341    });
342
343    bench_batched("bare 3-stage pipe", || {
344        input = input.wrapping_add(1);
345        bare_3stage_run(&mut bare, &mut world, black_box(input))
346    });
347
348    bench_batched("option 3-stage (Some path)", || {
349        input = input.wrapping_add(1);
350        option_3stage_run(&mut option, &mut world, black_box(input + 1)).unwrap_or(0)
351    });
352
353    bench_batched("option 3-stage (None path)", || {
354        option_3stage_run(&mut option, &mut world, black_box(0)).unwrap_or(0)
355    });
356
357    bench_batched("world-access 2-stage (Res<T>)", || {
358        input = input.wrapping_add(1);
359        world_access_run(&mut world_resolved, &mut world, black_box(input))
360    });
361
362    bench_batched("boxed Pipeline (dyn dispatch)", || {
363        input = input.wrapping_add(1);
364        boxed_pipeline_run(&mut boxed, &mut world, black_box(input));
365        0
366    });
367
368    bench_batched("result→catch→map→unwrap_or", || {
369        input = input.wrapping_add(1);
370        catch_pipeline.run(&mut world, black_box(input))
371    });
372
373    // --- Handler dispatch benchmarks ---
374
375    println!();
376    print_header("Handler Dispatch Latency (cycles)");
377
378    bench_batched("Handler + Res<u64> (read)", || {
379        input = input.wrapping_add(1);
380        probe_handler_res_read(&mut sys_res, &mut world, black_box(input));
381        0
382    });
383
384    bench_batched("Handler + ResMut<u64> (write+stamp)", || {
385        input = input.wrapping_add(1);
386        probe_handler_res_mut(&mut sys_res_mut, &mut world, black_box(input));
387        0
388    });
389
390    bench_batched("Handler + 2x Res (tuple fetch)", || {
391        input = input.wrapping_add(1);
392        probe_handler_two_res(&mut sys_two, &mut world, black_box(input));
393        0
394    });
395
396    bench_batched("Box<dyn Handler> + Res<u64>", || {
397        input = input.wrapping_add(1);
398        probe_dyn_handler(&mut *sys_dyn, &mut world, black_box(input));
399        0
400    });
401
402    // --- Stage pipeline with Res<T> (3-stage) ---
403
404    println!();
405    print_header("Stage Pipeline with Res<T> (cycles)");
406
407    bench_batched("3-stage pipeline (Res<T>)", || {
408        input = input.wrapping_add(1);
409        stage_3.run(&mut world, black_box(input))
410    });
411
412    // --- Batch vs Linear throughput (total cycles for 100 items) ---
413
414    println!();
415    print_header("Batch vs Linear Throughput (total cycles, 100 items)");
416
417    let items_100: Vec<u64> = (0..100).collect();
418
419    // Batch bare: fill + run
420    {
421        for _ in 0..WARMUP {
422            batch_bare.input_mut().extend_from_slice(&items_100);
423            batch_bare.run(&mut world);
424        }
425        let mut samples = Vec::with_capacity(ITERATIONS);
426        for _ in 0..ITERATIONS {
427            batch_bare.input_mut().extend_from_slice(&items_100);
428            let start = rdtsc_start();
429            batch_bare.run(&mut world);
430            let end = rdtsc_end();
431            samples.push(end.wrapping_sub(start));
432        }
433        samples.sort_unstable();
434        println!(
435            "{:<44} {:>8} {:>8} {:>8}",
436            "batch bare (100 items)",
437            percentile(&samples, 50.0),
438            percentile(&samples, 99.0),
439            percentile(&samples, 99.9),
440        );
441    }
442
443    // Linear bare: 100 individual calls (same chain)
444    {
445        for _ in 0..WARMUP {
446            for i in 0..100u64 {
447                linear_bare.run(&mut world, black_box(i));
448            }
449        }
450        let mut samples = Vec::with_capacity(ITERATIONS);
451        for _ in 0..ITERATIONS {
452            let start = rdtsc_start();
453            for i in 0..100u64 {
454                linear_bare.run(&mut world, black_box(i));
455            }
456            let end = rdtsc_end();
457            samples.push(end.wrapping_sub(start));
458        }
459        samples.sort_unstable();
460        println!(
461            "{:<44} {:>8} {:>8} {:>8}",
462            "linear bare (100 calls)",
463            percentile(&samples, 50.0),
464            percentile(&samples, 99.0),
465            percentile(&samples, 99.9),
466        );
467    }
468
469    // Batch Res<T>: fill + run
470    {
471        for _ in 0..WARMUP {
472            batch_res.input_mut().extend_from_slice(&items_100);
473            batch_res.run(&mut world);
474        }
475        let mut samples = Vec::with_capacity(ITERATIONS);
476        for _ in 0..ITERATIONS {
477            batch_res.input_mut().extend_from_slice(&items_100);
478            let start = rdtsc_start();
479            batch_res.run(&mut world);
480            let end = rdtsc_end();
481            samples.push(end.wrapping_sub(start));
482        }
483        samples.sort_unstable();
484        println!(
485            "{:<44} {:>8} {:>8} {:>8}",
486            "batch Res<T> (100 items)",
487            percentile(&samples, 50.0),
488            percentile(&samples, 99.0),
489            percentile(&samples, 99.9),
490        );
491    }
492
493    // Linear Res<T>: 100 individual calls (same chain)
494    {
495        for _ in 0..WARMUP {
496            for i in 0..100u64 {
497                linear_res.run(&mut world, black_box(i));
498            }
499        }
500        let mut samples = Vec::with_capacity(ITERATIONS);
501        for _ in 0..ITERATIONS {
502            let start = rdtsc_start();
503            for i in 0..100u64 {
504                linear_res.run(&mut world, black_box(i));
505            }
506            let end = rdtsc_end();
507            samples.push(end.wrapping_sub(start));
508        }
509        samples.sort_unstable();
510        println!(
511            "{:<44} {:>8} {:>8} {:>8}",
512            "linear Res<T> (100 calls)",
513            percentile(&samples, 50.0),
514            percentile(&samples, 99.0),
515            percentile(&samples, 99.9),
516        );
517    }
518
519    // --- inputs_changed cost ---
520
521    println!();
522    print_header("inputs_changed Latency (cycles)");
523
524    // Build a world with enough resources for 8-param handlers.
525    let mut ic_wb = WorldBuilder::new();
526    ic_wb.register::<u64>(0);
527    ic_wb.register::<u32>(0);
528    ic_wb.register::<bool>(false);
529    ic_wb.register::<f64>(0.0);
530    ic_wb.register::<i64>(0);
531    ic_wb.register::<i32>(0);
532    ic_wb.register::<u8>(0);
533    ic_wb.register::<u16>(0);
534    let mut ic_world = ic_wb.build();
535    let ic_r = ic_world.registry_mut();
536
537    let ic1 = ic_1p.into_handler(ic_r);
538    let ic2 = ic_2p.into_handler(ic_r);
539    let ic4 = ic_4p.into_handler(ic_r);
540    let ic8 = ic_8p.into_handler(ic_r);
541
542    // Tick 0: all changed (changed_at == current_sequence).
543    bench_batched("inputs_changed 1-param (changed)", || {
544        if ic1.inputs_changed(&ic_world) { 1 } else { 0 }
545    });
546
547    bench_batched("inputs_changed 2-param (changed)", || {
548        if ic2.inputs_changed(&ic_world) { 1 } else { 0 }
549    });
550
551    bench_batched("inputs_changed 4-param (changed)", || {
552        if ic4.inputs_changed(&ic_world) { 1 } else { 0 }
553    });
554
555    bench_batched("inputs_changed 8-param (changed)", || {
556        if ic8.inputs_changed(&ic_world) { 1 } else { 0 }
557    });
558
559    // Advance tick so inputs are stale.
560    ic_world.next_sequence();
561
562    bench_batched("inputs_changed 1-param (stale)", || {
563        if ic1.inputs_changed(&ic_world) { 1 } else { 0 }
564    });
565
566    bench_batched("inputs_changed 2-param (stale)", || {
567        if ic2.inputs_changed(&ic_world) { 1 } else { 0 }
568    });
569
570    bench_batched("inputs_changed 4-param (stale)", || {
571        if ic4.inputs_changed(&ic_world) { 1 } else { 0 }
572    });
573
574    bench_batched("inputs_changed 8-param (stale)", || {
575        if ic8.inputs_changed(&ic_world) { 1 } else { 0 }
576    });
577
578    println!();
579}
Source

fn name(&self) -> &'static str

Returns the handler’s name.

Default returns "<unnamed>". Callback captures the function’s type_name at construction time.

Implementors§

Source§

impl<C: 'static, E, F: 'static, P0: SystemParam + 'static> Handler<E> for Callback<C, F, (P0,)>
where for<'a> &'a mut F: FnMut(&mut C, P0, E) + FnMut(&mut C, P0::Item<'a>, E),

Source§

impl<C: 'static, E, F: 'static, P0: SystemParam + 'static, P1: SystemParam + 'static> Handler<E> for Callback<C, F, (P0, P1)>
where for<'a> &'a mut F: FnMut(&mut C, P0, P1, E) + FnMut(&mut C, P0::Item<'a>, P1::Item<'a>, E),

Source§

impl<C: 'static, E, F: 'static, P0: SystemParam + 'static, P1: SystemParam + 'static, P2: SystemParam + 'static> Handler<E> for Callback<C, F, (P0, P1, P2)>
where for<'a> &'a mut F: FnMut(&mut C, P0, P1, P2, E) + FnMut(&mut C, P0::Item<'a>, P1::Item<'a>, P2::Item<'a>, E),

Source§

impl<C: 'static, E, F: 'static, P0: SystemParam + 'static, P1: SystemParam + 'static, P2: SystemParam + 'static, P3: SystemParam + 'static> Handler<E> for Callback<C, F, (P0, P1, P2, P3)>
where for<'a> &'a mut F: FnMut(&mut C, P0, P1, P2, P3, E) + FnMut(&mut C, P0::Item<'a>, P1::Item<'a>, P2::Item<'a>, P3::Item<'a>, E),

Source§

impl<C: 'static, E, F: 'static, P0: SystemParam + 'static, P1: SystemParam + 'static, P2: SystemParam + 'static, P3: SystemParam + 'static, P4: SystemParam + 'static> Handler<E> for Callback<C, F, (P0, P1, P2, P3, P4)>
where for<'a> &'a mut F: FnMut(&mut C, P0, P1, P2, P3, P4, E) + FnMut(&mut C, P0::Item<'a>, P1::Item<'a>, P2::Item<'a>, P3::Item<'a>, P4::Item<'a>, E),

Source§

impl<C: 'static, E, F: 'static, P0: SystemParam + 'static, P1: SystemParam + 'static, P2: SystemParam + 'static, P3: SystemParam + 'static, P4: SystemParam + 'static, P5: SystemParam + 'static> Handler<E> for Callback<C, F, (P0, P1, P2, P3, P4, P5)>
where for<'a> &'a mut F: FnMut(&mut C, P0, P1, P2, P3, P4, P5, E) + FnMut(&mut C, P0::Item<'a>, P1::Item<'a>, P2::Item<'a>, P3::Item<'a>, P4::Item<'a>, P5::Item<'a>, E),

Source§

impl<C: 'static, E, F: 'static, P0: SystemParam + 'static, P1: SystemParam + 'static, P2: SystemParam + 'static, P3: SystemParam + 'static, P4: SystemParam + 'static, P5: SystemParam + 'static, P6: SystemParam + 'static> Handler<E> for Callback<C, F, (P0, P1, P2, P3, P4, P5, P6)>
where for<'a> &'a mut F: FnMut(&mut C, P0, P1, P2, P3, P4, P5, P6, E) + FnMut(&mut C, P0::Item<'a>, P1::Item<'a>, P2::Item<'a>, P3::Item<'a>, P4::Item<'a>, P5::Item<'a>, P6::Item<'a>, E),

Source§

impl<C: 'static, E, F: 'static, P0: SystemParam + 'static, P1: SystemParam + 'static, P2: SystemParam + 'static, P3: SystemParam + 'static, P4: SystemParam + 'static, P5: SystemParam + 'static, P6: SystemParam + 'static, P7: SystemParam + 'static> Handler<E> for Callback<C, F, (P0, P1, P2, P3, P4, P5, P6, P7)>
where for<'a> &'a mut F: FnMut(&mut C, P0, P1, P2, P3, P4, P5, P6, P7, E) + FnMut(&mut C, P0::Item<'a>, P1::Item<'a>, P2::Item<'a>, P3::Item<'a>, P4::Item<'a>, P5::Item<'a>, P6::Item<'a>, P7::Item<'a>, E),

Source§

impl<C: 'static, E, F: FnMut(&mut C, E) + 'static> Handler<E> for Callback<C, F, ()>

Source§

impl<In: 'static, F: FnMut(&mut World, In) + 'static> Handler<In> for Pipeline<In, F>