datum-core 0.9.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
# `io/` — streaming byte I/O

Context for a future agent working in `crates/datum-core/src/io/`. Read the root
[`CLAUDE.md`](../../../../CLAUDE.md) first (build/test/lint, execution model, the
no-latent-blocking-I/O principle) and `roadmap/M1-v0.1.0-foundation.md` (conventions). This file
covers only what is specific to this module.

## Purpose

Byte-stream I/O for the common sources/sinks: file (sync + Tokio + optional io_uring), TCP, byte
framing, compression, and `std::io::Read`/`Write` bridges. Everything here composes with the linear
`Source`/`Flow`/`Sink` DSL and produces/consumes `Vec<u8>` chunks (Akka uses `ByteString`).

## Files & key public types

- **`mod.rs`** — the module surface; re-exports everything below. Re-exported flat at the crate root
  (`datum::Framing`, …). **Note: io types are NOT in `datum::prelude`** — docs/examples must use
  explicit `use datum::{Framing, …}`.
- **`framing.rs`**`Framing` (namespace) → `delimiter`, `length_field`, `json`; `FramingByteOrder`.
  Each returns `Flow<Vec<u8>, Vec<u8>>`. Internals are pulled `Iterator` adapters over `BoxStream`
  built via `Flow::from_transform` (`DelimiterFramingStream`, `LengthFieldFramingStream`,
  `JsonFramingStream`).
- **`compression.rs`**`Compression``gzip`/`gunzip`/`deflate`/`inflate`, all
  `Flow<Vec<u8>, Vec<u8>>`, in-process via `flate2`. Same pulled-iterator-adapter shape.
- **`adapters.rs`** — synchronous side. `StreamConverters` (`from_reader`, `to_writer`,
  `as_input_stream`, `as_output_stream`), the blocking handles `InputStreamHandle` (`impl Read`) and
  `OutputStreamHandle` (`impl Write` + `close()`), and `FileIO` (sync `from_path`/`to_path`, backed
  by `StreamConverters` on the Datum thread pool with a bounded reader queue).
- **`tokio_io.rs`** — async side. `TokioFileIO`, `TokioTcp`, the byte aliases `TokioByteSource` /
  `TokioByteSink` (`Source`/`Sink<Vec<u8>, StreamCompletion<IoResult>>`), `IoResult`,
  `TcpBinding`/`TcpConnection`/`TcpIncomingConnection`, and feature-gated `UringFileIO`.

## Invariants & conventions (preserve these)

- **`#![forbid(unsafe_code)]`** (crate-wide). No exceptions here.
- **Blueprint vs. materialization.** Constructors only build blueprints. All side effects (open file,
  connect socket, spawn task, call the user factory) happen at materialization, inside
  `Source::from_materialized_factory` / `Sink::from_runner` / `from_reader`. Never open a resource in
  a constructor.
- **Bounded read-ahead, always.** No path hides an unbounded receive buffer (CLAUDE.md
  no-latent-blocking principle). Constants: `READER_QUEUE_CAPACITY=8` (sync), `FILE_READ_AHEAD_CHUNKS=8`,
  `TCP_READ_AHEAD_CHUNKS=1` (capacity-1: one chunk waits, then the task stops reading),
  `INPUT/OUTPUT_STREAM_BUFFER_CAPACITY=16`.
- **Sticky terminal.** Framing/compression iterators store `Option<Terminal>` and, once set, return
  it forever (`sticky_terminal`). A completed/failed stage must not resume.
- **Async carriers are single-owner Tokio tasks** on `crate::stream::stream_tokio_runtime()`, with a
  bounded `mpsc` for items/commands and a `watch` channel for cancellation. No per-op `block_on` /
  `blocking_recv`. Consumer side reads via `current_stream_cancelled()` (thread-local) + spin/park.
- **Lost-wakeup-safe Drop.** `ReaderSourceStream`/`InputStreamHandle`/`OutputStreamSourceStream`
  `Drop` set `cancelled`, then **re-take and drop the state lock before** `notify_all` so a producer
  that already read `cancelled==false` is guaranteed parked before the wake fires. Do not remove the
  lock re-acquire. Drop uses `unwrap_or_else(into_inner)` (never `expect`) — panicking in Drop during
  unwind aborts.

## Performance-sensitive hot paths (don't casually refactor)

- **Spin-then-park constants** in `tokio_io.rs`: `READ_READY_SPINS=256`, `BACKPRESSURE_READY_SPINS=64`,
  `BACKPRESSURE_PARK=10µs`, `PARK_INTERVAL=1ms`. Load-bearing, benchmark-chosen (see CLAUDE.md
  "spin-then-park"; the park budget is deadline-independent on purpose).
- **`ConsumerWaker` wake-on-send.** The producer task `unpark()`s the consumer thread after each
  successful channel send so `park_timeout` returns immediately instead of waiting a full tick. The
  consumer captures its `Thread` on first `next()`. Keep both sides' cancellation checks intact.
- **JSON framing fast-skip tables** (`OUTER_CHARS`, `INNER_INTERESTING`, `STRING_INTERESTING`) plus
  the lazy `compact()` (drains the consumed prefix only when the next chunk would otherwise realloc).
  This is the measured win; don't replace with a naive byte-by-byte match.
- **Chunk-boundary preservation** in `send_read_chunks` (Tokio file/uring): a 256 KiB internal read is
  re-sliced into exact `chunk_size` emissions with a carried `pending_tail`; only the final tail is
  short. Tests assert this (`*_preserves_requested_chunk_boundaries`).

## Akka mapping (and Rust-native deviations)

| Datum | Akka |
| --- | --- |
| `Framing::delimiter`/`length_field` | `akka.stream.scaladsl.Framing.{delimiter,lengthField}` |
| `Framing::json` | `JsonFraming.objectScanner` |
| `Compression::{gzip,gunzip,deflate,inflate}` | `akka.stream.scaladsl.Compression.*` |
| `FileIO` / `TokioFileIO` | `akka.stream.scaladsl.FileIO.{fromPath,toPath}` |
| `TokioTcp::{bind,outgoing_connection}` | `akka.stream.scaladsl.Tcp` |
| `StreamConverters::{from_reader,to_writer,as_input_stream,as_output_stream}` | `StreamConverters.{fromInputStream,fromOutputStream,asInputStream,asOutputStream}` |
| `IoResult` | `akka.stream.IOResult` |

Deviations: chunks are `Vec<u8>` not `ByteString`; failures flow as `Result<_, StreamError>`, not
exceptions; Rust-native names (`from_reader`/`to_writer` for `from/toInputStream`); a sync `FileIO`
is offered alongside async `TokioFileIO` (Akka has only the async one). Delimiter framing mirrors
Akka's trailing-partial-delimiter rule (`Framing.scala`): bytes in a trailing partial match don't
count toward the max-length failure (see `trailing_delimiter_prefix_len`).

## Gotchas / footguns

- **TCP half-close.** `tcp_flow_from_stream` uses `from_sink_and_source` (NOT `_coupled`) on purpose:
  the write half completing (FIN) must not tear down the read half before the peer's echo arrives.
  `TcpIncomingConnection::into_flow()` *is* coupled — different surface, intentional.
- **Single-use TCP halves.** `single_use_async_read_source`/`single_use_async_write_sink` take their
  reader/writer out of a `Mutex<Option<_>>`; a second materialization errors "already materialized".
- **`OutputStreamHandle`:** `flush()` is a no-op; you must `close()` (or drop) to signal EOF; writes
  after close return `BrokenPipe`. `OutputStreamTerminal::Error` is `#[allow(dead_code)]` — only
  `Complete` is ever set (the source has no failure path); don't assume an error path exists.
- **Length field is signed `i32`.** A 4-byte header with the high bit set parses negative → error
  (matches Akka's signed behavior); 1–3 byte headers are unsigned.
- **`UringFileIO`** (feature `io-uring-file`, Linux only) runs a separate `tokio-uring` current-thread
  runtime on its own thread (`datum-uring-file`). If that runtime can't start, or the target isn't
  Linux, it transparently falls back to `TokioFileIO` with identical semantics — keep the fallback.

## Tests & benches

- Inline `#[cfg(test)]` modules in each file (the dominant coverage; framing/adapters/tokio_io are
  thorough — round-trips, max-length, truncation, cancellation, timeouts, chunk boundaries).
- Doc-backed integration tests: `crates/datum-core/tests/docs/streaming_io.rs` (the `#region`s wired
  into `docs/guides/streaming-io.md` via `<<< @/…`).
- Bench: `crates/datum-core/benches/streaming_io.rs`; Akka compare harness under
  `benches/streaming_io_compare/` (needs JDK + sbt). Result table:
  `roadmap/benchmarks/streaming-io.md` (source of truth for perf claims).