datum-core 0.9.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
# actor module — AGENTS.md

Context for an AI agent working in the Ractor-interop layer. Read the repo
[`CLAUDE.md`](../../../../CLAUDE.md) (architecture + performance conventions) and
`roadmap/M1-v0.1.0-foundation.md` first; this file only covers what is specific to
`crates/datum-core/src/actor.rs` + `crates/datum-core/src/actor/`.

## Purpose

Bridges Datum streams to [Ractor](https://crates.io/crates/ractor) actors and to a
transport-agnostic remote StreamRefs protocol. Everything is local/Ractor-backed by default;
the protobuf seam (`stream_ref_proto`) is what `datum-net` drives over TCP/QUIC. The whole crate
is `#![forbid(unsafe_code)]`.

## Key public types & where they live

- **`actor.rs`** (module root) — `ActorFlow` + `ActorFlow::ask`; `ReplyPort<T>` / `ReplySendError`;
  `ActorResult<T>`; re-exports `ractor::{Actor, ActorProcessingErr, ActorRef, Message}`. Internally:
  `ReplyState<T>` (the pooled one-shot reply gate), the ordered ask state machine
  (`ask_ractor_ordered`), the spin-then-park wait loop (`wait_for_ready_ask`), and
  `ractor_runtime()` / `block_on_ractor_runtime()` (the shared multi-thread Tokio runtime Ractor
  actors run on).
- **`actor/interop.rs`** — the rest of `ActorFlow` (`ask_with_status`, `ask_with_context`,
  `ask_with_status_and_context`, `watch`); `ActorSource` (`actor_ref`/`typed`/
  `actor_ref_with_backpressure`); `ActorSink` (`actor_ref`/`typed`/`*_with_backpressure`);
  `ActorPubSub` (`source`/`sink` over Ractor `pg` groups). Protocol enums: `ActorStatus<T>`,
  `ActorSourceMessage<T>`, `ActorSinkMessage<T>`, `ActorSinkBackpressureMessage<T, Ack>`,
  `WatchEvent`. Holds a second ask state machine (`*_with_pending`) — see Gotchas.
- **`actor/stream_ref.rs`**`StreamRefs` factory (`source_ref`/`sink_ref` [+`_with_settings`]),
  `SourceRef<T>` / `SinkRef<T>` (one-shot handles), `StreamRefSettings`. Local default path is a
  **direct in-memory splice** (`SourceRefDirectState`/`SinkRefDirectState`, no actors, no
  serialization). The `#[cfg(feature = "cluster")]` path adds Ractor `BytesConvertable` +
  `pg`-resolved remote endpoints + producer actors.
- **`actor/stream_ref_proto.rs`** + **`stream_ref_proto.proto`** — transport-agnostic protobuf
  StreamRefs seam (no actor transport). `StreamRefPayload` trait (+ codecs for numbers/`bool`/
  `String`/`Vec<u8>`); `StreamRefId`; protocol types `StreamRefMessage`/`StreamRefFrame`/
  `StreamRefPayloadBytes`/`StreamRefPayloadBatch`/`StreamRefOutbound`; the `StreamRefProtoEndpoint`
  trait (+ `#[doc(hidden)]` `StreamRefProtoEndpointWake` for async carriers); and the drivers
  `StreamRefProtoProducer<T>` (`from_source`/`from_source_ref`/`new_lazy`/`sink`) and
  `StreamRefProtoConsumer<T>` (`new`/`source`). This is the seam `datum-net` uses.

## Invariants & conventions (preserve these)

- **Blueprint vs materialization.** Constructors build immutable blueprints and start nothing;
  actors spawn / streams run only at materialization. Keep this for any new API.
- **`ReplyPort` is one-shot.** `send` consumes `self`; dropping without sending calls
  `drop_sender` → consumer sees `ReplyPoll::Dropped``StreamError::ActorAskResponseDropped`.
  `ReplyState` is a three-state slot (`Pending`/`Ready`/`Dropped`) + an `AtomicU8` gate
  (`REPLY_PENDING`/`READY`/`DROPPED`). `poll()` reads the gate lock-free and only takes the slot
  mutex once something is delivered — do not move work in front of that fast path.
- **Mutex poison is recovered, not propagated** (`lock_slot` / `.unwrap_or_else(into_inner)`
  everywhere). Deliberate: slots have no cross-field invariant, so recovery is sound and avoids
  escalating one user-`Drop` panic into a stream-wide crash. Don't "fix" these to `.unwrap()`.
- **Ask preserves input order** regardless of reply order: `index`/`next_to_emit` + a `completed`
  reorder buffer, with up to `parallelism` asks in flight. `ReplyState`s are recycled via
  `reply_pool` only when `Arc::strong_count == 1`.
- **`ask_flow` uses `Flow::from_transform`, NOT `from_preserving_transform`** (actor.rs:327). An ask
  must not inherit the inline head-terminal hint, or a bounded eager source feeding `Sink::head`
  would run the blocking cross-thread reply wait inline on the caller and can deadlock. Mirror this
  in any new ask variant.
- **`block_on_ractor_runtime`** spawns a std thread when already inside a Tokio context (avoids the
  nested-`block_on` panic); otherwise blocks directly.
- **ActorSource/ActorSink default mailboxes are unbounded** (documented mailbox risk). The
  `*_with_backpressure` variants gate each element on an actor ack; an `Element` arriving before the
  ack is a protocol violation that fails the stream.
- **StreamRefs are one-shot.** A second `SourceRef::source()` / `SinkRef::sink()` materialization
  fails (`subscribed` `AtomicBool` swap). Local = direct splice (never serializes); cluster/proto =
  serialized. Proto protocol: cumulative demand, receive window = `buffer_capacity` (default 32),
  refill at half (16), seq numbers from 0; reliable carriers (TCP/QUIC) do **not** redeliver demand,
  unlike Akka's lossy-remoting redelivery.

## Performance-sensitive hot paths — do NOT casually refactor

- **The spin-then-park ask loop** (`wait_for_ready_ask` in actor.rs, and its `*_with_pending` twin in
  interop.rs) and the constants `ASK_READY_SPINS`/`ASK_IDLE_YIELDS`/`ASK_MAX_PARK`/
  `ASK_TIME_REFRESH_ITERS`. The park budget is deliberately **deadline-independent**; CLAUDE.md
  records that changing it regressed `timeout_*` CPU ~4×. `Instant::now()` is refreshed only every
  `ASK_TIME_REFRESH_ITERS` spins to keep the spin cheap.
- **The `ReplyState` lock-free gate** (`poll`/`send`/`drop_sender`): release/acquire ordering pairs
  the value write with the gate store. The `ordered_sum` ask path intentionally spends ~2× CPU vs
  wall to hit sub-µs latency (forcing earlier parking measured ~18× worse wall) — report CPU, don't
  "optimize" by parking sooner.
- **The StreamRefs direct splice** is the ~6.6–7.8× win over warmed Akka (`roadmap/benchmarks/
  actor-ask.md`, M5). Remote proto numbers (~10× faster wall vs forced-remote Akka) are in
  `roadmap/benchmarks/net.md`.
- **Ractor per-message box** (~848 B/elem) dominates ask allocation and is upstream-pinned — not
  fixable through the public `ActorRef<Msg>` API (CLAUDE.md). Don't chase it Datum-side.

## Akka mapping (and Rust-native deviations)

- `ActorFlow::ask` ↔ Akka `ActorFlow.ask`; `ActorSource``ActorSource`/`Source.actorRef`;
  `ActorSink``ActorSink`/`Sink.actorRef`; `ActorFlow::watch` ↔ actor-termination watch;
  `StreamRefs`/`SourceRef`/`SinkRef` ↔ Akka StreamRefs; the proto messages
  (`OnSubscribeHandshake`/`CumulativeDemand`/`SequencedOnNext`/`RemoteStreamCompleted`/
  `RemoteStreamFailure`/`Ack`) mirror Akka's StreamRefs protocol.
- Deviations: replies go through `ReplyPort<T>` (hides Ractor's RPC port type) instead of an
  `ActorRef<reply>`; `ActorStatus<T>` mirrors Akka `StatusReply`; failures are `Result`/
  `StreamError`, never panics/exceptions; `StreamRefPayload` is Datum-owned (not Ractor
  `BytesConvertable`) so the proto seam works without the `cluster` feature.

## Gotchas / footguns

- **Two near-identical ask state machines.** Any fix to ordering/timeout/wait must be mirrored in
  both `ask_ractor_ordered` (actor.rs) and `ask_ractor_ordered_with_pending` (interop.rs). The
  `_with_pending` variant threads a `Pending` value (context / status `finish`) alongside the reply;
  it backs `ask_with_status`/`ask_with_context`/`ask_with_status_and_context`.
- **`send_and_wait_ack` (interop.rs) reuses `InFlightAsk` + `wait_for_ready_ask` from actor.rs** for
  single-element backpressure acks — those types are `pub(super)`-shared, not just ask-internal.
- **`ReplyPort::timeout()` always returns `Some`** (the `Option` is a forward-compat shape, never
  `None` today).
- **`cluster` feature changes the `Message` bound.** Without it, any `Send + 'static` auto-implements
  Ractor `Message`; with it you need an explicit `impl Message for YourEnum {}` (see the
  `#[cfg(feature = "cluster")]` impls in the test modules).
- **`StreamRefProtoProducer` has eager vs lazy modes.** `from_source*` attaches input immediately;
  `new_lazy` + `sink()` attaches input later (SinkRef sender side) and waits on a condvar so an idle
  producer does not busy-loop before the remote subscribes.

## Tests & benches

- **Inline unit tests** (`#[cfg(test)] mod tests`) in every file: `actor.rs` (ordering, parallelism,
  timeout, dropped reply, builder panic, ask→head, long-deadline park), `interop.rs`
  (status/context ask, actor source/sink + backpressure, pubsub, watch), `stream_ref.rs` (direct
  splice, one-shot, cancellation, backpressure), `stream_ref_proto.rs` (protocol round-trips).
- **Verified doc snippets** (`#region` anchors): `crates/datum-core/tests/docs/actors_interop.rs`,
  `streamrefs.rs` — the source for the `<<< @/…` blocks in `docs/guides/`.
- **Benches:** `crates/datum-core/benches/actor_ask.rs` (Criterion) and `…/actor_ask_compare/`
  (Datum-vs-Akka). Result tables: `roadmap/benchmarks/actor-ask.md` (ask + local StreamRefs splice),
  `roadmap/benchmarks/net.md` (remote StreamRefs over TCP/QUIC).
- **Remote-transport integration** lives in `datum-net`: `crates/datum-net/tests/stream_ref_tcp.rs`,
  `stream_ref_quic.rs`, and the `streamref_quic_node` example.