# `stream/` — linear Source/Flow/Sink DSL + Runtime
> Context for a future agent working in this module. Read the root `CLAUDE.md`
> (architecture, performance + Akka-mirroring conventions) and
> `roadmap/M1-v0.1.0-foundation.md` (execution model) first — this file does not
> duplicate them, it tells you where things live and what you must not break.
## Purpose
This is Datum's **primary public surface**: the linear push-stream DSL
(`Source` → `Flow` → `Sink`) and the runtime that materializes it. It mirrors
**Akka/Pekko Streams Typed**. The `graph` module is the separate Akka-GraphDSL
layer; `actor`, `io`, `dynamic`, `queue` build on the types defined here.
Everything is re-exported flat from `crate::stream` (see `mod.rs` bottom) and
then from `lib.rs` / `lib.rs::prelude`.
## Key public types & where they live
- `mod.rs` — module root. Owns the re-export block, the `BoxStream<T>` /
`PureTransform` / `RuntimeTransform` type aliases, the `SourceHints`/`FlowHints`
optimization-hint system, fan-in stream combinators (`merge_*`/`zip_*`), the
spin-then-park constants, and the large `#[cfg(test)] mod tests` (≈L914–end).
- `source.rs` — `Source<Out, Mat = NotUsed>`, plus `NotUsed`, `Keep`, `Demand`,
`PushOutlet`, `MaybeHandle`, `IntoSource`, `SourceCombineStrategy`. All the
constructors (`empty`/`single`/`repeat`/`from_iter`/`from_iterable`/`failed`/
`maybe`/`future`/`future_source`/`unfold`/`cycle`/`tick`/`lazy_*`) and the
terminal runners (`run_with`, `run_fold`/`run_foreach`/`run_for_each`/
`run_reduce`/`run_collect`, `to`/`to_mat`).
- `sink.rs` — `Sink<In, Mat>`, `RunnableGraph<Mat>`, `SinkCombineStrategy`. Sink
constructors (`ignore`/`head`/`head_option`/`last`/`fold`/`reduce`/`foreach`/
`collect`/`cancelled`/`never`/…). `RunnableGraph::run()` is the execution
boundary.
- `flow.rs` — `Flow<In, Out, Mat = NotUsed>`, `BidiFlow`, and **the operator set**
(~9k lines): element ops, async ops, substreams (`flat_map_*`, `group_by`,
`split_*`, `prefix_and_tail`), fan-in (`concat`/`merge_*`/`zip_*`/`interleave`),
side-effects (`also_to`/`divert_to`/`wire_tap`/`monitor`/`watch_termination`),
and error ops (`recover`/`recover_with`/`map_error`/`on_error_complete`).
- `rate.rs` — `OverflowStrategy`, `AggregateTimer`; `buffer`/`conflate`/`batch`/
`expand`/`detach`/`aggregate_with_boundary` on both `Flow` and `Source`.
- `time.rs` — `ThrottleMode`, `DelayOverflowStrategy`; `throttle`/`delay`/
`*_within`/`*_timeout`/`keep_alive`/`Source::tick`.
- `restart.rs` — `RestartSettings`, `RestartSource`/`RestartFlow`/`RestartSink`,
`RetryFlow` (backoff supervision wrappers).
- `error.rs` — `StreamError`, `StreamResult<T>`, `Supervision`,
`SupervisionDecider`, `SupervisionDirective`.
- `completion.rs` — `StreamCompletion<T>` (the materialized handle) and
`Cancellable`.
- `runtime.rs` — `Runtime` (alias `Materializer`), the `StreamExecutor` thread
pool, and `runtime_checked_stream` (per-element shutdown/cancel wrapper).
- `timer.rs` — `TimerDriver`: single-thread min-heap timer (private; reached via
`Runtime::schedule_*`).
- `poller.rs` — `BlockingPoller`: spin-then-park driver for blocking `wait()`.
- `async_boundary.rs` — Ractor-actor-backed linear `async_boundary()` handoff.
## Invariants you must preserve
- **Blueprint vs. materialization (hard rule).** Building a `Source`/`Flow`/`Sink`/
`RunnableGraph` must have **zero side effects** — it only assembles factory
closures. Execution starts *only* at `run`/`run_with`/`materialize`/
`RunnableGraph::run`. `Source::lazy_*`, `future_*`, and `unfold` look eager but
defer all work into the factory; keep new constructors side-effect-free.
- **`StreamCompletion<T>` is `#[must_use]` and cancels on drop.** Dropping a
spawned completion cancels its running stream (`completion.rs` `Drop`). Never
silently drop one in library code that should keep running.
- **Failures are values, not panics.** Everything fallible returns
`StreamResult<T>`. User-closure panics are caught (`catch_unwind`) and surface
as `StreamError::Failed`/`AbruptTermination`; a panicking `SupervisionDecider`
is treated as `Stop` (`error.rs`).
- **`RunnableGraph::run()` builds a fresh `Materializer::new()` per call** — so
reusing one blueprint yields independent runtimes/thread-pools. Use
`run_with_materializer` to share a `Runtime`.
## Performance-sensitive hot paths — do NOT casually refactor
- **The fused linear path is pull-based.** A synchronous operator chain compiles
to a single `Box<dyn Iterator<Item = StreamResult<T>>>` (`BoxStream`) with no
per-element boxing or inter-stage queues. The push vocabulary (`PushOutlet`,
`Demand`) is the *logical* protocol; the iterator is the *implementation*. Keep
new sync operators on the `Flow::from_transform`/`PureTransform` path so they
stay fusible.
- **`SourceHints`/`FlowHints` + the terminal/inline fast paths** (`mod.rs`,
`source.rs::run_with`, `sink.rs` `fold_fp`/`try_register*`). These let proven
finite synchronous micro-sources skip the spawn + per-element checked wrapper
and drive a terminal fold/collect inline. They are correctness-gated (any
non-trivial flow clears `inline_micro`). Changing the hint propagation or the
`try_register_*` dispatch can silently regress materialization benchmarks —
re-bench `materialization` + `source_flow`.
- **Spin-then-park constants** (`STREAM_READY_SPINS`, `STREAM_SPIN_BACKOFF`,
`STREAM_MAX_PARK` in `mod.rs`, used by `poller.rs`). Load-bearing and chosen
against benchmarks; the park budget is deliberately deadline-independent. See
CLAUDE.md "Spin-then-park". Do not retune without before/after wall **and** CPU.
- **`StreamExecutor` (`runtime.rs`) grows on demand and must stay that way.** A
fixed-size pool would deadlock once `num_cpus` long-lived streams (e.g.
`Sink::never`, infinite `map_async`) are live. Workers park, reap after 10s
idle, and a spawn failure falls back to running inline.
- **`TimerDriver` (`timer.rs`)** is one min-heap thread for the whole `Runtime`
(replaced the old thread-per-timer). Firing dispatches onto the `StreamExecutor`.
## Akka mapping & Rust-native deviations
- `Source`/`Flow`/`Sink`/`RunnableGraph` ↔ Akka's same names; `Materializer` is an
alias for `Runtime`. `Keep::{left,right,both,none}` ↔ Akka `Keep`; `via`/`to`
keep left, `run_with` keeps the opposite side.
- Rust-native renames: Akka `collect` → `filter_map`; `filterNot` →
`filter_not`. Failures use `Result`, not the JVM exception model. Reactive
Streams interop is **out of scope** (don't add `Publisher`/`Subscriber`).
- **M8 ergonomics (already on `main`):** `try_map`/`try_filter`/`try_filter_map`/
`try_map_concat`/`try_stateful_map`/`try_stateful_map_concat`/`try_scan`/
`try_fold`/`try_reduce` are the current fallible operators; the old `*_result`
names are `#[deprecated]` aliases (flow.rs). `*_result_with_supervision`
variants are **unchanged** (not deprecated). `From<Vec>`/`From<[T;N]>`/
`FromIterator`/`IntoSource::into_source` and `run_fold`/`run_foreach`/
`run_for_each`/`run_reduce` are the source sugar.
## Gotchas / footguns
- Rate/time operators are **rate-dependent**: in a fully fused synchronous chain
where producer == consumer speed, `conflate`/`batch`/`expand` pass elements
through one at a time — coalescing only happens across a real speed gap (async
boundary, slow downstream). Tests must introduce that gap.
- `recover(|_| None)` **re-emits the original error** (stream fails);
`on_error_complete()` swallows it and completes. They are not equivalent.
- `async_boundary.rs` deliberately uses `block_on` + `blocking_recv` (on a scoped
thread when already inside a Tokio runtime) and lazily starts a *multi-thread*
Tokio runtime. This is the in-process boundary, **not** a network carrier — the
"no latent blocking IO" rule in CLAUDE.md applies to `datum-net` carriers, not
here. Don't "fix" it into the carrier model.
- Closures on most operators are `Send + Sync + 'static` because the fused chain
may run on a different worker thread than the caller.
## Tests & benches
- **Inline unit tests:** `mod.rs` (large, ≈L914+), `sink.rs` (L1387), `rate.rs`
(L1162), `time.rs` (L1997), `restart.rs` (L863); `flow.rs` has focused modules
`flat_map_merge_ready_ring_tests`, `inline_micro_source_tests`,
`split_sink_fast_path_tests` (L8027+). `source.rs` has only a test helper.
- **Doc snippets** for the pages this module owns live in
`crates/datum-core/tests/docs/*.rs` (compiled via `tests/docs/main.rs`); the
VitePress `<<< @/…#anchor` includes point at them, so they are compile-checked.
- **Benches** (Criterion, `harness = false`): `source_flow`, `materialization`,
`queues`, `substreams`, `push_baseline` are the ones that exercise this module.
Result tables: `roadmap/benchmarks/{source-flow,materialization,queues,substreams}.md`.
- Run: `cargo test <substring>`; `cargo bench --bench source_flow`.