reddb-io-server 1.1.2

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
//! `SseFrameEncoder` — pure Server-Sent Events frame serializer for
//! `ASK '...' STREAM` over HTTP.
//!
//! Issue #405 (PRD #391): the streaming variant of ASK emits three
//! kinds of SSE frames in fixed order — a single `sources` frame, a
//! run of `answer_token` frames, and exactly one terminal frame
//! (`validation` on success, `error` on mid-stream abort). This
//! module pins the wire format: event name, JSON payload shape, and
//! the SSE-specific quirks (multi-line `data:`, blank-line
//! terminator) that callers always get wrong.
//!
//! Deep module: no I/O, no transport. The HTTP handler owns the
//! `hyper::Body`/`axum::Sse` plumbing, the LLM streaming receiver,
//! and the cost-guard mid-stream check (#401). This module owns
//! "given one frame, what bytes go on the wire".
//!
//! ## Frame order (pinned by the spec, enforced by the handler)
//!
//! 1. exactly one [`Frame::Sources`] — `sources_flat` with URNs.
//! 2. zero or more [`Frame::AnswerToken`] — incremental answer text.
//! 3. exactly one terminal frame: [`Frame::Validation`] on the happy
//!    path, [`Frame::Error`] when a cost guard, timeout, or provider
//!    failure aborts mid-stream.
//!
//! This module does NOT enforce that order — the encoder is
//! per-frame and the caller is responsible for sequencing. A future
//! `SseStreamBuilder` slice can pin the sequence once the wiring
//! lands; for now the unit tests pin the byte layout of each frame
//! kind independently so the wiring slice can rely on it.
//!
//! ## SSE wire format
//!
//! Per the WHATWG spec (and what every reasonable client expects):
//!
//! ```text
//! event: <name>\n
//! data: <line 1>\n
//! data: <line 2>\n
//! ...
//! \n
//! ```
//!
//! Two rules everyone gets wrong:
//!
//! - A literal `\n` inside the JSON payload MUST be split across
//!   multiple `data:` lines. The browser concatenates them with a
//!   single `\n` between, so the receiver gets the original bytes
//!   back. `serde_json::to_string` by default does not emit a `\n`
//!   (it would use `\\n` in the string literal), so for our payloads
//!   this is theoretical — but the encoder still handles it because
//!   a future caller might emit pretty-printed JSON, and breaking on
//!   that would silently corrupt the event boundary.
//! - The trailing blank line is the frame terminator. Without it,
//!   the client buffers indefinitely. Two newlines, every time.
//!
//! ## Answer-token frame is text, not JSON
//!
//! Token frames carry raw answer text. We still wrap them in JSON
//! (`{"text":"..."}`) so the receiver has a single parse path across
//! all frame kinds — otherwise the client has to switch on event
//! name before deciding whether to JSON-parse, which is a footgun.
//! The encoder runs `serde_json::to_string` on a small struct, which
//! handles escaping (quotes, backslashes, control bytes) the same
//! way the rest of the JSON wire does.

use crate::serde_json::{Map, Value};

/// One source row in the `sources` frame. Caller produces these
/// after `RrfFuser` (#398) + column-policy redaction. `urn` is the
/// engine entity URN; `payload` is the redacted JSON the LLM also
/// saw, serialized as a string to keep the SSE JSON flat (the
/// client re-parses if it wants structure).
#[derive(Debug, Clone)]
pub struct SourceRow {
    pub urn: String,
    pub payload: String,
}

/// Warning emitted on the terminal `validation` frame. Mirrors the
/// non-streaming response shape so HTTP clients can share parsing
/// code across the two transports.
#[derive(Debug, Clone)]
pub struct ValidationWarning {
    pub kind: String,
    pub detail: String,
}

/// Compact audit summary attached to the terminal `validation`
/// frame. The full audit row goes to `red_ask_audit` (#402); this is
/// the subset clients are allowed to see.
#[derive(Debug, Clone)]
pub struct AuditSummary {
    pub provider: String,
    pub model: String,
    pub prompt_tokens: u32,
    pub completion_tokens: u32,
    pub cache_hit: bool,
}

fn obj(entries: &[(&str, Value)]) -> Value {
    let mut map = Map::new();
    for (k, v) in entries {
        map.insert((*k).to_string(), v.clone());
    }
    Value::Object(map)
}

fn source_row_value(row: &SourceRow) -> Value {
    obj(&[
        ("payload", Value::String(row.payload.clone())),
        ("urn", Value::String(row.urn.clone())),
    ])
}

fn warning_value(w: &ValidationWarning) -> Value {
    obj(&[
        ("detail", Value::String(w.detail.clone())),
        ("kind", Value::String(w.kind.clone())),
    ])
}

fn audit_value(a: &AuditSummary) -> Value {
    obj(&[
        ("cache_hit", Value::Bool(a.cache_hit)),
        (
            "completion_tokens",
            Value::Number(a.completion_tokens as f64),
        ),
        ("model", Value::String(a.model.clone())),
        ("prompt_tokens", Value::Number(a.prompt_tokens as f64)),
        ("provider", Value::String(a.provider.clone())),
    ])
}

/// One SSE frame. Event-name strings are pinned by tests so the
/// transport contract can't drift unnoticed.
#[derive(Debug, Clone)]
pub enum Frame {
    /// First frame. Full retrieved sources.
    Sources { sources_flat: Vec<SourceRow> },
    /// Incremental answer text. Many of these per stream.
    AnswerToken { text: String },
    /// Terminal happy-path frame.
    Validation {
        ok: bool,
        warnings: Vec<ValidationWarning>,
        audit: AuditSummary,
    },
    /// Terminal abort frame — cost guard, timeout, provider error.
    /// `code` mirrors the HTTP status the non-streaming path would
    /// have returned (413, 504, 422, 500) so clients can branch
    /// identically.
    Error { code: u16, message: String },
}

/// SSE event name pinned per variant. Exposed so the wiring layer
/// and tests can refer to them without re-typing the literals.
pub mod event {
    pub const SOURCES: &str = "sources";
    pub const ANSWER_TOKEN: &str = "answer_token";
    pub const VALIDATION: &str = "validation";
    pub const ERROR: &str = "error";
}

impl Frame {
    fn event_name(&self) -> &'static str {
        match self {
            Frame::Sources { .. } => event::SOURCES,
            Frame::AnswerToken { .. } => event::ANSWER_TOKEN,
            Frame::Validation { .. } => event::VALIDATION,
            Frame::Error { .. } => event::ERROR,
        }
    }

    fn payload_json(&self) -> String {
        let value = match self {
            Frame::Sources { sources_flat } => obj(&[(
                "sources_flat",
                Value::Array(sources_flat.iter().map(source_row_value).collect()),
            )]),
            Frame::AnswerToken { text } => obj(&[("text", Value::String(text.clone()))]),
            Frame::Validation {
                ok,
                warnings,
                audit,
            } => obj(&[
                ("audit", audit_value(audit)),
                ("ok", Value::Bool(*ok)),
                (
                    "warnings",
                    Value::Array(warnings.iter().map(warning_value).collect()),
                ),
            ]),
            Frame::Error { code, message } => obj(&[
                ("code", Value::Number(*code as f64)),
                ("message", Value::String(message.clone())),
            ]),
        };
        value.to_string_compact()
    }
}

/// Encode a single frame to its SSE on-wire bytes (as a `String`,
/// which is always valid UTF-8 here because JSON is UTF-8 and the
/// event name is ASCII).
///
/// Output always ends in `\n\n` — the SSE frame terminator. Callers
/// MUST NOT add their own trailing newline; doing so would emit an
/// empty frame after this one.
pub fn encode(frame: &Frame) -> String {
    let event = frame.event_name();
    let payload = frame.payload_json();

    // Pre-size: event line + per-data-line prefix + payload + terminator.
    // Cheap upper bound, avoids most reallocations on long answer tokens.
    let mut out = String::with_capacity(event.len() + payload.len() + 16);
    out.push_str("event: ");
    out.push_str(event);
    out.push('\n');

    // Split payload on '\n' so a multi-line JSON (e.g. pretty-printed)
    // still serializes to a valid SSE frame. For our compact JSON this
    // loop runs once.
    for line in payload.split('\n') {
        out.push_str("data: ");
        out.push_str(line);
        out.push('\n');
    }

    out.push('\n');
    out
}

#[cfg(test)]
mod tests {
    use super::*;

    fn audit_fixture() -> AuditSummary {
        AuditSummary {
            provider: "openai".to_string(),
            model: "gpt-4o-mini".to_string(),
            prompt_tokens: 123,
            completion_tokens: 45,
            cache_hit: false,
        }
    }

    #[test]
    fn event_names_pinned() {
        assert_eq!(event::SOURCES, "sources");
        assert_eq!(event::ANSWER_TOKEN, "answer_token");
        assert_eq!(event::VALIDATION, "validation");
        assert_eq!(event::ERROR, "error");
    }

    #[test]
    fn encodes_sources_frame_with_event_and_terminator() {
        let frame = Frame::Sources {
            sources_flat: vec![SourceRow {
                urn: "urn:reddb:row:1".to_string(),
                payload: "{\"k\":\"v\"}".to_string(),
            }],
        };
        let out = encode(&frame);
        assert!(out.starts_with("event: sources\n"));
        assert!(out.ends_with("\n\n"));
        assert!(out.contains("data: {"));
        assert!(out.contains("\"urn\":\"urn:reddb:row:1\""));
    }

    #[test]
    fn encodes_answer_token_frame_with_text_field() {
        let frame = Frame::AnswerToken {
            text: "hello".to_string(),
        };
        let out = encode(&frame);
        assert_eq!(out, "event: answer_token\ndata: {\"text\":\"hello\"}\n\n");
    }

    #[test]
    fn answer_token_escapes_quotes_and_backslashes() {
        let frame = Frame::AnswerToken {
            text: "a\"b\\c".to_string(),
        };
        let out = encode(&frame);
        // JSON escape: " → \" and \ → \\
        assert!(out.contains(r#"\"b\\c"#));
        assert!(out.ends_with("\n\n"));
    }

    #[test]
    fn encodes_validation_frame_with_full_shape() {
        let frame = Frame::Validation {
            ok: true,
            warnings: vec![],
            audit: audit_fixture(),
        };
        let out = encode(&frame);
        assert!(out.starts_with("event: validation\n"));
        assert!(out.contains("\"ok\":true"));
        assert!(out.contains("\"prompt_tokens\":123"));
        assert!(out.contains("\"cache_hit\":false"));
        assert!(out.ends_with("\n\n"));
    }

    #[test]
    fn validation_carries_warnings_array() {
        let frame = Frame::Validation {
            ok: false,
            warnings: vec![
                ValidationWarning {
                    kind: "out_of_range".to_string(),
                    detail: "[^9] but only 3 sources".to_string(),
                },
                ValidationWarning {
                    kind: "mode_fallback".to_string(),
                    detail: "ollama".to_string(),
                },
            ],
            audit: audit_fixture(),
        };
        let out = encode(&frame);
        assert!(out.contains("\"kind\":\"out_of_range\""));
        assert!(out.contains("\"kind\":\"mode_fallback\""));
        // ok=false visible to clients so they don't surface a "valid"
        // answer when validation actually failed.
        assert!(out.contains("\"ok\":false"));
    }

    #[test]
    fn encodes_error_frame_with_code() {
        let frame = Frame::Error {
            code: 413,
            message: "max_prompt_tokens exceeded".to_string(),
        };
        let out = encode(&frame);
        assert_eq!(
            out,
            "event: error\ndata: {\"code\":413,\"message\":\"max_prompt_tokens exceeded\"}\n\n"
        );
    }

    #[test]
    fn error_frame_handles_504_timeout() {
        // Pins the cost-guard / timeout mapping the wiring slice will
        // depend on (#401).
        let frame = Frame::Error {
            code: 504,
            message: "timeout_ms exceeded".to_string(),
        };
        let out = encode(&frame);
        assert!(out.contains("\"code\":504"));
    }

    #[test]
    fn multiline_payload_splits_across_data_lines() {
        // Forcing a newline inside a token text — would never happen
        // from a JSON serializer on its own, but a future caller might
        // pretty-print. The encoder must preserve frame boundaries.
        let frame = Frame::AnswerToken {
            text: "line1\nline2".to_string(),
        };
        let out = encode(&frame);
        // JSON serializer escapes '\n' to "\\n" so the data line stays
        // on one row. Pinned so a future swap to a pretty-printer
        // doesn't silently break the SSE framing.
        assert_eq!(
            out,
            "event: answer_token\ndata: {\"text\":\"line1\\nline2\"}\n\n"
        );
    }

    #[test]
    fn encoder_splits_on_literal_newlines_in_payload() {
        // Direct test of the split-on-'\n' branch using a hand-crafted
        // payload, since serde_json::to_string never emits one for our
        // shapes. Constructs the encoded form manually to verify the
        // multi-data-line layout.
        let mut out = String::new();
        out.push_str("event: x\n");
        for line in "a\nb\nc".split('\n') {
            out.push_str("data: ");
            out.push_str(line);
            out.push('\n');
        }
        out.push('\n');
        assert_eq!(out, "event: x\ndata: a\ndata: b\ndata: c\n\n");
    }

    #[test]
    fn frame_terminator_is_double_newline() {
        // The single most common SSE bug: forgetting the blank line.
        // Pinned independently for every frame kind.
        for frame in [
            Frame::Sources {
                sources_flat: vec![],
            },
            Frame::AnswerToken {
                text: String::new(),
            },
            Frame::Validation {
                ok: true,
                warnings: vec![],
                audit: audit_fixture(),
            },
            Frame::Error {
                code: 500,
                message: String::new(),
            },
        ] {
            let out = encode(&frame);
            assert!(out.ends_with("\n\n"), "frame missing terminator: {:?}", out);
            // And NOT a triple newline — that would split into an
            // extra empty frame on the client.
            assert!(!out.ends_with("\n\n\n"));
        }
    }

    #[test]
    fn sources_frame_with_empty_list_is_well_formed() {
        let frame = Frame::Sources {
            sources_flat: vec![],
        };
        let out = encode(&frame);
        assert_eq!(out, "event: sources\ndata: {\"sources_flat\":[]}\n\n");
    }

    #[test]
    fn answer_token_with_empty_text_is_well_formed() {
        // An empty-text frame should never be emitted in practice, but
        // the encoder must not crash on it — the caller might forward
        // an empty SSE chunk from a poorly-behaved provider.
        let frame = Frame::AnswerToken {
            text: String::new(),
        };
        let out = encode(&frame);
        assert_eq!(out, "event: answer_token\ndata: {\"text\":\"\"}\n\n");
    }

    #[test]
    fn encoding_is_deterministic_across_calls() {
        let frame = Frame::Validation {
            ok: true,
            warnings: vec![ValidationWarning {
                kind: "k".to_string(),
                detail: "d".to_string(),
            }],
            audit: audit_fixture(),
        };
        let a = encode(&frame);
        let b = encode(&frame);
        assert_eq!(a, b);
    }

    #[test]
    fn event_name_matches_pinned_constants() {
        assert_eq!(
            Frame::Sources {
                sources_flat: vec![]
            }
            .event_name(),
            event::SOURCES
        );
        assert_eq!(
            Frame::AnswerToken {
                text: String::new()
            }
            .event_name(),
            event::ANSWER_TOKEN
        );
        assert_eq!(
            Frame::Validation {
                ok: true,
                warnings: vec![],
                audit: audit_fixture(),
            }
            .event_name(),
            event::VALIDATION
        );
        assert_eq!(
            Frame::Error {
                code: 0,
                message: String::new(),
            }
            .event_name(),
            event::ERROR
        );
    }

    #[test]
    fn unicode_in_token_text_passes_through() {
        let frame = Frame::AnswerToken {
            text: "olá 🌍".to_string(),
        };
        let out = encode(&frame);
        // serde_json preserves non-ASCII by default
        assert!(out.contains("olá 🌍"));
        assert!(out.ends_with("\n\n"));
    }
}