reddb_server/wire/redwire/
input_stream.rs1use std::collections::HashMap;
38
39use crate::runtime::RedDBRuntime;
40use crate::serde_json::{self, Value as JsonValue};
41pub use reddb_wire::redwire::stream::{ChunkParseError, OpenInputParseError, OpenInputRequest};
42use reddb_wire::redwire::Frame;
43
44use super::output_stream::RegisterError;
45use crate::server::output_stream::{Clock, OpenStreamError, StreamConfig, StreamLease};
46
47pub fn open_stream_is_input(payload: &[u8]) -> bool {
52 reddb_wire::redwire::stream::open_stream_is_input(payload)
53}
54
55pub fn parse_open_input(payload: &[u8]) -> Result<OpenInputRequest, OpenInputParseError> {
56 reddb_wire::redwire::stream::parse_open_input(payload)
57}
58
59#[derive(Debug, Clone, PartialEq)]
64pub(super) struct RuntimeInputChunk {
65 pub seq: u64,
66 pub rows: Vec<JsonValue>,
67 pub terminal: bool,
68}
69
70pub(super) fn parse_input_chunk(payload: &[u8]) -> Result<RuntimeInputChunk, ChunkParseError> {
71 let chunk = reddb_wire::redwire::stream::parse_input_chunk_json(payload)?;
72 let rows = chunk
73 .rows_json
74 .iter()
75 .map(|row| serde_json::from_slice(row).unwrap_or(JsonValue::Null))
76 .collect();
77 Ok(RuntimeInputChunk {
78 seq: chunk.seq,
79 rows,
80 terminal: chunk.terminal,
81 })
82}
83
84#[derive(Debug)]
88pub struct InputStreamState {
89 pub lease: StreamLease,
90 pub target: String,
91 pub columns: Vec<String>,
92 pub committed_rid: u64,
95 pub row_count: u64,
96 pub chunk_count: u64,
97 pub snapshot_lsn: u64,
98}
99
100impl InputStreamState {
101 pub fn new(lease: StreamLease, target: String, columns: Vec<String>) -> Self {
102 let snapshot_lsn = lease.snapshot_lsn;
103 Self {
104 lease,
105 target,
106 columns,
107 committed_rid: snapshot_lsn,
108 row_count: 0,
109 chunk_count: 0,
110 snapshot_lsn,
111 }
112 }
113
114 pub fn commit_chunk(
121 &mut self,
122 runtime: &RedDBRuntime,
123 rows: &[JsonValue],
124 ) -> Result<(), (String, String)> {
125 if rows.is_empty() {
126 return Ok(());
127 }
128 let mut positional: Vec<Vec<JsonValue>> = Vec::with_capacity(rows.len());
131 for row in rows {
132 let obj = row.as_object().ok_or_else(|| {
133 (
134 "invalid_row".to_string(),
135 "row must be a JSON object".to_string(),
136 )
137 })?;
138 let mut values = Vec::with_capacity(self.columns.len());
139 for col in &self.columns {
140 values.push(obj.get(col).cloned().unwrap_or(JsonValue::Null));
141 }
142 positional.push(values);
143 }
144 let sql = crate::server::handlers_query::build_insert_sql(
145 &self.target,
146 &self.columns,
147 &positional,
148 )
149 .map_err(|message| ("invalid_row".to_string(), message))?;
150 match runtime.execute_query(&sql) {
151 Ok(_) => {
152 self.row_count += rows.len() as u64;
153 self.committed_rid = runtime.cdc_current_lsn();
154 self.chunk_count += 1;
155 Ok(())
156 }
157 Err(err) => Err(("chunk_commit_failed".to_string(), err.to_string())),
158 }
159 }
160}
161
162#[derive(Default)]
167pub struct InputStreamRegistry {
168 inner: HashMap<u16, InputStreamState>,
169}
170
171impl InputStreamRegistry {
172 pub fn new() -> Self {
173 Self::default()
174 }
175
176 pub fn register(
180 &mut self,
181 stream_id: u16,
182 state: InputStreamState,
183 ) -> Result<(), RegisterError> {
184 if stream_id == 0 {
185 return Err(RegisterError::ReservedStreamId);
186 }
187 if self.inner.contains_key(&stream_id) {
188 return Err(RegisterError::StreamInUse);
189 }
190 self.inner.insert(stream_id, state);
191 Ok(())
192 }
193
194 pub fn get_mut(&mut self, stream_id: u16) -> Option<&mut InputStreamState> {
195 self.inner.get_mut(&stream_id)
196 }
197
198 pub fn contains(&self, stream_id: u16) -> bool {
199 self.inner.contains_key(&stream_id)
200 }
201
202 pub fn remove(&mut self, stream_id: u16) -> Option<InputStreamState> {
206 self.inner.remove(&stream_id)
207 }
208
209 pub fn active_count(&self) -> usize {
210 self.inner.len()
211 }
212}
213
214pub fn build_input_stream_error_frame(
217 correlation_id: u64,
218 stream_id: u16,
219 code: &str,
220 message: &str,
221 chunk_seq: u64,
222 recoverable_rid: u64,
223) -> std::io::Result<Frame> {
224 reddb_wire::redwire::stream::build_input_stream_error_frame(
225 correlation_id,
226 stream_id,
227 code,
228 message,
229 chunk_seq,
230 recoverable_rid,
231 )
232 .map_err(|e| std::io::Error::other(format!("build input StreamError: {e}")))
233}
234
235pub fn build_input_stream_end_frame(
237 correlation_id: u64,
238 stream_id: u16,
239 row_count: u64,
240 chunk_count: u64,
241 committed_rid: u64,
242 snapshot_lsn: u64,
243 cancelled: bool,
244) -> std::io::Result<Frame> {
245 reddb_wire::redwire::stream::build_input_stream_end_frame(
246 correlation_id,
247 stream_id,
248 row_count,
249 chunk_count,
250 committed_rid,
251 snapshot_lsn,
252 cancelled,
253 )
254 .map_err(|e| std::io::Error::other(format!("build input StreamEnd: {e}")))
255}
256
257pub fn open_input_lease(
261 config: StreamConfig,
262 snapshot_lsn: u64,
263 in_transaction: bool,
264 clock: &dyn Clock,
265) -> Result<StreamLease, OpenStreamError> {
266 crate::server::output_stream::open_stream(config, snapshot_lsn, in_transaction, clock)
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272
273 #[test]
274 fn detects_input_direction() {
275 assert!(open_stream_is_input(
276 br#"{"direction":"in","target":"t","columns":["a"]}"#
277 ));
278 assert!(open_stream_is_input(br#"{"direction":"IN"}"#));
279 assert!(!open_stream_is_input(br#"{"sql":"SELECT 1"}"#));
281 assert!(!open_stream_is_input(br#"{"direction":"out"}"#));
282 assert!(!open_stream_is_input(b"not json"));
283 }
284
285 #[test]
286 fn parse_open_input_accepts_target_and_columns() {
287 let req =
288 parse_open_input(br#"{"direction":"in","target":"events","columns":["id","name"]}"#)
289 .unwrap();
290 assert_eq!(req.target, "events");
291 assert_eq!(req.columns, vec!["id".to_string(), "name".to_string()]);
292 }
293
294 #[test]
295 fn parse_open_input_rejects_missing_target() {
296 assert!(matches!(
297 parse_open_input(br#"{"direction":"in","columns":["a"]}"#),
298 Err(OpenInputParseError::MissingTarget)
299 ));
300 }
301
302 #[test]
303 fn parse_open_input_rejects_unsafe_target() {
304 assert!(matches!(
305 parse_open_input(br#"{"direction":"in","target":"t;DROP","columns":["a"]}"#),
306 Err(OpenInputParseError::UnsafeTarget)
307 ));
308 }
309
310 #[test]
311 fn parse_open_input_rejects_empty_or_missing_columns() {
312 assert!(matches!(
313 parse_open_input(br#"{"direction":"in","target":"t","columns":[]}"#),
314 Err(OpenInputParseError::EmptyColumns)
315 ));
316 assert!(matches!(
317 parse_open_input(br#"{"direction":"in","target":"t"}"#),
318 Err(OpenInputParseError::MissingColumns)
319 ));
320 }
321
322 #[test]
323 fn parse_open_input_rejects_unsafe_column() {
324 assert!(matches!(
325 parse_open_input(br#"{"direction":"in","target":"t","columns":["ok","b ad"]}"#),
326 Err(OpenInputParseError::UnsafeColumn)
327 ));
328 }
329
330 #[test]
331 fn parse_chunk_extracts_rows_seq_terminal() {
332 let chunk =
333 parse_input_chunk(br#"{"seq":3,"rows":[{"id":1},{"id":2}],"terminal":true}"#).unwrap();
334 assert_eq!(chunk.seq, 3);
335 assert_eq!(chunk.rows.len(), 2);
336 assert!(chunk.terminal);
337 }
338
339 #[test]
340 fn parse_chunk_allows_bare_terminal() {
341 let chunk = parse_input_chunk(br#"{"terminal":true}"#).unwrap();
342 assert!(chunk.rows.is_empty());
343 assert!(chunk.terminal);
344 assert_eq!(chunk.seq, 0);
345 }
346
347 #[test]
348 fn parse_chunk_rejects_non_array_rows() {
349 assert!(matches!(
350 parse_input_chunk(br#"{"rows":5}"#),
351 Err(ChunkParseError::RowsNotArray)
352 ));
353 }
354
355 #[test]
356 fn registry_register_rejects_reserved_and_duplicate() {
357 let mut reg = InputStreamRegistry::new();
358 let lease = StreamLease {
359 id: 1,
360 lease_handle: "h".to_string(),
361 snapshot_lsn: 10,
362 opened_at_ms: 0,
363 config: StreamConfig::default(),
364 };
365 assert!(matches!(
366 reg.register(
367 0,
368 InputStreamState::new(
369 StreamLease {
370 id: 2,
371 lease_handle: "h2".to_string(),
372 snapshot_lsn: 10,
373 opened_at_ms: 0,
374 config: StreamConfig::default(),
375 },
376 "t".to_string(),
377 vec!["a".to_string()],
378 )
379 ),
380 Err(RegisterError::ReservedStreamId)
381 ));
382 reg.register(
383 5,
384 InputStreamState::new(lease, "t".to_string(), vec!["a".to_string()]),
385 )
386 .unwrap();
387 assert!(reg.contains(5));
388 assert!(matches!(
389 reg.register(
390 5,
391 InputStreamState::new(
392 StreamLease {
393 id: 3,
394 lease_handle: "h3".to_string(),
395 snapshot_lsn: 10,
396 opened_at_ms: 0,
397 config: StreamConfig::default(),
398 },
399 "t".to_string(),
400 vec!["a".to_string()],
401 )
402 ),
403 Err(RegisterError::StreamInUse)
404 ));
405 assert_eq!(reg.active_count(), 1);
406 assert!(reg.remove(5).is_some());
407 assert!(reg.remove(5).is_none());
408 }
409}