Skip to main content

reddb_server/wire/redwire/
output_stream.rs

1//! RedWire output-stream dispatch (issue #762, PRD #759 S3).
2//!
3//! Carries the wire-side lifecycle envelopes for an output stream:
4//!   - `OpenStream`  (client→server)  — request to start streaming
5//!     a `SELECT`'s rows back over a multiplexed `stream_id`.
6//!   - `OpenAck`     (server→client)  — server accepted; carries
7//!     the lease handle + the snapshot LSN the stream pinned.
8//!   - `StreamChunk` (server→client)  — one or more rows as JSON.
9//!     Multiple chunks per stream; `terminal: true` may be set on
10//!     the last one when the producer wishes to coalesce with
11//!     `StreamEnd`. The standalone `StreamEnd` envelope is the
12//!     canonical close-of-stream marker.
13//!   - `StreamError` (server→client)  — protocol violation or
14//!     execution error for a specific `stream_id`. Non-fatal at
15//!     the connection level (AC #6: server must not crash).
16//!   - `StreamEnd`   (server→client)  — close-of-stream marker
17//!     carrying summary stats (row_count, lease_id, snapshot_lsn).
18//!   - `StreamCancel`(client→server)  — client asks to terminate
19//!     a specific stream; other streams on the connection are
20//!     unaffected (AC #3).
21//!
22//! Reuses [`crate::server::output_stream`] for the lease + config
23//! primitives (S1 / issue #760) so HTTP and RedWire agree on TTL
24//! and chunk semantics.
25
26use std::collections::HashMap;
27use std::sync::Arc;
28
29use tokio::sync::{oneshot, Mutex};
30
31use crate::runtime::RedDBRuntime;
32use crate::serde_json::Value as JsonValue;
33use crate::server::output_stream::{
34    self as outs, Clock, OpenStreamError, StreamConfig, SystemClock,
35};
36pub use reddb_wire::redwire::stream::{
37    OpenStreamParseError, OpenStreamRequest, StreamCancelRequest,
38};
39use reddb_wire::redwire::{encode_frame, Frame};
40
41pub fn parse_open_stream(payload: &[u8]) -> Result<OpenStreamRequest, OpenStreamParseError> {
42    reddb_wire::redwire::stream::parse_open_stream(payload)
43}
44
45pub fn parse_stream_cancel(payload: &[u8]) -> StreamCancelRequest {
46    reddb_wire::redwire::stream::parse_stream_cancel(payload)
47}
48
49pub fn build_open_ack_frame(
50    correlation_id: u64,
51    stream_id: u16,
52    lease_id: u64,
53    snapshot_lsn: u64,
54    resumable: bool,
55) -> Result<Frame, reddb_wire::BuildError> {
56    reddb_wire::redwire::stream::build_open_ack_frame(
57        correlation_id,
58        stream_id,
59        lease_id,
60        snapshot_lsn,
61        resumable,
62    )
63}
64
65/// Per-connection registry of in-flight output streams. Keyed by
66/// `stream_id` — the wire-spec multiplex tag — so a `StreamCancel`
67/// can target one stream without disturbing the rest of the
68/// connection (AC #3).
69#[derive(Default)]
70pub struct StreamRegistry {
71    inner: Mutex<HashMap<u16, oneshot::Sender<()>>>,
72}
73
74impl StreamRegistry {
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    /// Register a new stream. Returns the receiver half the worker
80    /// task selects on for cancellation, or `Err(InUse)` if the
81    /// `stream_id` is already active on this connection.
82    pub async fn register(&self, stream_id: u16) -> Result<oneshot::Receiver<()>, RegisterError> {
83        if stream_id == 0 {
84            return Err(RegisterError::ReservedStreamId);
85        }
86        let mut guard = self.inner.lock().await;
87        if guard.contains_key(&stream_id) {
88            return Err(RegisterError::StreamInUse);
89        }
90        let (tx, rx) = oneshot::channel();
91        guard.insert(stream_id, tx);
92        Ok(rx)
93    }
94
95    /// Signal the named stream to cancel. Returns `false` if the
96    /// `stream_id` is unknown — caller should emit `StreamError`
97    /// with `unknown_stream`.
98    pub async fn cancel(&self, stream_id: u16) -> bool {
99        let mut guard = self.inner.lock().await;
100        match guard.remove(&stream_id) {
101            Some(tx) => {
102                let _ = tx.send(());
103                true
104            }
105            None => false,
106        }
107    }
108
109    /// Remove the stream from the registry once the worker task
110    /// has finished (normally or via cancel). Idempotent.
111    pub async fn unregister(&self, stream_id: u16) {
112        let mut guard = self.inner.lock().await;
113        guard.remove(&stream_id);
114    }
115
116    pub async fn active_count(&self) -> usize {
117        self.inner.lock().await.len()
118    }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum RegisterError {
123    ReservedStreamId,
124    StreamInUse,
125}
126
127impl RegisterError {
128    pub fn code(&self) -> &'static str {
129        match self {
130            Self::ReservedStreamId => "open_stream_reserved_id",
131            Self::StreamInUse => "open_stream_id_in_use",
132        }
133    }
134    pub fn message(&self) -> &'static str {
135        match self {
136            Self::ReservedStreamId => {
137                "OpenStream cannot use stream_id 0 (reserved for unsolicited)"
138            }
139            Self::StreamInUse => "OpenStream stream_id already has an active stream",
140        }
141    }
142}
143
144/// Build a stand-alone `StreamError` frame addressed to a given
145/// `stream_id`. The correlation id echoes the request frame so a
146/// client can pair the error with the offending request.
147pub fn build_stream_error_frame(
148    correlation_id: u64,
149    stream_id: u16,
150    code: &str,
151    message: &str,
152) -> std::io::Result<Frame> {
153    reddb_wire::redwire::stream::build_stream_error_frame(
154        correlation_id,
155        stream_id,
156        None,
157        code,
158        message,
159    )
160    .map_err(|e| std::io::Error::other(format!("build StreamError: {e}")))
161}
162
163/// Run an output stream end-to-end. Emits OpenAck → StreamChunk*
164/// → StreamEnd via the supplied `send` closure, observing the
165/// `cancel_rx` between rows to honour `StreamCancel` (AC #3).
166///
167/// The function materialises `execute_query`'s result first
168/// (matching the S1 HTTP behaviour — pull-based scan executors
169/// are PRD #759 phase 3) and then dribbles rows out as
170/// `StreamChunk` envelopes via the same byte/row/latency
171/// page-aligned producer the HTTP path uses.
172pub async fn run_output_stream(
173    runtime: Arc<RedDBRuntime>,
174    correlation_id: u64,
175    stream_id: u16,
176    request: OpenStreamRequest,
177    in_transaction: bool,
178    mut cancel_rx: oneshot::Receiver<()>,
179    send: FrameTx,
180) {
181    let clock = SystemClock;
182    let config = StreamConfig::load(&runtime);
183    let snapshot_lsn = runtime.cdc_current_lsn();
184
185    let lease = match outs::open_stream(config, snapshot_lsn, in_transaction, &clock) {
186        Ok(l) => l,
187        Err(OpenStreamError::TransactionActive) => {
188            let err = OpenStreamError::TransactionActive;
189            let frame = match build_stream_error_frame(
190                correlation_id,
191                stream_id,
192                err.code(),
193                err.message(),
194            ) {
195                Ok(f) => f,
196                Err(_) => return,
197            };
198            send.send_frame(frame);
199            return;
200        }
201    };
202
203    // OpenAck — always first.
204    let ack = match reddb_wire::redwire::stream::build_open_ack_frame(
205        correlation_id,
206        stream_id,
207        lease.id,
208        lease.snapshot_lsn,
209        false,
210    ) {
211        Ok(f) => f,
212        Err(_) => return,
213    };
214    send.send_frame(ack);
215
216    // Materialise.
217    let result = runtime.execute_query(&request.sql);
218
219    // Stream rows out as StreamChunk envelopes.
220    let mut seq: u64 = 0;
221    let mut row_count: u64 = 0;
222    let mut cancelled = false;
223    let mut had_error: Option<(String, String)> = None;
224
225    match result {
226        Ok(qr) => {
227            let columns = qr.result.columns.clone();
228            let rows: Vec<JsonValue> = qr
229                .result
230                .records
231                .iter()
232                .map(|r| crate::presentation::query_result_json::unified_record_json(r, &columns))
233                .collect();
234
235            // One `StreamChunk` envelope per row. The page-aligned
236            // batcher used by the HTTP NDJSON path (S1) is byte-
237            // oriented; on the wire path each row already ships as
238            // its own framed envelope, so TCP / framing already
239            // handles the batching for us. Keeping one row per
240            // envelope keeps `StreamCancel` latency bounded to
241            // "between two adjacent rows".
242            for row in rows {
243                // Check cancel between rows (AC #3).
244                if let Ok(()) = cancel_rx.try_recv() {
245                    cancelled = true;
246                    break;
247                }
248                if lease.snapshot_expired(clock.now_ms()) {
249                    had_error = Some((
250                        "snapshot_expired".to_string(),
251                        "stream snapshot pin TTL elapsed".to_string(),
252                    ));
253                    break;
254                }
255                let row_bytes = row.to_string_compact().into_bytes();
256                let frame =
257                    match reddb_wire::redwire::stream::build_stream_chunk_frame_from_json_bytes(
258                        correlation_id,
259                        stream_id,
260                        seq,
261                        vec![row_bytes],
262                        false,
263                    ) {
264                        Ok(f) => f,
265                        Err(_) => break,
266                    };
267                send.send_frame(frame);
268                seq += 1;
269                row_count += 1;
270            }
271            // `config` is kept observed even when the batcher is
272            // bypassed so the frozen-config invariant from S1 still
273            // applies (no mid-stream KV-driven behaviour change).
274            let _ = config;
275        }
276        Err(err) => {
277            had_error = Some(("query_failed".to_string(), err.to_string()));
278        }
279    }
280
281    if let Some((code, message)) = had_error {
282        if let Ok(frame) = reddb_wire::redwire::stream::build_stream_error_frame(
283            correlation_id,
284            stream_id,
285            Some(seq),
286            &code,
287            &message,
288        ) {
289            send.send_frame(frame);
290        }
291    }
292
293    // StreamEnd is always emitted — including after error or
294    // cancel — so the client can drop bookkeeping on `StreamEnd`
295    // rather than on connection EOF.
296    if let Ok(frame) = reddb_wire::redwire::stream::build_stream_end_frame(
297        correlation_id,
298        stream_id,
299        row_count,
300        lease.id,
301        lease.snapshot_lsn,
302        cancelled,
303    ) {
304        send.send_frame(frame);
305    }
306}
307
308/// Encoded-frame transmit handle handed to stream workers. The
309/// session loop owns the matching receiver and drains it into the
310/// socket's write half — so multiple concurrent workers can
311/// interleave their output without contending on a writer mutex
312/// (AC #2 — interleaved chunks for two streams on one connection).
313#[derive(Clone)]
314pub struct FrameTx {
315    tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
316}
317
318impl FrameTx {
319    pub fn new(tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>) -> Self {
320        Self { tx }
321    }
322
323    /// Encode + enqueue. Drops silently if the receiver has been
324    /// dropped (connection torn down); the worker's next iteration
325    /// will hit cancellation / EOF and exit naturally.
326    pub fn send_frame(&self, frame: Frame) {
327        let bytes = encode_frame(&frame);
328        let _ = self.tx.send(bytes);
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use reddb_wire::redwire::MessageKind;
336
337    #[test]
338    fn parse_open_stream_accepts_minimal_payload() {
339        let req = parse_open_stream(br#"{"sql":"SELECT 1"}"#).unwrap();
340        assert_eq!(req.sql, "SELECT 1");
341        assert!(req.opts_raw.is_empty());
342    }
343
344    #[test]
345    fn parse_open_stream_captures_opts_opaque() {
346        let req =
347            parse_open_stream(br#"{"sql":"SELECT 1","opts":{"resume_after_rid":42}}"#).unwrap();
348        assert_eq!(req.sql, "SELECT 1");
349        assert!(!req.opts_raw.is_empty());
350    }
351
352    #[test]
353    fn parse_open_stream_rejects_non_object() {
354        assert!(matches!(
355            parse_open_stream(b"\"sql\""),
356            Err(OpenStreamParseError::NotObject)
357        ));
358    }
359
360    #[test]
361    fn parse_open_stream_rejects_missing_sql() {
362        assert!(matches!(
363            parse_open_stream(b"{}"),
364            Err(OpenStreamParseError::MissingSql)
365        ));
366    }
367
368    #[test]
369    fn parse_open_stream_rejects_empty_sql() {
370        assert!(matches!(
371            parse_open_stream(br#"{"sql":""}"#),
372            Err(OpenStreamParseError::EmptySql)
373        ));
374    }
375
376    #[test]
377    fn parse_open_stream_rejects_invalid_json() {
378        assert!(matches!(
379            parse_open_stream(b"{not json"),
380            Err(OpenStreamParseError::NotJson)
381        ));
382    }
383
384    #[test]
385    fn parse_stream_cancel_with_reason() {
386        let r = parse_stream_cancel(br#"{"reason":"client-abort"}"#);
387        assert_eq!(r.reason.as_deref(), Some("client-abort"));
388    }
389
390    #[test]
391    fn parse_stream_cancel_empty_payload_is_default() {
392        assert_eq!(parse_stream_cancel(b""), StreamCancelRequest::default());
393        assert_eq!(parse_stream_cancel(b"{}"), StreamCancelRequest::default());
394    }
395
396    #[tokio::test]
397    async fn registry_rejects_reserved_id_and_duplicates() {
398        let r = StreamRegistry::new();
399        assert!(matches!(
400            r.register(0).await,
401            Err(RegisterError::ReservedStreamId)
402        ));
403        let _rx = r.register(1).await.unwrap();
404        assert!(matches!(
405            r.register(1).await,
406            Err(RegisterError::StreamInUse)
407        ));
408        assert_eq!(r.active_count().await, 1);
409    }
410
411    #[tokio::test]
412    async fn registry_cancel_signals_named_stream_only() {
413        // AC #3 — cancelling stream X must not disturb stream Y.
414        let r = StreamRegistry::new();
415        let rx1 = r.register(1).await.unwrap();
416        let mut rx2 = r.register(2).await.unwrap();
417        assert!(r.cancel(1).await);
418        // Stream 1's rx fires.
419        assert!(rx1.await.is_ok());
420        // Stream 2's rx remains pending (try_recv would yield Empty).
421        match rx2.try_recv() {
422            Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {}
423            other => panic!("stream 2 should not be cancelled: {other:?}"),
424        }
425        assert_eq!(r.active_count().await, 1);
426    }
427
428    #[tokio::test]
429    async fn registry_cancel_unknown_returns_false() {
430        let r = StreamRegistry::new();
431        assert!(!r.cancel(99).await);
432    }
433
434    #[tokio::test]
435    async fn registry_unregister_is_idempotent() {
436        let r = StreamRegistry::new();
437        let _rx = r.register(1).await.unwrap();
438        r.unregister(1).await;
439        r.unregister(1).await;
440        assert_eq!(r.active_count().await, 0);
441    }
442
443    #[test]
444    fn build_stream_error_frame_carries_stream_id_and_correlation() {
445        let frame = build_stream_error_frame(99, 7, "unknown_stream", "no such stream").unwrap();
446        assert_eq!(frame.kind, MessageKind::StreamError);
447        assert_eq!(frame.stream_id, 7);
448        assert_eq!(frame.correlation_id, 99);
449    }
450}