Expand description
Output streaming primitives for issue #760 (PRD #759 / ADR 0029) — the “shim slice” of bidirectional streaming.
Scope of this module:
StreamConfig: capture thestream.*namespace fromred_configat lease open and freeze it for the lease’s lifetime (acceptance criterion: KV mutations mid-stream do not retroactively change behaviour).StreamLease: an internal, unforwarded handle bound to a snapshot LSN and a frozen config. No external surface yet (S2 will add quotas + per-principal accounting).open_stream: refuses withstream_in_transaction_unsupportedwhen the caller already has an activeBEGINon the session.ChunkProducer: page-aligned (N × 16 KiB) production buffer that flushes on the first of byte / row / latency cap.Clock: trait-injected time source so TTL expiry is testable.
The HTTP NDJSON wire framing built on top of these primitives lives in
handlers_query::handle_query_ndjson_stream and is dispatched from
routing::try_route_streaming.
Structs§
- Chunk
Producer - Page-aligned chunk producer. The producer accumulates byte-encoded rows in an N × 16 KiB buffer; on the first of byte / row / latency cap it forwards the buffer to the supplied flush closure, which the transport layer turns into a chunked-encoding frame.
- Cursor
Registry - Issue #807 / PRD #750 — process-wide cursor registry for
/query/stream. - Cursor
Resume - Pinned read state returned to the handler when a resume token resolves
cleanly. The handler re-executes
queryagainstsnapshot_lsnto re-stream the same view the original cursor referenced. - Fake
Clock - Lease
Registry - Resume-eligibility ledger. Holds
(snapshot_lsn → opened_at_ms, ttl_ms)so a resume request can be checked against TTL without trusting the wall clock on the client. The shim slice does not implement true MVCC pinning — the registry’s role is to makesnapshot_expireddeterministic and testable. - Prefix
Hasher - Incremental SHA-256 hasher over emitted row lines, hex-encoded on
finalize. The wire contract is that the server hashes the exact
byte sequence of each row line (without trailing newline) in the
order the rows are emitted; the client stores the resulting digest
from the
endenvelope and replays it on resume. - Stream
Capacity Guard - RAII slot returned by
StreamCapacityRegistry::try_acquire. Decrements both counters on drop — acceptance criterion #4 (“releasing a slot on stream end (any reason) decrements both counters atomically”). - Stream
Capacity Registry - Issue #761 / S2 — process-wide stream capacity registry. Holds two
counters: a global concurrent-stream count and a per-principal map.
Both are decremented when the
StreamCapacityGuardhanded back from a successfultry_acquireis dropped, so the release path covers every normal exit (success, mid-stream error, snapshot expiry, client disconnect that drops the writer chain, panic unwind through the stack frame holding the guard). - Stream
Config - Frozen snapshot of the
stream.*namespace at lease open. Acceptance criterion: ared_configmutation while a stream is running does not retroactively change the running stream’s behaviour — that is, the per-lease config is value-typed and not a back-reference. - Stream
Lease - System
Clock
Enums§
- Acquire
Error - Failure modes of
StreamCapacityRegistry::try_acquire. Each variant carries the cap that fired and the live counter value at refusal time so clients can back off intelligently (the HTTP layer surfaces these inside the structured 429 body). - Close
Reason - Issue #767 / S8 — wire-visible reason for
stream.closed. The audit emit site decides the variant once the stream’s exit shape has been observed. - Cursor
Reject - Why a resume token was refused. The handler maps every variant to a
structured wire error.
CursorReject::NotFounddeliberately covers both “never issued” and “owned by a different tenant/principal” so an unauthorized caller cannot tell a foreign cursor from a nonexistent one. - Flush
Reason - Lease
Lookup - Open
Stream Error
Constants§
- CURSOR_
TOKEN_ BYTES - Issue #807 / PRD #750 — opaque cursor token. 192 bits drawn from the
OS CSPRNG, hex-encoded so it survives JSON transport. The token is the
only cursor identifier the wire sees; the pinned snapshot, scope, and
query stay server-side in the
CursorRegistry. Opacity property: a client cannot derive the pinned snapshot LSN or another principal’s token from the value, and tokens are not sequential across opens. - LEASE_
HANDLE_ BYTES - Issue #767 / S8 — wire-visible lease handle, 128 bits drawn from the OS CSPRNG. Hex-encoded so it survives JSON / header transport. Opacity property: a client cannot derive the internal lease id from the handle (the two are independent), and the handle is not sequential across opens.
- PAGE_
SIZE - Engine page size — the production buffer is always a multiple of this.
Matches
storage::engine::PAGE_SIZE.
Traits§
- Clock
- Injectable time source. Production code uses
SystemClock; tests drive TTL expiry withFakeClockso they don’t depend on wall time.
Functions§
- assess_
resumability - Resumability assessment of a query plan. The shim slice runs a
textual classifier over the SQL string: a query is resumable iff
it has a stable total order over a unique key. By default we
promise RID ASC; an explicit
ORDER BY rid(orORDER BY rid ASC) is also resumable. Anything that aggregates / groups / windows or orders on a non-unique column is not. - audit_
stream_ capacity_ refused - Capacity refusal emits a
stream.closedevent with no lease handle (the open never produced one) andreason: capacity_refused. The brief lists capacity_refused alongside the other close reasons; we keep the wire shape identical so downstream audit-log consumers don’t have to special-case it. - audit_
stream_ closed - audit_
stream_ opened - Issue #767 / S8 — audit emission helpers. Each helper builds an
AuditEventshaped to the brief and forwards it to the runtime’s audit log. The helpers are intentionally side-effect-only (no return); audit emission must never terminate a stream that would otherwise succeed. - audit_
token_ expired_ during_ lease - generate_
cursor_ token - Generate an opaque 192-bit cursor token (48-char hex). Falls back to a
high-entropy time/counter mix if the CSPRNG is unavailable so a fresh
open never fails to mint a token (the fallback is not unguessable, but
production paths reach
/dev/urandomsuccessfully). - generate_
lease_ handle - Generate an opaque 128-bit lease handle (32-char hex). Per ADR 0029,
the handle is the only identifier the wire sees; the internal
StreamLease::idstays server-side. - open_
stream - Issue a lease. The caller is responsible for binding
snapshot_lsnto the same MVCC view the underlying executor will read from; in the shim slice this isruntime.cdc_current_lsn()captured beforeexecute_query. - parse_
jwt_ exp_ ms - Issue #767 / S8 — non-validating JWT
expclaim extractor. Returns the expiry time in milliseconds since the epoch whentokenis a JWT carrying an integerexpclaim, otherwiseNone. - system_
clock - Borrow the lease registry’s clock by default — the routing handler uses this, tests inject their own.
- write_
chunk - Emit one HTTP chunk (
<hex-size>\r\n<bytes>\r\n). A zero-length payload is silently dropped — the terminator chunk lives inwrite_chunked_terminator. - write_
chunked_ response_ header - HTTP chunked transfer encoding helpers. We do not pull in
hyperfor the streaming path; the existing SSE handler also hand-rolls its own HTTP framing, and matching that style keeps the diff narrow. - write_
chunked_ terminator - Final
0\r\n\r\nchunk that terminates a chunked body.