#![allow(clippy::cast_possible_truncation)]
use std::collections::HashSet;
use bytes::Bytes;
use futures::StreamExt;
use serde_json::{Map, Value, json};
use crate::codecs::codec::{
BoxByteStream, BoxDeltaStream, Codec, EncodedRequest, extract_openai_rate_limit,
service_tier_str,
};
use crate::error::{Error, Result};
use crate::ir::{
Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
ModelWarning, OutputStrategy, ReasoningEffort, RefusalReason, ResponseFormat, Role, StopReason,
ToolChoice, ToolKind, ToolResultContent, Usage,
};
use crate::rate_limit::RateLimitSnapshot;
use crate::stream::StreamDelta;
const DEFAULT_MAX_CONTEXT_TOKENS: u32 = 128_000;
#[derive(Clone, Copy, Debug, Default)]
pub struct OpenAiChatCodec;
impl OpenAiChatCodec {
pub const fn new() -> Self {
Self
}
}
impl Codec for OpenAiChatCodec {
fn name(&self) -> &'static str {
"openai-chat"
}
fn capabilities(&self, _model: &str) -> Capabilities {
Capabilities {
streaming: true,
tools: true,
multimodal_image: true,
multimodal_audio: true,
multimodal_video: false,
multimodal_document: true,
system_prompt: true,
structured_output: true,
prompt_caching: true,
thinking: false,
citations: true,
web_search: false,
computer_use: false,
max_context_tokens: DEFAULT_MAX_CONTEXT_TOKENS,
}
}
fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
let (body, warnings) = build_body(request, false)?;
finalize_request(&body, warnings)
}
fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
let (body, warnings) = build_body(request, true)?;
let mut encoded = finalize_request(&body, warnings)?;
encoded.headers.insert(
http::header::ACCEPT,
http::HeaderValue::from_static("text/event-stream"),
);
Ok(encoded.into_streaming())
}
fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
let raw: Value = super::codec::parse_response_body(body, "OpenAI Chat")?;
let mut warnings = warnings_in;
let id = str_field(&raw, "id").to_owned();
let model = str_field(&raw, "model").to_owned();
let usage = decode_usage(raw.get("usage"));
let (content, stop_reason) = decode_choice(&raw, &mut warnings);
Ok(ModelResponse {
id,
model,
stop_reason,
content,
usage,
rate_limit: None,
warnings,
provider_echoes: Vec::new(),
})
}
fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
extract_openai_rate_limit(headers)
}
fn decode_stream<'a>(
&'a self,
bytes: BoxByteStream<'a>,
warnings_in: Vec<ModelWarning>,
) -> BoxDeltaStream<'a> {
Box::pin(stream_openai_chat(bytes, warnings_in))
}
}
fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
if request.messages.is_empty() && request.system.is_empty() {
return Err(Error::invalid_request(
"OpenAI Chat requires at least one message",
));
}
let mut warnings = Vec::with_capacity(1);
let messages = encode_messages(request, &mut warnings);
let mut body = Map::new();
body.insert("model".into(), Value::String(request.model.clone()));
body.insert("messages".into(), Value::Array(messages));
if let Some(n) = request.max_tokens {
body.insert("max_tokens".into(), json!(n));
}
if let Some(t) = request.temperature {
body.insert("temperature".into(), json!(t));
}
if let Some(p) = request.top_p {
body.insert("top_p".into(), json!(p));
}
if request.top_k.is_some() {
warnings.push(ModelWarning::LossyEncode {
field: "top_k".into(),
detail: "OpenAI Chat Completions has no top_k parameter — setting dropped".into(),
});
}
if !request.stop_sequences.is_empty() {
body.insert("stop".into(), json!(request.stop_sequences));
}
if !request.tools.is_empty() {
body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
body.insert(
"tool_choice".into(),
encode_tool_choice(&request.tool_choice),
);
}
if let Some(format) = &request.response_format {
encode_openai_chat_structured_output(format, &mut body, &mut warnings)?;
}
if streaming {
body.insert("stream".into(), Value::Bool(true));
body.insert("stream_options".into(), json!({ "include_usage": true }));
}
apply_provider_extensions(request, &mut body, &mut warnings);
Ok((Value::Object(body), warnings))
}
fn apply_provider_extensions(
request: &ModelRequest,
body: &mut Map<String, Value>,
warnings: &mut Vec<ModelWarning>,
) {
let ext = &request.provider_extensions;
if let Some(parallel) = request.parallel_tool_calls {
body.insert("parallel_tool_calls".into(), json!(parallel));
}
if let Some(seed) = request.seed {
body.insert("seed".into(), json!(seed));
}
if let Some(user) = &request.end_user_id {
body.insert("user".into(), Value::String(user.clone()));
}
if let Some(openai_chat) = &ext.openai_chat {
if let Some(key) = &openai_chat.cache_key {
body.insert("prompt_cache_key".into(), Value::String(key.clone()));
}
if let Some(tier) = openai_chat.service_tier {
body.insert(
"service_tier".into(),
Value::String(service_tier_str(tier).into()),
);
}
}
if let Some(effort) = &request.reasoning_effort {
if is_openai_reasoning_model(&request.model) {
let effort_str = encode_chat_reasoning_effort(effort, warnings);
body.insert("reasoning_effort".into(), Value::String(effort_str));
} else {
warnings.push(ModelWarning::LossyEncode {
field: "reasoning_effort".into(),
detail: "OpenAI Chat Completions accepts `reasoning_effort` only on reasoning \
models (o1 / o3 / o4 / gpt-5); current model is non-reasoning — \
field dropped. Use OpenAiResponsesCodec for the full reasoning surface."
.into(),
});
}
}
if ext.anthropic.is_some() {
warnings.push(ModelWarning::ProviderExtensionIgnored {
vendor: "anthropic".into(),
});
}
if ext.openai_responses.is_some() {
warnings.push(ModelWarning::ProviderExtensionIgnored {
vendor: "openai_responses".into(),
});
}
if ext.gemini.is_some() {
warnings.push(ModelWarning::ProviderExtensionIgnored {
vendor: "gemini".into(),
});
}
if ext.bedrock.is_some() {
warnings.push(ModelWarning::ProviderExtensionIgnored {
vendor: "bedrock".into(),
});
}
}
fn is_openai_reasoning_model(model: &str) -> bool {
model.starts_with("o1")
|| model.starts_with("o3")
|| model.starts_with("o4")
|| model.starts_with("gpt-5")
}
fn encode_chat_reasoning_effort(
effort: &ReasoningEffort,
warnings: &mut Vec<ModelWarning>,
) -> String {
match effort {
ReasoningEffort::Off => "none".to_owned(),
ReasoningEffort::Minimal => "minimal".to_owned(),
ReasoningEffort::Low => "low".to_owned(),
ReasoningEffort::Medium => "medium".to_owned(),
ReasoningEffort::High => "high".to_owned(),
ReasoningEffort::Auto => {
warnings.push(ModelWarning::LossyEncode {
field: "reasoning_effort".into(),
detail: "OpenAI Chat has no `Auto` bucket — snapped to `medium`".into(),
});
"medium".to_owned()
}
ReasoningEffort::VendorSpecific(literal) => literal.clone(),
}
}
fn encode_openai_chat_structured_output(
format: &ResponseFormat,
body: &mut Map<String, Value>,
warnings: &mut Vec<ModelWarning>,
) -> Result<()> {
let strategy = match format.strategy {
OutputStrategy::Auto | OutputStrategy::Native => OutputStrategy::Native,
explicit => explicit,
};
match strategy {
OutputStrategy::Native => {
if let Err(err) = format.strict_preflight() {
warnings.push(ModelWarning::LossyEncode {
field: "response_format.json_schema".into(),
detail: err.to_string(),
});
}
body.insert(
"response_format".into(),
json!({
"type": "json_schema",
"json_schema": {
"name": format.json_schema.name,
"schema": format.json_schema.schema,
"strict": format.strict,
}
}),
);
}
OutputStrategy::Tool => {
let tool_name = format.json_schema.name.clone();
let synthetic_tool = json!({
"type": "function",
"function": {
"name": tool_name,
"description": format!(
"Emit the response as a JSON object matching the {tool_name} schema."
),
"parameters": format.json_schema.schema.clone(),
"strict": format.strict,
}
});
let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
if let Value::Array(arr) = tools {
arr.insert(0, synthetic_tool);
}
body.insert(
"tool_choice".into(),
json!({
"type": "function",
"function": { "name": format.json_schema.name },
}),
);
}
OutputStrategy::Prompted => {
return Err(Error::invalid_request(
"OutputStrategy::Prompted is deferred to entelix 1.1; use \
OutputStrategy::Native or OutputStrategy::Tool",
));
}
OutputStrategy::Auto => unreachable!("Auto resolved above"),
}
Ok(())
}
fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
let bytes = serde_json::to_vec(body)?;
let mut encoded = EncodedRequest::post_json("/v1/chat/completions", Bytes::from(bytes));
encoded.warnings = warnings;
Ok(encoded)
}
fn encode_messages(request: &ModelRequest, warnings: &mut Vec<ModelWarning>) -> Vec<Value> {
let mut out: Vec<Value> = Vec::new();
if !request.system.is_empty() {
if request.system.any_cached() {
warnings.push(ModelWarning::LossyEncode {
field: "system.cache_control".into(),
detail: "OpenAI Chat has no native prompt-cache control; \
block text is concatenated and the cache directive \
is dropped"
.into(),
});
}
out.push(json!({
"role": "system",
"content": request.system.concat_text(),
}));
}
for (idx, msg) in request.messages.iter().enumerate() {
match msg.role {
Role::System => {
let text = collect_text(&msg.content, warnings, idx);
out.push(json!({ "role": "system", "content": text }));
}
Role::User => {
out.push(json!({
"role": "user",
"content": encode_user_content(&msg.content, warnings, idx),
}));
}
Role::Assistant => {
out.push(encode_assistant_message(&msg.content, warnings, idx));
}
Role::Tool => {
for (part_idx, part) in msg.content.iter().enumerate() {
if let ContentPart::ToolResult {
tool_use_id,
content,
is_error,
..
} = part
{
let body_str = match content {
ToolResultContent::Text(t) => t.clone(),
ToolResultContent::Json(v) => v.to_string(),
};
let mut entry = Map::new();
entry.insert("role".into(), Value::String("tool".into()));
entry.insert("tool_call_id".into(), Value::String(tool_use_id.clone()));
entry.insert("content".into(), Value::String(body_str));
if *is_error {
warnings.push(ModelWarning::LossyEncode {
field: format!("messages[{idx}].content[{part_idx}].is_error"),
detail: "OpenAI Chat has no tool_result is_error flag — \
carrying via content text"
.into(),
});
}
out.push(Value::Object(entry));
} else {
warnings.push(ModelWarning::LossyEncode {
field: format!("messages[{idx}].content[{part_idx}]"),
detail: "non-tool_result part on Role::Tool dropped".into(),
});
}
}
}
}
}
out
}
fn encode_user_content(
parts: &[ContentPart],
warnings: &mut Vec<ModelWarning>,
msg_idx: usize,
) -> Value {
if parts.iter().all(|p| matches!(p, ContentPart::Text { .. })) {
let mut text = String::new();
for part in parts {
if let ContentPart::Text { text: t, .. } = part {
text.push_str(t);
}
}
return Value::String(text);
}
let mut arr = Vec::new();
for (part_idx, part) in parts.iter().enumerate() {
let path = || format!("messages[{msg_idx}].content[{part_idx}]");
match part {
ContentPart::Text { text, .. } => {
arr.push(json!({ "type": "text", "text": text }));
}
ContentPart::Image { source, .. } => {
arr.push(json!({
"type": "image_url",
"image_url": { "url": media_to_url_chat(source) },
}));
}
ContentPart::Audio { source, .. } => {
if let MediaSource::Base64 { media_type, data } = source {
let format = audio_format_from_mime(media_type);
arr.push(json!({
"type": "input_audio",
"input_audio": { "data": data, "format": format },
}));
} else {
warnings.push(ModelWarning::LossyEncode {
field: path(),
detail: "OpenAI Chat input_audio requires base64 source; URL/FileId \
audio dropped"
.into(),
});
}
}
ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
field: path(),
detail: "OpenAI Chat does not accept video inputs; block dropped".into(),
}),
ContentPart::Document { source, name, .. } => {
if let MediaSource::FileId { id, .. } = source {
let mut o = Map::new();
o.insert("type".into(), Value::String("file".into()));
let mut file_obj = Map::new();
file_obj.insert("file_id".into(), Value::String(id.clone()));
if let Some(n) = name {
file_obj.insert("filename".into(), Value::String(n.clone()));
}
o.insert("file".into(), Value::Object(file_obj));
arr.push(Value::Object(o));
} else {
warnings.push(ModelWarning::LossyEncode {
field: path(),
detail: "OpenAI Chat document input requires Files-API FileId source; \
inline document dropped"
.into(),
});
}
}
ContentPart::Thinking { .. } => warnings.push(ModelWarning::LossyEncode {
field: path(),
detail: "OpenAI Chat does not accept thinking blocks on input; block dropped"
.into(),
}),
ContentPart::Citation { .. } => warnings.push(ModelWarning::LossyEncode {
field: path(),
detail: "OpenAI Chat does not echo citations on input; block dropped".into(),
}),
ContentPart::ToolUse { .. } | ContentPart::ToolResult { .. } => {
warnings.push(ModelWarning::LossyEncode {
field: path(),
detail: "tool_use / tool_result not allowed on user role for OpenAI Chat; \
move to assistant or tool role"
.into(),
});
}
ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
warnings.push(ModelWarning::LossyEncode {
field: path(),
detail: "OpenAI Chat does not accept assistant-produced image / audio output \
as input — block dropped"
.into(),
});
}
ContentPart::RedactedThinking { .. } => {
warnings.push(ModelWarning::LossyEncode {
field: path(),
detail: "OpenAI Chat does not accept redacted_thinking blocks; block dropped"
.into(),
});
}
}
}
Value::Array(arr)
}
fn media_to_url_chat(source: &MediaSource) -> String {
match source {
MediaSource::Url { url, .. } => url.clone(),
MediaSource::Base64 { media_type, data } => format!("data:{media_type};base64,{data}"),
MediaSource::FileId { id, .. } => id.clone(),
}
}
fn audio_format_from_mime(mime: &str) -> &'static str {
match mime {
"audio/mp3" | "audio/mpeg" => "mp3",
"audio/aac" => "aac",
"audio/flac" => "flac",
"audio/ogg" | "audio/opus" => "opus",
_ => "wav",
}
}
fn encode_assistant_message(
parts: &[ContentPart],
warnings: &mut Vec<ModelWarning>,
msg_idx: usize,
) -> Value {
let mut text_buf = String::new();
let mut tool_calls = Vec::new();
for (part_idx, part) in parts.iter().enumerate() {
match part {
ContentPart::Text { text, .. } => text_buf.push_str(text),
ContentPart::ToolUse {
id, name, input, ..
} => {
tool_calls.push(json!({
"id": id,
"type": "function",
"function": {
"name": name,
"arguments": input.to_string(),
},
}));
}
ContentPart::Citation { snippet, .. } => text_buf.push_str(snippet),
other => {
warnings.push(ModelWarning::LossyEncode {
field: format!("messages[{msg_idx}].content[{part_idx}]"),
detail: format!(
"{} not supported on assistant role for OpenAI Chat — dropped",
debug_part_kind(other)
),
});
}
}
}
let mut entry = Map::new();
entry.insert("role".into(), Value::String("assistant".into()));
entry.insert(
"content".into(),
if text_buf.is_empty() {
Value::Null
} else {
Value::String(text_buf)
},
);
if !tool_calls.is_empty() {
entry.insert("tool_calls".into(), Value::Array(tool_calls));
}
Value::Object(entry)
}
fn collect_text(parts: &[ContentPart], warnings: &mut Vec<ModelWarning>, msg_idx: usize) -> String {
let mut text = String::new();
let mut lossy = false;
for part in parts {
match part {
ContentPart::Text { text: t, .. } => text.push_str(t),
_ => lossy = true,
}
}
if lossy {
warnings.push(ModelWarning::LossyEncode {
field: format!("messages[{msg_idx}].content"),
detail: "non-text parts dropped from system message".into(),
});
}
text
}
const fn debug_part_kind(part: &ContentPart) -> &'static str {
match part {
ContentPart::Text { .. } => "text",
ContentPart::Image { .. } => "image",
ContentPart::Audio { .. } => "audio",
ContentPart::Video { .. } => "video",
ContentPart::Document { .. } => "document",
ContentPart::Thinking { .. } => "thinking",
ContentPart::Citation { .. } => "citation",
ContentPart::ToolUse { .. } => "tool_use",
ContentPart::ToolResult { .. } => "tool_result",
ContentPart::ImageOutput { .. } => "image_output",
ContentPart::AudioOutput { .. } => "audio_output",
ContentPart::RedactedThinking { .. } => "redacted_thinking",
}
}
fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
let mut arr = Vec::with_capacity(tools.len());
for (idx, t) in tools.iter().enumerate() {
match &t.kind {
ToolKind::Function { input_schema } => arr.push(json!({
"type": "function",
"function": {
"name": t.name,
"description": t.description,
"parameters": input_schema,
},
})),
ToolKind::WebSearch { .. }
| ToolKind::Computer { .. }
| ToolKind::TextEditor
| ToolKind::Bash
| ToolKind::CodeExecution
| ToolKind::FileSearch { .. }
| ToolKind::CodeInterpreter
| ToolKind::ImageGeneration
| ToolKind::McpConnector { .. }
| ToolKind::Memory => warnings.push(ModelWarning::LossyEncode {
field: format!("tools[{idx}]"),
detail: "OpenAI Chat Completions advertises only function tools — \
vendor built-ins (web_search, computer, file_search, …) \
live on the Responses API; tool dropped"
.into(),
}),
}
}
Value::Array(arr)
}
fn encode_tool_choice(choice: &ToolChoice) -> Value {
match choice {
ToolChoice::Auto => Value::String("auto".into()),
ToolChoice::Required => Value::String("required".into()),
ToolChoice::None => Value::String("none".into()),
ToolChoice::Specific { name } => json!({
"type": "function",
"function": { "name": name },
}),
}
}
fn decode_choice(raw: &Value, warnings: &mut Vec<ModelWarning>) -> (Vec<ContentPart>, StopReason) {
let choice = raw
.get("choices")
.and_then(Value::as_array)
.and_then(|a| a.first())
.cloned()
.unwrap_or(Value::Null); let message = choice.get("message").unwrap_or(&Value::Null); let content = decode_assistant_message(message, warnings);
let stop_reason = decode_finish_reason(
choice.get("finish_reason").and_then(Value::as_str),
warnings,
);
(content, stop_reason)
}
fn decode_assistant_message(message: &Value, warnings: &mut Vec<ModelWarning>) -> Vec<ContentPart> {
let mut parts: Vec<ContentPart> = Vec::new();
let text = message
.get("content")
.and_then(Value::as_str)
.unwrap_or_default(); if let Some(annotations) = message.get("annotations").and_then(Value::as_array) {
for ann in annotations {
if ann.get("type").and_then(Value::as_str) == Some("url_citation")
&& let Some(uc) = ann.get("url_citation")
{
parts.push(ContentPart::Citation {
snippet: text.to_owned(),
source: CitationSource::Url {
url: str_field(uc, "url").to_owned(),
title: uc.get("title").and_then(Value::as_str).map(str::to_owned),
},
cache_control: None,
provider_echoes: Vec::new(),
});
}
}
}
if !text.is_empty() {
parts.push(ContentPart::text(text));
}
if let Some(tool_calls) = message.get("tool_calls").and_then(Value::as_array) {
for (idx, call) in tool_calls.iter().enumerate() {
let id = str_field(call, "id").to_owned();
let function = call.get("function").unwrap_or(&Value::Null); let name = str_field(function, "name").to_owned();
let arguments = function
.get("arguments")
.and_then(Value::as_str)
.unwrap_or("{}"); let input = if let Ok(v) = serde_json::from_str::<Value>(arguments) {
v
} else {
warnings.push(ModelWarning::LossyEncode {
field: format!("choices[0].message.tool_calls[{idx}].function.arguments"),
detail: "tool arguments not valid JSON; preserved as raw string".into(),
});
Value::String(arguments.to_owned())
};
parts.push(ContentPart::ToolUse {
id,
name,
input,
provider_echoes: Vec::new(),
});
}
}
parts
}
fn decode_finish_reason(reason: Option<&str>, warnings: &mut Vec<ModelWarning>) -> StopReason {
match reason {
Some("stop") => StopReason::EndTurn,
Some("length") => StopReason::MaxTokens,
Some("tool_calls" | "function_call") => StopReason::ToolUse,
Some("content_filter") => StopReason::Refusal {
reason: RefusalReason::Safety,
},
Some(other) => {
warnings.push(ModelWarning::UnknownStopReason {
raw: other.to_owned(),
});
StopReason::Other {
raw: other.to_owned(),
}
}
None => {
warnings.push(ModelWarning::LossyEncode {
field: "finish_reason".into(),
detail: "OpenAI Chat response carried no finish_reason — \
IR records `Other{raw:\"missing\"}`"
.into(),
});
StopReason::Other {
raw: "missing".to_owned(),
}
}
}
}
fn decode_usage(usage: Option<&Value>) -> Usage {
Usage {
input_tokens: u_field(usage, "prompt_tokens"),
output_tokens: u_field(usage, "completion_tokens"),
cached_input_tokens: u_field_nested(usage, &["prompt_tokens_details", "cached_tokens"]),
cache_creation_input_tokens: 0,
reasoning_tokens: u_field_nested(usage, &["completion_tokens_details", "reasoning_tokens"]),
safety_ratings: Vec::new(),
}
}
fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
v.get(key).and_then(Value::as_str).unwrap_or("") }
fn u_field(v: Option<&Value>, key: &str) -> u32 {
v.and_then(|inner| inner.get(key))
.and_then(Value::as_u64)
.map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
fn u_field_nested(v: Option<&Value>, path: &[&str]) -> u32 {
let Some(mut cursor) = v else {
return 0;
};
for segment in path {
let Some(next) = cursor.get(*segment) else {
return 0;
};
cursor = next;
}
cursor
.as_u64()
.map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
#[allow(tail_expr_drop_order, clippy::too_many_lines)]
fn stream_openai_chat(
bytes: BoxByteStream<'_>,
warnings_in: Vec<ModelWarning>,
) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
async_stream::stream! {
let mut bytes = bytes;
let mut buf: Vec<u8> = Vec::new();
let mut tool_indices_open: HashSet<u64> = HashSet::new();
let mut current_tool_index: Option<u64> = None;
let mut started = false;
let mut last_stop = StopReason::EndTurn;
let mut warnings_emitted = false;
while let Some(chunk) = bytes.next().await {
match chunk {
Ok(b) => buf.extend_from_slice(&b),
Err(e) => {
yield Err(e);
return;
}
}
if !warnings_emitted {
warnings_emitted = true;
for w in &warnings_in {
yield Ok(StreamDelta::Warning(w.clone()));
}
}
while let Some(pos) = find_double_newline(&buf) {
let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
let Ok(frame_str) = std::str::from_utf8(&frame) else {
continue;
};
let Some(payload) = parse_sse_data(frame_str) else {
continue;
};
if payload.trim() == "[DONE]" {
if current_tool_index.take().is_some() {
yield Ok(StreamDelta::ToolUseStop);
}
yield Ok(StreamDelta::Stop {
stop_reason: last_stop.clone(),
});
return;
}
let Ok(event) = serde_json::from_str::<Value>(&payload) else {
yield Err(Error::invalid_request(format!(
"OpenAI Chat stream: malformed chunk: {payload}"
)));
return;
};
if !started {
started = true;
let id = str_field(&event, "id").to_owned();
let model = str_field(&event, "model").to_owned();
yield Ok(StreamDelta::Start {
id,
model,
provider_echoes: Vec::new(),
});
}
if let Some(usage) = event.get("usage").filter(|v| !v.is_null()) {
yield Ok(StreamDelta::Usage(decode_usage(Some(usage))));
}
let Some(choice) = event
.get("choices")
.and_then(Value::as_array)
.and_then(|a| a.first())
else {
continue;
};
if let Some(reason) = choice.get("finish_reason").and_then(Value::as_str) {
last_stop = decode_finish_reason(Some(reason), &mut Vec::new());
}
let Some(delta) = choice.get("delta") else {
continue;
};
if let Some(text) = delta.get("content").and_then(Value::as_str)
&& !text.is_empty()
{
if current_tool_index.take().is_some() {
yield Ok(StreamDelta::ToolUseStop);
}
yield Ok(StreamDelta::TextDelta {
text: text.to_owned(),
provider_echoes: Vec::new(),
});
}
if let Some(tool_calls) = delta.get("tool_calls").and_then(Value::as_array) {
for call in tool_calls {
let idx = if let Some(n) = call.get("index").and_then(Value::as_u64) {
n
} else {
yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
field: "stream.delta.tool_calls[].index".into(),
detail: "OpenAI Chat stream tool_call missing spec-mandated 'index' field; falling back to slot 0 (mirrors anthropic streaming idx handling)".into(),
}));
0
};
let function = call.get("function");
let name = function
.and_then(|f| f.get("name"))
.and_then(Value::as_str)
.unwrap_or(""); let arguments = function
.and_then(|f| f.get("arguments"))
.and_then(Value::as_str)
.unwrap_or(""); let id = call
.get("id")
.and_then(Value::as_str)
.unwrap_or("") .to_owned();
if tool_indices_open.insert(idx) {
if let Some(prev) = current_tool_index.take()
&& prev != idx
{
yield Ok(StreamDelta::ToolUseStop);
}
yield Ok(StreamDelta::ToolUseStart {
id,
name: name.to_owned(),
provider_echoes: Vec::new(),
});
current_tool_index = Some(idx);
}
if !arguments.is_empty() {
yield Ok(StreamDelta::ToolUseInputDelta {
partial_json: arguments.to_owned(),
});
}
}
}
}
}
}
}
fn find_double_newline(buf: &[u8]) -> Option<usize> {
let lf = buf.windows(2).position(|w| w == b"\n\n");
let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
match (lf, crlf) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
}
}
fn parse_sse_data(frame: &str) -> Option<String> {
let mut out: Option<String> = None;
for line in frame.lines() {
if let Some(rest) = line.strip_prefix("data:") {
let trimmed = rest.strip_prefix(' ').unwrap_or(rest); match &mut out {
Some(existing) => {
existing.push('\n');
existing.push_str(trimmed);
}
None => out = Some(trimmed.to_owned()),
}
}
}
out
}