Expand description
RedWire input-stream dispatch (issue #764, PRD #759 S5).
Brings the S4 HTTP NDJSON input-stream behaviour
([crate::server::handlers_query::handle_query_ndjson_input_stream])
to the RedWire protocol, reusing the S3 envelope vocabulary:
OpenStream(client→server) — carriesdirection: "in"plus atargettable andcolumns. The output-stream variant (direction: "out", the default) keeps usingsqland is handled bysuper::output_stream; the two never collide because the dispatch loop branches ondirectionfirst.OpenAck(server→client) — input stream accepted; carries the lease handle + snapshot LSN, identical to the output ack.StreamChunk(client→server) — one chunk of rows. Each chunk is committed atomically (one multi-rowINSERT) before the next frame is read, so rows from chunk K are durable and visible before chunk K+1 arrives (auto-commit per chunk). A chunk withterminal: truecloses the input phase.StreamEnd(server→client) — success terminal carrying the committed RID range (snapshot_lsn..committed_rid) and stats (row_count,chunk_count).StreamError(server→client) — a chunk failed to commit. Rows from earlier chunks remain durable; the error carriesrecoverable_rid(the CDC LSN at the last good commit) and the failingchunk_seq. No further frames are emitted for thestream_id(AC #3).StreamCancel(client→server) — discard the in-flight (not yet committed) chunk; prior committed chunks stay durable (AC #4).
Input streams are driven inline from the per-connection reader
loop (each StreamChunk commits synchronously) and tracked in an
InputStreamRegistry keyed by stream_id, kept separate from
the spawned-worker super::output_stream::StreamRegistry. Both
kinds of stream therefore coexist on one connection, dispatched by
stream_id (AC #2).
Structs§
- Input
Chunk - Parsed
StreamChunkpayload sent by an input-stream client. Shape mirrors the output-stream chunk ({"seq", "rows", "terminal"}) but the rows are JSON objects keyed by column rather than already-shaped output rows. - Input
Stream Registry - Per-connection registry of in-flight input streams. Keyed by
stream_id, separate from the output-stream worker registry so an input and an output stream may share one connection without colliding (AC #2). - Input
Stream State - Per-stream state for an in-flight input stream. Lives in the
session loop’s
InputStreamRegistryand is mutated synchronously as eachStreamChunkis committed. - Open
Input Request - Parsed
OpenStream {direction:"in"}payload. Shape:
Enums§
Functions§
- build_
input_ stream_ end_ frame - Build the input-stream
StreamEndframe. - build_
input_ stream_ end_ payload - Build the success terminal
StreamEndpayload for an input stream. Carries the committed RID range (snapshot_lsn..committed_rid) and ingest stats. - build_
input_ stream_ error_ frame - Build an input-stream
StreamErrorframe addressed tostream_id, echoingcorrelation_idso the client can pair it to the request. - build_
input_ stream_ error_ payload - Build the input-stream
StreamErrorpayload. Unlike the output variant it carries therecoverable_ridprefix (the CDC LSN of the last good commit) and the failingchunk_seq. - open_
input_ lease - Open an input-stream lease, reusing the output-stream lease primitive so HTTP, output, and input streams agree on TTL and the in-transaction refusal (AC mirrors S4 #4).
- open_
stream_ is_ input truewhen anOpenStreampayload requests the input direction ({"direction":"in", ...}). Any other value — including a missing field or a malformed payload — is treated as the output direction so the existing S3 path keeps owning the default.- parse_
input_ chunk - parse_
open_ input