use crate::pipeline::{Decoder, PipelineError};
use crate::protocol::DecoderConfig;
use crate::{BoxStream, PipeResult};
use bytes::Bytes;
use futures::{stream, StreamExt};
use serde_json::Value;
pub struct SseDecoder {
delimiter: String,
prefix: String,
done_signal: String,
}
impl SseDecoder {
pub fn new(
delimiter: Option<String>,
prefix: Option<String>,
done_signal: Option<String>,
) -> Self {
Self {
delimiter: delimiter.unwrap_or_else(|| "\n\n".to_string()),
prefix: prefix.unwrap_or_else(|| "data: ".to_string()),
done_signal: done_signal.unwrap_or_else(|| "[DONE]".to_string()),
}
}
pub fn from_config(cfg: &DecoderConfig) -> Result<Self, PipelineError> {
Ok(Self::new(
cfg.delimiter.clone(),
cfg.prefix.clone(),
cfg.done_signal.clone(),
))
}
}
#[async_trait::async_trait]
impl Decoder for SseDecoder {
async fn decode_stream(
&self,
input: BoxStream<'static, Bytes>,
) -> PipeResult<BoxStream<'static, Value>> {
let delimiter = self.delimiter.clone();
let delimiter_len = delimiter.len();
let prefix = self.prefix.clone();
let done_signal = self.done_signal.clone();
let stream = stream::unfold((input, String::new()), move |(mut input, mut buf)| {
let delimiter = delimiter.clone();
let prefix = prefix.clone();
let done_signal = done_signal.clone();
async move {
let is_done = |s: &str| -> bool {
let t = s.trim();
t == done_signal
|| t == format!("data: {}", done_signal)
|| t == format!("data:{}", done_signal)
};
let parse_payload = |raw: &str| -> Option<Value> {
let trimmed = raw.trim();
if trimmed.is_empty() || is_done(trimmed) {
return None;
}
if trimmed.starts_with(':') {
return None;
}
let payload = if trimmed.starts_with(&prefix) {
&trimmed[prefix.len()..]
} else if let Some(stripped) = trimmed.strip_prefix("data:") {
stripped.trim_start()
} else {
trimmed
};
serde_json::from_str(payload).ok()
};
loop {
if let Some(idx) = buf.find(&delimiter) {
let frame = buf[..idx].to_string();
let rest_start = idx + delimiter_len;
buf = if rest_start <= buf.len() {
buf[rest_start..].to_string()
} else {
String::new()
};
if is_done(&frame) {
return None;
}
if let Some(v) = parse_payload(&frame) {
return Some((Ok(v), (input, buf)));
}
continue;
}
match input.next().await {
Some(Ok(bytes)) => {
let s = String::from_utf8_lossy(&bytes);
buf.push_str(&s);
continue;
}
Some(Err(e)) => {
return Some((Err(e), (input, buf)));
}
None => {
if is_done(&buf) {
return None;
}
if let Some(v) = parse_payload(&buf) {
return Some((Ok(v), (input, String::new())));
}
return None;
}
}
}
}
});
Ok(Box::pin(stream))
}
}
pub struct NdjsonDecoder;
#[async_trait::async_trait]
impl Decoder for NdjsonDecoder {
async fn decode_stream(
&self,
input: BoxStream<'static, Bytes>,
) -> PipeResult<BoxStream<'static, Value>> {
let stream = stream::unfold(
(input, String::new()),
move |(mut input, mut buf)| async move {
loop {
if let Some(idx) = buf.find('\n') {
let line = buf[..idx].trim().to_string();
buf = buf[idx + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<Value>(&line) {
Ok(v) => return Some((Ok(v), (input, buf))),
Err(e) => {
return Some((Err(crate::Error::Serialization(e)), (input, buf)))
}
}
}
match input.next().await {
Some(Ok(bytes)) => {
let s = String::from_utf8_lossy(&bytes);
buf.push_str(&s);
continue;
}
Some(Err(e)) => return Some((Err(e), (input, buf))),
None => {
let line = buf.trim();
if line.is_empty() {
return None;
}
match serde_json::from_str::<Value>(line) {
Ok(v) => return Some((Ok(v), (input, String::new()))),
Err(_) => return None,
}
}
}
}
},
);
Ok(Box::pin(stream))
}
}
pub fn create_decoder(cfg: &DecoderConfig) -> Result<Box<dyn Decoder>, PipelineError> {
match cfg.format.as_str() {
"sse" => Ok(Box::new(SseDecoder::from_config(cfg)?)),
"anthropic_sse" => Ok(Box::new(SseDecoder::from_config(cfg)?)),
"ndjson" | "jsonl" => Ok(Box::new(NdjsonDecoder)),
other => Err(PipelineError::Configuration(format!(
"Unsupported decoder format: {}. Supported formats: sse, jsonl, ndjson",
other
))),
}
}