datum-core 0.3.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## What Datum is

Datum is a Rust stream-processing library that mirrors the **Akka/Pekko Streams Typed** API shape
and behavior, built on [Ractor](https://crates.io/crates/ractor) actors and a push-based stream
vocabulary. The whole crate is `#![forbid(unsafe_code)]` (Rust edition 2024). The Akka source is
vendored as a submodule (`third_party/akka`) and used only as a behavior/API reference and as the
basis for the benchmark comparison harness — not as a runtime dependency.

The project's defining activity is **closing the performance gap with warmed Akka/Pekko on every
benchmarked path** (wall-clock *and* CPU, not just throughput) while keeping API/behavior fidelity.
Performance work is structured and documented; read `roadmap/M1-v0.1.0-foundation.md` (current state
+ optimization status) and `roadmap/M2-v0.2.0-akka-streams-parity.md` (forward plan) before touching
any hot path or adding an API.

## Commands

```sh
cargo test                       # run all tests (unit tests are inline #[cfg(test)] modules + README doctest)
cargo test <substring>           # run a single test by name substring, e.g. cargo test ordered_sum
cargo test --doc                 # run the README doctest only
cargo fmt --all -- --check       # formatting gate (CI-enforced)
cargo clippy --locked --workspace --all-targets --all-features -- -D warnings  # lint gate (CI-enforced; warnings are errors)
cargo check --locked --workspace --benches --all-features                       # benches must compile (CI-enforced)
cargo build --features cluster   # optional ractor_cluster-backed build
```

The CI gate (`.github/workflows/rust.yml`) runs two jobs. The `ci` job: `fmt --check` →
`clippy -D warnings` → `cargo test` → `cargo check --benches` →
`RUSTDOCFLAGS="-D warnings" cargo doc --no-deps --all-features --locked`, all with
`--locked --all-features`. The `docs` job: `npm --prefix docs run docs:build` (VitePress build;
also fails on dead internal links). Match both locally before proposing a change is done.

### Benchmarks

Criterion micro-benchmarks (Datum-only, `harness = false`):

```sh
cargo bench --bench push_baseline    # harness sanity baseline
cargo bench --bench source_flow      # Source/Flow operators
cargo bench --bench materialization  # graph build + materialized values + sink terminals
cargo bench --bench graph            # GraphDSL build, fused execution, junctions
cargo bench --bench actor_ask        # ActorFlow::ask throughput, latency, timeout
cargo bench --bench dynamic_streams  # KillSwitch, MergeHub, BroadcastHub, PartitionHub
cargo bench --bench streaming_io     # file IO, compression, framing, TCP
cargo bench --bench queues           # SourceQueue, BoundedSourceQueue, SinkQueue
cargo bench --bench substreams       # flat_map_concat/merge, split_when/after, group_by
```

### Crate, docs site, and release

The crate publishes to crates.io as **`datum-core`** (import stays `use datum::…` via `[lib] name = "datum"`).
Docs site commands (Node ≥18; VitePress; run from repo root):

```sh
make docs-serve   # live-reload dev server at http://localhost:5173
make docs-build   # production build → docs/.vitepress/dist/
```

Tag-triggered workflows (push a `v*` tag to fire both):
- `deploy-docs.yml` — triggers the Cloudflare Pages deploy hook
- `publish-crate.yml` — runs `cargo publish` for `datum-core`

Both require repository secrets (`CARGO_REGISTRY_TOKEN`, `CLOUDFLARE_DEPLOY_HOOK_URL`).
The maintainer cuts the release tag; do not push `v*` tags from automated work.

Datum-vs-Akka comparison tables (`benches/{source_flow,materialization,graph,actor_ask}_compare/run.sh`)
build a Datum release runner (`src/bin/<area>_compare.rs`) and the matching Akka JMH benchmark
(`benches/akka-jmh/`, classes `SourceFlowBenchmark` / `MaterializationBenchmark` / `GraphBenchmark`
/ `ActorAskBenchmark`), then render a shared table.
**These require a JDK + `sbt`** in addition to cargo. Akka runs under fair JMH warmup
(`-wi 5 -i 5 -w 1s -r 1s`); the Datum side adds a `Datum CPU us/op` column from `/proc/self/stat`
(process CPU time, distinct from wall-clock — see "Performance conventions" below).

## Architecture

Three modules under `src/`, re-exported flat from `src/lib.rs`:

- **`stream.rs`** — the linear Source/Flow/Sink DSL and the runtime. `Source<Out, Mat>`,
  `Flow<In, Out, Mat>`, `Sink<In, Mat>`, `RunnableGraph<Mat>`, plus the `Runtime`/`Materializer`
  (they are the same type; `Materializer` is an alias). This is the primary public surface.
- **`graph.rs`** — the Akka-style GraphDSL layer: typed ports (`Inlet`/`Outlet`), `Shape`s,
  junctions (`Broadcast`, `Balance`, `Merge`, `MergePreferred`, `MergePrioritized`, `Zip`),
  `GraphStage`/`GraphStageLogic` with the push/pull handler protocol
  (`pull`/`grab`/`push`/`offer`/`complete`/`fail`), `GraphBuilder`/`GraphDsl`, and the fused
  executor. Wiring is **validated at runtime**, not encoded in the type system (deliberate — see
  conventions).
- **`actor.rs`** — Ractor interop. `ActorFlow::ask` turns each stream element into a request/reply
  actor message; `ReplyPort`/`ReplyState` is Datum's pooled one-shot reply handle that hides
  Ractor's RPC port type.

### Blueprint vs. materialization (a hard rule)

Building a `Source`/`Flow`/`Sink`/`Graph` constructs an immutable **blueprint** and starts nothing.
Execution begins only at materialization (`run`, `run_with`, `materialize`, `GraphDsl::run_*`),
which returns materialized handles (e.g. `StreamCompletion<T>`) immediately. Preserve this
distinction in any new API — construction must have no side effects.

### Execution model (where performance lives)

- The `Runtime` owns a small **thread pool** (`StreamExecutor` in `stream.rs`): workers are spawned
  on demand, parked when idle, reaped after a 10s idle timeout, and a spawn failure falls back to
  running the job inline. It also owns shutdown and the timer wheel
  (`schedule_once`/`schedule_with_fixed_delay`/`schedule_at_fixed_rate`).
- **Linear fused path:** a chain of synchronous operators compiles to a pulled
  `Box<dyn Iterator<Item = StreamResult<T>>>` (`BoxStream`). Internally pull-based even though the
  public vocabulary (`PushOutlet`, `Demand`) is push-shaped.
- **Graph fused executor:** two tiers. The **typed-linear fast path** (`typed_linear_plan` in
  `graph.rs`) is monomorphized and avoids boxing — it is 16–46x Akka. Graphs it can't specialize
  fall back to the **erased executor** over `Box<dyn DatumElement>` (one allocation per element),
  which is the main known slow path (~0.5–0.7x Akka). Widening typed-path applicability is the top
  open optimization (see the optimization tiers in `roadmap/M1-v0.1.0-foundation.md`).
- **Spin-then-park:** the actor ask path (`actor.rs`) and the stream poller (`stream.rs`
  `BlockingPoller`) spin briefly to catch fast completions, then `park_timeout` and are woken by the
  reply gate / `FuturesUnordered` completion. Tuning constants (`STREAM_READY_SPINS`,
  `ASK_IDLE_YIELDS`, `ASK_MAX_PARK`, etc.) are load-bearing and were chosen against benchmarks — the
  park budget is deliberately deadline-*independent* (changing it has regressed `timeout_*` CPU by
  ~4x in the past).
- **Tokio-first async:** Tokio is the async foundation (Ractor runs on it; `tokio` is a hard
  dependency). New async work — futures interop and real `map_async` parallelism — dispatches onto
  Tokio rather than a `std`/`block_on` pool (`roadmap/M2-v0.2.0-akka-streams-parity.md` WP-6);
  `std`-future support is an optional portability surface. The pure-synchronous fused path uses no
  async runtime.

## Akka-mirroring conventions

These come from `roadmap/M1-v0.1.0-foundation.md` and govern API decisions:

- **Every mirrored API area must land with a benchmark before it counts as ported** — covering
  construction, materialization, steady-state throughput, backpressure, and failure/cancellation
  where the area exposes them.
- **Runtime-checked correctness before compile-time guarantees.** Prefer a runtime-validated state
  machine that matches Akka behavior over encoding every illegal graph in the type system; add
  stronger type-level guarantees later, once behavior is stable.
- **Rust-native names where Scala doesn't translate** (e.g. `filter_map` for Akka's `collect`).
  Failures flow through `Result<T, StreamError>`, not panics.
- Reactive Streams publisher/subscriber interop is **out of scope** — don't mirror Akka's RS bridge
  APIs.
- Benchmarks and the roadmap are organized into four **areas** — Source/Flow, materialization,
  graph/junctions, and actor ask — each with a `*_compare` runner and a result table under
  `roadmap/benchmarks/` (`source-flow.md`, `materialization.md`, `graph.md`, `actor-ask.md`).

## Performance conventions

- The bar is parity-or-better with warmed Akka on **both wall-clock and CPU**. The `Datum CPU us/op`
  column matters: a path can win wall-clock by busy-spinning while Akka parks — that's a real cost,
  report it, don't hide it. (The `ordered_sum` ask path inherently spends ~2x CPU vs wall to hit a
  sub-µs latency target; forcing earlier parking measured ~18x worse wall-clock.)
- When recording a benchmark result, capture **before/after** numbers in the relevant
  `roadmap/benchmarks/<area>.md`, include the CPU column, and confirm no previously-winning row
  regressed. Don't overstate a cost as "inherent" — name the lever that could still move it.
- Some allocation is pinned upstream: Ractor 0.15.13 boxes `process_message` once per delivered
  message (~832 B/elem), which dominates `ordered_sum` allocation and can't be fixed cleanly through
  the public `ActorRef<Msg>` API (item #8 — blocked on an upstream Ractor change).

## Reference docs

All planning and the benchmark record live in [`roadmap/`](roadmap/README.md) (`docs/` is the
published documentation site — VitePress; `make docs-build`; deployed to Cloudflare Pages on a `v*` tag).

- `roadmap/README.md` — milestone index (M1 v0.1.0 · M2 v0.2.0 · M3 v0.3.0).
- `roadmap/M1-v0.1.0-foundation.md` — what exists today: delivered API surface, architecture,
  optimization status, and the Akka coverage matrix + apples-to-apples caveats. **Read before
  optimizing.**
- `roadmap/M2-v0.2.0-akka-streams-parity.md` — the forward roadmap: every Akka Streams topic and
  operator mapped to a status/plan/rationale, grouped into benchmark-gated work packages. **Read
  before adding an API.**
- `roadmap/M3-v0.3.0-docs-and-site.md` — the documentation-site plan.
- `roadmap/benchmarks/{source-flow,materialization,graph,actor-ask}.md` — per-area result tables.
- `benches/README.md` — what each benchmark covers.