# `io/` — streaming byte I/O
Context for a future agent working in `crates/datum-core/src/io/`. Read the root
[`CLAUDE.md`](../../../../CLAUDE.md) first (build/test/lint, execution model, the
no-latent-blocking-I/O principle) and `roadmap/M1-v0.1.0-foundation.md` (conventions). This file
covers only what is specific to this module.
## Purpose
Byte-stream I/O for the common sources/sinks: file (sync + Tokio + optional io_uring), TCP, byte
framing, compression, and `std::io::Read`/`Write` bridges. Everything here composes with the linear
`Source`/`Flow`/`Sink` DSL and produces/consumes `Vec<u8>` chunks (Akka uses `ByteString`).
## Files & key public types
- **`mod.rs`** — the module surface; re-exports everything below. Re-exported flat at the crate root
(`datum::Framing`, …). **Note: io types are NOT in `datum::prelude`** — docs/examples must use
explicit `use datum::{Framing, …}`.
- **`framing.rs`** — `Framing` (namespace) → `delimiter`, `length_field`, `json`; `FramingByteOrder`.
Each returns `Flow<Vec<u8>, Vec<u8>>`. Internals are pulled `Iterator` adapters over `BoxStream`
built via `Flow::from_transform` (`DelimiterFramingStream`, `LengthFieldFramingStream`,
`JsonFramingStream`).
- **`compression.rs`** — `Compression` → `gzip`/`gunzip`/`deflate`/`inflate`, all
`Flow<Vec<u8>, Vec<u8>>`, in-process via `flate2`. Same pulled-iterator-adapter shape.
- **`adapters.rs`** — synchronous side. `StreamConverters` (`from_reader`, `to_writer`,
`as_input_stream`, `as_output_stream`), the blocking handles `InputStreamHandle` (`impl Read`) and
`OutputStreamHandle` (`impl Write` + `close()`), and `FileIO` (sync `from_path`/`to_path`, backed
by `StreamConverters` on the Datum thread pool with a bounded reader queue).
- **`tokio_io.rs`** — async side. `TokioFileIO`, `TokioTcp`, the byte aliases `TokioByteSource` /
`TokioByteSink` (`Source`/`Sink<Vec<u8>, StreamCompletion<IoResult>>`), `IoResult`,
`TcpBinding`/`TcpConnection`/`TcpIncomingConnection`, and feature-gated `UringFileIO`.
## Invariants & conventions (preserve these)
- **`#![forbid(unsafe_code)]`** (crate-wide). No exceptions here.
- **Blueprint vs. materialization.** Constructors only build blueprints. All side effects (open file,
connect socket, spawn task, call the user factory) happen at materialization, inside
`Source::from_materialized_factory` / `Sink::from_runner` / `from_reader`. Never open a resource in
a constructor.
- **Bounded read-ahead, always.** No path hides an unbounded receive buffer (CLAUDE.md
no-latent-blocking principle). Constants: `READER_QUEUE_CAPACITY=8` (sync), `FILE_READ_AHEAD_CHUNKS=8`,
`TCP_READ_AHEAD_CHUNKS=1` (capacity-1: one chunk waits, then the task stops reading),
`INPUT/OUTPUT_STREAM_BUFFER_CAPACITY=16`.
- **Sticky terminal.** Framing/compression iterators store `Option<Terminal>` and, once set, return
it forever (`sticky_terminal`). A completed/failed stage must not resume.
- **Async carriers are single-owner Tokio tasks** on `crate::stream::stream_tokio_runtime()`, with a
bounded `mpsc` for items/commands and a `watch` channel for cancellation. No per-op `block_on` /
`blocking_recv`. Consumer side reads via `current_stream_cancelled()` (thread-local) + spin/park.
- **Lost-wakeup-safe Drop.** `ReaderSourceStream`/`InputStreamHandle`/`OutputStreamSourceStream`
`Drop` set `cancelled`, then **re-take and drop the state lock before** `notify_all` so a producer
that already read `cancelled==false` is guaranteed parked before the wake fires. Do not remove the
lock re-acquire. Drop uses `unwrap_or_else(into_inner)` (never `expect`) — panicking in Drop during
unwind aborts.
## Performance-sensitive hot paths (don't casually refactor)
- **Spin-then-park constants** in `tokio_io.rs`: `READ_READY_SPINS=256`, `BACKPRESSURE_READY_SPINS=64`,
`BACKPRESSURE_PARK=10µs`, `PARK_INTERVAL=1ms`. Load-bearing, benchmark-chosen (see CLAUDE.md
"spin-then-park"; the park budget is deadline-independent on purpose).
- **`ConsumerWaker` wake-on-send.** The producer task `unpark()`s the consumer thread after each
successful channel send so `park_timeout` returns immediately instead of waiting a full tick. The
consumer captures its `Thread` on first `next()`. Keep both sides' cancellation checks intact.
- **JSON framing fast-skip tables** (`OUTER_CHARS`, `INNER_INTERESTING`, `STRING_INTERESTING`) plus
the lazy `compact()` (drains the consumed prefix only when the next chunk would otherwise realloc).
This is the measured win; don't replace with a naive byte-by-byte match.
- **Chunk-boundary preservation** in `send_read_chunks` (Tokio file/uring): a 256 KiB internal read is
re-sliced into exact `chunk_size` emissions with a carried `pending_tail`; only the final tail is
short. Tests assert this (`*_preserves_requested_chunk_boundaries`).
## Akka mapping (and Rust-native deviations)
| `Framing::delimiter`/`length_field` | `akka.stream.scaladsl.Framing.{delimiter,lengthField}` |
| `Framing::json` | `JsonFraming.objectScanner` |
| `Compression::{gzip,gunzip,deflate,inflate}` | `akka.stream.scaladsl.Compression.*` |
| `FileIO` / `TokioFileIO` | `akka.stream.scaladsl.FileIO.{fromPath,toPath}` |
| `TokioTcp::{bind,outgoing_connection}` | `akka.stream.scaladsl.Tcp` |
| `StreamConverters::{from_reader,to_writer,as_input_stream,as_output_stream}` | `StreamConverters.{fromInputStream,fromOutputStream,asInputStream,asOutputStream}` |
| `IoResult` | `akka.stream.IOResult` |
Deviations: chunks are `Vec<u8>` not `ByteString`; failures flow as `Result<_, StreamError>`, not
exceptions; Rust-native names (`from_reader`/`to_writer` for `from/toInputStream`); a sync `FileIO`
is offered alongside async `TokioFileIO` (Akka has only the async one). Delimiter framing mirrors
Akka's trailing-partial-delimiter rule (`Framing.scala`): bytes in a trailing partial match don't
count toward the max-length failure (see `trailing_delimiter_prefix_len`).
## Gotchas / footguns
- **TCP half-close.** `tcp_flow_from_stream` uses `from_sink_and_source` (NOT `_coupled`) on purpose:
the write half completing (FIN) must not tear down the read half before the peer's echo arrives.
`TcpIncomingConnection::into_flow()` *is* coupled — different surface, intentional.
- **Single-use TCP halves.** `single_use_async_read_source`/`single_use_async_write_sink` take their
reader/writer out of a `Mutex<Option<_>>`; a second materialization errors "already materialized".
- **`OutputStreamHandle`:** `flush()` is a no-op; you must `close()` (or drop) to signal EOF; writes
after close return `BrokenPipe`. `OutputStreamTerminal::Error` is `#[allow(dead_code)]` — only
`Complete` is ever set (the source has no failure path); don't assume an error path exists.
- **Length field is signed `i32`.** A 4-byte header with the high bit set parses negative → error
(matches Akka's signed behavior); 1–3 byte headers are unsigned.
- **`UringFileIO`** (feature `io-uring-file`, Linux only) runs a separate `tokio-uring` current-thread
runtime on its own thread (`datum-uring-file`). If that runtime can't start, or the target isn't
Linux, it transparently falls back to `TokioFileIO` with identical semantics — keep the fallback.
## Tests & benches
- Inline `#[cfg(test)]` modules in each file (the dominant coverage; framing/adapters/tokio_io are
thorough — round-trips, max-length, truncation, cancellation, timeouts, chunk boundaries).
- Doc-backed integration tests: `crates/datum-core/tests/docs/streaming_io.rs` (the `#region`s wired
into `docs/guides/streaming-io.md` via `<<< @/…`).
- Bench: `crates/datum-core/benches/streaming_io.rs`; Akka compare harness under
`benches/streaming_io_compare/` (needs JDK + sbt). Result table:
`roadmap/benchmarks/streaming-io.md` (source of truth for perf claims).