Skip to main content

nemo_flow/codec/
streaming.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Streaming response codecs for the managed LLM execution pipeline.
5//!
6//! [`LlmResponseCodec`] (in [`crate::codec::traits`]) decodes a complete provider response into a
7//! normalized [`AnnotatedLlmResponse`]. For streaming providers, the analogous job is to:
8//!
9//! 1. consume per-chunk events as they arrive on a streaming HTTP response, and
10//! 2. assemble a single non-streaming-shape JSON payload at end of stream.
11//!
12//! Once assembled, the payload can be fed back through the matching [`LlmResponseCodec`] to produce
13//! an [`AnnotatedLlmResponse`] — meaning streaming and non-streaming requests converge on the same
14//! observability output without per-route shape duplication.
15//!
16//! [`StreamingCodec`] is the trait that bundles the two functions
17//! ([`LlmCollectorFn`](crate::api::runtime::LlmCollectorFn),
18//! [`LlmFinalizerFn`](crate::api::runtime::LlmFinalizerFn)) used by
19//! [`crate::api::llm::llm_stream_call_execute`]. Each provider supplies one impl whose internal
20//! state holds whatever incremental information is needed to materialize the final payload.
21//!
22//! [`AnnotatedLlmResponse`]: crate::codec::response::AnnotatedLlmResponse
23
24use crate::api::runtime::{LlmCollectorFn, LlmFinalizerFn};
25use crate::error::{FlowError, Result};
26use crate::json::Json;
27
28/// Per-provider streaming codec used with [`crate::api::llm::llm_stream_call_execute`].
29///
30/// `collector()` and `finalizer()` produce owned closures that share the codec's internal
31/// accumulation state. Implementations typically wrap that state in `Arc<Mutex<...>>` so each
32/// `&self`-produced closure captures a clone of the handle.
33///
34/// [`LlmFinalizerFn`] is `FnOnce`, so a [`StreamingCodec`] instance is single-use: callers
35/// construct a fresh instance per managed-lifecycle call and discard it after the stream
36/// completes.
37pub trait StreamingCodec: Send + Sync {
38    /// Returns a closure that consumes one decoded provider event per call.
39    fn collector(&self) -> LlmCollectorFn;
40
41    /// Returns a closure that, when called once at end of stream, produces the assembled response
42    /// payload in the shape the matching [`crate::codec::traits::LlmResponseCodec`] can decode.
43    fn finalizer(&self) -> LlmFinalizerFn;
44}
45
46/// Incremental decoder for `text/event-stream` byte streams that yields one JSON object per
47/// complete `data:` payload.
48///
49/// SSE frames are separated by blank lines (`\n\n`); each frame may contain `event:` and `data:`
50/// lines. Anthropic Messages, OpenAI Responses, and OpenAI Chat Completions all emit one JSON
51/// object per `data:` line, so the decoder buffers received bytes, splits on frame boundaries,
52/// parses the JSON payload, and tags it with the frame's event name when present.
53///
54/// The decoder is byte-stream-friendly: it accumulates partial frames across chunks and emits
55/// completed frames only when their terminating blank line arrives. Bytes after the last
56/// terminator are retained for the next call.
57#[derive(Default)]
58pub struct SseEventDecoder {
59    buffer: String,
60}
61
62/// One decoded SSE frame, paired with the parsed `data:` payload.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct SseEvent {
65    /// Value of the `event:` line if present.
66    pub event: Option<String>,
67    /// Parsed JSON payload from the `data:` line(s).
68    pub data: Json,
69}
70
71impl SseEventDecoder {
72    /// Creates a new decoder with an empty buffer.
73    pub fn new() -> Self {
74        Self::default()
75    }
76
77    /// Appends `bytes` to the internal buffer and returns every now-complete SSE event.
78    ///
79    /// Bytes are interpreted as UTF-8 with replacement characters for invalid sequences; provider
80    /// SSE streams are well-formed UTF-8 in practice, but lossy decoding keeps the decoder honest
81    /// rather than failing on a single corrupt chunk.
82    ///
83    /// Returns `Ok(events)` containing zero or more events whose `data:` payloads parsed
84    /// successfully. Frames whose `data:` line is non-empty but does not parse as JSON are
85    /// surfaced as [`FlowError::Internal`] so the caller can decide whether to abort the stream
86    /// or skip the frame; frames with no `data:` line at all (e.g. SSE heartbeats) are silently
87    /// dropped.
88    pub fn push_bytes(&mut self, bytes: &[u8]) -> Result<Vec<SseEvent>> {
89        // Normalize CRLF to LF on append so the framing search only needs to find `\n\n`. Some
90        // providers emit mixed line endings on the wire; normalizing once here keeps the inner
91        // loop cheap.
92        let chunk = String::from_utf8_lossy(bytes).replace("\r\n", "\n");
93        self.buffer.push_str(&chunk);
94        let mut events = Vec::new();
95        while let Some(cut) = self.buffer.find("\n\n") {
96            let frame: String = self.buffer.drain(..cut).collect();
97            // Drop the `\n\n` terminator itself.
98            self.buffer.drain(..2);
99            if let Some(event) = parse_sse_frame(&frame)? {
100                events.push(event);
101            }
102        }
103        Ok(events)
104    }
105
106    /// Drains any remaining buffered frame at end of stream.
107    ///
108    /// Most well-formed SSE streams end with a terminating blank line, in which case this returns
109    /// `Ok(None)`. Stops with no terminator are surfaced as a final partial frame so observability
110    /// captures the last bytes the upstream sent before disconnect.
111    pub fn finish(mut self) -> Result<Option<SseEvent>> {
112        let trailing = std::mem::take(&mut self.buffer);
113        if trailing.trim().is_empty() {
114            Ok(None)
115        } else {
116            parse_sse_frame(&trailing)
117        }
118    }
119}
120
121// Parses a single SSE frame. Returns `None` for frames without a `data:` line, `Some(event)` for
122// frames whose `data:` JSON parsed successfully.
123fn parse_sse_frame(frame: &str) -> Result<Option<SseEvent>> {
124    let mut event_name: Option<String> = None;
125    let mut data_parts: Vec<&str> = Vec::new();
126    for line in frame.split('\n') {
127        if let Some(rest) = line.strip_prefix("event:") {
128            event_name = Some(rest.trim().to_string());
129        } else if let Some(rest) = line.strip_prefix("data:") {
130            // SSE allows a single space after the colon by convention; strip it lazily.
131            data_parts.push(rest.strip_prefix(' ').unwrap_or(rest));
132        }
133        // Other lines (`id:`, `retry:`, comments starting with `:`) are ignored.
134    }
135    if data_parts.is_empty() {
136        return Ok(None);
137    }
138    let payload = data_parts.join("\n");
139    let trimmed = payload.trim();
140    // OpenAI Chat Completions emits a `data: [DONE]` terminator as a wire-level end-of-stream
141    // sentinel. It's not a JSON payload — drop it like a heartbeat. Other providers (Anthropic,
142    // OpenAI Responses) have proper terminal events instead, so this only fires for OpenAI Chat.
143    if trimmed == "[DONE]" {
144        return Ok(None);
145    }
146    let data: Json = serde_json::from_str(trimmed).map_err(|error| {
147        FlowError::Internal(format!(
148            "streaming codec failed to parse SSE data payload: {error}: {payload}"
149        ))
150    })?;
151    Ok(Some(SseEvent {
152        event: event_name,
153        data,
154    }))
155}
156
157#[cfg(test)]
158#[path = "../../tests/unit/codec/streaming_tests.rs"]
159mod tests;