Skip to main content

reddb_server/runtime/ai/
sse_frame_encoder.rs

1//! `SseFrameEncoder` — pure Server-Sent Events frame serializer for
2//! `ASK '...' STREAM` over HTTP.
3//!
4//! Issue #405 (PRD #391): the streaming variant of ASK emits three
5//! kinds of SSE frames in fixed order — a single `sources` frame, a
6//! run of `answer_token` frames, and exactly one terminal frame
7//! (`validation` on success, `error` on mid-stream abort). This
8//! module pins the wire format: event name, JSON payload shape, and
9//! the SSE-specific quirks (multi-line `data:`, blank-line
10//! terminator) that callers always get wrong.
11//!
12//! Deep module: no I/O, no transport. The HTTP handler owns the
13//! `hyper::Body`/`axum::Sse` plumbing, the LLM streaming receiver,
14//! and the cost-guard mid-stream check (#401). This module owns
15//! "given one frame, what bytes go on the wire".
16//!
17//! ## Frame order (pinned by the spec, enforced by the handler)
18//!
19//! 1. exactly one [`Frame::Sources`] — `sources_flat` with URNs.
20//! 2. zero or more [`Frame::AnswerToken`] — incremental answer text.
21//! 3. exactly one terminal frame: [`Frame::Validation`] on the happy
22//!    path, [`Frame::Error`] when a cost guard, timeout, or provider
23//!    failure aborts mid-stream.
24//!
25//! This module does NOT enforce that order — the encoder is
26//! per-frame and the caller is responsible for sequencing. A future
27//! `SseStreamBuilder` slice can pin the sequence once the wiring
28//! lands; for now the unit tests pin the byte layout of each frame
29//! kind independently so the wiring slice can rely on it.
30//!
31//! ## SSE wire format
32//!
33//! Per the WHATWG spec (and what every reasonable client expects):
34//!
35//! ```text
36//! event: <name>\n
37//! data: <line 1>\n
38//! data: <line 2>\n
39//! ...
40//! \n
41//! ```
42//!
43//! Two rules everyone gets wrong:
44//!
45//! - A literal `\n` inside the JSON payload MUST be split across
46//!   multiple `data:` lines. The browser concatenates them with a
47//!   single `\n` between, so the receiver gets the original bytes
48//!   back. `serde_json::to_string` by default does not emit a `\n`
49//!   (it would use `\\n` in the string literal), so for our payloads
50//!   this is theoretical — but the encoder still handles it because
51//!   a future caller might emit pretty-printed JSON, and breaking on
52//!   that would silently corrupt the event boundary.
53//! - The trailing blank line is the frame terminator. Without it,
54//!   the client buffers indefinitely. Two newlines, every time.
55//!
56//! ## Answer-token frame is text, not JSON
57//!
58//! Token frames carry raw answer text. We still wrap them in JSON
59//! (`{"text":"..."}`) so the receiver has a single parse path across
60//! all frame kinds — otherwise the client has to switch on event
61//! name before deciding whether to JSON-parse, which is a footgun.
62//! The encoder runs `serde_json::to_string` on a small struct, which
63//! handles escaping (quotes, backslashes, control bytes) the same
64//! way the rest of the JSON wire does.
65
66use crate::serde_json::{Map, Value};
67
68/// One source row in the `sources` frame. Caller produces these
69/// after `RrfFuser` (#398) + column-policy redaction. `urn` is the
70/// engine entity URN; `payload` is the redacted JSON the LLM also
71/// saw, serialized as a string to keep the SSE JSON flat (the
72/// client re-parses if it wants structure).
73#[derive(Debug, Clone)]
74pub struct SourceRow {
75    pub urn: String,
76    pub payload: String,
77}
78
79/// Warning emitted on the terminal `validation` frame. Mirrors the
80/// non-streaming response shape so HTTP clients can share parsing
81/// code across the two transports.
82#[derive(Debug, Clone)]
83pub struct ValidationWarning {
84    pub kind: String,
85    pub detail: String,
86}
87
88/// Compact audit summary attached to the terminal `validation`
89/// frame. The full audit row goes to `red_ask_audit` (#402); this is
90/// the subset clients are allowed to see.
91#[derive(Debug, Clone)]
92pub struct AuditSummary {
93    pub provider: String,
94    pub model: String,
95    pub prompt_tokens: u32,
96    pub completion_tokens: u32,
97    pub cache_hit: bool,
98}
99
100fn obj(entries: &[(&str, Value)]) -> Value {
101    let mut map = Map::new();
102    for (k, v) in entries {
103        map.insert((*k).to_string(), v.clone());
104    }
105    Value::Object(map)
106}
107
108fn source_row_value(row: &SourceRow) -> Value {
109    obj(&[
110        ("payload", Value::String(row.payload.clone())),
111        ("urn", Value::String(row.urn.clone())),
112    ])
113}
114
115fn warning_value(w: &ValidationWarning) -> Value {
116    obj(&[
117        ("detail", Value::String(w.detail.clone())),
118        ("kind", Value::String(w.kind.clone())),
119    ])
120}
121
122fn audit_value(a: &AuditSummary) -> Value {
123    obj(&[
124        ("cache_hit", Value::Bool(a.cache_hit)),
125        ("completion_tokens", Value::Number(a.completion_tokens as f64)),
126        ("model", Value::String(a.model.clone())),
127        ("prompt_tokens", Value::Number(a.prompt_tokens as f64)),
128        ("provider", Value::String(a.provider.clone())),
129    ])
130}
131
132/// One SSE frame. Event-name strings are pinned by tests so the
133/// transport contract can't drift unnoticed.
134#[derive(Debug, Clone)]
135pub enum Frame {
136    /// First frame. Full retrieved sources.
137    Sources {
138        sources_flat: Vec<SourceRow>,
139    },
140    /// Incremental answer text. Many of these per stream.
141    AnswerToken {
142        text: String,
143    },
144    /// Terminal happy-path frame.
145    Validation {
146        ok: bool,
147        warnings: Vec<ValidationWarning>,
148        audit: AuditSummary,
149    },
150    /// Terminal abort frame — cost guard, timeout, provider error.
151    /// `code` mirrors the HTTP status the non-streaming path would
152    /// have returned (413, 504, 422, 500) so clients can branch
153    /// identically.
154    Error {
155        code: u16,
156        message: String,
157    },
158}
159
160/// SSE event name pinned per variant. Exposed so the wiring layer
161/// and tests can refer to them without re-typing the literals.
162pub mod event {
163    pub const SOURCES: &str = "sources";
164    pub const ANSWER_TOKEN: &str = "answer_token";
165    pub const VALIDATION: &str = "validation";
166    pub const ERROR: &str = "error";
167}
168
169impl Frame {
170    fn event_name(&self) -> &'static str {
171        match self {
172            Frame::Sources { .. } => event::SOURCES,
173            Frame::AnswerToken { .. } => event::ANSWER_TOKEN,
174            Frame::Validation { .. } => event::VALIDATION,
175            Frame::Error { .. } => event::ERROR,
176        }
177    }
178
179    fn payload_json(&self) -> String {
180        let value = match self {
181            Frame::Sources { sources_flat } => obj(&[(
182                "sources_flat",
183                Value::Array(sources_flat.iter().map(source_row_value).collect()),
184            )]),
185            Frame::AnswerToken { text } => obj(&[("text", Value::String(text.clone()))]),
186            Frame::Validation {
187                ok,
188                warnings,
189                audit,
190            } => obj(&[
191                ("audit", audit_value(audit)),
192                ("ok", Value::Bool(*ok)),
193                (
194                    "warnings",
195                    Value::Array(warnings.iter().map(warning_value).collect()),
196                ),
197            ]),
198            Frame::Error { code, message } => obj(&[
199                ("code", Value::Number(*code as f64)),
200                ("message", Value::String(message.clone())),
201            ]),
202        };
203        value.to_string_compact()
204    }
205}
206
207/// Encode a single frame to its SSE on-wire bytes (as a `String`,
208/// which is always valid UTF-8 here because JSON is UTF-8 and the
209/// event name is ASCII).
210///
211/// Output always ends in `\n\n` — the SSE frame terminator. Callers
212/// MUST NOT add their own trailing newline; doing so would emit an
213/// empty frame after this one.
214pub fn encode(frame: &Frame) -> String {
215    let event = frame.event_name();
216    let payload = frame.payload_json();
217
218    // Pre-size: event line + per-data-line prefix + payload + terminator.
219    // Cheap upper bound, avoids most reallocations on long answer tokens.
220    let mut out = String::with_capacity(event.len() + payload.len() + 16);
221    out.push_str("event: ");
222    out.push_str(event);
223    out.push('\n');
224
225    // Split payload on '\n' so a multi-line JSON (e.g. pretty-printed)
226    // still serializes to a valid SSE frame. For our compact JSON this
227    // loop runs once.
228    for line in payload.split('\n') {
229        out.push_str("data: ");
230        out.push_str(line);
231        out.push('\n');
232    }
233
234    out.push('\n');
235    out
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    fn audit_fixture() -> AuditSummary {
243        AuditSummary {
244            provider: "openai".to_string(),
245            model: "gpt-4o-mini".to_string(),
246            prompt_tokens: 123,
247            completion_tokens: 45,
248            cache_hit: false,
249        }
250    }
251
252    #[test]
253    fn event_names_pinned() {
254        assert_eq!(event::SOURCES, "sources");
255        assert_eq!(event::ANSWER_TOKEN, "answer_token");
256        assert_eq!(event::VALIDATION, "validation");
257        assert_eq!(event::ERROR, "error");
258    }
259
260    #[test]
261    fn encodes_sources_frame_with_event_and_terminator() {
262        let frame = Frame::Sources {
263            sources_flat: vec![SourceRow {
264                urn: "urn:reddb:row:1".to_string(),
265                payload: "{\"k\":\"v\"}".to_string(),
266            }],
267        };
268        let out = encode(&frame);
269        assert!(out.starts_with("event: sources\n"));
270        assert!(out.ends_with("\n\n"));
271        assert!(out.contains("data: {"));
272        assert!(out.contains("\"urn\":\"urn:reddb:row:1\""));
273    }
274
275    #[test]
276    fn encodes_answer_token_frame_with_text_field() {
277        let frame = Frame::AnswerToken {
278            text: "hello".to_string(),
279        };
280        let out = encode(&frame);
281        assert_eq!(out, "event: answer_token\ndata: {\"text\":\"hello\"}\n\n");
282    }
283
284    #[test]
285    fn answer_token_escapes_quotes_and_backslashes() {
286        let frame = Frame::AnswerToken {
287            text: "a\"b\\c".to_string(),
288        };
289        let out = encode(&frame);
290        // JSON escape: " → \" and \ → \\
291        assert!(out.contains(r#"\"b\\c"#));
292        assert!(out.ends_with("\n\n"));
293    }
294
295    #[test]
296    fn encodes_validation_frame_with_full_shape() {
297        let frame = Frame::Validation {
298            ok: true,
299            warnings: vec![],
300            audit: audit_fixture(),
301        };
302        let out = encode(&frame);
303        assert!(out.starts_with("event: validation\n"));
304        assert!(out.contains("\"ok\":true"));
305        assert!(out.contains("\"prompt_tokens\":123"));
306        assert!(out.contains("\"cache_hit\":false"));
307        assert!(out.ends_with("\n\n"));
308    }
309
310    #[test]
311    fn validation_carries_warnings_array() {
312        let frame = Frame::Validation {
313            ok: false,
314            warnings: vec![
315                ValidationWarning {
316                    kind: "out_of_range".to_string(),
317                    detail: "[^9] but only 3 sources".to_string(),
318                },
319                ValidationWarning {
320                    kind: "mode_fallback".to_string(),
321                    detail: "ollama".to_string(),
322                },
323            ],
324            audit: audit_fixture(),
325        };
326        let out = encode(&frame);
327        assert!(out.contains("\"kind\":\"out_of_range\""));
328        assert!(out.contains("\"kind\":\"mode_fallback\""));
329        // ok=false visible to clients so they don't surface a "valid"
330        // answer when validation actually failed.
331        assert!(out.contains("\"ok\":false"));
332    }
333
334    #[test]
335    fn encodes_error_frame_with_code() {
336        let frame = Frame::Error {
337            code: 413,
338            message: "max_prompt_tokens exceeded".to_string(),
339        };
340        let out = encode(&frame);
341        assert_eq!(
342            out,
343            "event: error\ndata: {\"code\":413,\"message\":\"max_prompt_tokens exceeded\"}\n\n"
344        );
345    }
346
347    #[test]
348    fn error_frame_handles_504_timeout() {
349        // Pins the cost-guard / timeout mapping the wiring slice will
350        // depend on (#401).
351        let frame = Frame::Error {
352            code: 504,
353            message: "timeout_ms exceeded".to_string(),
354        };
355        let out = encode(&frame);
356        assert!(out.contains("\"code\":504"));
357    }
358
359    #[test]
360    fn multiline_payload_splits_across_data_lines() {
361        // Forcing a newline inside a token text — would never happen
362        // from a JSON serializer on its own, but a future caller might
363        // pretty-print. The encoder must preserve frame boundaries.
364        let frame = Frame::AnswerToken {
365            text: "line1\nline2".to_string(),
366        };
367        let out = encode(&frame);
368        // JSON serializer escapes '\n' to "\\n" so the data line stays
369        // on one row. Pinned so a future swap to a pretty-printer
370        // doesn't silently break the SSE framing.
371        assert_eq!(
372            out,
373            "event: answer_token\ndata: {\"text\":\"line1\\nline2\"}\n\n"
374        );
375    }
376
377    #[test]
378    fn encoder_splits_on_literal_newlines_in_payload() {
379        // Direct test of the split-on-'\n' branch using a hand-crafted
380        // payload, since serde_json::to_string never emits one for our
381        // shapes. Constructs the encoded form manually to verify the
382        // multi-data-line layout.
383        let mut out = String::new();
384        out.push_str("event: x\n");
385        for line in "a\nb\nc".split('\n') {
386            out.push_str("data: ");
387            out.push_str(line);
388            out.push('\n');
389        }
390        out.push('\n');
391        assert_eq!(out, "event: x\ndata: a\ndata: b\ndata: c\n\n");
392    }
393
394    #[test]
395    fn frame_terminator_is_double_newline() {
396        // The single most common SSE bug: forgetting the blank line.
397        // Pinned independently for every frame kind.
398        for frame in [
399            Frame::Sources { sources_flat: vec![] },
400            Frame::AnswerToken { text: String::new() },
401            Frame::Validation {
402                ok: true,
403                warnings: vec![],
404                audit: audit_fixture(),
405            },
406            Frame::Error {
407                code: 500,
408                message: String::new(),
409            },
410        ] {
411            let out = encode(&frame);
412            assert!(
413                out.ends_with("\n\n"),
414                "frame missing terminator: {:?}",
415                out
416            );
417            // And NOT a triple newline — that would split into an
418            // extra empty frame on the client.
419            assert!(!out.ends_with("\n\n\n"));
420        }
421    }
422
423    #[test]
424    fn sources_frame_with_empty_list_is_well_formed() {
425        let frame = Frame::Sources {
426            sources_flat: vec![],
427        };
428        let out = encode(&frame);
429        assert_eq!(out, "event: sources\ndata: {\"sources_flat\":[]}\n\n");
430    }
431
432    #[test]
433    fn answer_token_with_empty_text_is_well_formed() {
434        // An empty-text frame should never be emitted in practice, but
435        // the encoder must not crash on it — the caller might forward
436        // an empty SSE chunk from a poorly-behaved provider.
437        let frame = Frame::AnswerToken {
438            text: String::new(),
439        };
440        let out = encode(&frame);
441        assert_eq!(out, "event: answer_token\ndata: {\"text\":\"\"}\n\n");
442    }
443
444    #[test]
445    fn encoding_is_deterministic_across_calls() {
446        let frame = Frame::Validation {
447            ok: true,
448            warnings: vec![ValidationWarning {
449                kind: "k".to_string(),
450                detail: "d".to_string(),
451            }],
452            audit: audit_fixture(),
453        };
454        let a = encode(&frame);
455        let b = encode(&frame);
456        assert_eq!(a, b);
457    }
458
459    #[test]
460    fn event_name_matches_pinned_constants() {
461        assert_eq!(
462            Frame::Sources { sources_flat: vec![] }.event_name(),
463            event::SOURCES
464        );
465        assert_eq!(
466            Frame::AnswerToken { text: String::new() }.event_name(),
467            event::ANSWER_TOKEN
468        );
469        assert_eq!(
470            Frame::Validation {
471                ok: true,
472                warnings: vec![],
473                audit: audit_fixture(),
474            }
475            .event_name(),
476            event::VALIDATION
477        );
478        assert_eq!(
479            Frame::Error {
480                code: 0,
481                message: String::new(),
482            }
483            .event_name(),
484            event::ERROR
485        );
486    }
487
488    #[test]
489    fn unicode_in_token_text_passes_through() {
490        let frame = Frame::AnswerToken {
491            text: "olá 🌍".to_string(),
492        };
493        let out = encode(&frame);
494        // serde_json preserves non-ASCII by default
495        assert!(out.contains("olá 🌍"));
496        assert!(out.ends_with("\n\n"));
497    }
498}