Skip to main content

Module output_stream

Module output_stream 

Source
Expand description

Output streaming primitives for issue #760 (PRD #759 / ADR 0029) — the “shim slice” of bidirectional streaming.

Scope of this module:

  • StreamConfig: capture the stream.* namespace from red_config at lease open and freeze it for the lease’s lifetime (acceptance criterion: KV mutations mid-stream do not retroactively change behaviour).
  • StreamLease: an internal, unforwarded handle bound to a snapshot LSN and a frozen config. No external surface yet (S2 will add quotas + per-principal accounting).
  • open_stream: refuses with stream_in_transaction_unsupported when the caller already has an active BEGIN on the session.
  • ChunkProducer: page-aligned (N × 16 KiB) production buffer that flushes on the first of byte / row / latency cap.
  • Clock: trait-injected time source so TTL expiry is testable.

The HTTP NDJSON wire framing built on top of these primitives lives in handlers_query::handle_query_ndjson_stream and is dispatched from routing::try_route_streaming.

Structs§

ChunkProducer
Page-aligned chunk producer. The producer accumulates byte-encoded rows in an N × 16 KiB buffer; on the first of byte / row / latency cap it forwards the buffer to the supplied flush closure, which the transport layer turns into a chunked-encoding frame.
CursorRegistry
Issue #807 / PRD #750 — process-wide cursor registry for /query/stream.
CursorResume
Pinned read state returned to the handler when a resume token resolves cleanly. The handler re-executes query against snapshot_lsn to re-stream the same view the original cursor referenced.
FakeClock
LeaseRegistry
Resume-eligibility ledger. Holds (snapshot_lsn → opened_at_ms, ttl_ms) so a resume request can be checked against TTL without trusting the wall clock on the client. The shim slice does not implement true MVCC pinning — the registry’s role is to make snapshot_expired deterministic and testable.
PrefixHasher
Incremental SHA-256 hasher over emitted row lines, hex-encoded on finalize. The wire contract is that the server hashes the exact byte sequence of each row line (without trailing newline) in the order the rows are emitted; the client stores the resulting digest from the end envelope and replays it on resume.
StreamCapacityGuard
RAII slot returned by StreamCapacityRegistry::try_acquire. Decrements both counters on drop — acceptance criterion #4 (“releasing a slot on stream end (any reason) decrements both counters atomically”).
StreamCapacityRegistry
Issue #761 / S2 — process-wide stream capacity registry. Holds two counters: a global concurrent-stream count and a per-principal map. Both are decremented when the StreamCapacityGuard handed back from a successful try_acquire is dropped, so the release path covers every normal exit (success, mid-stream error, snapshot expiry, client disconnect that drops the writer chain, panic unwind through the stack frame holding the guard).
StreamConfig
Frozen snapshot of the stream.* namespace at lease open. Acceptance criterion: a red_config mutation while a stream is running does not retroactively change the running stream’s behaviour — that is, the per-lease config is value-typed and not a back-reference.
StreamLease
SystemClock

Enums§

AcquireError
Failure modes of StreamCapacityRegistry::try_acquire. Each variant carries the cap that fired and the live counter value at refusal time so clients can back off intelligently (the HTTP layer surfaces these inside the structured 429 body).
CloseReason
Issue #767 / S8 — wire-visible reason for stream.closed. The audit emit site decides the variant once the stream’s exit shape has been observed.
CursorReject
Why a resume token was refused. The handler maps every variant to a structured wire error. CursorReject::NotFound deliberately covers both “never issued” and “owned by a different tenant/principal” so an unauthorized caller cannot tell a foreign cursor from a nonexistent one.
FlushReason
LeaseLookup
OpenStreamError

Constants§

CURSOR_TOKEN_BYTES
Issue #807 / PRD #750 — opaque cursor token. 192 bits drawn from the OS CSPRNG, hex-encoded so it survives JSON transport. The token is the only cursor identifier the wire sees; the pinned snapshot, scope, and query stay server-side in the CursorRegistry. Opacity property: a client cannot derive the pinned snapshot LSN or another principal’s token from the value, and tokens are not sequential across opens.
LEASE_HANDLE_BYTES
Issue #767 / S8 — wire-visible lease handle, 128 bits drawn from the OS CSPRNG. Hex-encoded so it survives JSON / header transport. Opacity property: a client cannot derive the internal lease id from the handle (the two are independent), and the handle is not sequential across opens.
PAGE_SIZE
Engine page size — the production buffer is always a multiple of this. Matches storage::engine::PAGE_SIZE.

Traits§

Clock
Injectable time source. Production code uses SystemClock; tests drive TTL expiry with FakeClock so they don’t depend on wall time.

Functions§

assess_resumability
Resumability assessment of a query plan. The shim slice runs a textual classifier over the SQL string: a query is resumable iff it has a stable total order over a unique key. By default we promise RID ASC; an explicit ORDER BY rid (or ORDER BY rid ASC) is also resumable. Anything that aggregates / groups / windows or orders on a non-unique column is not.
audit_stream_capacity_refused
Capacity refusal emits a stream.closed event with no lease handle (the open never produced one) and reason: capacity_refused. The brief lists capacity_refused alongside the other close reasons; we keep the wire shape identical so downstream audit-log consumers don’t have to special-case it.
audit_stream_closed
audit_stream_opened
Issue #767 / S8 — audit emission helpers. Each helper builds an AuditEvent shaped to the brief and forwards it to the runtime’s audit log. The helpers are intentionally side-effect-only (no return); audit emission must never terminate a stream that would otherwise succeed.
audit_token_expired_during_lease
generate_cursor_token
Generate an opaque 192-bit cursor token (48-char hex). Falls back to a high-entropy time/counter mix if the CSPRNG is unavailable so a fresh open never fails to mint a token (the fallback is not unguessable, but production paths reach /dev/urandom successfully).
generate_lease_handle
Generate an opaque 128-bit lease handle (32-char hex). Per ADR 0029, the handle is the only identifier the wire sees; the internal StreamLease::id stays server-side.
open_stream
Issue a lease. The caller is responsible for binding snapshot_lsn to the same MVCC view the underlying executor will read from; in the shim slice this is runtime.cdc_current_lsn() captured before execute_query.
parse_jwt_exp_ms
Issue #767 / S8 — non-validating JWT exp claim extractor. Returns the expiry time in milliseconds since the epoch when token is a JWT carrying an integer exp claim, otherwise None.
system_clock
Borrow the lease registry’s clock by default — the routing handler uses this, tests inject their own.
write_chunk
Emit one HTTP chunk (<hex-size>\r\n<bytes>\r\n). A zero-length payload is silently dropped — the terminator chunk lives in write_chunked_terminator.
write_chunked_response_header
HTTP chunked transfer encoding helpers. We do not pull in hyper for the streaming path; the existing SSE handler also hand-rolls its own HTTP framing, and matching that style keeps the diff narrow.
write_chunked_terminator
Final 0\r\n\r\n chunk that terminates a chunked body.