reddb_server/wire/redwire/
output_stream.rs1use std::collections::HashMap;
27use std::sync::Arc;
28
29use tokio::sync::{oneshot, Mutex};
30
31use crate::runtime::RedDBRuntime;
32use crate::serde_json::Value as JsonValue;
33use crate::server::output_stream::{
34 self as outs, Clock, OpenStreamError, StreamConfig, SystemClock,
35};
36pub use reddb_wire::redwire::stream::{
37 OpenStreamParseError, OpenStreamRequest, StreamCancelRequest,
38};
39use reddb_wire::redwire::{encode_frame, Frame};
40
41pub fn parse_open_stream(payload: &[u8]) -> Result<OpenStreamRequest, OpenStreamParseError> {
42 reddb_wire::redwire::stream::parse_open_stream(payload)
43}
44
45pub fn parse_stream_cancel(payload: &[u8]) -> StreamCancelRequest {
46 reddb_wire::redwire::stream::parse_stream_cancel(payload)
47}
48
49pub fn build_open_ack_frame(
50 correlation_id: u64,
51 stream_id: u16,
52 lease_id: u64,
53 snapshot_lsn: u64,
54 resumable: bool,
55) -> Result<Frame, reddb_wire::BuildError> {
56 reddb_wire::redwire::stream::build_open_ack_frame(
57 correlation_id,
58 stream_id,
59 lease_id,
60 snapshot_lsn,
61 resumable,
62 )
63}
64
65#[derive(Default)]
70pub struct StreamRegistry {
71 inner: Mutex<HashMap<u16, oneshot::Sender<()>>>,
72}
73
74impl StreamRegistry {
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub async fn register(&self, stream_id: u16) -> Result<oneshot::Receiver<()>, RegisterError> {
83 if stream_id == 0 {
84 return Err(RegisterError::ReservedStreamId);
85 }
86 let mut guard = self.inner.lock().await;
87 if guard.contains_key(&stream_id) {
88 return Err(RegisterError::StreamInUse);
89 }
90 let (tx, rx) = oneshot::channel();
91 guard.insert(stream_id, tx);
92 Ok(rx)
93 }
94
95 pub async fn cancel(&self, stream_id: u16) -> bool {
99 let mut guard = self.inner.lock().await;
100 match guard.remove(&stream_id) {
101 Some(tx) => {
102 let _ = tx.send(());
103 true
104 }
105 None => false,
106 }
107 }
108
109 pub async fn unregister(&self, stream_id: u16) {
112 let mut guard = self.inner.lock().await;
113 guard.remove(&stream_id);
114 }
115
116 pub async fn active_count(&self) -> usize {
117 self.inner.lock().await.len()
118 }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum RegisterError {
123 ReservedStreamId,
124 StreamInUse,
125}
126
127impl RegisterError {
128 pub fn code(&self) -> &'static str {
129 match self {
130 Self::ReservedStreamId => "open_stream_reserved_id",
131 Self::StreamInUse => "open_stream_id_in_use",
132 }
133 }
134 pub fn message(&self) -> &'static str {
135 match self {
136 Self::ReservedStreamId => {
137 "OpenStream cannot use stream_id 0 (reserved for unsolicited)"
138 }
139 Self::StreamInUse => "OpenStream stream_id already has an active stream",
140 }
141 }
142}
143
144pub fn build_stream_error_frame(
148 correlation_id: u64,
149 stream_id: u16,
150 code: &str,
151 message: &str,
152) -> std::io::Result<Frame> {
153 reddb_wire::redwire::stream::build_stream_error_frame(
154 correlation_id,
155 stream_id,
156 None,
157 code,
158 message,
159 )
160 .map_err(|e| std::io::Error::other(format!("build StreamError: {e}")))
161}
162
163pub async fn run_output_stream(
173 runtime: Arc<RedDBRuntime>,
174 correlation_id: u64,
175 stream_id: u16,
176 request: OpenStreamRequest,
177 in_transaction: bool,
178 mut cancel_rx: oneshot::Receiver<()>,
179 send: FrameTx,
180) {
181 let clock = SystemClock;
182 let config = StreamConfig::load(&runtime);
183 let snapshot_lsn = runtime.cdc_current_lsn();
184
185 let lease = match outs::open_stream(config, snapshot_lsn, in_transaction, &clock) {
186 Ok(l) => l,
187 Err(OpenStreamError::TransactionActive) => {
188 let err = OpenStreamError::TransactionActive;
189 let frame = match build_stream_error_frame(
190 correlation_id,
191 stream_id,
192 err.code(),
193 err.message(),
194 ) {
195 Ok(f) => f,
196 Err(_) => return,
197 };
198 send.send_frame(frame);
199 return;
200 }
201 };
202
203 let ack = match reddb_wire::redwire::stream::build_open_ack_frame(
205 correlation_id,
206 stream_id,
207 lease.id,
208 lease.snapshot_lsn,
209 false,
210 ) {
211 Ok(f) => f,
212 Err(_) => return,
213 };
214 send.send_frame(ack);
215
216 let result = runtime.execute_query(&request.sql);
218
219 let mut seq: u64 = 0;
221 let mut row_count: u64 = 0;
222 let mut cancelled = false;
223 let mut had_error: Option<(String, String)> = None;
224
225 match result {
226 Ok(qr) => {
227 let columns = qr.result.columns.clone();
228 let rows: Vec<JsonValue> = qr
229 .result
230 .records
231 .iter()
232 .map(|r| crate::presentation::query_result_json::unified_record_json(r, &columns))
233 .collect();
234
235 for row in rows {
243 if let Ok(()) = cancel_rx.try_recv() {
245 cancelled = true;
246 break;
247 }
248 if lease.snapshot_expired(clock.now_ms()) {
249 had_error = Some((
250 "snapshot_expired".to_string(),
251 "stream snapshot pin TTL elapsed".to_string(),
252 ));
253 break;
254 }
255 let row_bytes = row.to_string_compact().into_bytes();
256 let frame =
257 match reddb_wire::redwire::stream::build_stream_chunk_frame_from_json_bytes(
258 correlation_id,
259 stream_id,
260 seq,
261 vec![row_bytes],
262 false,
263 ) {
264 Ok(f) => f,
265 Err(_) => break,
266 };
267 send.send_frame(frame);
268 seq += 1;
269 row_count += 1;
270 }
271 let _ = config;
275 }
276 Err(err) => {
277 had_error = Some(("query_failed".to_string(), err.to_string()));
278 }
279 }
280
281 if let Some((code, message)) = had_error {
282 if let Ok(frame) = reddb_wire::redwire::stream::build_stream_error_frame(
283 correlation_id,
284 stream_id,
285 Some(seq),
286 &code,
287 &message,
288 ) {
289 send.send_frame(frame);
290 }
291 }
292
293 if let Ok(frame) = reddb_wire::redwire::stream::build_stream_end_frame(
297 correlation_id,
298 stream_id,
299 row_count,
300 lease.id,
301 lease.snapshot_lsn,
302 cancelled,
303 ) {
304 send.send_frame(frame);
305 }
306}
307
308#[derive(Clone)]
314pub struct FrameTx {
315 tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
316}
317
318impl FrameTx {
319 pub fn new(tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>) -> Self {
320 Self { tx }
321 }
322
323 pub fn send_frame(&self, frame: Frame) {
327 let bytes = encode_frame(&frame);
328 let _ = self.tx.send(bytes);
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use reddb_wire::redwire::MessageKind;
336
337 #[test]
338 fn parse_open_stream_accepts_minimal_payload() {
339 let req = parse_open_stream(br#"{"sql":"SELECT 1"}"#).unwrap();
340 assert_eq!(req.sql, "SELECT 1");
341 assert!(req.opts_raw.is_empty());
342 }
343
344 #[test]
345 fn parse_open_stream_captures_opts_opaque() {
346 let req =
347 parse_open_stream(br#"{"sql":"SELECT 1","opts":{"resume_after_rid":42}}"#).unwrap();
348 assert_eq!(req.sql, "SELECT 1");
349 assert!(!req.opts_raw.is_empty());
350 }
351
352 #[test]
353 fn parse_open_stream_rejects_non_object() {
354 assert!(matches!(
355 parse_open_stream(b"\"sql\""),
356 Err(OpenStreamParseError::NotObject)
357 ));
358 }
359
360 #[test]
361 fn parse_open_stream_rejects_missing_sql() {
362 assert!(matches!(
363 parse_open_stream(b"{}"),
364 Err(OpenStreamParseError::MissingSql)
365 ));
366 }
367
368 #[test]
369 fn parse_open_stream_rejects_empty_sql() {
370 assert!(matches!(
371 parse_open_stream(br#"{"sql":""}"#),
372 Err(OpenStreamParseError::EmptySql)
373 ));
374 }
375
376 #[test]
377 fn parse_open_stream_rejects_invalid_json() {
378 assert!(matches!(
379 parse_open_stream(b"{not json"),
380 Err(OpenStreamParseError::NotJson)
381 ));
382 }
383
384 #[test]
385 fn parse_stream_cancel_with_reason() {
386 let r = parse_stream_cancel(br#"{"reason":"client-abort"}"#);
387 assert_eq!(r.reason.as_deref(), Some("client-abort"));
388 }
389
390 #[test]
391 fn parse_stream_cancel_empty_payload_is_default() {
392 assert_eq!(parse_stream_cancel(b""), StreamCancelRequest::default());
393 assert_eq!(parse_stream_cancel(b"{}"), StreamCancelRequest::default());
394 }
395
396 #[tokio::test]
397 async fn registry_rejects_reserved_id_and_duplicates() {
398 let r = StreamRegistry::new();
399 assert!(matches!(
400 r.register(0).await,
401 Err(RegisterError::ReservedStreamId)
402 ));
403 let _rx = r.register(1).await.unwrap();
404 assert!(matches!(
405 r.register(1).await,
406 Err(RegisterError::StreamInUse)
407 ));
408 assert_eq!(r.active_count().await, 1);
409 }
410
411 #[tokio::test]
412 async fn registry_cancel_signals_named_stream_only() {
413 let r = StreamRegistry::new();
415 let rx1 = r.register(1).await.unwrap();
416 let mut rx2 = r.register(2).await.unwrap();
417 assert!(r.cancel(1).await);
418 assert!(rx1.await.is_ok());
420 match rx2.try_recv() {
422 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {}
423 other => panic!("stream 2 should not be cancelled: {other:?}"),
424 }
425 assert_eq!(r.active_count().await, 1);
426 }
427
428 #[tokio::test]
429 async fn registry_cancel_unknown_returns_false() {
430 let r = StreamRegistry::new();
431 assert!(!r.cancel(99).await);
432 }
433
434 #[tokio::test]
435 async fn registry_unregister_is_idempotent() {
436 let r = StreamRegistry::new();
437 let _rx = r.register(1).await.unwrap();
438 r.unregister(1).await;
439 r.unregister(1).await;
440 assert_eq!(r.active_count().await, 0);
441 }
442
443 #[test]
444 fn build_stream_error_frame_carries_stream_id_and_correlation() {
445 let frame = build_stream_error_frame(99, 7, "unknown_stream", "no such stream").unwrap();
446 assert_eq!(frame.kind, MessageKind::StreamError);
447 assert_eq!(frame.stream_id, 7);
448 assert_eq!(frame.correlation_id, 99);
449 }
450}