# `graph/` — GraphDSL, shapes, ports, junctions, fused executor
Datum's Akka/Pekko **GraphDSL** layer: build arbitrary fan-in/fan-out topologies
(diamonds, junctions, bounded feedback cycles) that the linear `Source→Flow→Sink`
DSL can't express, and run them on a fused executor with several monomorphized
fast paths. Mirrors Akka Streams `GraphDSL`. Read the repo-root `CLAUDE.md`
("Architecture" → `graph.rs`, "Execution model") and `roadmap/M1-v0.1.0-foundation.md`
before touching any hot path.
## File map
| `mod.rs` | Module glue, re-exports, shared `Arc<str>` stage/port names, `PortId`/`PortKind`, the erased `DatumElement`/`DatumValue` element box and `downcast_datum`. Top-level `#[cfg(test)]` GraphDSL integration tests. |
| `ports.rs` | `Inlet<T>` / `Outlet<T>` (typed), `AnyInlet`/`AnyOutlet` (erased, carry `TypeId` + name), `PortRef` trait. Ports are values, not type-level wiring. |
| `shapes.rs` | `Shape` trait + concrete shapes: `SourceShape`, `SinkShape`, `FlowShape`, `FanInShape`, `FanOutShape`, `FanOutShape2`, `ZipShape`, `MergePreferredShape`, `BidiShape`, and `PortAllocator`. |
| `stage.rs` | `GraphStage` / `GraphStageLogic` (push/pull handler protocol), `InHandler`/`OutHandler`/`TimerHandler`, `StageSpec`, the private `StageKind`, emit/read helpers, `AsyncCallback`, per-stage timers. |
| `junctions.rs` | The built-in stages: `Identity`, `MapStage`, `Buffer`, `TakeWhile`, `Broadcast`, `Balance`, `Merge`, `MergePreferred`, `MergePrioritized`, `Concat`, `OrElse`, `Interleave`, `Zip`, `Unzip`, `UnzipWith`, `MergeSorted`, `MergeSequence`, `MergeLatest`, `Partition`, `AsyncBoundary`. |
| `builder.rs` | `GraphBuilder` (`add`/`connect`/`wire`/`import`), `GraphDsl` (`create`/`try_create`/`partial`), `GraphBlueprint<S>`, `PartialGraph<S>`, `FusedSegment`, `FusedExecutionConfig`. Runtime wiring validation lives here. |
| `wire.rs` | M8 method-based wiring DSL: `WireDsl` (`shape.to(&other)`, `.out(i)`, `.in_(i)`), `WireSpec`, `WirePair`, `OutletCursor`/`InletCursor`. `compile_fail` doctests pin the explicit-only shapes. |
| `executor.rs` | **9.5k lines.** The fused executor's four tiers + all `GraphBlueprint::run_*` methods. The biggest, most performance-sensitive file — see below. |
## Invariants & conventions (preserve these)
- **Blueprint vs materialization.** `GraphDsl::create`/`try_create`/`add`/`connect`/`wire`
build an immutable blueprint and start nothing. Execution begins only at a `run_*`
call. Keep construction side-effect-free.
- **Runtime-checked wiring, not type-level.** `GraphBuilder::validate_connection`
(`builder.rs`) checks port kind, `TypeId` match, and single-use of each port at
`connect`/`wire` time; `finish` checks the result shape matches the open ports.
Deliberate (CLAUDE.md "runtime-checked correctness before compile-time guarantees").
Do not try to push these into the type system.
- **Executor tiers + `ExecutorMode::Auto`.** `executor.rs:24` defines `Auto` (default,
public path), `ErasedOnly`, `TypedOnly` (test/diagnostic). `run_with_input_report_mode`
(`executor.rs:59`) dispatches in a fixed order: typed-linear flow plan → typed
acyclic junction kernels → typed `MergeSequence` → typed `MergeLatest` → typed
cyclic feedback kernel → **erased `FusedExecutor` fallback**. The erased executor is
the correctness oracle; the typed tiers must produce identical output (equivalence
tests in `executor.rs` `mod tests` enforce this).
- **Fresh per-run state.** Typed kernels build a fresh plan per `run_*` call (no cached
mutable state, no `Mutex` on the run path) so blueprint reuse and concurrent runs are
independent. Keep it that way (`executor.rs:132` comment; test `concurrent`/`independence`).
- **Cycles are bounded.** An unproductive feedback loop surfaces
`StreamError::EventLimitExceeded { limit }` (from `FusedExecutionConfig::event_limit`,
default 100M) instead of hanging.
- **`#![forbid(unsafe_code)]`**, edition 2024, MSRV 1.88. Errors flow through
`Result<_, StreamError>`, never panics, on the public path. (Built-in stage
constructors do `assert!` on obviously-invalid args, e.g. `Broadcast::new(0)`.)
- **Port IDs are process-global**, from the `NEXT_PORT_ID` atomic (`mod.rs:137`).
Built-in stages allocate contiguous ID blocks via `next_port_id_block`; never assume
IDs start at 0 or are graph-local.
## Performance-sensitive hot paths (do NOT casually refactor)
CLAUDE.md's defining goal is parity-or-better vs warmed Akka on **wall-clock and CPU**.
These paths are benchmark-gated (`cargo bench --bench graph`, table in
`roadmap/benchmarks/graph.md`); changing them silently can regress 10–340× wins.
- **Typed-linear fast path** — `try_typed_flow_plan` / `TypedFlowPlan` / `TypedLinearPlan`
(`executor.rs` ~898–1140, ~705–985). Monomorphized Identity/Map/AsyncBoundary chains
with no per-element `DatumValue` boxing. Auto-selected first.
- **Typed acyclic junction kernels** — `try_build_typed_acyclic_junction_dispatch`
(`executor.rs:~2248`) and its sub-runners: Broadcast→Zip, Balance→Merge,
Partition→Merge, Unzip→Zip, Unzip→MergeSorted (and the `MergeSequence`/`MergeLatest`
plans). Replace the older hand-written erased fast paths.
- **Typed cyclic feedback kernel** — `try_build_typed_cyclic_feedback_dispatch`
(`executor.rs:~1990`) for the `MergePreferred→Broadcast(2)` loop, consuming
`TypedCyclicOp` (`mod.rs:64`) from `Buffer`/`TakeWhile`.
- **Erased `FusedExecutor`** (`executor.rs:~6778`) — the `Box<dyn DatumElement>` event-stack
interpreter; correctness oracle and fallback. `StageState` (`~6802`) holds per-stage
mutable state; `FusedEvent` (`~6929`) is the LIFO event stack used for cyclic graphs.
- **Ractor async-boundary path** — `BoundaryCountExecutor` (`executor.rs:~2721`), used only
by the async-boundary benchmark; bounded handoff queues, `buffer_size > 0` required.
## Akka mapping & Rust-native deviations
- Junctions, `GraphStage`/`GraphStageLogic`, the push/pull/`grab`/`offer`/`emit`/`read`
handler protocol, shapes, and partial graphs all mirror Akka 1:1 in spirit.
- **`MergePrioritized` is a deterministic weighted *schedule*, not Akka's randomized
selection.** Each input appears `weight` times in a round-robin cycle
(`typed_prioritized_fan_in_schedule`, `executor.rs:~5984`). Document/benchmark as such.
- Wiring is validated at runtime (see invariants), unlike Akka's compile-checked shapes.
- Reactive Streams publisher/subscriber interop is **out of scope** (CLAUDE.md).
## Gotchas / footguns
- **Typed cyclic feedback requires the output branch on `Broadcast.outlet(0)` and the
feedback branch on `outlet(1)`** (`executor.rs:2070`). The reversed orientation is
still *correct* — it just falls back to the slower erased interpreter, matching the
oracle's LIFO emission order. Not a bug; explains a surprise perf cliff.
- **`builder.wire(spec)` defers wiring errors** to graph creation (collected in
`errors`), while **`try_wire(spec)?` fails fast**. `connect`/`connect_any` also return
`StreamResult` immediately. `wire`/`connect` are both first-class and mix freely.
- **`MergeLatest`/`MergeSequence`/`Partition` typed kernels are gated to a bounded
element-type set** (~17 common `T`); other `T` returns `None` from the dispatch and
falls back to the erased executor (Auto) or errors (TypedOnly). Custom types still run
correctly — just on the erased path.
- **`StageSpec::typed_cyclic`** (and `with_typed_cyclic`) is read **only** by the typed
cyclic planner. The erased executor ignores it and runs the full `GraphStageLogic`. A
built-in stage that wants the typed cyclic kernel must surface `TypedCyclicOp`.
- `GraphBuilder::add` is `#[must_use]` and returns the stage's **shape** (use it to grab
ports), not a handle.
- `MergePreferredShape` and `BidiShape` are **explicit-only** in the wiring DSL — no
`AutoOutletEndpoint`/`AutoInletEndpoint` impls; use `.preferred()`/`.secondary(i)` or
`.out(i)`/`.in_(i)` cursors. `wire.rs` `compile_fail` doctests pin this.
## Tests & benches
- **Unit/equivalence tests:** inline `#[cfg(test)] mod tests` in `mod.rs:251`
(GraphDSL builds, cycles, junction semantics), `executor.rs:2748` (custom opaque
stages, typed-vs-erased equivalence, async-boundary, concurrent reuse), `wire.rs:416`
(wiring DSL equivalence to `connect`, deferred-vs-fail-fast).
- **Integration tests:** `crates/datum-core/tests/prelude_graph_wiring.rs`,
`crates/datum-core/tests/try_result_aliases.rs`, and the doc snippets under
`crates/datum-core/tests/docs/working_with_graphs.rs`.
- **Benches:** `cargo bench --bench graph`; Datum-vs-Akka comparison via
`crates/datum-core/benches/graph_compare/` + `src/bin/graph_compare.rs` (needs JDK+sbt).
Result table: `roadmap/benchmarks/graph.md`.
- **Docs pages:** `docs/guides/working-with-graphs.md`,
`docs/guides/modularity-composition.md`, `docs/guides/custom-stream-processing.md`.