# actor module — AGENTS.md
Context for an AI agent working in the Ractor-interop layer. Read the repo
[`CLAUDE.md`](../../../../CLAUDE.md) (architecture + performance conventions) and
`roadmap/M1-v0.1.0-foundation.md` first; this file only covers what is specific to
`crates/datum-core/src/actor.rs` + `crates/datum-core/src/actor/`.
## Purpose
Bridges Datum streams to [Ractor](https://crates.io/crates/ractor) actors and to a
transport-agnostic remote StreamRefs protocol. Everything is local/Ractor-backed by default;
the protobuf seam (`stream_ref_proto`) is what `datum-net` drives over TCP/QUIC. The whole crate
is `#![forbid(unsafe_code)]`.
## Key public types & where they live
- **`actor.rs`** (module root) — `ActorFlow` + `ActorFlow::ask`; `ReplyPort<T>` / `ReplySendError`;
`ActorResult<T>`; re-exports `ractor::{Actor, ActorProcessingErr, ActorRef, Message}`. Internally:
`ReplyState<T>` (the pooled one-shot reply gate), the ordered ask state machine
(`ask_ractor_ordered`), the spin-then-park wait loop (`wait_for_ready_ask`), and
`ractor_runtime()` / `block_on_ractor_runtime()` (the shared multi-thread Tokio runtime Ractor
actors run on).
- **`actor/interop.rs`** — the rest of `ActorFlow` (`ask_with_status`, `ask_with_context`,
`ask_with_status_and_context`, `watch`); `ActorSource` (`actor_ref`/`typed`/
`actor_ref_with_backpressure`); `ActorSink` (`actor_ref`/`typed`/`*_with_backpressure`);
`ActorPubSub` (`source`/`sink` over Ractor `pg` groups). Protocol enums: `ActorStatus<T>`,
`ActorSourceMessage<T>`, `ActorSinkMessage<T>`, `ActorSinkBackpressureMessage<T, Ack>`,
`WatchEvent`. Holds a second ask state machine (`*_with_pending`) — see Gotchas.
- **`actor/stream_ref.rs`** — `StreamRefs` factory (`source_ref`/`sink_ref` [+`_with_settings`]),
`SourceRef<T>` / `SinkRef<T>` (one-shot handles), `StreamRefSettings`. Local default path is a
**direct in-memory splice** (`SourceRefDirectState`/`SinkRefDirectState`, no actors, no
serialization). The `#[cfg(feature = "cluster")]` path adds Ractor `BytesConvertable` +
`pg`-resolved remote endpoints + producer actors.
- **`actor/stream_ref_proto.rs`** + **`stream_ref_proto.proto`** — transport-agnostic protobuf
StreamRefs seam (no actor transport). `StreamRefPayload` trait (+ codecs for numbers/`bool`/
`String`/`Vec<u8>`); `StreamRefId`; protocol types `StreamRefMessage`/`StreamRefFrame`/
`StreamRefPayloadBytes`/`StreamRefPayloadBatch`/`StreamRefOutbound`; the `StreamRefProtoEndpoint`
trait (+ `#[doc(hidden)]` `StreamRefProtoEndpointWake` for async carriers); and the drivers
`StreamRefProtoProducer<T>` (`from_source`/`from_source_ref`/`new_lazy`/`sink`) and
`StreamRefProtoConsumer<T>` (`new`/`source`). This is the seam `datum-net` uses.
## Invariants & conventions (preserve these)
- **Blueprint vs materialization.** Constructors build immutable blueprints and start nothing;
actors spawn / streams run only at materialization. Keep this for any new API.
- **`ReplyPort` is one-shot.** `send` consumes `self`; dropping without sending calls
`drop_sender` → consumer sees `ReplyPoll::Dropped` → `StreamError::ActorAskResponseDropped`.
`ReplyState` is a three-state slot (`Pending`/`Ready`/`Dropped`) + an `AtomicU8` gate
(`REPLY_PENDING`/`READY`/`DROPPED`). `poll()` reads the gate lock-free and only takes the slot
mutex once something is delivered — do not move work in front of that fast path.
- **Mutex poison is recovered, not propagated** (`lock_slot` / `.unwrap_or_else(into_inner)`
everywhere). Deliberate: slots have no cross-field invariant, so recovery is sound and avoids
escalating one user-`Drop` panic into a stream-wide crash. Don't "fix" these to `.unwrap()`.
- **Ask preserves input order** regardless of reply order: `index`/`next_to_emit` + a `completed`
reorder buffer, with up to `parallelism` asks in flight. `ReplyState`s are recycled via
`reply_pool` only when `Arc::strong_count == 1`.
- **`ask_flow` uses `Flow::from_transform`, NOT `from_preserving_transform`** (actor.rs:327). An ask
must not inherit the inline head-terminal hint, or a bounded eager source feeding `Sink::head`
would run the blocking cross-thread reply wait inline on the caller and can deadlock. Mirror this
in any new ask variant.
- **`block_on_ractor_runtime`** spawns a std thread when already inside a Tokio context (avoids the
nested-`block_on` panic); otherwise blocks directly.
- **ActorSource/ActorSink default mailboxes are unbounded** (documented mailbox risk). The
`*_with_backpressure` variants gate each element on an actor ack; an `Element` arriving before the
ack is a protocol violation that fails the stream.
- **StreamRefs are one-shot.** A second `SourceRef::source()` / `SinkRef::sink()` materialization
fails (`subscribed` `AtomicBool` swap). Local = direct splice (never serializes); cluster/proto =
serialized. Proto protocol: cumulative demand, receive window = `buffer_capacity` (default 32),
refill at half (16), seq numbers from 0; reliable carriers (TCP/QUIC) do **not** redeliver demand,
unlike Akka's lossy-remoting redelivery.
## Performance-sensitive hot paths — do NOT casually refactor
- **The spin-then-park ask loop** (`wait_for_ready_ask` in actor.rs, and its `*_with_pending` twin in
interop.rs) and the constants `ASK_READY_SPINS`/`ASK_IDLE_YIELDS`/`ASK_MAX_PARK`/
`ASK_TIME_REFRESH_ITERS`. The park budget is deliberately **deadline-independent**; CLAUDE.md
records that changing it regressed `timeout_*` CPU ~4×. `Instant::now()` is refreshed only every
`ASK_TIME_REFRESH_ITERS` spins to keep the spin cheap.
- **The `ReplyState` lock-free gate** (`poll`/`send`/`drop_sender`): release/acquire ordering pairs
the value write with the gate store. The `ordered_sum` ask path intentionally spends ~2× CPU vs
wall to hit sub-µs latency (forcing earlier parking measured ~18× worse wall) — report CPU, don't
"optimize" by parking sooner.
- **The StreamRefs direct splice** is the ~6.6–7.8× win over warmed Akka (`roadmap/benchmarks/
actor-ask.md`, M5). Remote proto numbers (~10× faster wall vs forced-remote Akka) are in
`roadmap/benchmarks/net.md`.
- **Ractor per-message box** (~848 B/elem) dominates ask allocation and is upstream-pinned — not
fixable through the public `ActorRef<Msg>` API (CLAUDE.md). Don't chase it Datum-side.
## Akka mapping (and Rust-native deviations)
- `ActorFlow::ask` ↔ Akka `ActorFlow.ask`; `ActorSource` ↔ `ActorSource`/`Source.actorRef`;
`ActorSink` ↔ `ActorSink`/`Sink.actorRef`; `ActorFlow::watch` ↔ actor-termination watch;
`StreamRefs`/`SourceRef`/`SinkRef` ↔ Akka StreamRefs; the proto messages
(`OnSubscribeHandshake`/`CumulativeDemand`/`SequencedOnNext`/`RemoteStreamCompleted`/
`RemoteStreamFailure`/`Ack`) mirror Akka's StreamRefs protocol.
- Deviations: replies go through `ReplyPort<T>` (hides Ractor's RPC port type) instead of an
`ActorRef<reply>`; `ActorStatus<T>` mirrors Akka `StatusReply`; failures are `Result`/
`StreamError`, never panics/exceptions; `StreamRefPayload` is Datum-owned (not Ractor
`BytesConvertable`) so the proto seam works without the `cluster` feature.
## Gotchas / footguns
- **Two near-identical ask state machines.** Any fix to ordering/timeout/wait must be mirrored in
both `ask_ractor_ordered` (actor.rs) and `ask_ractor_ordered_with_pending` (interop.rs). The
`_with_pending` variant threads a `Pending` value (context / status `finish`) alongside the reply;
it backs `ask_with_status`/`ask_with_context`/`ask_with_status_and_context`.
- **`send_and_wait_ack` (interop.rs) reuses `InFlightAsk` + `wait_for_ready_ask` from actor.rs** for
single-element backpressure acks — those types are `pub(super)`-shared, not just ask-internal.
- **`ReplyPort::timeout()` always returns `Some`** (the `Option` is a forward-compat shape, never
`None` today).
- **`cluster` feature changes the `Message` bound.** Without it, any `Send + 'static` auto-implements
Ractor `Message`; with it you need an explicit `impl Message for YourEnum {}` (see the
`#[cfg(feature = "cluster")]` impls in the test modules).
- **`StreamRefProtoProducer` has eager vs lazy modes.** `from_source*` attaches input immediately;
`new_lazy` + `sink()` attaches input later (SinkRef sender side) and waits on a condvar so an idle
producer does not busy-loop before the remote subscribes.
## Tests & benches
- **Inline unit tests** (`#[cfg(test)] mod tests`) in every file: `actor.rs` (ordering, parallelism,
timeout, dropped reply, builder panic, ask→head, long-deadline park), `interop.rs`
(status/context ask, actor source/sink + backpressure, pubsub, watch), `stream_ref.rs` (direct
splice, one-shot, cancellation, backpressure), `stream_ref_proto.rs` (protocol round-trips).
- **Verified doc snippets** (`#region` anchors): `crates/datum-core/tests/docs/actors_interop.rs`,
`streamrefs.rs` — the source for the `<<< @/…` blocks in `docs/guides/`.
- **Benches:** `crates/datum-core/benches/actor_ask.rs` (Criterion) and `…/actor_ask_compare/`
(Datum-vs-Akka). Result tables: `roadmap/benchmarks/actor-ask.md` (ask + local StreamRefs splice),
`roadmap/benchmarks/net.md` (remote StreamRefs over TCP/QUIC).
- **Remote-transport integration** lives in `datum-net`: `crates/datum-net/tests/stream_ref_tcp.rs`,
`stream_ref_quic.rs`, and the `streamref_quic_node` example.