use futures_util::StreamExt;
use serde_json::Value;
use super::super::super::canonical_response::{
CanonicalEvent, CanonicalStopReason, CanonicalUsage, ContentBlockKind,
};
pub(super) fn sse_to_canonical_events<S>(
stream: S,
fallback_model: String,
) -> futures_util::stream::BoxStream<'static, Result<CanonicalEvent, String>>
where
S: futures_util::Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
{
use futures_util::stream;
let initial = ResponsesStreamState {
buf: Vec::new(),
model: fallback_model,
response_id: String::new(),
started: false,
items: Vec::new(),
};
let s = stream
.map(|chunk| chunk.map_err(|e| e.to_string()))
.scan(initial, |state, item| {
let res = match item {
Ok(bytes) => Some(drain_buffer(state, &bytes)),
Err(e) => Some(vec![Err(e)]),
};
futures_util::future::ready(res)
})
.flat_map(stream::iter);
s.boxed()
}
fn drain_buffer(
state: &mut ResponsesStreamState,
bytes: &bytes::Bytes,
) -> Vec<Result<CanonicalEvent, String>> {
state.buf.extend_from_slice(bytes);
let mut events: Vec<Result<CanonicalEvent, String>> = Vec::new();
while let Some(pos) = find_double_newline(&state.buf) {
let frame: Vec<u8> = state.buf.drain(..pos + 2).collect();
let frame_str = String::from_utf8_lossy(&frame);
let mut data_parts: Vec<&str> = Vec::new();
for line in frame_str.lines() {
if let Some(d) = line.strip_prefix("data: ") {
data_parts.push(d);
}
}
let joined = data_parts.join("\n");
if joined.trim().is_empty() {
continue;
}
if let Ok(value) = serde_json::from_str::<Value>(&joined) {
handle_responses_event(state, &value, &mut events);
}
}
events
}
fn find_double_newline(buf: &[u8]) -> Option<usize> {
buf.windows(2).position(|w| w == b"\n\n")
}
struct ResponsesStreamState {
buf: Vec<u8>,
model: String,
response_id: String,
started: bool,
items: Vec<ItemSlot>,
}
use super::slot::{ItemSlot, SlotKind, SlotKindMatch, lookup_canonical};
fn handle_responses_event(
state: &mut ResponsesStreamState,
value: &Value,
events: &mut Vec<Result<CanonicalEvent, String>>,
) {
let Some(kind) = value.get("type").and_then(Value::as_str) else {
return;
};
match kind {
"response.created" => handle_created(state, value, events),
"response.output_item.added" => handle_item_added(state, value, events),
"response.output_text.delta" => {
emit_delta(
state,
value,
SlotKindMatch::Message,
events,
DeltaShape::Text,
);
},
"response.function_call_arguments.delta" => {
emit_delta(
state,
value,
SlotKindMatch::Function,
events,
DeltaShape::ToolUse,
);
},
"response.reasoning_summary_text.delta" => {
emit_delta(
state,
value,
SlotKindMatch::Reasoning,
events,
DeltaShape::Thinking,
);
},
"response.output_item.done" => handle_item_done(state, value, events),
"response.completed" => handle_completed(state, value, events),
"response.failed" | "error" => handle_error(value, events),
_ => {},
}
}
fn handle_created(
state: &mut ResponsesStreamState,
value: &Value,
events: &mut Vec<Result<CanonicalEvent, String>>,
) {
let response = value.get("response").unwrap_or(&Value::Null);
let id = response
.get("id")
.and_then(Value::as_str)
.unwrap_or("resp_unknown")
.to_string();
let model = response
.get("model")
.and_then(Value::as_str)
.unwrap_or(&state.model)
.to_string();
state.model.clone_from(&model);
state.response_id.clone_from(&id);
state.started = true;
events.push(Ok(CanonicalEvent::MessageStart {
id,
model,
usage: CanonicalUsage::default(),
}));
}
fn handle_item_added(
state: &mut ResponsesStreamState,
value: &Value,
events: &mut Vec<Result<CanonicalEvent, String>>,
) {
let output_index = value
.get("output_index")
.and_then(Value::as_i64)
.unwrap_or(-1);
let item = value.get("item").unwrap_or(&Value::Null);
let item_type = item.get("type").and_then(Value::as_str).unwrap_or("");
let canonical_index = state.items.len() as u32;
let (kind, block) = match item_type {
"message" => (SlotKind::Message, ContentBlockKind::Text),
"function_call" => {
let id = item
.get("call_id")
.and_then(Value::as_str)
.or_else(|| item.get("id").and_then(Value::as_str))
.unwrap_or("")
.to_string();
let name = item
.get("name")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
(SlotKind::Function, ContentBlockKind::ToolUse { id, name })
},
"reasoning" => (
SlotKind::Reasoning,
ContentBlockKind::Thinking { signature: None },
),
_ => return,
};
state.items.push(ItemSlot {
output_index,
canonical_index,
kind,
});
events.push(Ok(CanonicalEvent::ContentBlockStart {
index: canonical_index,
block,
}));
}
#[derive(Clone, Copy)]
enum DeltaShape {
Text,
ToolUse,
Thinking,
}
fn emit_delta(
state: &ResponsesStreamState,
value: &Value,
want: SlotKindMatch,
events: &mut Vec<Result<CanonicalEvent, String>>,
shape: DeltaShape,
) {
let output_index = value
.get("output_index")
.and_then(Value::as_i64)
.unwrap_or(-1);
let Some(idx) = lookup_canonical(&state.items, output_index, want) else {
return;
};
let delta = value.get("delta").and_then(Value::as_str).unwrap_or("");
if delta.is_empty() {
return;
}
let event = match shape {
DeltaShape::Text => CanonicalEvent::TextDelta {
index: idx,
text: delta.to_string(),
},
DeltaShape::ToolUse => CanonicalEvent::ToolUseDelta {
index: idx,
partial_json: delta.to_string(),
},
DeltaShape::Thinking => CanonicalEvent::ThinkingDelta {
index: idx,
text: delta.to_string(),
},
};
events.push(Ok(event));
}
fn handle_item_done(
state: &ResponsesStreamState,
value: &Value,
events: &mut Vec<Result<CanonicalEvent, String>>,
) {
let output_index = value
.get("output_index")
.and_then(Value::as_i64)
.unwrap_or(-1);
if let Some(slot) = state.items.iter().find(|s| s.output_index == output_index) {
events.push(Ok(CanonicalEvent::ContentBlockStop {
index: slot.canonical_index,
}));
}
}
fn handle_completed(
state: &ResponsesStreamState,
value: &Value,
events: &mut Vec<Result<CanonicalEvent, String>>,
) {
let response = value.get("response").unwrap_or(&Value::Null);
let id = response
.get("id")
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map_or_else(|| state.response_id.clone(), ToString::to_string);
if let Some(usage) = response.get("usage") {
let pull = |key: &str| usage.get(key).and_then(Value::as_u64).unwrap_or(0) as u32;
events.push(Ok(CanonicalEvent::UsageDelta(CanonicalUsage {
input_tokens: pull("input_tokens"),
output_tokens: pull("output_tokens"),
})));
}
events.push(Ok(CanonicalEvent::MessageStop {
id,
stop_reason: Some(CanonicalStopReason::EndTurn),
}));
}
fn handle_error(value: &Value, events: &mut Vec<Result<CanonicalEvent, String>>) {
let msg = value
.get("error")
.and_then(|e| e.get("message"))
.and_then(Value::as_str)
.unwrap_or("upstream error")
.to_string();
events.push(Ok(CanonicalEvent::Error(msg)));
}