use crate::api::runtime::{LlmCollectorFn, LlmFinalizerFn};
use crate::error::{FlowError, Result};
use crate::json::Json;
pub trait StreamingCodec: Send + Sync {
fn collector(&self) -> LlmCollectorFn;
fn finalizer(&self) -> LlmFinalizerFn;
}
#[derive(Default)]
pub struct SseEventDecoder {
buffer: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SseEvent {
pub event: Option<String>,
pub data: Json,
}
impl SseEventDecoder {
pub fn new() -> Self {
Self::default()
}
pub fn push_bytes(&mut self, bytes: &[u8]) -> Result<Vec<SseEvent>> {
let chunk = String::from_utf8_lossy(bytes).replace("\r\n", "\n");
self.buffer.push_str(&chunk);
let mut events = Vec::new();
while let Some(cut) = self.buffer.find("\n\n") {
let frame: String = self.buffer.drain(..cut).collect();
self.buffer.drain(..2);
if let Some(event) = parse_sse_frame(&frame)? {
events.push(event);
}
}
Ok(events)
}
pub fn finish(mut self) -> Result<Option<SseEvent>> {
let trailing = std::mem::take(&mut self.buffer);
if trailing.trim().is_empty() {
Ok(None)
} else {
parse_sse_frame(&trailing)
}
}
}
fn parse_sse_frame(frame: &str) -> Result<Option<SseEvent>> {
let mut event_name: Option<String> = None;
let mut data_parts: Vec<&str> = Vec::new();
for line in frame.split('\n') {
if let Some(rest) = line.strip_prefix("event:") {
event_name = Some(rest.trim().to_string());
} else if let Some(rest) = line.strip_prefix("data:") {
data_parts.push(rest.strip_prefix(' ').unwrap_or(rest));
}
}
if data_parts.is_empty() {
return Ok(None);
}
let payload = data_parts.join("\n");
let trimmed = payload.trim();
if trimmed == "[DONE]" {
return Ok(None);
}
let data: Json = serde_json::from_str(trimmed).map_err(|error| {
FlowError::Internal(format!(
"streaming codec failed to parse SSE data payload: {error}: {payload}"
))
})?;
Ok(Some(SseEvent {
event: event_name,
data,
}))
}
#[cfg(test)]
#[path = "../../tests/unit/codec/streaming_tests.rs"]
mod tests;