datum-net 0.9.0

Network sources and sinks for Datum streams, built on datum-core
Documentation
# datum-net — agent orientation

Network carrier satellite for Datum. Wraps sockets/streams as `datum-core`
`Source`/`Flow`/`Sink` blueprints. Read the root [`CLAUDE.md`](../../../CLAUDE.md)
first (architecture, performance + Akka-mirroring conventions, the
"single-owner async carriers / no blocking IO seam" rule) and
[`roadmap/M7-remote-streamrefs-transport.md`](../../../roadmap/M7-remote-streamrefs-transport.md)
(the F1–F8 / U1–U3 work packages this crate implements). This file is the
module-local map; it does not duplicate CLAUDE.md.

`#![forbid(unsafe_code)]`, edition 2024, MSRV 1.88. All `unsafe` lives in deps
(`quinn`, `tokio-rustls`, optional `tokio-uring`).

## Files & key public types

- **`lib.rs`** — crate root + flat re-exports. `async_carrier` is `pub(crate)`
  (only `#[doc(hidden)]` test helpers leak). Edit the `//!` here freely; do NOT
  touch `crates/datum-core/src/lib.rs`.
- **`tls.rs`**`TokioTls` (alias `Tls`): `bind`/`outgoing_connection`
  (+ `_default` 8 KiB chunk). Types: `TlsConnection`, `TlsBinding`,
  `TlsIncomingConnection` (`into_parts`/`into_flow`), `TlsByteSource`,
  `TlsByteSink`. Re-exports `tokio_rustls::rustls`. **Only carrier wired to the
  sharded engine** (`tls_connection_execution` + `ACTIVE_TLS_CONNECTIONS`).
- **`udp.rs`**`TokioUdp` (alias `Udp`): `bind`/`send_sink`/`bind_flow`/
  `connect` (+ `_default`). Types: `Datagram`, `UdpBinding`, `UdpConnection`.
  `DEFAULT_MAX_DATAGRAM_SIZE = 65536`, `DEFAULT_RECEIVE_BUFFER = 64`. Lossy by
  design (see invariants).
- **`quic.rs`**`TokioQuic` (alias `Quic`): `bind`/`connect` +
  `open_bi`/`accept_bi`/`open_bi_stream` with `_default`/`_available` variants.
  Types: `QuicBinding`, `QuicStream`, `QuicConnection`, `QuicIncomingConnection`,
  `QuicBidirectionalStream`, `QuicByteSource`, `QuicByteSink`. Re-exports
  `quinn`, `crypto`, `rustls`. `into_stream_ref_parts()` is `pub(crate)` for
  `stream_ref.rs`.
- **`connection.rs`**`Connection` (TLS-client lifecycle namespace),
  `ConnectionSettings`, `RetryPolicy`, `ConnectionLifecycleExt`. Timeout/retry/
  exponential-backoff around TLS connect. `RetryPolicy::max_attempts` counts the
  first attempt; values `< 1` clamp to 1. `graceful_shutdown`/`half_close` are
  **no-op markers** — the transport sink already half-closes on upstream finish.
- **`async_carrier.rs`** — shared helpers: `AsyncCommandSender` (bounded command
  channel: `try_send`/`send_or_blocking`/`send_blocking`), `DemandBatcher`
  (refills at half the window), `ShardedTokioCarrierExecution` +
  `sharded_tokio_carrier_execution`/`sharded_tokio_shard_count`. Env knobs:
  `DATUM_NET_SHARDED_TOKIO_{SHARDS,MIN_CONNECTIONS,DISABLE}`.
- **`stream_ref.rs`** — remote StreamRefs carriers. `serve_source_ref_*`/
  `source_ref_*`/`serve_sink_ref_*`/`sink_ref_*` over `_tcp`/`_tcp_stream`/
  `_quic`, each with a `_with_diagnostics` twin. Types: `StreamRefTcpBinding`,
  `StreamRefTcpHandle`, `StreamRefQuicHandle`, `StreamRefProtocolDiagnostics`,
  `StreamRefProtocolMessageCounts`. `datum-core` owns the protobuf protocol +
  state machine; this file is length-prefixed framing + IO pumping only.
- **`bin/net_compare.rs`** — Datum-side runner for the Akka comparison
  (`roadmap/benchmarks/net.md`). Benchmark-shaped; treat as measurement code.

## Invariants & conventions (preserve these)

- **Blueprint vs materialization.** Constructors build immutable blueprints and
  start nothing — no socket bind/connect/handshake until the stream is
  materialized + pulled (via `future_flow`/`lazy_future_source`/
  `unfold_resource`). Keep construction side-effect-free.
- **Single-owner async carriers / no blocking IO seam.** Each byte carrier
  (TLS/UDP/QUIC) owns its socket in ONE Tokio task that drives a `biased`
  `tokio::select!` over a bounded command channel + socket readiness. No per-op
  `block_on`/`blocking_recv` on the Tokio side; OS threads stay flat (~9, not ~1
  per connection). The *synchronous* stream-core side blocks on a
  `std_mpsc::Receiver::recv` — that is the pull-based stream worker, expected and
  fine; the carrier task itself never blocks.
- **StreamRefs carrier execution.** TCP (`TcpEndpointTask::run`) and QUIC
  (`QuicEndpointTask::run`) both use the single-owner async task pattern: one
  Tokio task owns the socket/stream, and the synchronous protocol endpoint wakes
  it through a bounded channel. There is no per-frame `block_on` and no
  carrier-owned OS thread pair per StreamRef endpoint.
- **Demand backpressure.** `DemandBatcher::new(window)` seeds `requested`; the
  carrier only reads while `read_open && requested > 0`. Read overflow past the
  bounded `sync_channel` is a hard error for reliable carriers (TLS/QUIC:
  `*_receive_buffer_overflow`), but UDP **drops** (`QueueOutcome::Dropped`  `drain_ready_*`) — lossy by spec.
- **Single-use materialization.** Sources/sinks `.take()` an `Arc<Mutex<Option<_>>>`
  on first materialization and error ("already materialized") on reuse. Preserve
  this guard when touching resource constructors.

## Hot paths — do NOT casually refactor

- Carrier task loops (`run_tls_carrier_task`, `run_datagram_carrier_task`,
  `run_connected_carrier_task`, `run_quic_carrier_task`) and their
  `handle_*_command` arms. The `biased` select ordering (commands before reads)
  and `requested` accounting are load-bearing for backpressure correctness.
- `DemandBatcher` refill arithmetic and `command_channel` capacity
  (`DEFAULT_COMMAND_BUFFER.max(receive_buffer)`).
- Sharded engine (`ShardedTokioCarrierExecution`, `sharded_tokio_shard_count`):
  activation threshold (≥64 conns, ≥2 physical cores), per-shard `current_thread`
  runtime on a dedicated OS thread, round-robin routing. Sharded TLS is ~10×
  wall at 1024 streams (CLAUDE.md / net.md).
- `stream_ref.rs` framing: `FrameDecoder`, the compact `SequencedOnNext` batch
  encoder (high-bit `COMPACT_FRAME_FLAG` length prefix), and the per-direction
  IO. These gate the ~10× RT5 win over forced-remote Akka.
- `is_quic_teardown_loss` (the WP-F1 "connection lost" race fix): a read-side
  loss AFTER outbound completed is swallowed as success. Keep it.

## Akka mapping & Rust-native deviations

Mirrors Pekko Connectors `Tcp`/`Udp` and Akka StreamRefs. Deviations: failures
flow through `Result<_, StreamError>` (no panics); `Datagram` is the explicit
boundary-preserving payload+peer pair; half-close is a transport behavior +
explicit marker method, not a mode type; `*_default` constructors stand in for
Scala default args. Reactive-Streams interop is out of scope.

## Tests & benches

- Unit tests: inline `#[cfg(test)]` in `stream_ref.rs` (framing codec).
- Integration: `crates/datum-net/tests/{tls,udp,quic,connection,smoke,integration,
  stream_ref_tcp,stream_ref_quic}.rs`.
- Criterion (`harness = false`): `crates/datum-net/benches/{net_sanity,tls,udp,
  quic,connection}.rs`.
- Akka comparison: `bin/net_compare.rs` +
  `crates/datum-core/benches/net_compare/run.sh``roadmap/benchmarks/net.md`
  (needs JDK + sbt).

## Gotchas

- Quinn exposes a freshly opened bidi stream to the peer only after the opener
  writes data or FIN — call `accept_bi*` only after the initiator has started.
  StreamRefs/interactive readers want `*_available` (emit-on-arrival) so small
  frames don't wait to fill `chunk_size`.
- UDP `send_sink` "success" only means handed to the OS — no delivery guarantee;
  a short send is a stream failure.
- `sharded_tokio_shard_count` has a test-only branch with a redundant
  `cores.min(cores)` (`async_carrier.rs`) — harmless, behavior-correct; see
  report before "cleaning" it (touching logic is out of a doc pass's scope).
- `connection.rs` `graceful_shutdown_on_upstream_finish` returns `self`
  unchanged — it's intentional documentation of existing behavior, not a TODO.