Skip to main content

reddb_server/wire/redwire/
input_stream.rs

1//! RedWire input-stream dispatch (issue #764, PRD #759 S5).
2//!
3//! Brings the S4 HTTP NDJSON input-stream behaviour
4//! ([`crate::server::handlers_query::handle_query_ndjson_input_stream`])
5//! to the RedWire protocol, reusing the S3 envelope vocabulary:
6//!
7//!   - `OpenStream` (client→server) — carries `direction: "in"` plus a
8//!     `target` table and `columns`. The output-stream variant
9//!     (`direction: "out"`, the default) keeps using `sql` and is
10//!     handled by [`super::output_stream`]; the two never collide
11//!     because the dispatch loop branches on `direction` first.
12//!   - `OpenAck`    (server→client) — input stream accepted; carries
13//!     the lease handle + snapshot LSN, identical to the output ack.
14//!   - `StreamChunk`(client→server) — one chunk of rows. Each chunk
15//!     is committed atomically (one multi-row `INSERT`) before the
16//!     next frame is read, so rows from chunk K are durable and
17//!     visible before chunk K+1 arrives (auto-commit per chunk). A
18//!     chunk with `terminal: true` closes the input phase.
19//!   - `StreamEnd`  (server→client) — success terminal carrying the
20//!     committed RID range (`snapshot_lsn` .. `committed_rid`) and
21//!     stats (`row_count`, `chunk_count`).
22//!   - `StreamError`(server→client) — a chunk failed to commit. Rows
23//!     from earlier chunks remain durable; the error carries
24//!     `recoverable_rid` (the CDC LSN at the last good commit) and
25//!     the failing `chunk_seq`. No further frames are emitted for the
26//!     `stream_id` (AC #3).
27//!   - `StreamCancel`(client→server) — discard the in-flight (not yet
28//!     committed) chunk; prior committed chunks stay durable (AC #4).
29//!
30//! Input streams are driven *inline* from the per-connection reader
31//! loop (each `StreamChunk` commits synchronously) and tracked in an
32//! [`InputStreamRegistry`] keyed by `stream_id`, kept separate from
33//! the spawned-worker [`super::output_stream::StreamRegistry`]. Both
34//! kinds of stream therefore coexist on one connection, dispatched by
35//! `stream_id` (AC #2).
36
37use std::collections::HashMap;
38
39use crate::runtime::RedDBRuntime;
40use crate::serde_json::{self, Value as JsonValue};
41pub use reddb_wire::redwire::stream::{ChunkParseError, OpenInputParseError, OpenInputRequest};
42use reddb_wire::redwire::Frame;
43
44use super::output_stream::RegisterError;
45use crate::server::output_stream::{Clock, OpenStreamError, StreamConfig, StreamLease};
46
47/// `true` when an `OpenStream` payload requests the input direction
48/// (`{"direction":"in", ...}`). Any other value — including a missing
49/// field or a malformed payload — is treated as the output direction
50/// so the existing S3 path keeps owning the default.
51pub fn open_stream_is_input(payload: &[u8]) -> bool {
52    reddb_wire::redwire::stream::open_stream_is_input(payload)
53}
54
55pub fn parse_open_input(payload: &[u8]) -> Result<OpenInputRequest, OpenInputParseError> {
56    reddb_wire::redwire::stream::parse_open_input(payload)
57}
58
59/// Runtime-local projection of the `reddb-wire` RedWire `StreamChunk`
60/// payload into the server's internal JSON value type.
61// No `Eq`: `serde_json::Value` rows may carry floats, which are only
62// `PartialEq`.
63#[derive(Debug, Clone, PartialEq)]
64pub(super) struct RuntimeInputChunk {
65    pub seq: u64,
66    pub rows: Vec<JsonValue>,
67    pub terminal: bool,
68}
69
70pub(super) fn parse_input_chunk(payload: &[u8]) -> Result<RuntimeInputChunk, ChunkParseError> {
71    let chunk = reddb_wire::redwire::stream::parse_input_chunk_json(payload)?;
72    let rows = chunk
73        .rows_json
74        .iter()
75        .map(|row| serde_json::from_slice(row).unwrap_or(JsonValue::Null))
76        .collect();
77    Ok(RuntimeInputChunk {
78        seq: chunk.seq,
79        rows,
80        terminal: chunk.terminal,
81    })
82}
83
84/// Per-stream state for an in-flight input stream. Lives in the
85/// session loop's [`InputStreamRegistry`] and is mutated synchronously
86/// as each `StreamChunk` is committed.
87#[derive(Debug)]
88pub struct InputStreamState {
89    pub lease: StreamLease,
90    pub target: String,
91    pub columns: Vec<String>,
92    /// CDC LSN at the last successful per-chunk commit; the start of
93    /// the committed RID range is the lease's `snapshot_lsn`.
94    pub committed_rid: u64,
95    pub row_count: u64,
96    pub chunk_count: u64,
97    pub snapshot_lsn: u64,
98}
99
100impl InputStreamState {
101    pub fn new(lease: StreamLease, target: String, columns: Vec<String>) -> Self {
102        let snapshot_lsn = lease.snapshot_lsn;
103        Self {
104            lease,
105            target,
106            columns,
107            committed_rid: snapshot_lsn,
108            row_count: 0,
109            chunk_count: 0,
110            snapshot_lsn,
111        }
112    }
113
114    /// Commit one chunk of rows as a single atomic multi-row `INSERT`.
115    /// On success the rows are durable and `committed_rid` advances to
116    /// the post-commit CDC LSN. On failure nothing in this chunk
117    /// commits — `committed_rid` (and therefore `recoverable_rid`)
118    /// stays at the last good commit, so chunks `1..N-1` remain
119    /// durable (AC #3).
120    pub fn commit_chunk(
121        &mut self,
122        runtime: &RedDBRuntime,
123        rows: &[JsonValue],
124    ) -> Result<(), (String, String)> {
125        if rows.is_empty() {
126            return Ok(());
127        }
128        // Project each row object onto the declared columns (missing
129        // keys become NULL), matching the S4 `parse_row_frame` shape.
130        let mut positional: Vec<Vec<JsonValue>> = Vec::with_capacity(rows.len());
131        for row in rows {
132            let obj = row.as_object().ok_or_else(|| {
133                (
134                    "invalid_row".to_string(),
135                    "row must be a JSON object".to_string(),
136                )
137            })?;
138            let mut values = Vec::with_capacity(self.columns.len());
139            for col in &self.columns {
140                values.push(obj.get(col).cloned().unwrap_or(JsonValue::Null));
141            }
142            positional.push(values);
143        }
144        let sql = crate::server::handlers_query::build_insert_sql(
145            &self.target,
146            &self.columns,
147            &positional,
148        )
149        .map_err(|message| ("invalid_row".to_string(), message))?;
150        match runtime.execute_query(&sql) {
151            Ok(_) => {
152                self.row_count += rows.len() as u64;
153                self.committed_rid = runtime.cdc_current_lsn();
154                self.chunk_count += 1;
155                Ok(())
156            }
157            Err(err) => Err(("chunk_commit_failed".to_string(), err.to_string())),
158        }
159    }
160}
161
162/// Per-connection registry of in-flight input streams. Keyed by
163/// `stream_id`, separate from the output-stream worker registry so an
164/// input and an output stream may share one connection without
165/// colliding (AC #2).
166#[derive(Default)]
167pub struct InputStreamRegistry {
168    inner: HashMap<u16, InputStreamState>,
169}
170
171impl InputStreamRegistry {
172    pub fn new() -> Self {
173        Self::default()
174    }
175
176    /// Register a freshly-opened input stream. Mirrors the output
177    /// registry's reserved-id / duplicate guards and reuses its
178    /// [`RegisterError`] codes so clients see one taxonomy.
179    pub fn register(
180        &mut self,
181        stream_id: u16,
182        state: InputStreamState,
183    ) -> Result<(), RegisterError> {
184        if stream_id == 0 {
185            return Err(RegisterError::ReservedStreamId);
186        }
187        if self.inner.contains_key(&stream_id) {
188            return Err(RegisterError::StreamInUse);
189        }
190        self.inner.insert(stream_id, state);
191        Ok(())
192    }
193
194    pub fn get_mut(&mut self, stream_id: u16) -> Option<&mut InputStreamState> {
195        self.inner.get_mut(&stream_id)
196    }
197
198    pub fn contains(&self, stream_id: u16) -> bool {
199        self.inner.contains_key(&stream_id)
200    }
201
202    /// Drop the stream from the registry, returning its state so the
203    /// caller can read final stats for a terminal frame. Idempotent —
204    /// a second remove returns `None`.
205    pub fn remove(&mut self, stream_id: u16) -> Option<InputStreamState> {
206        self.inner.remove(&stream_id)
207    }
208
209    pub fn active_count(&self) -> usize {
210        self.inner.len()
211    }
212}
213
214/// Build an input-stream `StreamError` frame addressed to `stream_id`,
215/// echoing `correlation_id` so the client can pair it to the request.
216pub fn build_input_stream_error_frame(
217    correlation_id: u64,
218    stream_id: u16,
219    code: &str,
220    message: &str,
221    chunk_seq: u64,
222    recoverable_rid: u64,
223) -> std::io::Result<Frame> {
224    reddb_wire::redwire::stream::build_input_stream_error_frame(
225        correlation_id,
226        stream_id,
227        code,
228        message,
229        chunk_seq,
230        recoverable_rid,
231    )
232    .map_err(|e| std::io::Error::other(format!("build input StreamError: {e}")))
233}
234
235/// Build the input-stream `StreamEnd` frame.
236pub fn build_input_stream_end_frame(
237    correlation_id: u64,
238    stream_id: u16,
239    row_count: u64,
240    chunk_count: u64,
241    committed_rid: u64,
242    snapshot_lsn: u64,
243    cancelled: bool,
244) -> std::io::Result<Frame> {
245    reddb_wire::redwire::stream::build_input_stream_end_frame(
246        correlation_id,
247        stream_id,
248        row_count,
249        chunk_count,
250        committed_rid,
251        snapshot_lsn,
252        cancelled,
253    )
254    .map_err(|e| std::io::Error::other(format!("build input StreamEnd: {e}")))
255}
256
257/// Open an input-stream lease, reusing the output-stream lease
258/// primitive so HTTP, output, and input streams agree on TTL and the
259/// in-transaction refusal (AC mirrors S4 #4).
260pub fn open_input_lease(
261    config: StreamConfig,
262    snapshot_lsn: u64,
263    in_transaction: bool,
264    clock: &dyn Clock,
265) -> Result<StreamLease, OpenStreamError> {
266    crate::server::output_stream::open_stream(config, snapshot_lsn, in_transaction, clock)
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[test]
274    fn detects_input_direction() {
275        assert!(open_stream_is_input(
276            br#"{"direction":"in","target":"t","columns":["a"]}"#
277        ));
278        assert!(open_stream_is_input(br#"{"direction":"IN"}"#));
279        // Default / output direction.
280        assert!(!open_stream_is_input(br#"{"sql":"SELECT 1"}"#));
281        assert!(!open_stream_is_input(br#"{"direction":"out"}"#));
282        assert!(!open_stream_is_input(b"not json"));
283    }
284
285    #[test]
286    fn parse_open_input_accepts_target_and_columns() {
287        let req =
288            parse_open_input(br#"{"direction":"in","target":"events","columns":["id","name"]}"#)
289                .unwrap();
290        assert_eq!(req.target, "events");
291        assert_eq!(req.columns, vec!["id".to_string(), "name".to_string()]);
292    }
293
294    #[test]
295    fn parse_open_input_rejects_missing_target() {
296        assert!(matches!(
297            parse_open_input(br#"{"direction":"in","columns":["a"]}"#),
298            Err(OpenInputParseError::MissingTarget)
299        ));
300    }
301
302    #[test]
303    fn parse_open_input_rejects_unsafe_target() {
304        assert!(matches!(
305            parse_open_input(br#"{"direction":"in","target":"t;DROP","columns":["a"]}"#),
306            Err(OpenInputParseError::UnsafeTarget)
307        ));
308    }
309
310    #[test]
311    fn parse_open_input_rejects_empty_or_missing_columns() {
312        assert!(matches!(
313            parse_open_input(br#"{"direction":"in","target":"t","columns":[]}"#),
314            Err(OpenInputParseError::EmptyColumns)
315        ));
316        assert!(matches!(
317            parse_open_input(br#"{"direction":"in","target":"t"}"#),
318            Err(OpenInputParseError::MissingColumns)
319        ));
320    }
321
322    #[test]
323    fn parse_open_input_rejects_unsafe_column() {
324        assert!(matches!(
325            parse_open_input(br#"{"direction":"in","target":"t","columns":["ok","b ad"]}"#),
326            Err(OpenInputParseError::UnsafeColumn)
327        ));
328    }
329
330    #[test]
331    fn parse_chunk_extracts_rows_seq_terminal() {
332        let chunk =
333            parse_input_chunk(br#"{"seq":3,"rows":[{"id":1},{"id":2}],"terminal":true}"#).unwrap();
334        assert_eq!(chunk.seq, 3);
335        assert_eq!(chunk.rows.len(), 2);
336        assert!(chunk.terminal);
337    }
338
339    #[test]
340    fn parse_chunk_allows_bare_terminal() {
341        let chunk = parse_input_chunk(br#"{"terminal":true}"#).unwrap();
342        assert!(chunk.rows.is_empty());
343        assert!(chunk.terminal);
344        assert_eq!(chunk.seq, 0);
345    }
346
347    #[test]
348    fn parse_chunk_rejects_non_array_rows() {
349        assert!(matches!(
350            parse_input_chunk(br#"{"rows":5}"#),
351            Err(ChunkParseError::RowsNotArray)
352        ));
353    }
354
355    #[test]
356    fn registry_register_rejects_reserved_and_duplicate() {
357        let mut reg = InputStreamRegistry::new();
358        let lease = StreamLease {
359            id: 1,
360            lease_handle: "h".to_string(),
361            snapshot_lsn: 10,
362            opened_at_ms: 0,
363            config: StreamConfig::default(),
364        };
365        assert!(matches!(
366            reg.register(
367                0,
368                InputStreamState::new(
369                    StreamLease {
370                        id: 2,
371                        lease_handle: "h2".to_string(),
372                        snapshot_lsn: 10,
373                        opened_at_ms: 0,
374                        config: StreamConfig::default(),
375                    },
376                    "t".to_string(),
377                    vec!["a".to_string()],
378                )
379            ),
380            Err(RegisterError::ReservedStreamId)
381        ));
382        reg.register(
383            5,
384            InputStreamState::new(lease, "t".to_string(), vec!["a".to_string()]),
385        )
386        .unwrap();
387        assert!(reg.contains(5));
388        assert!(matches!(
389            reg.register(
390                5,
391                InputStreamState::new(
392                    StreamLease {
393                        id: 3,
394                        lease_handle: "h3".to_string(),
395                        snapshot_lsn: 10,
396                        opened_at_ms: 0,
397                        config: StreamConfig::default(),
398                    },
399                    "t".to_string(),
400                    vec!["a".to_string()],
401                )
402            ),
403            Err(RegisterError::StreamInUse)
404        ));
405        assert_eq!(reg.active_count(), 1);
406        assert!(reg.remove(5).is_some());
407        assert!(reg.remove(5).is_none());
408    }
409}