datum-core 0.9.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
# `stream/` — linear Source/Flow/Sink DSL + Runtime

> Context for a future agent working in this module. Read the root `CLAUDE.md`
> (architecture, performance + Akka-mirroring conventions) and
> `roadmap/M1-v0.1.0-foundation.md` (execution model) first — this file does not
> duplicate them, it tells you where things live and what you must not break.

## Purpose

This is Datum's **primary public surface**: the linear push-stream DSL
(`Source` → `Flow` → `Sink`) and the runtime that materializes it. It mirrors
**Akka/Pekko Streams Typed**. The `graph` module is the separate Akka-GraphDSL
layer; `actor`, `io`, `dynamic`, `queue` build on the types defined here.
Everything is re-exported flat from `crate::stream` (see `mod.rs` bottom) and
then from `lib.rs` / `lib.rs::prelude`.

## Key public types & where they live

- `mod.rs` — module root. Owns the re-export block, the `BoxStream<T>` /
  `PureTransform` / `RuntimeTransform` type aliases, the `SourceHints`/`FlowHints`
  optimization-hint system, fan-in stream combinators (`merge_*`/`zip_*`), the
  spin-then-park constants, and the large `#[cfg(test)] mod tests` (≈L914–end).
- `source.rs``Source<Out, Mat = NotUsed>`, plus `NotUsed`, `Keep`, `Demand`,
  `PushOutlet`, `MaybeHandle`, `IntoSource`, `SourceCombineStrategy`. All the
  constructors (`empty`/`single`/`repeat`/`from_iter`/`from_iterable`/`failed`/
  `maybe`/`future`/`future_source`/`unfold`/`cycle`/`tick`/`lazy_*`) and the
  terminal runners (`run_with`, `run_fold`/`run_foreach`/`run_for_each`/
  `run_reduce`/`run_collect`, `to`/`to_mat`).
- `sink.rs``Sink<In, Mat>`, `RunnableGraph<Mat>`, `SinkCombineStrategy`. Sink
  constructors (`ignore`/`head`/`head_option`/`last`/`fold`/`reduce`/`foreach`/
  `collect`/`cancelled`/`never`/…). `RunnableGraph::run()` is the execution
  boundary.
- `flow.rs``Flow<In, Out, Mat = NotUsed>`, `BidiFlow`, and **the operator set**
  (~9k lines): element ops, async ops, substreams (`flat_map_*`, `group_by`,
  `split_*`, `prefix_and_tail`), fan-in (`concat`/`merge_*`/`zip_*`/`interleave`),
  side-effects (`also_to`/`divert_to`/`wire_tap`/`monitor`/`watch_termination`),
  and error ops (`recover`/`recover_with`/`map_error`/`on_error_complete`).
- `rate.rs``OverflowStrategy`, `AggregateTimer`; `buffer`/`conflate`/`batch`/
  `expand`/`detach`/`aggregate_with_boundary` on both `Flow` and `Source`.
- `time.rs``ThrottleMode`, `DelayOverflowStrategy`; `throttle`/`delay`/
  `*_within`/`*_timeout`/`keep_alive`/`Source::tick`.
- `restart.rs``RestartSettings`, `RestartSource`/`RestartFlow`/`RestartSink`,
  `RetryFlow` (backoff supervision wrappers).
- `error.rs``StreamError`, `StreamResult<T>`, `Supervision`,
  `SupervisionDecider`, `SupervisionDirective`.
- `completion.rs``StreamCompletion<T>` (the materialized handle) and
  `Cancellable`.
- `runtime.rs``Runtime` (alias `Materializer`), the `StreamExecutor` thread
  pool, and `runtime_checked_stream` (per-element shutdown/cancel wrapper).
- `timer.rs``TimerDriver`: single-thread min-heap timer (private; reached via
  `Runtime::schedule_*`).
- `poller.rs``BlockingPoller`: spin-then-park driver for blocking `wait()`.
- `async_boundary.rs` — Ractor-actor-backed linear `async_boundary()` handoff.

## Invariants you must preserve

- **Blueprint vs. materialization (hard rule).** Building a `Source`/`Flow`/`Sink`/
  `RunnableGraph` must have **zero side effects** — it only assembles factory
  closures. Execution starts *only* at `run`/`run_with`/`materialize`/
  `RunnableGraph::run`. `Source::lazy_*`, `future_*`, and `unfold` look eager but
  defer all work into the factory; keep new constructors side-effect-free.
- **`StreamCompletion<T>` is `#[must_use]` and cancels on drop.** Dropping a
  spawned completion cancels its running stream (`completion.rs` `Drop`). Never
  silently drop one in library code that should keep running.
- **Failures are values, not panics.** Everything fallible returns
  `StreamResult<T>`. User-closure panics are caught (`catch_unwind`) and surface
  as `StreamError::Failed`/`AbruptTermination`; a panicking `SupervisionDecider`
  is treated as `Stop` (`error.rs`).
- **`RunnableGraph::run()` builds a fresh `Materializer::new()` per call** — so
  reusing one blueprint yields independent runtimes/thread-pools. Use
  `run_with_materializer` to share a `Runtime`.

## Performance-sensitive hot paths — do NOT casually refactor

- **The fused linear path is pull-based.** A synchronous operator chain compiles
  to a single `Box<dyn Iterator<Item = StreamResult<T>>>` (`BoxStream`) with no
  per-element boxing or inter-stage queues. The push vocabulary (`PushOutlet`,
  `Demand`) is the *logical* protocol; the iterator is the *implementation*. Keep
  new sync operators on the `Flow::from_transform`/`PureTransform` path so they
  stay fusible.
- **`SourceHints`/`FlowHints` + the terminal/inline fast paths** (`mod.rs`,
  `source.rs::run_with`, `sink.rs` `fold_fp`/`try_register*`). These let proven
  finite synchronous micro-sources skip the spawn + per-element checked wrapper
  and drive a terminal fold/collect inline. They are correctness-gated (any
  non-trivial flow clears `inline_micro`). Changing the hint propagation or the
  `try_register_*` dispatch can silently regress materialization benchmarks —
  re-bench `materialization` + `source_flow`.
- **Spin-then-park constants** (`STREAM_READY_SPINS`, `STREAM_SPIN_BACKOFF`,
  `STREAM_MAX_PARK` in `mod.rs`, used by `poller.rs`). Load-bearing and chosen
  against benchmarks; the park budget is deliberately deadline-independent. See
  CLAUDE.md "Spin-then-park". Do not retune without before/after wall **and** CPU.
- **`StreamExecutor` (`runtime.rs`) grows on demand and must stay that way.** A
  fixed-size pool would deadlock once `num_cpus` long-lived streams (e.g.
  `Sink::never`, infinite `map_async`) are live. Workers park, reap after 10s
  idle, and a spawn failure falls back to running inline.
- **`TimerDriver` (`timer.rs`)** is one min-heap thread for the whole `Runtime`
  (replaced the old thread-per-timer). Firing dispatches onto the `StreamExecutor`.

## Akka mapping & Rust-native deviations

- `Source`/`Flow`/`Sink`/`RunnableGraph` ↔ Akka's same names; `Materializer` is an
  alias for `Runtime`. `Keep::{left,right,both,none}` ↔ Akka `Keep`; `via`/`to`
  keep left, `run_with` keeps the opposite side.
- Rust-native renames: Akka `collect``filter_map`; `filterNot`  `filter_not`. Failures use `Result`, not the JVM exception model. Reactive
  Streams interop is **out of scope** (don't add `Publisher`/`Subscriber`).
- **M8 ergonomics (already on `main`):** `try_map`/`try_filter`/`try_filter_map`/
  `try_map_concat`/`try_stateful_map`/`try_stateful_map_concat`/`try_scan`/
  `try_fold`/`try_reduce` are the current fallible operators; the old `*_result`
  names are `#[deprecated]` aliases (flow.rs). `*_result_with_supervision`
  variants are **unchanged** (not deprecated). `From<Vec>`/`From<[T;N]>`/
  `FromIterator`/`IntoSource::into_source` and `run_fold`/`run_foreach`/
  `run_for_each`/`run_reduce` are the source sugar.

## Gotchas / footguns

- Rate/time operators are **rate-dependent**: in a fully fused synchronous chain
  where producer == consumer speed, `conflate`/`batch`/`expand` pass elements
  through one at a time — coalescing only happens across a real speed gap (async
  boundary, slow downstream). Tests must introduce that gap.
- `recover(|_| None)` **re-emits the original error** (stream fails);
  `on_error_complete()` swallows it and completes. They are not equivalent.
- `async_boundary.rs` deliberately uses `block_on` + `blocking_recv` (on a scoped
  thread when already inside a Tokio runtime) and lazily starts a *multi-thread*
  Tokio runtime. This is the in-process boundary, **not** a network carrier — the
  "no latent blocking IO" rule in CLAUDE.md applies to `datum-net` carriers, not
  here. Don't "fix" it into the carrier model.
- Closures on most operators are `Send + Sync + 'static` because the fused chain
  may run on a different worker thread than the caller.

## Tests & benches

- **Inline unit tests:** `mod.rs` (large, ≈L914+), `sink.rs` (L1387), `rate.rs`
  (L1162), `time.rs` (L1997), `restart.rs` (L863); `flow.rs` has focused modules
  `flat_map_merge_ready_ring_tests`, `inline_micro_source_tests`,
  `split_sink_fast_path_tests` (L8027+). `source.rs` has only a test helper.
- **Doc snippets** for the pages this module owns live in
  `crates/datum-core/tests/docs/*.rs` (compiled via `tests/docs/main.rs`); the
  VitePress `<<< @/…#anchor` includes point at them, so they are compile-checked.
- **Benches** (Criterion, `harness = false`): `source_flow`, `materialization`,
  `queues`, `substreams`, `push_baseline` are the ones that exercise this module.
  Result tables: `roadmap/benchmarks/{source-flow,materialization,queues,substreams}.md`.
- Run: `cargo test <substring>`; `cargo bench --bench source_flow`.