nexus-rt 2.2.0

Single-threaded, event-driven runtime primitives with pre-resolved dispatch
Documentation
# DAGs — Data-Flow Graphs with Fan-Out and Merge

A `Dag` (directed acyclic graph) is a pipeline that can fan out into
multiple branches, process each branch independently, and merge the
results back. Use a DAG when a single input produces multiple outputs
that need to be computed and combined.

**Use Pipeline when:** the flow is linear — stage 1 → stage 2 → stage 3.

**Use DAG when:** one input feeds multiple branches that must all complete
before a downstream step runs.

---

## Mental Model

```
         ┌── arm_a: parse_trade → update_trade_stats ──┐
event ───┤                                             ├── join → output
         └── arm_b: parse_quote → update_book ────────┘
```

Each arm is a sub-pipeline. The arms run in parallel (conceptually — they
still run on one thread). The `join` step receives references to each arm's
output and produces a single combined value.

---

## Minimal DAG

```rust
use nexus_rt::{DagBuilder, Res, ResMut, WorldBuilder, Handler};

#[derive(nexus_rt::Resource, Default)]
struct Stats {
    count: u64,
    sum: i64,
}

// Root step: takes the event by value, returns the value to propagate
fn root(event: i64) -> i64 {
    event
}

// Arm A: compute double
fn double(x: &i64) -> i64 {
    x * 2
}

// Arm B: compute square
fn square(x: &i64) -> i64 {
    x * x
}

// Merge: takes references to each arm's output, returns combined
fn merge(mut stats: ResMut<Stats>, doubled: &i64, squared: &i64) -> () {
    stats.count += 1;
    stats.sum += *doubled + *squared;
}

fn main() {
    let mut wb = WorldBuilder::new();
    wb.register(Stats::default());
    let mut world = wb.build();

    let mut dag = DagBuilder::<i64>::new()
        .root(root, world.registry())
        .fork()
            .arm(|seed| seed.then(double, world.registry()))
            .arm(|seed| seed.then(square, world.registry()))
        .join(merge, world.registry())
        .build();

    dag.run(&mut world, 5);
    // Stats: count=1, sum = 10 (double) + 25 (square) = 35
}
```

---

## When to Use DAG

### Fan-out with aggregation

```
market_data_update
    ├── update_order_book (arm)
    ├── update_vwap        (arm)
    └── update_spread      (arm)
    publish_analytics(book, vwap, spread)
```

One input (market data tick), three independent computations, one
downstream step that sees all three results.

### Parallel validation

```
order
    ├── check_risk_limits   (arm)
    ├── check_credit        (arm)
    └── check_compliance    (arm)
    accept_or_reject(risk_ok, credit_ok, compliance_ok)
```

All three checks run. The merge step decides based on all results.

### Dependent computations that share input

A pipeline would force you to pick an order: risk THEN credit THEN
compliance. A DAG expresses that they're independent — any order works,
and conceptually they happen "together."

---

## Root vs Then

The first step in a DAG is special:

```rust
.root(f, registry)   // f takes the event BY VALUE
.then(f, registry)   // f takes &previous_output
```

Root consumes the event. Subsequent `.then` steps take references to the
previous output. This mirrors how data flows: the root owns the event,
everything downstream borrows from the chain.

---

## Arms

An arm is a branch that forks from the main chain:

```rust
.fork()
    .arm(|seed| {
        seed
            .then(parse, reg)
            .then(validate, reg)
            .then(enrich, reg)
    })
    .arm(|seed| seed.then(shortcut, reg))
.join(merge, reg)
```

The `seed` is a starting point that takes `&In` (the fork's input type).
Inside the closure, you build the arm with the same combinators as a
Pipeline: `.then`, `.guard`, `.tap`, `.scan`, etc.

Each arm produces its own output type. The merge function sees all arm
outputs by reference.

---

## Merge Function Signature

```rust
fn merge(
    // Params first
    config: Res<Config>,
    mut state: ResMut<State>,
    // Then references to each arm's output
    arm0: &ArmAOutput,
    arm1: &ArmBOutput,
    arm2: &ArmCOutput,
) -> FinalOutput {
    // Compute from all arms
}
```

Params come first, arm outputs last (by reference). The return type is
the DAG's new output — subsequent `.then` steps see this.

---

## Nested Forks

Arms can fork further:

```rust
.root(root, reg)
.fork()
    .arm(|seed| {
        seed
            .then(parse, reg)
            .fork()
                .arm(|s| s.then(sub_a, reg))
                .arm(|s| s.then(sub_b, reg))
            .join(merge_inner, reg)
    })
    .arm(|seed| seed.then(other, reg))
.join(merge_outer, reg)
```

The nested fork produces a single output (from `merge_inner`) that feeds
back into the outer arm's chain. The outer merge sees the outer arm
outputs.

---

## Complete Example: Market Data Analytics

```rust
use nexus_rt::{DagBuilder, Res, ResMut, WorldBuilder, Handler};

#[derive(Clone, Copy)]
struct MarketTick {
    symbol_id: u32,
    bid: f64,
    ask: f64,
    bid_size: u32,
    ask_size: u32,
}

#[derive(nexus_rt::Resource, Default)]
struct BookState {
    last_mid: f64,
}

#[derive(nexus_rt::Resource, Default)]
struct SpreadStats {
    ewma: f64,
}

#[derive(nexus_rt::Resource, Default)]
struct VwapAccumulator {
    pv: f64,
    volume: u64,
}

// Root: consume the tick, pass it through
fn receive_tick(tick: MarketTick) -> MarketTick {
    tick
}

// Arm 1: compute mid price
fn compute_mid(tick: &MarketTick) -> f64 {
    (tick.bid + tick.ask) / 2.0
}

// Arm 2: compute spread
fn compute_spread(tick: &MarketTick) -> f64 {
    tick.ask - tick.bid
}

// Arm 3: compute size-weighted mid (vwap contribution)
fn compute_vwap_contrib(tick: &MarketTick) -> (f64, u64) {
    let size = (tick.bid_size + tick.ask_size) as u64;
    let pv = (tick.bid + tick.ask) / 2.0 * size as f64;
    (pv, size)
}

// Merge: update all stats
fn update_analytics(
    mut book: ResMut<BookState>,
    mut spread: ResMut<SpreadStats>,
    mut vwap: ResMut<VwapAccumulator>,
    mid: &f64,
    sprd: &f64,
    vwap_contrib: &(f64, u64),
) {
    book.last_mid = *mid;
    spread.ewma = 0.9 * spread.ewma + 0.1 * sprd;
    vwap.pv += vwap_contrib.0;
    vwap.volume += vwap_contrib.1;
}

fn build_analytics_dag(world: &WorldBuilder) -> impl Handler<MarketTick> {
    DagBuilder::<MarketTick>::new()
        .root(receive_tick, world.registry())
        .fork()
            .arm(|seed| seed.then(compute_mid, world.registry()))
            .arm(|seed| seed.then(compute_spread, world.registry()))
            .arm(|seed| seed.then(compute_vwap_contrib, world.registry()))
        .join(update_analytics, world.registry())
        .build()
}
```

---

## Batch Processing

Like Pipeline, DAG supports batch mode with pre-allocated input buffers:

```rust
let mut dag = DagBuilder::<MarketTick>::new()
    .root(receive_tick, reg)
    .fork()
        // ... arms
    .join(merge, reg)
    .build_batch(1024);  // pre-allocate for 1024 ticks per batch
```

Use `build_batch` when you process ticks in bursts (e.g., draining a
network buffer with many messages).

---

## Performance

A DAG compiles to a single monomorphized function — just like Pipeline.
Every arm, every combinator, every merge is inlined. No vtable, no Box,
no dynamic dispatch. The `use<>` capture rules from Rust 2024 apply: if
you return `impl Handler<E>` from a factory, add `+ use<Params, Chain>`
to exclude the registry borrow.

---

## DAG vs Pipeline: Decision Guide

| Question | Use |
|----------|-----|
| Is the flow strictly linear? | Pipeline |
| Do multiple computations need the same input? | DAG |
| Does a downstream step need results from multiple branches? | DAG |
| Are branches independent (no shared intermediate state)? | DAG |
| Is one branch just a side effect (log, metric)? | Pipeline with `.tap()` |
| Do you need to route based on a condition (if/else)? | Pipeline with `.route()` |
| Do you need fan-out to multiple distinct handlers? | Pipeline with `.dispatch()` or DAG |

---

## Need Per-Instance State?

This document covers `DagBuilder` — DAGs composed from `Handler`-style
functions where state lives in the World.

If each arm needs access to per-instance context (per-instrument state,
per-session counters, per-connection metadata), use `CtxDagBuilder`
instead. It's the parallel API for callbacks, threading `&mut C` through
every arm and the merge function. See
[callbacks.md — Callback DAGs](callbacks.md#callback-dags-ctxdag) for the
full guide.

Same fork/arm/join structure, same builder pattern, same monomorphization
— just with a context parameter threaded through every step.

## See Also

- [pipelines.md]pipelines.md — Linear processing chains
- [handlers.md]handlers.md — Writing the functions used as arms and merges
- [callbacks.md]callbacks.md — When you need per-instance state, including
  CtxPipeline and CtxDag (the callback parallels of Pipeline and Dag)