Skip to main content

reddb_server/runtime/ai/
sse_frame_encoder.rs

1//! `SseFrameEncoder` — pure Server-Sent Events frame serializer for
2//! `ASK '...' STREAM` over HTTP.
3//!
4//! Issue #405 (PRD #391): the streaming variant of ASK emits three
5//! kinds of SSE frames in fixed order — a single `sources` frame, a
6//! run of `answer_token` frames, and exactly one terminal frame
7//! (`validation` on success, `error` on mid-stream abort). This
8//! module pins the wire format: event name, JSON payload shape, and
9//! the SSE-specific quirks (multi-line `data:`, blank-line
10//! terminator) that callers always get wrong.
11//!
12//! Deep module: no I/O, no transport. The HTTP handler owns the
13//! `hyper::Body`/`axum::Sse` plumbing, the LLM streaming receiver,
14//! and the cost-guard mid-stream check (#401). This module owns
15//! "given one frame, what bytes go on the wire".
16//!
17//! ## Frame order (pinned by the spec, enforced by the handler)
18//!
19//! 1. exactly one [`Frame::Sources`] — `sources_flat` with URNs.
20//! 2. zero or more [`Frame::AnswerToken`] — incremental answer text.
21//! 3. exactly one terminal frame: [`Frame::Validation`] on the happy
22//!    path, [`Frame::Error`] when a cost guard, timeout, or provider
23//!    failure aborts mid-stream.
24//!
25//! This module does NOT enforce that order — the encoder is
26//! per-frame and the caller is responsible for sequencing. A future
27//! `SseStreamBuilder` slice can pin the sequence once the wiring
28//! lands; for now the unit tests pin the byte layout of each frame
29//! kind independently so the wiring slice can rely on it.
30//!
31//! ## SSE wire format
32//!
33//! Per the WHATWG spec (and what every reasonable client expects):
34//!
35//! ```text
36//! event: <name>\n
37//! data: <line 1>\n
38//! data: <line 2>\n
39//! ...
40//! \n
41//! ```
42//!
43//! Two rules everyone gets wrong:
44//!
45//! - A literal `\n` inside the JSON payload MUST be split across
46//!   multiple `data:` lines. The browser concatenates them with a
47//!   single `\n` between, so the receiver gets the original bytes
48//!   back. `serde_json::to_string` by default does not emit a `\n`
49//!   (it would use `\\n` in the string literal), so for our payloads
50//!   this is theoretical — but the encoder still handles it because
51//!   a future caller might emit pretty-printed JSON, and breaking on
52//!   that would silently corrupt the event boundary.
53//! - The trailing blank line is the frame terminator. Without it,
54//!   the client buffers indefinitely. Two newlines, every time.
55//!
56//! ## Answer-token frame is text, not JSON
57//!
58//! Token frames carry raw answer text. We still wrap them in JSON
59//! (`{"text":"..."}`) so the receiver has a single parse path across
60//! all frame kinds — otherwise the client has to switch on event
61//! name before deciding whether to JSON-parse, which is a footgun.
62//! The encoder runs `serde_json::to_string` on a small struct, which
63//! handles escaping (quotes, backslashes, control bytes) the same
64//! way the rest of the JSON wire does.
65
66use crate::serde_json::{Map, Value};
67
68/// One source row in the `sources` frame. Caller produces these
69/// after `RrfFuser` (#398) + column-policy redaction. `urn` is the
70/// engine entity URN; `payload` is the redacted JSON the LLM also
71/// saw, serialized as a string to keep the SSE JSON flat (the
72/// client re-parses if it wants structure).
73#[derive(Debug, Clone)]
74pub struct SourceRow {
75    pub urn: String,
76    pub payload: String,
77}
78
79/// Warning emitted on the terminal `validation` frame. Mirrors the
80/// non-streaming response shape so HTTP clients can share parsing
81/// code across the two transports.
82#[derive(Debug, Clone)]
83pub struct ValidationWarning {
84    pub kind: String,
85    pub detail: String,
86}
87
88/// Compact audit summary attached to the terminal `validation`
89/// frame. The full audit row goes to `red_ask_audit` (#402); this is
90/// the subset clients are allowed to see.
91#[derive(Debug, Clone)]
92pub struct AuditSummary {
93    pub provider: String,
94    pub model: String,
95    pub prompt_tokens: u32,
96    pub completion_tokens: u32,
97    pub cache_hit: bool,
98}
99
100fn obj(entries: &[(&str, Value)]) -> Value {
101    let mut map = Map::new();
102    for (k, v) in entries {
103        map.insert((*k).to_string(), v.clone());
104    }
105    Value::Object(map)
106}
107
108fn source_row_value(row: &SourceRow) -> Value {
109    obj(&[
110        ("payload", Value::String(row.payload.clone())),
111        ("urn", Value::String(row.urn.clone())),
112    ])
113}
114
115fn warning_value(w: &ValidationWarning) -> Value {
116    obj(&[
117        ("detail", Value::String(w.detail.clone())),
118        ("kind", Value::String(w.kind.clone())),
119    ])
120}
121
122fn audit_value(a: &AuditSummary) -> Value {
123    obj(&[
124        ("cache_hit", Value::Bool(a.cache_hit)),
125        (
126            "completion_tokens",
127            Value::Number(a.completion_tokens as f64),
128        ),
129        ("model", Value::String(a.model.clone())),
130        ("prompt_tokens", Value::Number(a.prompt_tokens as f64)),
131        ("provider", Value::String(a.provider.clone())),
132    ])
133}
134
135/// One SSE frame. Event-name strings are pinned by tests so the
136/// transport contract can't drift unnoticed.
137#[derive(Debug, Clone)]
138pub enum Frame {
139    /// First frame. Full retrieved sources.
140    Sources { sources_flat: Vec<SourceRow> },
141    /// Incremental answer text. Many of these per stream.
142    AnswerToken { text: String },
143    /// Terminal happy-path frame.
144    Validation {
145        ok: bool,
146        warnings: Vec<ValidationWarning>,
147        audit: AuditSummary,
148    },
149    /// Terminal abort frame — cost guard, timeout, provider error.
150    /// `code` mirrors the HTTP status the non-streaming path would
151    /// have returned (413, 504, 422, 500) so clients can branch
152    /// identically.
153    Error { code: u16, message: String },
154}
155
156/// SSE event name pinned per variant. Exposed so the wiring layer
157/// and tests can refer to them without re-typing the literals.
158pub mod event {
159    pub const SOURCES: &str = "sources";
160    pub const ANSWER_TOKEN: &str = "answer_token";
161    pub const VALIDATION: &str = "validation";
162    pub const ERROR: &str = "error";
163}
164
165impl Frame {
166    fn event_name(&self) -> &'static str {
167        match self {
168            Frame::Sources { .. } => event::SOURCES,
169            Frame::AnswerToken { .. } => event::ANSWER_TOKEN,
170            Frame::Validation { .. } => event::VALIDATION,
171            Frame::Error { .. } => event::ERROR,
172        }
173    }
174
175    fn payload_json(&self) -> String {
176        let value = match self {
177            Frame::Sources { sources_flat } => obj(&[(
178                "sources_flat",
179                Value::Array(sources_flat.iter().map(source_row_value).collect()),
180            )]),
181            Frame::AnswerToken { text } => obj(&[("text", Value::String(text.clone()))]),
182            Frame::Validation {
183                ok,
184                warnings,
185                audit,
186            } => obj(&[
187                ("audit", audit_value(audit)),
188                ("ok", Value::Bool(*ok)),
189                (
190                    "warnings",
191                    Value::Array(warnings.iter().map(warning_value).collect()),
192                ),
193            ]),
194            Frame::Error { code, message } => obj(&[
195                ("code", Value::Number(*code as f64)),
196                ("message", Value::String(message.clone())),
197            ]),
198        };
199        value.to_string_compact()
200    }
201}
202
203/// Encode a single frame to its SSE on-wire bytes (as a `String`,
204/// which is always valid UTF-8 here because JSON is UTF-8 and the
205/// event name is ASCII).
206///
207/// Output always ends in `\n\n` — the SSE frame terminator. Callers
208/// MUST NOT add their own trailing newline; doing so would emit an
209/// empty frame after this one.
210pub fn encode(frame: &Frame) -> String {
211    let event = frame.event_name();
212    let payload = frame.payload_json();
213
214    // Pre-size: event line + per-data-line prefix + payload + terminator.
215    // Cheap upper bound, avoids most reallocations on long answer tokens.
216    let mut out = String::with_capacity(event.len() + payload.len() + 16);
217    out.push_str("event: ");
218    out.push_str(event);
219    out.push('\n');
220
221    // Split payload on '\n' so a multi-line JSON (e.g. pretty-printed)
222    // still serializes to a valid SSE frame. For our compact JSON this
223    // loop runs once.
224    for line in payload.split('\n') {
225        out.push_str("data: ");
226        out.push_str(line);
227        out.push('\n');
228    }
229
230    out.push('\n');
231    out
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    fn audit_fixture() -> AuditSummary {
239        AuditSummary {
240            provider: "openai".to_string(),
241            model: "gpt-4o-mini".to_string(),
242            prompt_tokens: 123,
243            completion_tokens: 45,
244            cache_hit: false,
245        }
246    }
247
248    #[test]
249    fn event_names_pinned() {
250        assert_eq!(event::SOURCES, "sources");
251        assert_eq!(event::ANSWER_TOKEN, "answer_token");
252        assert_eq!(event::VALIDATION, "validation");
253        assert_eq!(event::ERROR, "error");
254    }
255
256    #[test]
257    fn encodes_sources_frame_with_event_and_terminator() {
258        let frame = Frame::Sources {
259            sources_flat: vec![SourceRow {
260                urn: "urn:reddb:row:1".to_string(),
261                payload: "{\"k\":\"v\"}".to_string(),
262            }],
263        };
264        let out = encode(&frame);
265        assert!(out.starts_with("event: sources\n"));
266        assert!(out.ends_with("\n\n"));
267        assert!(out.contains("data: {"));
268        assert!(out.contains("\"urn\":\"urn:reddb:row:1\""));
269    }
270
271    #[test]
272    fn encodes_answer_token_frame_with_text_field() {
273        let frame = Frame::AnswerToken {
274            text: "hello".to_string(),
275        };
276        let out = encode(&frame);
277        assert_eq!(out, "event: answer_token\ndata: {\"text\":\"hello\"}\n\n");
278    }
279
280    #[test]
281    fn answer_token_escapes_quotes_and_backslashes() {
282        let frame = Frame::AnswerToken {
283            text: "a\"b\\c".to_string(),
284        };
285        let out = encode(&frame);
286        // JSON escape: " → \" and \ → \\
287        assert!(out.contains(r#"\"b\\c"#));
288        assert!(out.ends_with("\n\n"));
289    }
290
291    #[test]
292    fn encodes_validation_frame_with_full_shape() {
293        let frame = Frame::Validation {
294            ok: true,
295            warnings: vec![],
296            audit: audit_fixture(),
297        };
298        let out = encode(&frame);
299        assert!(out.starts_with("event: validation\n"));
300        assert!(out.contains("\"ok\":true"));
301        assert!(out.contains("\"prompt_tokens\":123"));
302        assert!(out.contains("\"cache_hit\":false"));
303        assert!(out.ends_with("\n\n"));
304    }
305
306    #[test]
307    fn validation_carries_warnings_array() {
308        let frame = Frame::Validation {
309            ok: false,
310            warnings: vec![
311                ValidationWarning {
312                    kind: "out_of_range".to_string(),
313                    detail: "[^9] but only 3 sources".to_string(),
314                },
315                ValidationWarning {
316                    kind: "mode_fallback".to_string(),
317                    detail: "ollama".to_string(),
318                },
319            ],
320            audit: audit_fixture(),
321        };
322        let out = encode(&frame);
323        assert!(out.contains("\"kind\":\"out_of_range\""));
324        assert!(out.contains("\"kind\":\"mode_fallback\""));
325        // ok=false visible to clients so they don't surface a "valid"
326        // answer when validation actually failed.
327        assert!(out.contains("\"ok\":false"));
328    }
329
330    #[test]
331    fn encodes_error_frame_with_code() {
332        let frame = Frame::Error {
333            code: 413,
334            message: "max_prompt_tokens exceeded".to_string(),
335        };
336        let out = encode(&frame);
337        assert_eq!(
338            out,
339            "event: error\ndata: {\"code\":413,\"message\":\"max_prompt_tokens exceeded\"}\n\n"
340        );
341    }
342
343    #[test]
344    fn error_frame_handles_504_timeout() {
345        // Pins the cost-guard / timeout mapping the wiring slice will
346        // depend on (#401).
347        let frame = Frame::Error {
348            code: 504,
349            message: "timeout_ms exceeded".to_string(),
350        };
351        let out = encode(&frame);
352        assert!(out.contains("\"code\":504"));
353    }
354
355    #[test]
356    fn multiline_payload_splits_across_data_lines() {
357        // Forcing a newline inside a token text — would never happen
358        // from a JSON serializer on its own, but a future caller might
359        // pretty-print. The encoder must preserve frame boundaries.
360        let frame = Frame::AnswerToken {
361            text: "line1\nline2".to_string(),
362        };
363        let out = encode(&frame);
364        // JSON serializer escapes '\n' to "\\n" so the data line stays
365        // on one row. Pinned so a future swap to a pretty-printer
366        // doesn't silently break the SSE framing.
367        assert_eq!(
368            out,
369            "event: answer_token\ndata: {\"text\":\"line1\\nline2\"}\n\n"
370        );
371    }
372
373    #[test]
374    fn encoder_splits_on_literal_newlines_in_payload() {
375        // Direct test of the split-on-'\n' branch using a hand-crafted
376        // payload, since serde_json::to_string never emits one for our
377        // shapes. Constructs the encoded form manually to verify the
378        // multi-data-line layout.
379        let mut out = String::new();
380        out.push_str("event: x\n");
381        for line in "a\nb\nc".split('\n') {
382            out.push_str("data: ");
383            out.push_str(line);
384            out.push('\n');
385        }
386        out.push('\n');
387        assert_eq!(out, "event: x\ndata: a\ndata: b\ndata: c\n\n");
388    }
389
390    #[test]
391    fn frame_terminator_is_double_newline() {
392        // The single most common SSE bug: forgetting the blank line.
393        // Pinned independently for every frame kind.
394        for frame in [
395            Frame::Sources {
396                sources_flat: vec![],
397            },
398            Frame::AnswerToken {
399                text: String::new(),
400            },
401            Frame::Validation {
402                ok: true,
403                warnings: vec![],
404                audit: audit_fixture(),
405            },
406            Frame::Error {
407                code: 500,
408                message: String::new(),
409            },
410        ] {
411            let out = encode(&frame);
412            assert!(out.ends_with("\n\n"), "frame missing terminator: {:?}", out);
413            // And NOT a triple newline — that would split into an
414            // extra empty frame on the client.
415            assert!(!out.ends_with("\n\n\n"));
416        }
417    }
418
419    #[test]
420    fn sources_frame_with_empty_list_is_well_formed() {
421        let frame = Frame::Sources {
422            sources_flat: vec![],
423        };
424        let out = encode(&frame);
425        assert_eq!(out, "event: sources\ndata: {\"sources_flat\":[]}\n\n");
426    }
427
428    #[test]
429    fn answer_token_with_empty_text_is_well_formed() {
430        // An empty-text frame should never be emitted in practice, but
431        // the encoder must not crash on it — the caller might forward
432        // an empty SSE chunk from a poorly-behaved provider.
433        let frame = Frame::AnswerToken {
434            text: String::new(),
435        };
436        let out = encode(&frame);
437        assert_eq!(out, "event: answer_token\ndata: {\"text\":\"\"}\n\n");
438    }
439
440    #[test]
441    fn encoding_is_deterministic_across_calls() {
442        let frame = Frame::Validation {
443            ok: true,
444            warnings: vec![ValidationWarning {
445                kind: "k".to_string(),
446                detail: "d".to_string(),
447            }],
448            audit: audit_fixture(),
449        };
450        let a = encode(&frame);
451        let b = encode(&frame);
452        assert_eq!(a, b);
453    }
454
455    #[test]
456    fn event_name_matches_pinned_constants() {
457        assert_eq!(
458            Frame::Sources {
459                sources_flat: vec![]
460            }
461            .event_name(),
462            event::SOURCES
463        );
464        assert_eq!(
465            Frame::AnswerToken {
466                text: String::new()
467            }
468            .event_name(),
469            event::ANSWER_TOKEN
470        );
471        assert_eq!(
472            Frame::Validation {
473                ok: true,
474                warnings: vec![],
475                audit: audit_fixture(),
476            }
477            .event_name(),
478            event::VALIDATION
479        );
480        assert_eq!(
481            Frame::Error {
482                code: 0,
483                message: String::new(),
484            }
485            .event_name(),
486            event::ERROR
487        );
488    }
489
490    #[test]
491    fn unicode_in_token_text_passes_through() {
492        let frame = Frame::AnswerToken {
493            text: "olá 🌍".to_string(),
494        };
495        let out = encode(&frame);
496        // serde_json preserves non-ASCII by default
497        assert!(out.contains("olá 🌍"));
498        assert!(out.ends_with("\n\n"));
499    }
500}