1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
//! Streaming decoders (Bytes -> JSON Value)
//!
//! This module intentionally keeps provider logic out of code: it decodes *formats*
//! (SSE, NDJSON, etc.) based on manifest configuration.
use crate::pipeline::{Decoder, PipelineError};
use crate::protocol::DecoderConfig;
use crate::{BoxStream, PipeResult};
use bytes::Bytes;
use futures::{stream, StreamExt};
use serde_json::Value;
/// A minimal, manifest-driven SSE decoder:
/// - splits by delimiter (default "\n\n")
/// - strips `prefix` (default "data: ")
/// - stops on `done_signal` (default "[DONE]")
pub struct SseDecoder {
delimiter: String,
prefix: String,
done_signal: String,
}
impl SseDecoder {
pub fn new(
delimiter: Option<String>,
prefix: Option<String>,
done_signal: Option<String>,
) -> Self {
Self {
delimiter: delimiter.unwrap_or_else(|| "\n\n".to_string()),
prefix: prefix.unwrap_or_else(|| "data: ".to_string()),
done_signal: done_signal.unwrap_or_else(|| "[DONE]".to_string()),
}
}
pub fn from_config(cfg: &DecoderConfig) -> Result<Self, PipelineError> {
Ok(Self::new(
cfg.delimiter.clone(),
cfg.prefix.clone(),
cfg.done_signal.clone(),
))
}
// NOTE: Parsing is implemented inside `decode_stream()` so we can construct streams that do not
// borrow `&self` (required to return `'static` streams for higher-level retry/fallback).
}
#[async_trait::async_trait]
impl Decoder for SseDecoder {
async fn decode_stream(
&self,
input: BoxStream<'static, Bytes>,
) -> PipeResult<BoxStream<'static, Value>> {
let delimiter = self.delimiter.clone();
let delimiter_len = delimiter.len();
let prefix = self.prefix.clone();
let done_signal = self.done_signal.clone();
// Incrementally buffer bytes and emit full frames split by delimiter.
let stream = stream::unfold((input, String::new()), move |(mut input, mut buf)| {
let delimiter = delimiter.clone();
let prefix = prefix.clone();
let done_signal = done_signal.clone();
async move {
let is_done = |s: &str| -> bool {
let t = s.trim();
t == done_signal
|| t == format!("data: {}", done_signal)
|| t == format!("data:{}", done_signal)
};
let parse_payload = |raw: &str| -> Option<Value> {
let trimmed = raw.trim();
if trimmed.is_empty() || is_done(trimmed) {
return None;
}
// Ignore SSE comment lines
if trimmed.starts_with(':') {
return None;
}
// Strip prefix if present
let payload = if trimmed.starts_with(&prefix) {
&trimmed[prefix.len()..]
} else if let Some(stripped) = trimmed.strip_prefix("data:") {
stripped.trim_start()
} else {
trimmed
};
serde_json::from_str(payload).ok()
};
loop {
// If we have a full frame in buffer, emit it.
if let Some(idx) = buf.find(&delimiter) {
let frame = buf[..idx].to_string();
let rest_start = idx + delimiter_len;
buf = if rest_start <= buf.len() {
buf[rest_start..].to_string()
} else {
String::new()
};
if is_done(&frame) {
return None;
}
if let Some(v) = parse_payload(&frame) {
return Some((Ok(v), (input, buf)));
}
// Skip non-json frames; keep looping.
continue;
}
// Need more data.
match input.next().await {
Some(Ok(bytes)) => {
let s = String::from_utf8_lossy(&bytes);
buf.push_str(&s);
continue;
}
Some(Err(e)) => {
return Some((Err(e), (input, buf)));
}
None => {
// EOF: try parse remaining buffer once
if is_done(&buf) {
return None;
}
if let Some(v) = parse_payload(&buf) {
return Some((Ok(v), (input, String::new())));
}
return None;
}
}
}
}
});
Ok(Box::pin(stream))
}
}
/// NDJSON / JSONL decoder (one JSON object per line).
pub struct NdjsonDecoder;
#[async_trait::async_trait]
impl Decoder for NdjsonDecoder {
async fn decode_stream(
&self,
input: BoxStream<'static, Bytes>,
) -> PipeResult<BoxStream<'static, Value>> {
let stream = stream::unfold(
(input, String::new()),
move |(mut input, mut buf)| async move {
loop {
if let Some(idx) = buf.find('\n') {
let line = buf[..idx].trim().to_string();
buf = buf[idx + 1..].to_string();
if line.is_empty() {
continue;
}
match serde_json::from_str::<Value>(&line) {
Ok(v) => return Some((Ok(v), (input, buf))),
Err(e) => {
return Some((Err(crate::Error::Serialization(e)), (input, buf)))
}
}
}
match input.next().await {
Some(Ok(bytes)) => {
let s = String::from_utf8_lossy(&bytes);
buf.push_str(&s);
continue;
}
Some(Err(e)) => return Some((Err(e), (input, buf))),
None => {
let line = buf.trim();
if line.is_empty() {
return None;
}
match serde_json::from_str::<Value>(line) {
Ok(v) => return Some((Ok(v), (input, String::new()))),
Err(_) => return None,
}
}
}
}
},
);
Ok(Box::pin(stream))
}
}
pub fn create_decoder(cfg: &DecoderConfig) -> Result<Box<dyn Decoder>, PipelineError> {
match cfg.format.as_str() {
"sse" => Ok(Box::new(SseDecoder::from_config(cfg)?)),
// Many providers (e.g. Anthropic) still speak SSE but differ in event semantics.
// We keep this manifest-driven and treat it as standard SSE framing.
"anthropic_sse" => Ok(Box::new(SseDecoder::from_config(cfg)?)),
"ndjson" | "jsonl" => Ok(Box::new(NdjsonDecoder)),
other => Err(PipelineError::Configuration(format!(
"Unsupported decoder format: {}. Supported formats: sse, jsonl, ndjson",
other
))),
}
}