Skip to main content

Module input_stream

Module input_stream 

Source
Expand description

RedWire input-stream dispatch (issue #764, PRD #759 S5).

Brings the S4 HTTP NDJSON input-stream behaviour ([crate::server::handlers_query::handle_query_ndjson_input_stream]) to the RedWire protocol, reusing the S3 envelope vocabulary:

  • OpenStream (client→server) — carries direction: "in" plus a target table and columns. The output-stream variant (direction: "out", the default) keeps using sql and is handled by super::output_stream; the two never collide because the dispatch loop branches on direction first.
  • OpenAck (server→client) — input stream accepted; carries the lease handle + snapshot LSN, identical to the output ack.
  • StreamChunk(client→server) — one chunk of rows. Each chunk is committed atomically (one multi-row INSERT) before the next frame is read, so rows from chunk K are durable and visible before chunk K+1 arrives (auto-commit per chunk). A chunk with terminal: true closes the input phase.
  • StreamEnd (server→client) — success terminal carrying the committed RID range (snapshot_lsn .. committed_rid) and stats (row_count, chunk_count).
  • StreamError(server→client) — a chunk failed to commit. Rows from earlier chunks remain durable; the error carries recoverable_rid (the CDC LSN at the last good commit) and the failing chunk_seq. No further frames are emitted for the stream_id (AC #3).
  • StreamCancel(client→server) — discard the in-flight (not yet committed) chunk; prior committed chunks stay durable (AC #4).

Input streams are driven inline from the per-connection reader loop (each StreamChunk commits synchronously) and tracked in an InputStreamRegistry keyed by stream_id, kept separate from the spawned-worker super::output_stream::StreamRegistry. Both kinds of stream therefore coexist on one connection, dispatched by stream_id (AC #2).

Structs§

InputChunk
Parsed StreamChunk payload sent by an input-stream client. Shape mirrors the output-stream chunk ({"seq", "rows", "terminal"}) but the rows are JSON objects keyed by column rather than already-shaped output rows.
InputStreamRegistry
Per-connection registry of in-flight input streams. Keyed by stream_id, separate from the output-stream worker registry so an input and an output stream may share one connection without colliding (AC #2).
InputStreamState
Per-stream state for an in-flight input stream. Lives in the session loop’s InputStreamRegistry and is mutated synchronously as each StreamChunk is committed.
OpenInputRequest
Parsed OpenStream {direction:"in"} payload. Shape:

Enums§

ChunkParseError
OpenInputParseError

Functions§

build_input_stream_end_frame
Build the input-stream StreamEnd frame.
build_input_stream_end_payload
Build the success terminal StreamEnd payload for an input stream. Carries the committed RID range (snapshot_lsn .. committed_rid) and ingest stats.
build_input_stream_error_frame
Build an input-stream StreamError frame addressed to stream_id, echoing correlation_id so the client can pair it to the request.
build_input_stream_error_payload
Build the input-stream StreamError payload. Unlike the output variant it carries the recoverable_rid prefix (the CDC LSN of the last good commit) and the failing chunk_seq.
open_input_lease
Open an input-stream lease, reusing the output-stream lease primitive so HTTP, output, and input streams agree on TTL and the in-transaction refusal (AC mirrors S4 #4).
open_stream_is_input
true when an OpenStream payload requests the input direction ({"direction":"in", ...}). Any other value — including a missing field or a malformed payload — is treated as the output direction so the existing S3 path keeps owning the default.
parse_input_chunk
parse_open_input