datum-core 0.9.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
# `dynamic/` — kill switches & dynamic hubs

Akka-style **dynamic stream controls** and **runtime attachment points**. This module has no
runtime of its own: every type is built on the `stream` DSL (`Source`/`Flow`/`Sink`, the
`Materializer`, and `spawn_stream`). Read `../../CLAUDE.md` first for crate-wide conventions, and
`roadmap/benchmarks/dynamic-streams.md` for the authoritative perf record.

## Layout

- **`mod.rs`** — module docs + re-exports only.
- **`kill_switch.rs`**`KillSwitches` (factory), `UniqueKillSwitch`, `SharedKillSwitch`.
- **`hub.rs`**`MergeHub` (fan-in), `BroadcastHub` / `PartitionHub` (fan-out), their consumer
  sources, `MergeHubDrainingControl`, and `PartitionConsumerInfo`. ~1.7k lines; the bulk is the
  shared `FanOutHubShared` machinery behind broadcast/partition.

## Public surface (re-exported from the crate root)

| Type | File | Role |
| --- | --- | --- |
| `KillSwitches` | kill_switch.rs | Factory: `single::<T>() -> Flow<T,T,UniqueKillSwitch>`, `shared(name) -> SharedKillSwitch` |
| `UniqueKillSwitch` | kill_switch.rs | One-stream `shutdown()`/`abort(err)` handle (per materialization) |
| `SharedKillSwitch` | kill_switch.rs | `Clone` handle; `flow::<T>()` wires it into many streams sharing one on/off state |
| `MergeHub` | hub.rs | `source(buf)` / `source_with_draining(buf)``Source<T, Sink<T,NotUsed>>` many producers attach to |
| `MergeHubDrainingControl` | hub.rs | `drain_and_complete()`: stop accepting producers, complete when current ones finish |
| `BroadcastHub` | hub.rs | `sink(buf)` / `sink_starting_after(n,buf)` → every consumer gets every element |
| `PartitionHub` | hub.rs | `sink(partitioner, start_after, buf)` → each element routed to one consumer (or dropped) |
| `BroadcastHubConsumerSource<T>` / `PartitionHubConsumerSource<T>` | hub.rs | `Clone`; `.source()` returns a fresh `Source<T,NotUsed>` blueprint per attached consumer |
| `PartitionConsumerInfo` | hub.rs | Snapshot passed to the partitioner: `size()`, `consumer_ids()`, `consumer_id_by_idx()`, `queue_size(id)` |

## Invariants & conventions (preserve these)

- **Blueprint vs. materialization.** Factory methods build immutable blueprints — nothing runs until
  materialization. Exception worth knowing: capacity asserts (`per_producer_buffer_size > 0`,
  `buffer_size > 0`) fire at **blueprint-construction time** (they sit before the factory closure),
  so a zero capacity panics when you *call* the constructor, not at `run`. `KillSwitches::shared()`
  also eagerly allocates the shared state (the handle is a control object, not a running stream).
- **Pull-based terminal observation.** Kill-switch `shutdown()`/`abort()` only take effect on the
  **next downstream pull** — the `KillSwitchStream` checks the gate at the top of each `next()`. The
  gate is an `AtomicU8` (`OPEN`/`SHUTDOWN`/`ABORTED`) fronting a `Mutex<KillSwitchStatus>` that holds
  the error payload; the first signal wins (idempotent: double-shutdown, or abort-after-shutdown, is
  a no-op). Tests must `request(1)` before `expect_complete()`/`expect_error()`.
- **User closures never run under a hub lock.** The `PartitionHub` partitioner runs against an
  `ArcSwap` snapshot + a producer-local `PartitionTopologyCache` keyed by `topology_epoch`, *outside*
  the registry mutex; the recognized terminal `fold`/`collect` closures run after the drain hook
  returns. Keep it that way.
- **Hub backpressure = slowest active consumer.** Broadcast/partition producers block until every
  active lane has room (per-lane `buffer_size`). **Datum deviation:** `BroadcastHub` blocks upstream
  immediately at zero consumers; Akka pre-buffers up to `buffer_size` first.
- **Draining is one-way.** After `drain_and_complete()` (or source close), `register_producer` /
  `register_direct_producer` return `Err("merge hub is draining or closed to new producers")`.

## Hot paths — do NOT casually refactor

These were chosen against benchmarks (see CLAUDE.md "Dynamic hubs" + the roadmap file):

- **Batched, targeted wakeups, not `notify_all`/element.** Per-producer (MergeHub) and per-lane
  (FanOut) condvars; `notify_one` where possible. The `SourceRuntimeHints.inline_micro_max_success_items`
  hint selects the **batched producer** path vs. the one-element fallback for unknown/probe/blocking
  sources — both must stay correct.
- **MergeHub terminal-drain fast path.** `from_terminal_direct_materialized_factory` +
  `MergeHubTerminalHook` + `register_direct_producer` let recognized terminals (`fold`/`collect`/
  `ignore`) pull producers directly without the boxed `MergeHubSourceStream`. `prefer_direct`
  alternates direct-lane vs. queued drain so neither producer class starves. This shipped the p1
  0.35×→0.46× move; the residual p1 floor is now **core materialization cost, not hub coordination**
  (roadmap 2026-06-16 finding) — don't chase it inside this module.
- **`prune_finished_producer_completions` is off the per-element path** (it lives in register/
  store/deregister). Keep it there — putting it back per-`push_item` was the original p16 CPU blowup.
- **FanOut steady state avoids the global lock:** `ArcSwap<FanOutSnapshot>` for topology, `AtomicU64`
  producer/topology epochs, `SmallVec<[_; 16]>` inline lane storage. Batch-limit constants
  (`*_BATCH_LIMIT`, scaled by active-consumer count) and `fan_out_wait_timeout()` (1 ms park) are
  load-bearing.
- **Panic safety:** `FanOutProducer::run` installs a drop guard that `fail()`s the hub if the
  producer thread panics; MergeHub closes the source on terminal panic/error. Preserve on edits.

## Akka mapping & Rust-native deviations

Mirrors `akka.stream.{KillSwitches, scaladsl.{MergeHub, BroadcastHub, PartitionHub}}`. Deviations:
errors flow through `Result<_, StreamError>` (no exceptions); `SharedKillSwitch::flow::<T>()` is
generic per element type; the partitioner returns `isize` (a consumer id, or `-1` to drop), matching
Akka's `partitioner: (size, elem) => idx`; `BroadcastHub` zero-consumer blocking (see above).

## Tests & benches

- Unit tests: inline `#[cfg(test)]` in `kill_switch.rs` and `hub.rs` (probe-driven via `testkit`).
- Doc snippets: `crates/datum-core/tests/docs/dynamic_streams.rs` (anchors consumed by
  `docs/guides/dynamic-streams.md`).
- Benches: `cargo bench --bench dynamic_streams`; full Akka comparison
  `crates/datum-core/benches/dynamic_streams_compare/run.sh` (needs JDK + sbt) via
  `src/bin/dynamic_streams_compare.rs`. Record before/after with the CPU column in
  `roadmap/benchmarks/dynamic-streams.md`.