pub trait Handler<E> {
// Required method
fn run(&mut self, world: &mut World, event: E);
// Provided methods
fn inputs_changed(&self, world: &World) -> bool { ... }
fn name(&self) -> &'static str { ... }
}Expand description
Object-safe dispatch trait for event handlers.
Enables Box<dyn Handler<E>> for type-erased heterogeneous dispatch.
Storage and scheduling are the driver’s responsibility — this trait
only defines the dispatch interface.
Takes &mut World — drivers call this directly in their poll loop.
Required Methods§
Provided Methods§
Sourcefn inputs_changed(&self, world: &World) -> bool
fn inputs_changed(&self, world: &World) -> bool
Returns true if any input resource was modified this sequence.
Default returns true (always run). Callback overrides
by checking its state via SystemParam::any_changed.
Examples found in repository?
examples/mock_runtime.rs (line 243)
203fn main() {
204 // -- Build ----------------------------------------------------------------
205
206 let mut wb = WorldBuilder::new();
207 wb.install_plugin(TradingPlugin {
208 initial_prices: vec![("BTC", 50_000.0), ("ETH", 3_000.0)],
209 risk_cap: 100,
210 });
211 let mut md = wb.install_driver(MarketDataInstaller);
212 let mut world = wb.build();
213
214 // Standalone handler — demonstrates change detection.
215 fn on_signal(signals: Res<SignalBuffer>, _event: ()) {
216 black_box(signals.signals.len());
217 }
218 let mut signal_handler = on_signal.into_handler(world.registry_mut());
219
220 // -- Correctness check ----------------------------------------------------
221
222 let ticks = [
223 MarketTick {
224 symbol: "BTC",
225 price: 50_100.0,
226 }, // delta=100 > 50 → signal
227 MarketTick {
228 symbol: "ETH",
229 price: 3_010.0,
230 }, // delta=10 < 50 → no signal
231 MarketTick {
232 symbol: "BTC",
233 price: 49_900.0,
234 }, // delta=200 > 50 → signal
235 ];
236
237 md.poll(&mut world, &ticks);
238
239 // 2 signals accepted, risk cap=100 so both go through.
240 assert_eq!(world.resource::<OrderCount>().0, 2);
241
242 // Change detection: SignalBuffer was modified → handler should run.
243 assert!(signal_handler.inputs_changed(&world));
244 signal_handler.run(&mut world, ());
245
246 // Advance sequence, no new writes → handler should skip.
247 world.next_sequence();
248 assert!(!signal_handler.inputs_changed(&world));
249
250 println!("Correctness checks passed.\n");
251
252 // -- Latency measurement --------------------------------------------------
253
254 const WARMUP: usize = 1_000;
255 const ITERATIONS: usize = 1_000;
256
257 // Ticks that exercise the full pipeline path (signal detection + cache write).
258 let bench_ticks = [
259 MarketTick {
260 symbol: "BTC",
261 price: 50_100.0,
262 },
263 MarketTick {
264 symbol: "ETH",
265 price: 3_100.0,
266 },
267 MarketTick {
268 symbol: "BTC",
269 price: 50_200.0,
270 },
271 MarketTick {
272 symbol: "ETH",
273 price: 3_200.0,
274 },
275 ];
276
277 println!(
278 "=== nexus-rt Dispatch Latency (cycles, {} iterations) ===\n",
279 ITERATIONS
280 );
281 println!(
282 "{:<44} {:>8} {:>8} {:>8}",
283 "Operation", "p50", "p99", "p999"
284 );
285 println!("{}", "-".repeat(72));
286
287 // Single tick through dyn pipeline
288 {
289 let tick = bench_ticks[0];
290 for _ in 0..WARMUP {
291 world.next_sequence();
292 md.pipeline.run(&mut world, black_box(tick));
293 }
294 let mut samples = Vec::with_capacity(ITERATIONS);
295 for _ in 0..ITERATIONS {
296 world.next_sequence();
297 let start = rdtsc_start();
298 md.pipeline.run(&mut world, black_box(tick));
299 let end = rdtsc_end();
300 samples.push(end.wrapping_sub(start));
301 }
302 report("single tick (dyn pipeline, 3 stages)", &mut samples);
303 }
304
305 // Standalone handler (1 param, Res<T>)
306 {
307 for _ in 0..WARMUP {
308 black_box(());
309 signal_handler.run(&mut world, ());
310 }
311 let mut samples = Vec::with_capacity(ITERATIONS);
312 for _ in 0..ITERATIONS {
313 let start = rdtsc_start();
314 black_box(());
315 signal_handler.run(&mut world, ());
316 let end = rdtsc_end();
317 samples.push(end.wrapping_sub(start));
318 }
319 report("handler dispatch (1 param, Res<T>)", &mut samples);
320 }
321
322 // 4-tick batch through driver poll
323 {
324 for _ in 0..WARMUP {
325 md.poll(&mut world, &bench_ticks);
326 }
327 let mut samples = Vec::with_capacity(ITERATIONS);
328 for _ in 0..ITERATIONS {
329 let start = rdtsc_start();
330 md.poll(&mut world, black_box(&bench_ticks));
331 let end = rdtsc_end();
332 samples.push(end.wrapping_sub(start));
333 }
334 let mut per_tick: Vec<u64> = samples
335 .iter()
336 .map(|&s| s / bench_ticks.len() as u64)
337 .collect();
338 report("4-tick poll (total)", &mut samples);
339 report("4-tick poll (per tick)", &mut per_tick);
340 }
341
342 // Change detection
343 {
344 world.next_sequence(); // ensure stale
345 for _ in 0..WARMUP {
346 black_box(signal_handler.inputs_changed(&world));
347 }
348 let mut samples = Vec::with_capacity(ITERATIONS);
349 for _ in 0..ITERATIONS {
350 let start = rdtsc_start();
351 black_box(signal_handler.inputs_changed(&world));
352 let end = rdtsc_end();
353 samples.push(end.wrapping_sub(start));
354 }
355 report("inputs_changed (1 param, stale)", &mut samples);
356 }
357
358 println!();
359}More examples
examples/perf_pipeline.rs (line 544)
236fn main() {
237 let mut wb = WorldBuilder::new();
238 wb.register::<u64>(42);
239 wb.register::<u32>(7);
240 let mut world = wb.build();
241 let r = world.registry_mut();
242
243 // --- Bare 3-stage pipeline (no Option, no World access) ---
244
245 let mut bare = PipelineStart::<u64>::new()
246 .stage(|x: u64| x.wrapping_mul(3), r)
247 .stage(|x: u64| x.wrapping_add(7), r)
248 .stage(|x: u64| x >> 1, r);
249
250 // --- Option 3-stage pipeline ---
251
252 let mut option = PipelineStart::<u64>::new()
253 .stage(
254 |x: u64| -> Option<u64> { if x > 0 { Some(x) } else { None } },
255 r,
256 )
257 .map(|x: u64| x.wrapping_mul(3), r)
258 .filter(|_w, x| *x < 1_000_000);
259
260 // --- World-accessing pipeline (pre-resolved via Res<T>) ---
261
262 let mut world_resolved = PipelineStart::<u64>::new()
263 .stage(add_resource, r)
264 .stage(mul_resource, r);
265
266 // --- World-accessing 3-stage pipeline ---
267
268 let mut stage_3 = PipelineStart::<u64>::new()
269 .stage(add_resource, r)
270 .stage(mul_resource, r)
271 .stage(sub_resource, r);
272
273 // --- Built (boxed) pipeline ---
274
275 let mut boxed = PipelineStart::<u64>::new()
276 .stage(|x: u64| x.wrapping_mul(3), r)
277 .stage(|x: u64| x.wrapping_add(7), r)
278 .stage(|_x: u64| {}, r)
279 .build();
280
281 // --- Batch pipelines (same chains as their linear counterparts) ---
282
283 fn sink(mut acc: ResMut<u64>, x: u64) {
284 *acc = acc.wrapping_add(x);
285 }
286
287 // Bare: 3 compute stages + sink (same chain for both batch and linear)
288 let mut batch_bare = PipelineStart::<u64>::new()
289 .stage(|x: u64| x.wrapping_mul(3), r)
290 .stage(|x: u64| x.wrapping_add(7), r)
291 .stage(sink, r)
292 .build_batch(1024);
293
294 let mut linear_bare = PipelineStart::<u64>::new()
295 .stage(|x: u64| x.wrapping_mul(3), r)
296 .stage(|x: u64| x.wrapping_add(7), r)
297 .stage(sink, r);
298
299 // Res<T>: 3 world-access stages + sink (same chain for both)
300 let mut batch_res = PipelineStart::<u64>::new()
301 .stage(add_resource, r)
302 .stage(mul_resource, r)
303 .stage(sub_resource, r)
304 .stage(sink, r)
305 .build_batch(1024);
306
307 let mut linear_res = PipelineStart::<u64>::new()
308 .stage(add_resource, r)
309 .stage(mul_resource, r)
310 .stage(sub_resource, r)
311 .stage(sink, r);
312
313 // --- Result→catch→map→unwrap_or ---
314
315 let mut catch_pipeline = PipelineStart::<u64>::new()
316 .stage(
317 |x: u64| -> Result<u64, &'static str> { if x > 0 { Ok(x) } else { Err("zero") } },
318 r,
319 )
320 .catch(|_err: &'static str| {}, r)
321 .map(|x: u64| x.wrapping_mul(2), r)
322 .unwrap_or(0);
323
324 // --- Handler dispatch setup ---
325
326 let mut sys_res = handler_res_read.into_handler(world.registry_mut());
327 let mut sys_res_mut = handler_res_mut_write.into_handler(world.registry_mut());
328 let mut sys_two = handler_two_res.into_handler(world.registry_mut());
329 let mut sys_dyn: Box<dyn Handler<u64>> =
330 Box::new(handler_res_read.into_handler(world.registry_mut()));
331
332 // --- Pipeline benchmarks ---
333
334 print_header("Pipeline Dispatch Latency (cycles)");
335
336 let mut input = 1u64;
337
338 bench_batched("baseline (hand-written fn)", || {
339 input = input.wrapping_add(1);
340 baseline_handwritten(&mut world, black_box(input))
341 });
342
343 bench_batched("bare 3-stage pipe", || {
344 input = input.wrapping_add(1);
345 bare_3stage_run(&mut bare, &mut world, black_box(input))
346 });
347
348 bench_batched("option 3-stage (Some path)", || {
349 input = input.wrapping_add(1);
350 option_3stage_run(&mut option, &mut world, black_box(input + 1)).unwrap_or(0)
351 });
352
353 bench_batched("option 3-stage (None path)", || {
354 option_3stage_run(&mut option, &mut world, black_box(0)).unwrap_or(0)
355 });
356
357 bench_batched("world-access 2-stage (Res<T>)", || {
358 input = input.wrapping_add(1);
359 world_access_run(&mut world_resolved, &mut world, black_box(input))
360 });
361
362 bench_batched("boxed Pipeline (dyn dispatch)", || {
363 input = input.wrapping_add(1);
364 boxed_pipeline_run(&mut boxed, &mut world, black_box(input));
365 0
366 });
367
368 bench_batched("result→catch→map→unwrap_or", || {
369 input = input.wrapping_add(1);
370 catch_pipeline.run(&mut world, black_box(input))
371 });
372
373 // --- Handler dispatch benchmarks ---
374
375 println!();
376 print_header("Handler Dispatch Latency (cycles)");
377
378 bench_batched("Handler + Res<u64> (read)", || {
379 input = input.wrapping_add(1);
380 probe_handler_res_read(&mut sys_res, &mut world, black_box(input));
381 0
382 });
383
384 bench_batched("Handler + ResMut<u64> (write+stamp)", || {
385 input = input.wrapping_add(1);
386 probe_handler_res_mut(&mut sys_res_mut, &mut world, black_box(input));
387 0
388 });
389
390 bench_batched("Handler + 2x Res (tuple fetch)", || {
391 input = input.wrapping_add(1);
392 probe_handler_two_res(&mut sys_two, &mut world, black_box(input));
393 0
394 });
395
396 bench_batched("Box<dyn Handler> + Res<u64>", || {
397 input = input.wrapping_add(1);
398 probe_dyn_handler(&mut *sys_dyn, &mut world, black_box(input));
399 0
400 });
401
402 // --- Stage pipeline with Res<T> (3-stage) ---
403
404 println!();
405 print_header("Stage Pipeline with Res<T> (cycles)");
406
407 bench_batched("3-stage pipeline (Res<T>)", || {
408 input = input.wrapping_add(1);
409 stage_3.run(&mut world, black_box(input))
410 });
411
412 // --- Batch vs Linear throughput (total cycles for 100 items) ---
413
414 println!();
415 print_header("Batch vs Linear Throughput (total cycles, 100 items)");
416
417 let items_100: Vec<u64> = (0..100).collect();
418
419 // Batch bare: fill + run
420 {
421 for _ in 0..WARMUP {
422 batch_bare.input_mut().extend_from_slice(&items_100);
423 batch_bare.run(&mut world);
424 }
425 let mut samples = Vec::with_capacity(ITERATIONS);
426 for _ in 0..ITERATIONS {
427 batch_bare.input_mut().extend_from_slice(&items_100);
428 let start = rdtsc_start();
429 batch_bare.run(&mut world);
430 let end = rdtsc_end();
431 samples.push(end.wrapping_sub(start));
432 }
433 samples.sort_unstable();
434 println!(
435 "{:<44} {:>8} {:>8} {:>8}",
436 "batch bare (100 items)",
437 percentile(&samples, 50.0),
438 percentile(&samples, 99.0),
439 percentile(&samples, 99.9),
440 );
441 }
442
443 // Linear bare: 100 individual calls (same chain)
444 {
445 for _ in 0..WARMUP {
446 for i in 0..100u64 {
447 linear_bare.run(&mut world, black_box(i));
448 }
449 }
450 let mut samples = Vec::with_capacity(ITERATIONS);
451 for _ in 0..ITERATIONS {
452 let start = rdtsc_start();
453 for i in 0..100u64 {
454 linear_bare.run(&mut world, black_box(i));
455 }
456 let end = rdtsc_end();
457 samples.push(end.wrapping_sub(start));
458 }
459 samples.sort_unstable();
460 println!(
461 "{:<44} {:>8} {:>8} {:>8}",
462 "linear bare (100 calls)",
463 percentile(&samples, 50.0),
464 percentile(&samples, 99.0),
465 percentile(&samples, 99.9),
466 );
467 }
468
469 // Batch Res<T>: fill + run
470 {
471 for _ in 0..WARMUP {
472 batch_res.input_mut().extend_from_slice(&items_100);
473 batch_res.run(&mut world);
474 }
475 let mut samples = Vec::with_capacity(ITERATIONS);
476 for _ in 0..ITERATIONS {
477 batch_res.input_mut().extend_from_slice(&items_100);
478 let start = rdtsc_start();
479 batch_res.run(&mut world);
480 let end = rdtsc_end();
481 samples.push(end.wrapping_sub(start));
482 }
483 samples.sort_unstable();
484 println!(
485 "{:<44} {:>8} {:>8} {:>8}",
486 "batch Res<T> (100 items)",
487 percentile(&samples, 50.0),
488 percentile(&samples, 99.0),
489 percentile(&samples, 99.9),
490 );
491 }
492
493 // Linear Res<T>: 100 individual calls (same chain)
494 {
495 for _ in 0..WARMUP {
496 for i in 0..100u64 {
497 linear_res.run(&mut world, black_box(i));
498 }
499 }
500 let mut samples = Vec::with_capacity(ITERATIONS);
501 for _ in 0..ITERATIONS {
502 let start = rdtsc_start();
503 for i in 0..100u64 {
504 linear_res.run(&mut world, black_box(i));
505 }
506 let end = rdtsc_end();
507 samples.push(end.wrapping_sub(start));
508 }
509 samples.sort_unstable();
510 println!(
511 "{:<44} {:>8} {:>8} {:>8}",
512 "linear Res<T> (100 calls)",
513 percentile(&samples, 50.0),
514 percentile(&samples, 99.0),
515 percentile(&samples, 99.9),
516 );
517 }
518
519 // --- inputs_changed cost ---
520
521 println!();
522 print_header("inputs_changed Latency (cycles)");
523
524 // Build a world with enough resources for 8-param handlers.
525 let mut ic_wb = WorldBuilder::new();
526 ic_wb.register::<u64>(0);
527 ic_wb.register::<u32>(0);
528 ic_wb.register::<bool>(false);
529 ic_wb.register::<f64>(0.0);
530 ic_wb.register::<i64>(0);
531 ic_wb.register::<i32>(0);
532 ic_wb.register::<u8>(0);
533 ic_wb.register::<u16>(0);
534 let mut ic_world = ic_wb.build();
535 let ic_r = ic_world.registry_mut();
536
537 let ic1 = ic_1p.into_handler(ic_r);
538 let ic2 = ic_2p.into_handler(ic_r);
539 let ic4 = ic_4p.into_handler(ic_r);
540 let ic8 = ic_8p.into_handler(ic_r);
541
542 // Tick 0: all changed (changed_at == current_sequence).
543 bench_batched("inputs_changed 1-param (changed)", || {
544 if ic1.inputs_changed(&ic_world) { 1 } else { 0 }
545 });
546
547 bench_batched("inputs_changed 2-param (changed)", || {
548 if ic2.inputs_changed(&ic_world) { 1 } else { 0 }
549 });
550
551 bench_batched("inputs_changed 4-param (changed)", || {
552 if ic4.inputs_changed(&ic_world) { 1 } else { 0 }
553 });
554
555 bench_batched("inputs_changed 8-param (changed)", || {
556 if ic8.inputs_changed(&ic_world) { 1 } else { 0 }
557 });
558
559 // Advance tick so inputs are stale.
560 ic_world.next_sequence();
561
562 bench_batched("inputs_changed 1-param (stale)", || {
563 if ic1.inputs_changed(&ic_world) { 1 } else { 0 }
564 });
565
566 bench_batched("inputs_changed 2-param (stale)", || {
567 if ic2.inputs_changed(&ic_world) { 1 } else { 0 }
568 });
569
570 bench_batched("inputs_changed 4-param (stale)", || {
571 if ic4.inputs_changed(&ic_world) { 1 } else { 0 }
572 });
573
574 bench_batched("inputs_changed 8-param (stale)", || {
575 if ic8.inputs_changed(&ic_world) { 1 } else { 0 }
576 });
577
578 println!();
579}