use super::tool_schema::sanitize_openai_function_parameters_schema;
use crate::llm::provider::ResponsesRequestOptions;
use crate::llm::provider::Result;
use crate::llm::types::LLMChunk;
use bamboo_domain::MessagePart;
use bamboo_domain::ReasoningEffort;
use bamboo_domain::ToolSchema;
use bamboo_domain::{Message, MessagePhase, Role};
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
pub fn messages_to_responses_input_json(messages: &[Message]) -> Vec<Value> {
let has_images = messages.iter().any(|m| {
m.content_parts.as_ref().is_some_and(|parts| {
parts
.iter()
.any(|p| matches!(p, MessagePart::ImageUrl { .. }))
})
});
let mut out: Vec<Value> = Vec::with_capacity(messages.len());
for m in messages {
match m.role {
Role::Assistant => {
let has_text = !m.content.trim().is_empty();
if has_text || m.tool_calls.is_none() {
let content = build_content_value(m, has_images, None, "output_text");
let mut message_item = json!({
"type": "message",
"role": "assistant",
"content": content,
});
if let Some(phase) = assistant_phase_for_responses_input(m) {
message_item["phase"] = json!(phase);
}
out.push(message_item);
}
if let Some(calls) = m.tool_calls.as_ref() {
for c in calls {
out.push(json!({
"type": "function_call",
"call_id": c.id,
"name": c.function.name,
"arguments": c.function.arguments,
}));
}
}
}
Role::Tool => {
let call_id = m.tool_call_id.as_deref().unwrap_or("");
if !call_id.is_empty() {
out.push(json!({
"type": "function_call_output",
"call_id": call_id,
"output": m.content,
}));
} else {
let content = json!(format!("[tool_result]\n{}", m.content));
out.push(json!({
"type": "message",
"role": "user",
"content": content,
}));
}
}
Role::System | Role::User => {
let role = if m.role == Role::System {
"system"
} else {
"user"
};
let content = build_content_value(m, has_images, None, "input_text");
out.push(json!({
"type": "message",
"role": role,
"content": content,
}));
}
}
}
out
}
fn assistant_phase_for_responses_input(message: &Message) -> Option<&'static str> {
if !matches!(message.role, Role::Assistant) {
return None;
}
if let Some(phase) = message.phase.as_ref() {
return Some(phase.as_str());
}
if message
.tool_calls
.as_ref()
.is_some_and(|calls| !calls.is_empty())
{
return Some(MessagePhase::Commentary.as_str());
}
if !message.content.trim().is_empty() {
return Some(MessagePhase::FinalAnswer.as_str());
}
None
}
fn build_content_value(
m: &Message,
has_images: bool,
text_override: Option<&str>,
text_part_type: &str,
) -> Value {
if has_images {
let mut parts = Vec::new();
if let Some(content_parts) = m.content_parts.as_ref() {
for part in content_parts {
match part {
MessagePart::Text { text } => {
parts.push(json!({"type": text_part_type, "text": text}));
}
MessagePart::ImageUrl { image_url } => {
parts.push(json!({"type": "input_image", "image_url": image_url.url}));
}
}
}
} else {
let text = text_override.unwrap_or(&m.content);
parts.push(json!({"type": text_part_type, "text": text}));
}
json!(parts)
} else {
let text = text_override.unwrap_or(&m.content);
json!(text)
}
}
pub fn tools_to_responses_json(tools: &[ToolSchema]) -> Vec<Value> {
tools
.iter()
.map(|t| {
json!({
"type": t.schema_type,
"name": t.function.name,
"description": t.function.description,
"parameters": sanitize_openai_function_parameters_schema(&t.function.parameters),
})
})
.collect()
}
pub fn build_responses_body(
model: &str,
messages: &[Message],
tools: &[ToolSchema],
max_output_tokens: Option<u32>,
reasoning_effort: Option<ReasoningEffort>,
responses_options: Option<&ResponsesRequestOptions>,
parallel_tool_calls: Option<bool>,
) -> Value {
let mut body = json!({
"model": model,
"input": messages_to_responses_input_json(messages),
"stream": true,
});
if let Some(instructions) = responses_options
.and_then(|opts| opts.instructions.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty())
{
body["instructions"] = json!(instructions);
}
if let Some(previous_response_id) = responses_options
.and_then(|opts| opts.previous_response_id.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty())
{
body["previous_response_id"] = json!(previous_response_id);
}
if !tools.is_empty() {
body["tools"] = json!(tools_to_responses_json(tools));
body["tool_choice"] = json!("auto");
}
if let Some(parallel_tool_calls) = parallel_tool_calls {
body["parallel_tool_calls"] = json!(parallel_tool_calls);
}
if let Some(max_tokens) = max_output_tokens {
body["max_output_tokens"] = json!(max_tokens);
}
let reasoning_summary = responses_options
.and_then(|opts| opts.reasoning_summary.as_deref())
.map(str::trim)
.filter(|summary| !summary.is_empty());
if reasoning_effort.is_some() || reasoning_summary.is_some() {
let mut reasoning = serde_json::Map::new();
if let Some(effort) = reasoning_effort {
reasoning.insert("effort".to_string(), json!(effort.to_wire_format(model)));
}
if let Some(summary) = reasoning_summary {
reasoning.insert("summary".to_string(), json!(summary));
}
if !reasoning.is_empty() {
body["reasoning"] = Value::Object(reasoning);
}
}
if let Some(include) = responses_options
.and_then(|opts| opts.include.as_ref())
.filter(|values| !values.is_empty())
{
body["include"] = json!(include);
}
if let Some(truncation) = responses_options
.and_then(|opts| opts.truncation.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty())
{
body["truncation"] = json!(truncation);
}
if let Some(text_verbosity) = responses_options
.and_then(|opts| opts.text_verbosity.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty())
{
body["text"] = json!({ "verbosity": text_verbosity });
}
let store = responses_options
.and_then(|opts| opts.store)
.unwrap_or(false);
body["store"] = json!(store);
body
}
#[derive(Debug, Default)]
struct AccFnCall {
call_id: Option<String>,
name: Option<String>,
arguments: String,
}
pub struct ResponsesSseParser {
fn_calls: HashMap<String, AccFnCall>,
streamed_text_item_ids: HashSet<String>,
saw_unkeyed_text_delta: bool,
text_delta_stream_keys: HashSet<String>,
text_done_stream_keys: HashSet<String>,
streamed_text_output_indexes: HashSet<i64>,
streamed_reasoning_item_ids: HashSet<String>,
saw_unkeyed_reasoning_delta: bool,
reasoning_item_content: HashMap<String, String>,
streamed_reasoning_stream_keys: HashSet<String>,
saw_reasoning_text_stream: bool,
emitted_reasoning_text: String,
provider_label: String,
model: String,
requested_reasoning_effort: Option<ReasoningEffort>,
request_reasoning_enabled: bool,
observed_reasoning_signal: bool,
reasoning_event_count: usize,
reasoning_text_chars: usize,
logged_summary: bool,
emitted_response_id: Option<String>,
}
impl Default for ResponsesSseParser {
fn default() -> Self {
Self::new()
}
}
impl ResponsesSseParser {
#[allow(dead_code)] pub fn new() -> Self {
Self::new_with_context("Responses", "", None)
}
pub fn new_with_context(
provider_label: &str,
model: &str,
requested_reasoning_effort: Option<ReasoningEffort>,
) -> Self {
Self {
fn_calls: HashMap::new(),
streamed_text_item_ids: HashSet::new(),
saw_unkeyed_text_delta: false,
text_delta_stream_keys: HashSet::new(),
text_done_stream_keys: HashSet::new(),
streamed_text_output_indexes: HashSet::new(),
streamed_reasoning_item_ids: HashSet::new(),
saw_unkeyed_reasoning_delta: false,
reasoning_item_content: HashMap::new(),
streamed_reasoning_stream_keys: HashSet::new(),
saw_reasoning_text_stream: false,
emitted_reasoning_text: String::new(),
provider_label: provider_label.to_string(),
model: model.to_string(),
requested_reasoning_effort,
request_reasoning_enabled: requested_reasoning_effort.is_some(),
observed_reasoning_signal: false,
reasoning_event_count: 0,
reasoning_text_chars: 0,
logged_summary: false,
emitted_response_id: None,
}
}
fn event_type<'a>(&self, event: &'a str, v: &'a Value) -> &'a str {
v.get("type").and_then(|t| t.as_str()).unwrap_or(event)
}
fn ensure_fn_call(&mut self, item_id: &str) -> &mut AccFnCall {
self.fn_calls.entry(item_id.to_string()).or_default()
}
fn parse_fn_call_item(&mut self, item_id: &str, item: &Value) {
let entry = self.ensure_fn_call(item_id);
if entry.call_id.is_none() {
entry.call_id = item
.get("call_id")
.or_else(|| item.get("id"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
}
if entry.name.is_none() {
entry.name = item
.get("name")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
}
if let Some(args) = item.get("arguments").and_then(|v| v.as_str()) {
if !args.is_empty() && entry.arguments.is_empty() {
entry.arguments = args.to_string();
}
}
}
fn apply_done_fn_call_item(&mut self, item_id: &str, item: &Value) {
let entry = self.ensure_fn_call(item_id);
if entry.call_id.is_none() {
entry.call_id = item
.get("call_id")
.or_else(|| item.get("id"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
}
if entry.name.is_none() {
entry.name = item
.get("name")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
}
if let Some(args) = item.get("arguments").and_then(|v| v.as_str()) {
if !args.is_empty() {
entry.arguments = args.to_string();
}
}
}
fn finalize_tool_call(&mut self, item_id: &str) -> Option<bamboo_domain::ToolCall> {
let acc = self.fn_calls.remove(item_id)?;
Some(bamboo_domain::ToolCall {
id: acc.call_id?,
tool_type: "function".to_string(),
function: bamboo_domain::FunctionCall {
name: acc.name?,
arguments: acc.arguments,
},
})
}
fn text_item_id(v: &Value) -> Option<String> {
v.get("item_id")
.and_then(|id| id.as_str())
.or_else(|| {
v.get("item")
.and_then(|item| item.get("id"))
.and_then(|id| id.as_str())
})
.map(|id| id.to_string())
.filter(|id| !id.is_empty())
}
fn message_item_output_text(item: &Value) -> String {
let Some(content) = item.get("content").and_then(|value| value.as_array()) else {
return String::new();
};
let mut out = String::new();
for part in content {
let part_type = part
.get("type")
.and_then(|value| value.as_str())
.unwrap_or("");
if part_type != "output_text" {
continue;
}
if let Some(text) = part.get("text").and_then(|value| value.as_str()) {
out.push_str(text);
}
}
out
}
fn content_part_output_text(v: &Value, prefer_delta: bool) -> String {
let part = v
.get("part")
.or_else(|| v.get("content_part"))
.unwrap_or(&Value::Null);
let part_type = part
.get("type")
.and_then(|value| value.as_str())
.unwrap_or("");
if part_type != "output_text" {
return String::new();
}
if prefer_delta {
return part
.get("delta")
.and_then(|value| value.as_str())
.unwrap_or("")
.to_string();
}
part.get("text")
.and_then(|value| value.as_str())
.or_else(|| part.get("delta").and_then(|value| value.as_str()))
.unwrap_or("")
.to_string()
}
fn text_output_index(v: &Value) -> Option<i64> {
v.get("output_index").and_then(|value| value.as_i64())
}
fn text_stream_key(v: &Value) -> Option<String> {
let output_index = Self::text_output_index(v)?;
let content_index = v
.get("content_index")
.and_then(|value| value.as_i64())
.or_else(|| {
v.get("part")
.and_then(|part| part.get("index"))
.and_then(|value| value.as_i64())
});
match content_index {
Some(content_index) => Some(format!("text:{output_index}:{content_index}")),
None => Some(format!("text:{output_index}:_")),
}
}
fn emit_text_delta_with_key(&mut self, stream_key: &str, text: &str) -> Option<LLMChunk> {
if text.is_empty() {
return None;
}
self.text_delta_stream_keys.insert(stream_key.to_string());
Some(LLMChunk::Token(text.to_string()))
}
fn emit_text_done_with_key(&mut self, stream_key: &str, text: &str) -> Option<LLMChunk> {
if text.is_empty() {
return None;
}
if self.text_delta_stream_keys.contains(stream_key)
|| self.text_done_stream_keys.contains(stream_key)
{
return None;
}
if let Some((prefix, _)) = stream_key.rsplit_once(':') {
let wildcard_key = format!("{prefix}:_");
if self.text_delta_stream_keys.contains(wildcard_key.as_str())
|| self.text_done_stream_keys.contains(wildcard_key.as_str())
{
return None;
}
if stream_key.ends_with(":_") {
let output_prefix = format!("{prefix}:");
if self
.text_delta_stream_keys
.iter()
.any(|key| key.starts_with(output_prefix.as_str()))
|| self
.text_done_stream_keys
.iter()
.any(|key| key.starts_with(output_prefix.as_str()))
{
return None;
}
}
}
self.text_done_stream_keys.insert(stream_key.to_string());
Some(LLMChunk::Token(text.to_string()))
}
fn reasoning_item_summary_text(item: &Value) -> String {
let mut out = String::new();
if let Some(summary) = item.get("summary") {
if let Some(text) = summary.as_str() {
out.push_str(text);
} else if let Some(parts) = summary.as_array() {
for part in parts {
let maybe_text = part
.get("text")
.and_then(|value| value.as_str())
.or_else(|| part.as_str());
let Some(text) = maybe_text else {
continue;
};
if text.is_empty() {
continue;
}
if !out.is_empty() {
out.push_str("\n\n");
}
out.push_str(text);
}
}
}
if out.is_empty() {
if let Some(text) = item.get("text").and_then(|value| value.as_str()) {
out.push_str(text);
}
}
out
}
fn reasoning_summary_part_text(v: &Value) -> String {
let part = v
.get("part")
.or_else(|| v.get("summary_part"))
.unwrap_or(&Value::Null);
let part_type = part
.get("type")
.and_then(|value| value.as_str())
.unwrap_or("");
if part_type != "summary_text" {
return String::new();
}
part.get("text")
.and_then(|value| value.as_str())
.or_else(|| part.get("delta").and_then(|value| value.as_str()))
.unwrap_or("")
.to_string()
}
fn reasoning_summary_stream_key(v: &Value) -> Option<String> {
let output_index = v.get("output_index").and_then(|value| value.as_i64());
let summary_index = v.get("summary_index").and_then(|value| value.as_i64());
match (output_index, summary_index) {
(Some(output_index), Some(summary_index)) => {
Some(format!("summary:{output_index}:{summary_index}"))
}
(Some(output_index), None) => Some(format!("summary:{output_index}:_")),
(None, Some(summary_index)) => Some(format!("summary:_:{}", summary_index)),
(None, None) => None,
}
}
fn reasoning_event_stream_key(event_type: &str, v: &Value) -> Option<String> {
if event_type.starts_with("response.reasoning_summary_") {
return Self::reasoning_summary_stream_key(v)
.or_else(|| Self::text_item_id(v).map(|item_id| format!("item:{item_id}")));
}
Self::text_item_id(v).map(|item_id| format!("item:{item_id}"))
}
fn reasoning_summary_stream_starts_new_block(stream_key: &str) -> bool {
stream_key.starts_with("summary:") || stream_key.starts_with("item:")
}
fn with_reasoning_block_separator_if_needed(&self, stream_key: &str, chunk: &str) -> String {
if chunk.is_empty()
|| self.emitted_reasoning_text.is_empty()
|| !Self::reasoning_summary_stream_starts_new_block(stream_key)
{
return chunk.to_string();
}
let trimmed_output = self
.emitted_reasoning_text
.trim_end_matches([' ', '\t', '\r']);
let trailing_newlines = trimmed_output
.chars()
.rev()
.take_while(|&c| c == '\n')
.count();
let leading_newlines = chunk
.chars()
.take_while(|&c| c == '\n' || c == '\r')
.filter(|&c| c == '\n')
.count();
let missing_newlines = 2usize.saturating_sub(trailing_newlines + leading_newlines);
if missing_newlines == 0 {
return chunk.to_string();
}
format!("{}{}", "\n".repeat(missing_newlines), chunk)
}
fn track_emitted_reasoning_text(&mut self, chunk: &str) {
if !chunk.is_empty() {
self.emitted_reasoning_text.push_str(chunk);
}
}
fn emit_reasoning_delta_with_key(&mut self, stream_key: &str, text: &str) -> Option<LLMChunk> {
if text.is_empty() {
return None;
}
self.saw_reasoning_text_stream = true;
let is_new_stream = !self.streamed_reasoning_stream_keys.contains(stream_key);
self.streamed_reasoning_stream_keys
.insert(stream_key.to_string());
let emitted = self.normalize_reasoning_item_chunk(stream_key, text)?;
let emitted = if is_new_stream {
self.with_reasoning_block_separator_if_needed(stream_key, emitted.as_str())
} else {
emitted
};
self.track_emitted_reasoning_text(emitted.as_str());
Some(LLMChunk::ReasoningToken(emitted))
}
fn emit_reasoning_done_with_key(&mut self, stream_key: &str, text: &str) -> Option<LLMChunk> {
if text.is_empty() {
return None;
}
self.saw_reasoning_text_stream = true;
let is_new_stream = !self.streamed_reasoning_stream_keys.contains(stream_key);
if !is_new_stream {
return None;
}
self.streamed_reasoning_stream_keys
.insert(stream_key.to_string());
let emitted = self.normalize_reasoning_item_chunk(stream_key, text)?;
let emitted = self.with_reasoning_block_separator_if_needed(stream_key, emitted.as_str());
self.track_emitted_reasoning_text(emitted.as_str());
Some(LLMChunk::ReasoningToken(emitted))
}
fn emit_reasoning_item_text(&mut self, item_id: Option<&str>, text: &str) -> Option<LLMChunk> {
self.emit_reasoning_delta_text(item_id, text)
}
fn emit_reasoning_delta_text(&mut self, item_id: Option<&str>, text: &str) -> Option<LLMChunk> {
if text.is_empty() {
return None;
}
self.saw_reasoning_text_stream = true;
if let Some(item_id) = item_id {
self.streamed_reasoning_item_ids.insert(item_id.to_string());
let emitted = self.normalize_reasoning_item_chunk(item_id, text)?;
self.track_emitted_reasoning_text(emitted.as_str());
return Some(LLMChunk::ReasoningToken(emitted));
}
self.saw_unkeyed_reasoning_delta = true;
self.track_emitted_reasoning_text(text);
Some(LLMChunk::ReasoningToken(text.to_string()))
}
fn emit_reasoning_done_text(&mut self, item_id: Option<&str>, text: &str) -> Option<LLMChunk> {
if text.is_empty() {
return None;
}
self.saw_reasoning_text_stream = true;
if let Some(item_id) = item_id {
self.streamed_reasoning_item_ids.insert(item_id.to_string());
let emitted = self.normalize_reasoning_item_chunk(item_id, text)?;
self.track_emitted_reasoning_text(emitted.as_str());
return Some(LLMChunk::ReasoningToken(emitted));
}
if self.saw_unkeyed_reasoning_delta {
return None;
}
self.track_emitted_reasoning_text(text);
Some(LLMChunk::ReasoningToken(text.to_string()))
}
fn emit_done_text(&mut self, item_id: Option<&str>, text: &str) -> Option<LLMChunk> {
if text.is_empty() {
return None;
}
if let Some(item_id) = item_id {
if self.streamed_text_item_ids.contains(item_id) {
return None;
}
self.streamed_text_item_ids.insert(item_id.to_string());
return Some(LLMChunk::Token(text.to_string()));
}
if self.saw_unkeyed_text_delta {
return None;
}
Some(LLMChunk::Token(text.to_string()))
}
fn normalize_reasoning_item_chunk(&mut self, item_id: &str, chunk: &str) -> Option<String> {
let entry = self
.reasoning_item_content
.entry(item_id.to_string())
.or_default();
if entry.is_empty() {
entry.push_str(chunk);
return Some(chunk.to_string());
}
if chunk == entry.as_str() {
return None;
}
if chunk.starts_with(entry.as_str()) {
let suffix = chunk[entry.len()..].to_string();
*entry = chunk.to_string();
if suffix.is_empty() {
return None;
}
return Some(suffix);
}
if entry.ends_with(chunk) {
return None;
}
entry.push_str(chunk);
Some(chunk.to_string())
}
fn log_reasoning_summary_if_needed(&mut self, usage: Option<&Value>) {
if self.logged_summary {
return;
}
if !(self.request_reasoning_enabled || self.observed_reasoning_signal) {
return;
}
let reasoning_tokens = usage
.and_then(|value| value.get("output_tokens_details"))
.and_then(|details| details.get("reasoning_tokens"))
.and_then(|tokens| tokens.as_u64())
.or_else(|| {
usage
.and_then(|value| value.get("reasoning_tokens"))
.and_then(|tokens| tokens.as_u64())
});
tracing::info!(
"{} responses reasoning summary: model='{}' requested_effort={} request_reasoning_enabled={} observed_reasoning_signal={} reasoning_event_count={} reasoning_text_chars={} reasoning_tokens={}",
self.provider_label,
if self.model.is_empty() { "<unknown>" } else { self.model.as_str() },
self.requested_reasoning_effort
.map(ReasoningEffort::as_str)
.unwrap_or("none"),
self.request_reasoning_enabled,
self.observed_reasoning_signal,
self.reasoning_event_count,
self.reasoning_text_chars,
reasoning_tokens
.map(|tokens| tokens.to_string())
.unwrap_or_else(|| "none".to_string())
);
self.logged_summary = true;
}
fn response_id_from_value<'a>(&self, value: &'a Value) -> Option<&'a str> {
value
.get("response")
.and_then(|response| response.get("id"))
.and_then(|id| id.as_str())
.or_else(|| value.get("response_id").and_then(|id| id.as_str()))
}
fn maybe_emit_response_id(&mut self, event_type: &str, value: &Value) -> Option<LLMChunk> {
if !matches!(
event_type,
"response.created" | "response.in_progress" | "response.completed"
) {
return None;
}
let response_id = self.response_id_from_value(value)?;
if self.emitted_response_id.as_deref() == Some(response_id) {
return None;
}
self.emitted_response_id = Some(response_id.to_string());
Some(LLMChunk::ResponseId(response_id.to_string()))
}
pub fn handle_event(&mut self, event: &str, data: &str) -> Result<Option<LLMChunk>> {
let Ok(v) = serde_json::from_str::<Value>(data) else {
return Ok(None);
};
let event_type = self.event_type(event, &v);
if let Some(chunk) = self.maybe_emit_response_id(event_type, &v) {
return Ok(Some(chunk));
}
match event_type {
"response.reasoning_summary_part.added" => {
self.observed_reasoning_signal = true;
self.reasoning_event_count = self.reasoning_event_count.saturating_add(1);
return Ok(None);
}
"response.reasoning_summary_text.delta" => {
self.observed_reasoning_signal = true;
self.reasoning_event_count = self.reasoning_event_count.saturating_add(1);
let reasoning_chunk = v
.get("delta")
.and_then(|value| value.as_str())
.or_else(|| v.get("text").and_then(|value| value.as_str()))
.or_else(|| v.get("summary").and_then(|value| value.as_str()))
.unwrap_or("");
self.reasoning_text_chars = self
.reasoning_text_chars
.saturating_add(reasoning_chunk.len());
if reasoning_chunk.is_empty() {
return Ok(None);
}
if let Some(stream_key) = Self::reasoning_event_stream_key(event_type, &v) {
return Ok(
self.emit_reasoning_delta_with_key(stream_key.as_str(), reasoning_chunk)
);
}
let item_id = Self::text_item_id(&v);
return Ok(self.emit_reasoning_delta_text(item_id.as_deref(), reasoning_chunk));
}
"response.reasoning_summary_text.done" => {
self.observed_reasoning_signal = true;
self.reasoning_event_count = self.reasoning_event_count.saturating_add(1);
let mut reasoning_chunk = v
.get("text")
.and_then(|value| value.as_str())
.or_else(|| v.get("delta").and_then(|value| value.as_str()))
.or_else(|| v.get("summary").and_then(|value| value.as_str()))
.unwrap_or("")
.to_string();
if reasoning_chunk.is_empty() {
reasoning_chunk = Self::reasoning_item_summary_text(&v);
}
self.reasoning_text_chars = self
.reasoning_text_chars
.saturating_add(reasoning_chunk.len());
if reasoning_chunk.is_empty() {
return Ok(None);
}
if let Some(stream_key) = Self::reasoning_event_stream_key(event_type, &v) {
return Ok(self.emit_reasoning_done_with_key(
stream_key.as_str(),
reasoning_chunk.as_str(),
));
}
let item_id = Self::text_item_id(&v);
return Ok(
self.emit_reasoning_done_text(item_id.as_deref(), reasoning_chunk.as_str())
);
}
"response.reasoning_summary_part.done" => {
self.observed_reasoning_signal = true;
self.reasoning_event_count = self.reasoning_event_count.saturating_add(1);
let reasoning_chunk = Self::reasoning_summary_part_text(&v);
self.reasoning_text_chars = self
.reasoning_text_chars
.saturating_add(reasoning_chunk.len());
if reasoning_chunk.is_empty() {
return Ok(None);
}
if let Some(stream_key) = Self::reasoning_event_stream_key(event_type, &v) {
return Ok(self.emit_reasoning_done_with_key(
stream_key.as_str(),
reasoning_chunk.as_str(),
));
}
let item_id = Self::text_item_id(&v);
return Ok(
self.emit_reasoning_done_text(item_id.as_deref(), reasoning_chunk.as_str())
);
}
"response.reasoning.delta" | "response.reasoning_text.delta" => {
self.observed_reasoning_signal = true;
self.reasoning_event_count = self.reasoning_event_count.saturating_add(1);
let mut reasoning_chunk = v
.get("delta")
.and_then(|value| value.as_str())
.or_else(|| v.get("text").and_then(|value| value.as_str()))
.or_else(|| v.get("summary").and_then(|value| value.as_str()))
.unwrap_or("")
.to_string();
if reasoning_chunk.is_empty() {
reasoning_chunk = Self::reasoning_item_summary_text(&v);
}
self.reasoning_text_chars = self
.reasoning_text_chars
.saturating_add(reasoning_chunk.len());
if reasoning_chunk.is_empty() {
return Ok(None);
}
let item_id = Self::text_item_id(&v);
return Ok(
self.emit_reasoning_delta_text(item_id.as_deref(), reasoning_chunk.as_str())
);
}
"response.reasoning.done" | "response.reasoning_text.done" => {
self.observed_reasoning_signal = true;
self.reasoning_event_count = self.reasoning_event_count.saturating_add(1);
let mut reasoning_chunk = v
.get("text")
.and_then(|value| value.as_str())
.or_else(|| v.get("delta").and_then(|value| value.as_str()))
.or_else(|| v.get("summary").and_then(|value| value.as_str()))
.unwrap_or("")
.to_string();
if reasoning_chunk.is_empty() {
reasoning_chunk = Self::reasoning_item_summary_text(&v);
}
self.reasoning_text_chars = self
.reasoning_text_chars
.saturating_add(reasoning_chunk.len());
if reasoning_chunk.is_empty() {
return Ok(None);
}
let item_id = Self::text_item_id(&v);
return Ok(
self.emit_reasoning_done_text(item_id.as_deref(), reasoning_chunk.as_str())
);
}
_ => {}
}
match event_type {
"response.output_text.delta" => {
let delta = v.get("delta").and_then(|d| d.as_str()).unwrap_or("");
if delta.is_empty() {
return Ok(None);
}
if let Some(output_index) = Self::text_output_index(&v) {
self.streamed_text_output_indexes.insert(output_index);
}
if let Some(stream_key) = Self::text_stream_key(&v) {
return Ok(self.emit_text_delta_with_key(stream_key.as_str(), delta));
}
if let Some(item_id) = Self::text_item_id(&v) {
self.streamed_text_item_ids.insert(item_id);
} else {
self.saw_unkeyed_text_delta = true;
}
Ok(Some(LLMChunk::Token(delta.to_string())))
}
"response.output_text.done" => {
let text = v
.get("text")
.and_then(|value| value.as_str())
.or_else(|| v.get("delta").and_then(|value| value.as_str()))
.unwrap_or("");
if let Some(output_index) = Self::text_output_index(&v) {
self.streamed_text_output_indexes.insert(output_index);
}
if let Some(stream_key) = Self::text_stream_key(&v) {
return Ok(self.emit_text_done_with_key(stream_key.as_str(), text));
}
let item_id = Self::text_item_id(&v);
Ok(self.emit_done_text(item_id.as_deref(), text))
}
"response.content_part.added" => {
let text = Self::content_part_output_text(&v, true);
if text.is_empty() {
return Ok(None);
}
if let Some(output_index) = Self::text_output_index(&v) {
self.streamed_text_output_indexes.insert(output_index);
}
if let Some(stream_key) = Self::text_stream_key(&v) {
return Ok(self.emit_text_delta_with_key(stream_key.as_str(), text.as_str()));
}
if let Some(item_id) = Self::text_item_id(&v) {
self.streamed_text_item_ids.insert(item_id);
} else {
self.saw_unkeyed_text_delta = true;
}
Ok(Some(LLMChunk::Token(text)))
}
"response.content_part.done" => {
let text = Self::content_part_output_text(&v, false);
if let Some(output_index) = Self::text_output_index(&v) {
self.streamed_text_output_indexes.insert(output_index);
}
if let Some(stream_key) = Self::text_stream_key(&v) {
return Ok(self.emit_text_done_with_key(stream_key.as_str(), text.as_str()));
}
let item_id = Self::text_item_id(&v);
Ok(self.emit_done_text(item_id.as_deref(), text.as_str()))
}
"response.output_item.added" => {
if let Some(item) = v.get("item") {
let item_type = item.get("type").and_then(|t| t.as_str()).unwrap_or("");
if item_type == "function_call" {
let item_id = item
.get("id")
.and_then(|id| id.as_str())
.or_else(|| v.get("item_id").and_then(|id| id.as_str()))
.unwrap_or("");
if !item_id.is_empty() {
self.parse_fn_call_item(item_id, item);
}
}
}
Ok(None)
}
"response.function_call_arguments.delta" => {
let item_id = v.get("item_id").and_then(|id| id.as_str()).unwrap_or("");
let delta = v.get("delta").and_then(|d| d.as_str()).unwrap_or("");
if item_id.is_empty() || delta.is_empty() {
return Ok(None);
}
let entry = self.ensure_fn_call(item_id);
entry.arguments.push_str(delta);
Ok(None)
}
"response.output_item.done" => {
let item_id = Self::text_item_id(&v).unwrap_or_default();
if let Some(item) = v.get("item") {
let item_type = item.get("type").and_then(|t| t.as_str()).unwrap_or("");
if item_type == "reasoning" {
if self.saw_reasoning_text_stream {
return Ok(None);
}
let text = Self::reasoning_item_summary_text(item);
return Ok(self.emit_reasoning_item_text(
if item_id.is_empty() {
None
} else {
Some(item_id.as_str())
},
text.as_str(),
));
}
if item_type == "message" {
let output_index = Self::text_output_index(&v);
if output_index
.is_some_and(|index| self.streamed_text_output_indexes.contains(&index))
{
return Ok(None);
}
let text = Self::message_item_output_text(item);
let out = self.emit_done_text(
if item_id.is_empty() {
None
} else {
Some(item_id.as_str())
},
text.as_str(),
);
if out.is_some() {
if let Some(output_index) = output_index {
self.streamed_text_output_indexes.insert(output_index);
}
}
return Ok(out);
}
if item_type == "function_call" && !item_id.is_empty() {
self.apply_done_fn_call_item(&item_id, item);
}
}
if item_id.is_empty() {
return Ok(None);
}
let Some(call) = self.finalize_tool_call(&item_id) else {
return Ok(None);
};
Ok(Some(LLMChunk::ToolCalls(vec![call])))
}
"response.completed" => {
let usage = v
.get("response")
.and_then(|response| response.get("usage"))
.or_else(|| v.get("usage"));
self.log_reasoning_summary_if_needed(usage);
Ok(Some(LLMChunk::Done))
}
_ => Ok(None),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llm::models::{ContentPart, ImageUrl};
use bamboo_domain::MessagePart;
use bamboo_domain::MessagePhase;
use bamboo_domain::{FunctionCall, ToolCall};
use bamboo_domain::{FunctionSchema, ToolSchema};
#[test]
fn build_responses_body_includes_input_and_stream() {
let body = build_responses_body("gpt-5.3-codex", &[], &[], Some(123), None, None, None);
assert_eq!(body["model"], "gpt-5.3-codex");
assert_eq!(body["stream"], true);
assert_eq!(body["max_output_tokens"], 123);
assert_eq!(body["store"], false);
assert!(body.get("input").is_some());
}
#[test]
fn build_responses_body_applies_responses_options() {
let body = build_responses_body(
"gpt-5.4",
&[],
&[],
None,
Some(ReasoningEffort::High),
Some(&ResponsesRequestOptions {
instructions: Some("You are helpful".to_string()),
reasoning_summary: Some("detailed".to_string()),
include: Some(vec!["reasoning.encrypted_content".to_string()]),
store: Some(true),
previous_response_id: Some("resp_123".to_string()),
truncation: Some("auto".to_string()),
text_verbosity: Some("high".to_string()),
}),
None,
);
assert_eq!(body["reasoning"]["effort"], "high");
assert_eq!(body["reasoning"]["summary"], "detailed");
assert_eq!(body["instructions"], "You are helpful");
assert_eq!(
body["include"],
serde_json::json!(["reasoning.encrypted_content"])
);
assert_eq!(body["store"], true);
assert_eq!(body["previous_response_id"], "resp_123");
assert_eq!(body["truncation"], "auto");
assert_eq!(body["text"]["verbosity"], "high");
}
#[test]
fn build_responses_body_with_parallel_tool_calls() {
let body = build_responses_body("gpt-5.4", &[], &[], None, None, None, Some(false));
assert_eq!(body["parallel_tool_calls"], false);
}
#[test]
fn tools_to_responses_json_flattens_function_schema() {
let tools = vec![ToolSchema {
schema_type: "function".to_string(),
function: FunctionSchema {
name: "search".to_string(),
description: "Search things".to_string(),
parameters: json!({
"type": "object",
"properties": { "q": { "type": "string" } },
"required": ["q"]
}),
},
}];
let out = tools_to_responses_json(&tools);
assert_eq!(out.len(), 1);
assert_eq!(out[0]["type"], "function");
assert_eq!(out[0]["name"], "search");
assert_eq!(out[0]["description"], "Search things");
assert!(out[0].get("function").is_none());
assert!(out[0].get("parameters").is_some());
}
#[test]
fn tools_to_responses_json_sanitizes_top_level_combinators() {
let tools = vec![ToolSchema {
schema_type: "function".to_string(),
function: FunctionSchema {
name: "edit".to_string(),
description: "Edit file".to_string(),
parameters: json!({
"type": "object",
"properties": {
"file_path": { "type": "string" },
"patch": { "type": "string" }
},
"oneOf": [
{ "required": ["patch"] },
{ "required": ["old_string", "new_string"] }
]
}),
},
}];
let out = tools_to_responses_json(&tools);
assert!(out[0]["parameters"]["oneOf"].is_null());
assert_eq!(out[0]["parameters"]["type"], "object");
}
#[test]
fn messages_to_responses_input_json_serializes_tool_calls_structurally() {
let messages = vec![
Message::assistant(
"Calling a tool...",
Some(vec![ToolCall {
id: "call_1".to_string(),
tool_type: "function".to_string(),
function: FunctionCall {
name: "search".to_string(),
arguments: r#"{"q":"x"}"#.to_string(),
},
}]),
),
Message::tool_result("call_1", "result payload"),
];
let out = messages_to_responses_input_json(&messages);
assert_eq!(out.len(), 3);
assert_eq!(out[0]["type"], "message");
assert_eq!(out[0]["role"], "assistant");
assert_eq!(out[0]["content"], "Calling a tool...");
assert_eq!(out[0]["phase"], "commentary");
assert_eq!(out[1]["type"], "function_call");
assert_eq!(out[1]["call_id"], "call_1");
assert_eq!(out[1]["name"], "search");
assert_eq!(out[1]["arguments"], r#"{"q":"x"}"#);
assert_eq!(out[2]["type"], "function_call_output");
assert_eq!(out[2]["call_id"], "call_1");
assert_eq!(out[2]["output"], "result payload");
}
#[test]
fn messages_to_responses_input_json_assistant_with_only_tool_calls_no_text() {
let messages = vec![
Message::assistant(
"",
Some(vec![ToolCall {
id: "call_1".to_string(),
tool_type: "function".to_string(),
function: FunctionCall {
name: "read_file".to_string(),
arguments: r#"{"path":"/tmp/test"}"#.to_string(),
},
}]),
),
Message::tool_result("call_1", "file contents"),
];
let out = messages_to_responses_input_json(&messages);
assert_eq!(out.len(), 2);
assert_eq!(out[0]["type"], "function_call");
assert_eq!(out[0]["call_id"], "call_1");
assert_eq!(out[0]["name"], "read_file");
assert_eq!(out[1]["type"], "function_call_output");
assert_eq!(out[1]["call_id"], "call_1");
assert_eq!(out[1]["output"], "file contents");
}
#[test]
fn messages_to_responses_input_json_multiple_tool_calls_in_one_round() {
let messages = vec![
Message::user("Search and read"),
Message::assistant(
"",
Some(vec![
ToolCall {
id: "call_1".to_string(),
tool_type: "function".to_string(),
function: FunctionCall {
name: "search".to_string(),
arguments: r#"{"q":"test"}"#.to_string(),
},
},
ToolCall {
id: "call_2".to_string(),
tool_type: "function".to_string(),
function: FunctionCall {
name: "read_file".to_string(),
arguments: r#"{"path":"/tmp"}"#.to_string(),
},
},
]),
),
Message::tool_result("call_1", "search results"),
Message::tool_result("call_2", "file contents"),
];
let out = messages_to_responses_input_json(&messages);
assert_eq!(out.len(), 5);
assert_eq!(out[0]["type"], "message");
assert_eq!(out[0]["role"], "user");
assert_eq!(out[1]["type"], "function_call");
assert_eq!(out[1]["call_id"], "call_1");
assert_eq!(out[1]["name"], "search");
assert_eq!(out[2]["type"], "function_call");
assert_eq!(out[2]["call_id"], "call_2");
assert_eq!(out[2]["name"], "read_file");
assert_eq!(out[3]["type"], "function_call_output");
assert_eq!(out[3]["call_id"], "call_1");
assert_eq!(out[4]["type"], "function_call_output");
assert_eq!(out[4]["call_id"], "call_2");
}
#[test]
fn messages_to_responses_input_json_tool_result_without_call_id_falls_back() {
let messages = vec![Message::tool_result("", "orphan result")];
let out = messages_to_responses_input_json(&messages);
assert_eq!(out.len(), 1);
assert_eq!(out[0]["type"], "message");
assert_eq!(out[0]["role"], "user");
assert!(out[0]["content"]
.as_str()
.unwrap_or("")
.contains("[tool_result]"));
}
#[test]
fn messages_to_responses_input_json_system_and_user_messages() {
let messages = vec![Message::system("You are helpful"), Message::user("Hello")];
let out = messages_to_responses_input_json(&messages);
assert_eq!(out.len(), 2);
assert_eq!(out[0]["type"], "message");
assert_eq!(out[0]["role"], "system");
assert_eq!(out[0]["content"], "You are helpful");
assert_eq!(out[1]["type"], "message");
assert_eq!(out[1]["role"], "user");
assert_eq!(out[1]["content"], "Hello");
}
#[test]
fn messages_to_responses_input_json_assistant_without_tool_calls() {
let messages = vec![Message::assistant("Just a text reply", None)];
let out = messages_to_responses_input_json(&messages);
assert_eq!(out.len(), 1);
assert_eq!(out[0]["type"], "message");
assert_eq!(out[0]["role"], "assistant");
assert_eq!(out[0]["content"], "Just a text reply");
assert_eq!(out[0]["phase"], "final_answer");
}
#[test]
fn messages_to_responses_input_json_uses_output_text_for_assistant_in_typed_content_mode() {
let messages = vec![
Message::user_with_parts(
"Describe this image",
vec![ContentPart::ImageUrl {
image_url: ImageUrl {
url: "https://example.com/image.png".to_string(),
detail: None,
},
}]
.into_iter()
.map(Into::into)
.collect(),
),
Message::assistant("It is a screenshot.", None),
];
let out = messages_to_responses_input_json(&messages);
assert_eq!(out.len(), 2);
assert_eq!(out[1]["type"], "message");
assert_eq!(out[1]["role"], "assistant");
assert_eq!(out[1]["content"][0]["type"], "output_text");
assert_eq!(out[1]["content"][0]["text"], "It is a screenshot.");
}
#[test]
fn messages_to_responses_input_json_honors_explicit_assistant_phase() {
let mut assistant = Message::assistant("intermediate narration", None);
assistant.phase = Some(MessagePhase::Commentary);
let out = messages_to_responses_input_json(&[assistant]);
assert_eq!(out.len(), 1);
assert_eq!(out[0]["type"], "message");
assert_eq!(out[0]["role"], "assistant");
assert_eq!(out[0]["phase"], "commentary");
}
#[test]
fn parser_emits_token_on_output_text_delta() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.output_text.delta",
r#"{"type":"response.output_text.delta","delta":"hi"}"#,
)
.unwrap();
match out {
Some(LLMChunk::Token(t)) => assert_eq!(t, "hi"),
other => panic!("expected token, got {other:?}"),
}
}
#[test]
fn parser_emits_token_on_output_text_done_without_prior_delta() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.output_text.done",
r#"{"type":"response.output_text.done","item_id":"msg_1","text":"hello"}"#,
)
.unwrap();
match out {
Some(LLMChunk::Token(t)) => assert_eq!(t, "hello"),
other => panic!("expected token, got {other:?}"),
}
}
#[test]
fn parser_skips_output_text_done_after_streaming_delta_for_same_item() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.output_text.delta",
r#"{"type":"response.output_text.delta","item_id":"msg_1","delta":"hel"}"#,
)
.unwrap();
let out = p
.handle_event(
"response.output_text.done",
r#"{"type":"response.output_text.done","item_id":"msg_1","text":"hello"}"#,
)
.unwrap();
assert!(out.is_none());
}
#[test]
fn parser_ignores_snapshot_text_on_content_part_added() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.content_part.added",
r#"{"type":"response.content_part.added","item_id":"msg_2","part":{"type":"output_text","text":"hello from part added"}}"#,
)
.unwrap();
assert!(out.is_none());
}
#[test]
fn parser_emits_token_on_content_part_done_without_prior_delta() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.content_part.done",
r#"{"type":"response.content_part.done","item_id":"msg_3","part":{"type":"output_text","text":"hello from part done"}}"#,
)
.unwrap();
match out {
Some(LLMChunk::Token(t)) => assert_eq!(t, "hello from part done"),
other => panic!("expected token, got {other:?}"),
}
}
#[test]
fn parser_skips_content_part_done_after_text_stream_for_same_item() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.output_text.delta",
r#"{"type":"response.output_text.delta","item_id":"msg_4","delta":"hello"}"#,
)
.unwrap();
let out = p
.handle_event(
"response.content_part.done",
r#"{"type":"response.content_part.done","item_id":"msg_4","part":{"type":"output_text","text":"hello"}}"#,
)
.unwrap();
assert!(out.is_none());
}
#[test]
fn parser_emits_response_id_on_created_event() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.created",
r#"{"type":"response.created","response":{"id":"resp_123","status":"in_progress"}}"#,
)
.unwrap();
match out {
Some(LLMChunk::ResponseId(response_id)) => assert_eq!(response_id, "resp_123"),
other => panic!("expected response id, got {other:?}"),
}
}
#[test]
fn parser_does_not_confuse_item_id_with_response_id() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.output_item.added",
r#"{"type":"response.output_item.added","item":{"id":"item_1","type":"function_call","call_id":"call_1","name":"search","arguments":"{}"}}"#,
)
.unwrap();
assert!(out.is_none());
}
#[test]
fn parser_ignores_message_output_item_added_snapshot() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.output_item.added",
r#"{"type":"response.output_item.added","item":{"id":"msg_added_1","type":"message","content":[{"type":"output_text","text":"thinking before tool"}]}}"#,
)
.unwrap();
assert!(out.is_none());
}
#[test]
fn parser_emits_message_output_item_done_after_added_snapshot() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.output_item.added",
r#"{"type":"response.output_item.added","item":{"id":"msg_added_2","type":"message","content":[{"type":"output_text","text":"thinking"}]}}"#,
)
.unwrap();
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","item":{"id":"msg_added_2","type":"message","content":[{"type":"output_text","text":"thinking"}]}}"#,
)
.unwrap();
match out {
Some(LLMChunk::Token(t)) => assert_eq!(t, "thinking"),
other => panic!("expected token, got {other:?}"),
}
}
#[test]
fn parser_emits_reasoning_token_on_reasoning_delta() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.reasoning.delta",
r#"{"type":"response.reasoning.delta","delta":"think"}"#,
)
.unwrap();
match out {
Some(LLMChunk::ReasoningToken(t)) => assert_eq!(t, "think"),
other => panic!("expected reasoning token, got {other:?}"),
}
}
#[test]
fn parser_emits_reasoning_token_from_reasoning_output_item_done() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","item":{"id":"rs_1","type":"reasoning","summary":[{"type":"summary_text","text":"I will inspect the repo first."}]}}"#,
)
.unwrap();
match out {
Some(LLMChunk::ReasoningToken(t)) => assert_eq!(t, "I will inspect the repo first."),
other => panic!("expected reasoning token, got {other:?}"),
}
}
#[test]
fn parser_joins_multiple_reasoning_summary_parts_with_blank_line() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","item":{"id":"rs_multi_1","type":"reasoning","summary":[{"type":"summary_text","text":"Evaluating local changes"},{"type":"summary_text","text":"Committing release version"}]}}"#,
)
.unwrap();
match out {
Some(LLMChunk::ReasoningToken(t)) => {
assert_eq!(t, "Evaluating local changes\n\nCommitting release version")
}
other => panic!("expected reasoning token, got {other:?}"),
}
}
#[test]
fn parser_skips_duplicate_reasoning_done_after_reasoning_delta_for_same_item() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_2","delta":"Planning now."}"#,
)
.unwrap();
let out = p
.handle_event(
"response.reasoning_summary_text.done",
r#"{"type":"response.reasoning_summary_text.done","item_id":"rs_2","text":"Planning now."}"#,
)
.unwrap();
assert!(out.is_none());
}
#[test]
fn parser_skips_duplicate_reasoning_output_item_done_after_reasoning_delta() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_3","delta":"Planning now."}"#,
)
.unwrap();
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","item":{"id":"rs_3","type":"reasoning","summary":[{"type":"summary_text","text":"Planning now."}]}}"#,
)
.unwrap();
assert!(out.is_none());
}
#[test]
fn parser_emits_tool_call_on_output_item_done() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.output_item.added",
r#"{"type":"response.output_item.added","item":{"id":"item_1","type":"function_call","call_id":"call_1","name":"search","arguments":"{\"q\":\""}}"#,
)
.unwrap();
let _ = p
.handle_event(
"response.function_call_arguments.delta",
r#"{"type":"response.function_call_arguments.delta","item_id":"item_1","delta":"test\"}"}"#,
)
.unwrap();
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","item_id":"item_1","item":{"id":"item_1","type":"function_call"}}"#,
)
.unwrap();
match out {
Some(LLMChunk::ToolCalls(calls)) => {
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].id, "call_1");
assert_eq!(calls[0].function.name, "search");
assert_eq!(calls[0].function.arguments, r#"{"q":"test"}"#);
}
other => panic!("expected tool_calls, got {other:?}"),
}
}
#[test]
fn parser_prefers_done_arguments_snapshot_over_accumulated_partials() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.output_item.added",
r#"{"type":"response.output_item.added","item":{"id":"item_2","type":"function_call","call_id":"call_2","name":"search","arguments":"{\"q\":\""}}"#,
)
.unwrap();
let _ = p
.handle_event(
"response.function_call_arguments.delta",
r#"{"type":"response.function_call_arguments.delta","item_id":"item_2","delta":"test\"}"}"#,
)
.unwrap();
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","item":{"id":"item_2","type":"function_call","call_id":"call_2","name":"search","arguments":"{\"q\":\"test\"}"}}"#,
)
.unwrap();
match out {
Some(LLMChunk::ToolCalls(calls)) => {
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].id, "call_2");
assert_eq!(calls[0].function.name, "search");
assert_eq!(calls[0].function.arguments, r#"{"q":"test"}"#);
}
other => panic!("expected tool_calls, got {other:?}"),
}
}
#[test]
fn parser_emits_token_from_message_output_item_done_when_no_deltas_seen() {
let mut p = ResponsesSseParser::new();
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","item":{"id":"msg_1","type":"message","content":[{"type":"output_text","text":"hello from item"}]}}"#,
)
.unwrap();
match out {
Some(LLMChunk::Token(t)) => assert_eq!(t, "hello from item"),
other => panic!("expected token, got {other:?}"),
}
}
#[test]
fn parser_skips_message_output_item_done_when_text_delta_already_streamed() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.output_text.delta",
r#"{"type":"response.output_text.delta","item_id":"msg_1","delta":"hello"}"#,
)
.unwrap();
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","item":{"id":"msg_1","type":"message","content":[{"type":"output_text","text":"hello"}]}}"#,
)
.unwrap();
assert!(out.is_none());
}
#[test]
fn parser_avoids_duplicate_text_when_snapshot_and_delta_channels_overlap() {
let mut p = ResponsesSseParser::new();
let mut emitted = Vec::new();
let out = p
.handle_event(
"response.output_item.added",
r#"{"type":"response.output_item.added","item":{"id":"msg_overlap","type":"message","content":[{"type":"output_text","text":"hello"}]}}"#,
)
.unwrap();
if let Some(chunk) = out {
emitted.push(chunk);
}
let out = p
.handle_event(
"response.content_part.added",
r#"{"type":"response.content_part.added","item_id":"msg_overlap","part":{"type":"output_text","text":"hello"}}"#,
)
.unwrap();
if let Some(chunk) = out {
emitted.push(chunk);
}
let out = p
.handle_event(
"response.output_text.delta",
r#"{"type":"response.output_text.delta","item_id":"msg_overlap","delta":"hello"}"#,
)
.unwrap();
if let Some(chunk) = out {
emitted.push(chunk);
}
let out = p
.handle_event(
"response.output_text.done",
r#"{"type":"response.output_text.done","item_id":"msg_overlap","text":"hello"}"#,
)
.unwrap();
if let Some(chunk) = out {
emitted.push(chunk);
}
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","item":{"id":"msg_overlap","type":"message","content":[{"type":"output_text","text":"hello"}]}}"#,
)
.unwrap();
if let Some(chunk) = out {
emitted.push(chunk);
}
let tokens: Vec<String> = emitted
.into_iter()
.filter_map(|chunk| match chunk {
LLMChunk::Token(text) => Some(text),
_ => None,
})
.collect();
assert_eq!(tokens, vec!["hello".to_string()]);
}
#[test]
fn parser_emits_single_token_when_done_channels_repeat_same_text_with_different_item_ids() {
let mut p = ResponsesSseParser::new();
let mut emitted = Vec::new();
let out = p
.handle_event(
"response.output_text.done",
r#"{"type":"response.output_text.done","item_id":"msg_a","output_index":0,"content_index":0,"text":"Hi! What can I help you with?"}"#,
)
.unwrap();
if let Some(chunk) = out {
emitted.push(chunk);
}
let out = p
.handle_event(
"response.content_part.done",
r#"{"type":"response.content_part.done","item_id":"msg_b","output_index":0,"part":{"type":"output_text","text":"Hi! What can I help you with?"}}"#,
)
.unwrap();
if let Some(chunk) = out {
emitted.push(chunk);
}
let out = p
.handle_event(
"response.output_item.done",
r#"{"type":"response.output_item.done","output_index":0,"item":{"id":"msg_c","type":"message","content":[{"type":"output_text","text":"Hi! What can I help you with?"}]}}"#,
)
.unwrap();
if let Some(chunk) = out {
emitted.push(chunk);
}
let tokens: Vec<String> = emitted
.into_iter()
.filter_map(|chunk| match chunk {
LLMChunk::Token(text) => Some(text),
_ => None,
})
.collect();
assert_eq!(tokens, vec!["Hi! What can I help you with?".to_string()]);
}
#[test]
fn parser_handles_cumulative_reasoning_delta_for_same_item_as_suffix() {
let mut p = ResponsesSseParser::new();
let first = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_snap","delta":"Analyzing project structure"}"#,
)
.unwrap();
match first {
Some(LLMChunk::ReasoningToken(text)) => assert_eq!(text, "Analyzing project structure"),
other => panic!("expected reasoning token, got {other:?}"),
}
let second = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_snap","delta":"Analyzing project structure and listing candidate files"}"#,
)
.unwrap();
match second {
Some(LLMChunk::ReasoningToken(text)) => {
assert_eq!(text, " and listing candidate files")
}
other => panic!("expected reasoning suffix token, got {other:?}"),
}
let done = p
.handle_event(
"response.reasoning_summary_text.done",
r#"{"type":"response.reasoning_summary_text.done","item_id":"rs_snap","text":"Analyzing project structure and listing candidate files"}"#,
)
.unwrap();
assert!(done.is_none());
}
#[test]
fn parser_skips_reasoning_summary_done_for_same_summary_stream_when_item_ids_differ() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_a","output_index":0,"summary_index":7,"delta":"Planning now."}"#,
)
.unwrap();
let out = p
.handle_event(
"response.reasoning_summary_text.done",
r#"{"type":"response.reasoning_summary_text.done","item_id":"rs_b","output_index":0,"summary_index":7,"text":"Planning now."}"#,
)
.unwrap();
assert!(out.is_none());
}
#[test]
fn parser_normalizes_cumulative_reasoning_delta_by_summary_stream_key() {
let mut p = ResponsesSseParser::new();
let first = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_key_1","output_index":0,"summary_index":3,"delta":"Analyzing project"}"#,
)
.unwrap();
match first {
Some(LLMChunk::ReasoningToken(text)) => assert_eq!(text, "Analyzing project"),
other => panic!("expected reasoning token, got {other:?}"),
}
let second = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_key_2","output_index":0,"summary_index":3,"delta":"Analyzing project structure"}"#,
)
.unwrap();
match second {
Some(LLMChunk::ReasoningToken(text)) => assert_eq!(text, " structure"),
other => panic!("expected reasoning suffix token, got {other:?}"),
}
}
#[test]
fn parser_skips_reasoning_summary_part_done_after_summary_text_done_for_same_stream() {
let mut p = ResponsesSseParser::new();
let first = p
.handle_event(
"response.reasoning_summary_text.done",
r#"{"type":"response.reasoning_summary_text.done","item_id":"sum_1","output_index":0,"summary_index":9,"text":"Final summary"}"#,
)
.unwrap();
match first {
Some(LLMChunk::ReasoningToken(text)) => assert_eq!(text, "Final summary"),
other => panic!("expected reasoning token, got {other:?}"),
}
let second = p
.handle_event(
"response.reasoning_summary_part.done",
r#"{"type":"response.reasoning_summary_part.done","item_id":"sum_2","output_index":0,"summary_index":9,"part":{"type":"summary_text","text":"Final summary"}}"#,
)
.unwrap();
assert!(second.is_none());
}
#[test]
fn parser_inserts_blank_line_before_new_reasoning_summary_stream_delta() {
let mut p = ResponsesSseParser::new();
let first = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_block_1","output_index":0,"summary_index":0,"delta":"I've noticed some strong evidence to analyze."}"#,
)
.unwrap();
match first {
Some(LLMChunk::ReasoningToken(text)) => {
assert_eq!(text, "I've noticed some strong evidence to analyze.")
}
other => panic!("expected reasoning token, got {other:?}"),
}
let second = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_block_2","output_index":0,"summary_index":1,"delta":"Analyzing dream view functionality"}"#,
)
.unwrap();
match second {
Some(LLMChunk::ReasoningToken(text)) => {
assert_eq!(text, "\n\nAnalyzing dream view functionality")
}
other => panic!("expected reasoning token with separator, got {other:?}"),
}
}
#[test]
fn parser_does_not_duplicate_blank_line_when_new_reasoning_stream_already_has_separator() {
let mut p = ResponsesSseParser::new();
let _ = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_block_3","output_index":0,"summary_index":0,"delta":"First block."}"#,
)
.unwrap();
let second = p
.handle_event(
"response.reasoning_summary_text.delta",
r#"{"type":"response.reasoning_summary_text.delta","item_id":"rs_block_4","output_index":0,"summary_index":1,"delta":"\n\nSecond block."}"#,
)
.unwrap();
match second {
Some(LLMChunk::ReasoningToken(text)) => assert_eq!(text, "\n\nSecond block."),
other => panic!("expected reasoning token, got {other:?}"),
}
}
}