1#![allow(clippy::cast_possible_truncation)]
23
24use std::collections::HashSet;
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{
31 BoxByteStream, BoxDeltaStream, Codec, EncodedRequest, extract_openai_rate_limit,
32 service_tier_str,
33};
34use crate::error::{Error, Result};
35use crate::ir::{
36 Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
37 ModelWarning, OutputStrategy, ReasoningEffort, RefusalReason, ResponseFormat, Role, StopReason,
38 ToolChoice, ToolKind, ToolResultContent, Usage,
39};
40use crate::rate_limit::RateLimitSnapshot;
41use crate::stream::StreamDelta;
42
43const DEFAULT_MAX_CONTEXT_TOKENS: u32 = 128_000;
44
45#[derive(Clone, Copy, Debug, Default)]
47pub struct OpenAiChatCodec;
48
49impl OpenAiChatCodec {
50 pub const fn new() -> Self {
52 Self
53 }
54}
55
56impl Codec for OpenAiChatCodec {
57 fn name(&self) -> &'static str {
58 "openai-chat"
59 }
60
61 fn capabilities(&self, _model: &str) -> Capabilities {
62 Capabilities {
63 streaming: true,
64 tools: true,
65 multimodal_image: true,
66 multimodal_audio: true,
67 multimodal_video: false,
68 multimodal_document: true,
69 system_prompt: true,
70 structured_output: true,
71 prompt_caching: true,
72 thinking: false,
73 citations: true,
74 web_search: false,
75 computer_use: false,
76 max_context_tokens: DEFAULT_MAX_CONTEXT_TOKENS,
77 }
78 }
79
80 fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
81 let (body, warnings) = build_body(request, false)?;
82 finalize_request(&body, warnings)
83 }
84
85 fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
86 let (body, warnings) = build_body(request, true)?;
87 let mut encoded = finalize_request(&body, warnings)?;
88 encoded.headers.insert(
89 http::header::ACCEPT,
90 http::HeaderValue::from_static("text/event-stream"),
91 );
92 Ok(encoded.into_streaming())
93 }
94
95 fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
96 let raw: Value = super::codec::parse_response_body(body, "OpenAI Chat")?;
97 let mut warnings = warnings_in;
98 let id = str_field(&raw, "id").to_owned();
99 let model = str_field(&raw, "model").to_owned();
100 let usage = decode_usage(raw.get("usage"));
101 let (content, stop_reason) = decode_choice(&raw, &mut warnings);
102 Ok(ModelResponse {
103 id,
104 model,
105 stop_reason,
106 content,
107 usage,
108 rate_limit: None,
109 warnings,
110 provider_echoes: Vec::new(),
111 })
112 }
113
114 fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
115 extract_openai_rate_limit(headers)
116 }
117
118 fn decode_stream<'a>(
119 &'a self,
120 bytes: BoxByteStream<'a>,
121 warnings_in: Vec<ModelWarning>,
122 ) -> BoxDeltaStream<'a> {
123 Box::pin(stream_openai_chat(bytes, warnings_in))
124 }
125}
126
127fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
130 if request.messages.is_empty() && request.system.is_empty() {
131 return Err(Error::invalid_request(
132 "OpenAI Chat requires at least one message",
133 ));
134 }
135 let mut warnings = Vec::with_capacity(1);
138 let messages = encode_messages(request, &mut warnings);
139
140 let mut body = Map::new();
141 body.insert("model".into(), Value::String(request.model.clone()));
142 body.insert("messages".into(), Value::Array(messages));
143 if let Some(n) = request.max_tokens {
144 body.insert("max_tokens".into(), json!(n));
145 }
146 if let Some(t) = request.temperature {
147 body.insert("temperature".into(), json!(t));
148 }
149 if let Some(p) = request.top_p {
150 body.insert("top_p".into(), json!(p));
151 }
152 if request.top_k.is_some() {
153 warnings.push(ModelWarning::LossyEncode {
154 field: "top_k".into(),
155 detail: "OpenAI Chat Completions has no top_k parameter — setting dropped".into(),
156 });
157 }
158 if !request.stop_sequences.is_empty() {
159 body.insert("stop".into(), json!(request.stop_sequences));
160 }
161 if !request.tools.is_empty() {
162 body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
163 body.insert(
164 "tool_choice".into(),
165 encode_tool_choice(&request.tool_choice),
166 );
167 }
168 if let Some(format) = &request.response_format {
169 encode_openai_chat_structured_output(format, &mut body, &mut warnings)?;
170 }
171 if streaming {
172 body.insert("stream".into(), Value::Bool(true));
173 body.insert("stream_options".into(), json!({ "include_usage": true }));
174 }
175 apply_provider_extensions(request, &mut body, &mut warnings);
176 Ok((Value::Object(body), warnings))
177}
178
179fn apply_provider_extensions(
184 request: &ModelRequest,
185 body: &mut Map<String, Value>,
186 warnings: &mut Vec<ModelWarning>,
187) {
188 let ext = &request.provider_extensions;
189 if let Some(parallel) = request.parallel_tool_calls {
190 body.insert("parallel_tool_calls".into(), json!(parallel));
191 }
192 if let Some(seed) = request.seed {
193 body.insert("seed".into(), json!(seed));
194 }
195 if let Some(user) = &request.end_user_id {
196 body.insert("user".into(), Value::String(user.clone()));
197 }
198 if let Some(openai_chat) = &ext.openai_chat {
199 if let Some(key) = &openai_chat.cache_key {
200 body.insert("prompt_cache_key".into(), Value::String(key.clone()));
201 }
202 if let Some(tier) = openai_chat.service_tier {
203 body.insert(
204 "service_tier".into(),
205 Value::String(service_tier_str(tier).into()),
206 );
207 }
208 }
209 if let Some(effort) = &request.reasoning_effort {
210 if is_openai_reasoning_model(&request.model) {
215 let effort_str = encode_chat_reasoning_effort(effort, warnings);
216 body.insert("reasoning_effort".into(), Value::String(effort_str));
217 } else {
218 warnings.push(ModelWarning::LossyEncode {
219 field: "reasoning_effort".into(),
220 detail: "OpenAI Chat Completions accepts `reasoning_effort` only on reasoning \
221 models (o1 / o3 / o4 / gpt-5); current model is non-reasoning — \
222 field dropped. Use OpenAiResponsesCodec for the full reasoning surface."
223 .into(),
224 });
225 }
226 }
227 if ext.anthropic.is_some() {
228 warnings.push(ModelWarning::ProviderExtensionIgnored {
229 vendor: "anthropic".into(),
230 });
231 }
232 if ext.openai_responses.is_some() {
233 warnings.push(ModelWarning::ProviderExtensionIgnored {
234 vendor: "openai_responses".into(),
235 });
236 }
237 if ext.gemini.is_some() {
238 warnings.push(ModelWarning::ProviderExtensionIgnored {
239 vendor: "gemini".into(),
240 });
241 }
242 if ext.bedrock.is_some() {
243 warnings.push(ModelWarning::ProviderExtensionIgnored {
244 vendor: "bedrock".into(),
245 });
246 }
247}
248
249fn is_openai_reasoning_model(model: &str) -> bool {
254 model.starts_with("o1")
255 || model.starts_with("o3")
256 || model.starts_with("o4")
257 || model.starts_with("gpt-5")
258}
259
260fn encode_chat_reasoning_effort(
267 effort: &ReasoningEffort,
268 warnings: &mut Vec<ModelWarning>,
269) -> String {
270 match effort {
271 ReasoningEffort::Off => "none".to_owned(),
272 ReasoningEffort::Minimal => "minimal".to_owned(),
273 ReasoningEffort::Low => "low".to_owned(),
274 ReasoningEffort::Medium => "medium".to_owned(),
275 ReasoningEffort::High => "high".to_owned(),
276 ReasoningEffort::Auto => {
277 warnings.push(ModelWarning::LossyEncode {
278 field: "reasoning_effort".into(),
279 detail: "OpenAI Chat has no `Auto` bucket — snapped to `medium`".into(),
280 });
281 "medium".to_owned()
282 }
283 ReasoningEffort::VendorSpecific(literal) => literal.clone(),
284 }
285}
286
287fn encode_openai_chat_structured_output(
292 format: &ResponseFormat,
293 body: &mut Map<String, Value>,
294 warnings: &mut Vec<ModelWarning>,
295) -> Result<()> {
296 let strategy = match format.strategy {
297 OutputStrategy::Auto | OutputStrategy::Native => OutputStrategy::Native,
298 explicit => explicit,
299 };
300 match strategy {
301 OutputStrategy::Native => {
302 if let Err(err) = format.strict_preflight() {
303 warnings.push(ModelWarning::LossyEncode {
304 field: "response_format.json_schema".into(),
305 detail: err.to_string(),
306 });
307 }
308 body.insert(
309 "response_format".into(),
310 json!({
311 "type": "json_schema",
312 "json_schema": {
313 "name": format.json_schema.name,
314 "schema": format.json_schema.schema,
315 "strict": format.strict,
316 }
317 }),
318 );
319 }
320 OutputStrategy::Tool => {
321 let tool_name = format.json_schema.name.clone();
322 let synthetic_tool = json!({
323 "type": "function",
324 "function": {
325 "name": tool_name,
326 "description": format!(
327 "Emit the response as a JSON object matching the {tool_name} schema."
328 ),
329 "parameters": format.json_schema.schema.clone(),
330 "strict": format.strict,
331 }
332 });
333 let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
334 if let Value::Array(arr) = tools {
335 arr.insert(0, synthetic_tool);
336 }
337 body.insert(
338 "tool_choice".into(),
339 json!({
340 "type": "function",
341 "function": { "name": format.json_schema.name },
342 }),
343 );
344 }
345 OutputStrategy::Prompted => {
346 return Err(Error::invalid_request(
347 "OutputStrategy::Prompted is deferred to entelix 1.1; use \
348 OutputStrategy::Native or OutputStrategy::Tool",
349 ));
350 }
351 OutputStrategy::Auto => unreachable!("Auto resolved above"),
352 }
353 Ok(())
354}
355
356fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
357 let bytes = serde_json::to_vec(body)?;
358 let mut encoded = EncodedRequest::post_json("/v1/chat/completions", Bytes::from(bytes));
359 encoded.warnings = warnings;
360 Ok(encoded)
361}
362
363fn encode_messages(request: &ModelRequest, warnings: &mut Vec<ModelWarning>) -> Vec<Value> {
366 let mut out: Vec<Value> = Vec::new();
367 if !request.system.is_empty() {
368 if request.system.any_cached() {
373 warnings.push(ModelWarning::LossyEncode {
374 field: "system.cache_control".into(),
375 detail: "OpenAI Chat has no native prompt-cache control; \
376 block text is concatenated and the cache directive \
377 is dropped"
378 .into(),
379 });
380 }
381 out.push(json!({
382 "role": "system",
383 "content": request.system.concat_text(),
384 }));
385 }
386 for (idx, msg) in request.messages.iter().enumerate() {
387 match msg.role {
388 Role::System => {
389 let text = collect_text(&msg.content, warnings, idx);
390 out.push(json!({ "role": "system", "content": text }));
391 }
392 Role::User => {
393 out.push(json!({
394 "role": "user",
395 "content": encode_user_content(&msg.content, warnings, idx),
396 }));
397 }
398 Role::Assistant => {
399 out.push(encode_assistant_message(&msg.content, warnings, idx));
400 }
401 Role::Tool => {
402 for (part_idx, part) in msg.content.iter().enumerate() {
406 if let ContentPart::ToolResult {
407 tool_use_id,
408 content,
409 is_error,
410 ..
411 } = part
412 {
413 let body_str = match content {
414 ToolResultContent::Text(t) => t.clone(),
415 ToolResultContent::Json(v) => v.to_string(),
416 };
417 let mut entry = Map::new();
418 entry.insert("role".into(), Value::String("tool".into()));
419 entry.insert("tool_call_id".into(), Value::String(tool_use_id.clone()));
420 entry.insert("content".into(), Value::String(body_str));
421 if *is_error {
422 warnings.push(ModelWarning::LossyEncode {
423 field: format!("messages[{idx}].content[{part_idx}].is_error"),
424 detail: "OpenAI Chat has no tool_result is_error flag — \
425 carrying via content text"
426 .into(),
427 });
428 }
429 out.push(Value::Object(entry));
430 } else {
431 warnings.push(ModelWarning::LossyEncode {
432 field: format!("messages[{idx}].content[{part_idx}]"),
433 detail: "non-tool_result part on Role::Tool dropped".into(),
434 });
435 }
436 }
437 }
438 }
439 }
440 out
441}
442
443fn encode_user_content(
444 parts: &[ContentPart],
445 warnings: &mut Vec<ModelWarning>,
446 msg_idx: usize,
447) -> Value {
448 if parts.iter().all(|p| matches!(p, ContentPart::Text { .. })) {
450 let mut text = String::new();
451 for part in parts {
452 if let ContentPart::Text { text: t, .. } = part {
453 text.push_str(t);
454 }
455 }
456 return Value::String(text);
457 }
458 let mut arr = Vec::new();
460 for (part_idx, part) in parts.iter().enumerate() {
461 let path = || format!("messages[{msg_idx}].content[{part_idx}]");
462 match part {
463 ContentPart::Text { text, .. } => {
464 arr.push(json!({ "type": "text", "text": text }));
465 }
466 ContentPart::Image { source, .. } => {
467 arr.push(json!({
468 "type": "image_url",
469 "image_url": { "url": media_to_url_chat(source) },
470 }));
471 }
472 ContentPart::Audio { source, .. } => {
473 if let MediaSource::Base64 { media_type, data } = source {
475 let format = audio_format_from_mime(media_type);
476 arr.push(json!({
477 "type": "input_audio",
478 "input_audio": { "data": data, "format": format },
479 }));
480 } else {
481 warnings.push(ModelWarning::LossyEncode {
482 field: path(),
483 detail: "OpenAI Chat input_audio requires base64 source; URL/FileId \
484 audio dropped"
485 .into(),
486 });
487 }
488 }
489 ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
490 field: path(),
491 detail: "OpenAI Chat does not accept video inputs; block dropped".into(),
492 }),
493 ContentPart::Document { source, name, .. } => {
494 if let MediaSource::FileId { id, .. } = source {
497 let mut o = Map::new();
498 o.insert("type".into(), Value::String("file".into()));
499 let mut file_obj = Map::new();
500 file_obj.insert("file_id".into(), Value::String(id.clone()));
501 if let Some(n) = name {
502 file_obj.insert("filename".into(), Value::String(n.clone()));
503 }
504 o.insert("file".into(), Value::Object(file_obj));
505 arr.push(Value::Object(o));
506 } else {
507 warnings.push(ModelWarning::LossyEncode {
508 field: path(),
509 detail: "OpenAI Chat document input requires Files-API FileId source; \
510 inline document dropped"
511 .into(),
512 });
513 }
514 }
515 ContentPart::Thinking { .. } => warnings.push(ModelWarning::LossyEncode {
516 field: path(),
517 detail: "OpenAI Chat does not accept thinking blocks on input; block dropped"
518 .into(),
519 }),
520 ContentPart::Citation { .. } => warnings.push(ModelWarning::LossyEncode {
521 field: path(),
522 detail: "OpenAI Chat does not echo citations on input; block dropped".into(),
523 }),
524 ContentPart::ToolUse { .. } | ContentPart::ToolResult { .. } => {
525 warnings.push(ModelWarning::LossyEncode {
526 field: path(),
527 detail: "tool_use / tool_result not allowed on user role for OpenAI Chat; \
528 move to assistant or tool role"
529 .into(),
530 });
531 }
532 ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
533 warnings.push(ModelWarning::LossyEncode {
534 field: path(),
535 detail: "OpenAI Chat does not accept assistant-produced image / audio output \
536 as input — block dropped"
537 .into(),
538 });
539 }
540 ContentPart::RedactedThinking { .. } => {
541 warnings.push(ModelWarning::LossyEncode {
542 field: path(),
543 detail: "OpenAI Chat does not accept redacted_thinking blocks; block dropped"
544 .into(),
545 });
546 }
547 }
548 }
549 Value::Array(arr)
550}
551
552fn media_to_url_chat(source: &MediaSource) -> String {
553 match source {
554 MediaSource::Url { url, .. } => url.clone(),
555 MediaSource::Base64 { media_type, data } => format!("data:{media_type};base64,{data}"),
556 MediaSource::FileId { id, .. } => id.clone(),
557 }
558}
559
560fn audio_format_from_mime(mime: &str) -> &'static str {
561 match mime {
562 "audio/mp3" | "audio/mpeg" => "mp3",
563 "audio/aac" => "aac",
564 "audio/flac" => "flac",
565 "audio/ogg" | "audio/opus" => "opus",
566 _ => "wav",
570 }
571}
572
573fn encode_assistant_message(
574 parts: &[ContentPart],
575 warnings: &mut Vec<ModelWarning>,
576 msg_idx: usize,
577) -> Value {
578 let mut text_buf = String::new();
579 let mut tool_calls = Vec::new();
580 for (part_idx, part) in parts.iter().enumerate() {
581 match part {
582 ContentPart::Text { text, .. } => text_buf.push_str(text),
583 ContentPart::ToolUse {
584 id, name, input, ..
585 } => {
586 tool_calls.push(json!({
587 "id": id,
588 "type": "function",
589 "function": {
590 "name": name,
591 "arguments": input.to_string(),
592 },
593 }));
594 }
595 ContentPart::Citation { snippet, .. } => text_buf.push_str(snippet),
600 other => {
601 warnings.push(ModelWarning::LossyEncode {
602 field: format!("messages[{msg_idx}].content[{part_idx}]"),
603 detail: format!(
604 "{} not supported on assistant role for OpenAI Chat — dropped",
605 debug_part_kind(other)
606 ),
607 });
608 }
609 }
610 }
611 let mut entry = Map::new();
612 entry.insert("role".into(), Value::String("assistant".into()));
613 entry.insert(
614 "content".into(),
615 if text_buf.is_empty() {
616 Value::Null
617 } else {
618 Value::String(text_buf)
619 },
620 );
621 if !tool_calls.is_empty() {
622 entry.insert("tool_calls".into(), Value::Array(tool_calls));
623 }
624 Value::Object(entry)
625}
626
627fn collect_text(parts: &[ContentPart], warnings: &mut Vec<ModelWarning>, msg_idx: usize) -> String {
628 let mut text = String::new();
629 let mut lossy = false;
630 for part in parts {
631 match part {
632 ContentPart::Text { text: t, .. } => text.push_str(t),
633 _ => lossy = true,
634 }
635 }
636 if lossy {
637 warnings.push(ModelWarning::LossyEncode {
638 field: format!("messages[{msg_idx}].content"),
639 detail: "non-text parts dropped from system message".into(),
640 });
641 }
642 text
643}
644
645const fn debug_part_kind(part: &ContentPart) -> &'static str {
646 match part {
647 ContentPart::Text { .. } => "text",
648 ContentPart::Image { .. } => "image",
649 ContentPart::Audio { .. } => "audio",
650 ContentPart::Video { .. } => "video",
651 ContentPart::Document { .. } => "document",
652 ContentPart::Thinking { .. } => "thinking",
653 ContentPart::Citation { .. } => "citation",
654 ContentPart::ToolUse { .. } => "tool_use",
655 ContentPart::ToolResult { .. } => "tool_result",
656 ContentPart::ImageOutput { .. } => "image_output",
657 ContentPart::AudioOutput { .. } => "audio_output",
658 ContentPart::RedactedThinking { .. } => "redacted_thinking",
659 }
660}
661
662fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
663 let mut arr = Vec::with_capacity(tools.len());
664 for (idx, t) in tools.iter().enumerate() {
665 match &t.kind {
666 ToolKind::Function { input_schema } => arr.push(json!({
667 "type": "function",
668 "function": {
669 "name": t.name,
670 "description": t.description,
671 "parameters": input_schema,
672 },
673 })),
674 ToolKind::WebSearch { .. }
677 | ToolKind::Computer { .. }
678 | ToolKind::TextEditor
679 | ToolKind::Bash
680 | ToolKind::CodeExecution
681 | ToolKind::FileSearch { .. }
682 | ToolKind::CodeInterpreter
683 | ToolKind::ImageGeneration
684 | ToolKind::McpConnector { .. }
685 | ToolKind::Memory => warnings.push(ModelWarning::LossyEncode {
686 field: format!("tools[{idx}]"),
687 detail: "OpenAI Chat Completions advertises only function tools — \
688 vendor built-ins (web_search, computer, file_search, …) \
689 live on the Responses API; tool dropped"
690 .into(),
691 }),
692 }
693 }
694 Value::Array(arr)
695}
696
697fn encode_tool_choice(choice: &ToolChoice) -> Value {
698 match choice {
699 ToolChoice::Auto => Value::String("auto".into()),
700 ToolChoice::Required => Value::String("required".into()),
701 ToolChoice::None => Value::String("none".into()),
702 ToolChoice::Specific { name } => json!({
703 "type": "function",
704 "function": { "name": name },
705 }),
706 }
707}
708
709fn decode_choice(raw: &Value, warnings: &mut Vec<ModelWarning>) -> (Vec<ContentPart>, StopReason) {
712 let choice = raw
713 .get("choices")
714 .and_then(Value::as_array)
715 .and_then(|a| a.first())
716 .cloned()
717 .unwrap_or(Value::Null); let message = choice.get("message").unwrap_or(&Value::Null); let content = decode_assistant_message(message, warnings);
720 let stop_reason = decode_finish_reason(
721 choice.get("finish_reason").and_then(Value::as_str),
722 warnings,
723 );
724 (content, stop_reason)
725}
726
727fn decode_assistant_message(message: &Value, warnings: &mut Vec<ModelWarning>) -> Vec<ContentPart> {
728 let mut parts: Vec<ContentPart> = Vec::new();
729 let text = message
730 .get("content")
731 .and_then(Value::as_str)
732 .unwrap_or_default(); if let Some(annotations) = message.get("annotations").and_then(Value::as_array) {
738 for ann in annotations {
739 if ann.get("type").and_then(Value::as_str) == Some("url_citation")
740 && let Some(uc) = ann.get("url_citation")
741 {
742 parts.push(ContentPart::Citation {
743 snippet: text.to_owned(),
744 source: CitationSource::Url {
745 url: str_field(uc, "url").to_owned(),
746 title: uc.get("title").and_then(Value::as_str).map(str::to_owned),
747 },
748 cache_control: None,
749 provider_echoes: Vec::new(),
750 });
751 }
752 }
753 }
754 if !text.is_empty() {
755 parts.push(ContentPart::text(text));
756 }
757 if let Some(tool_calls) = message.get("tool_calls").and_then(Value::as_array) {
758 for (idx, call) in tool_calls.iter().enumerate() {
759 let id = str_field(call, "id").to_owned();
760 let function = call.get("function").unwrap_or(&Value::Null); let name = str_field(function, "name").to_owned();
762 let arguments = function
763 .get("arguments")
764 .and_then(Value::as_str)
765 .unwrap_or("{}"); let input = if let Ok(v) = serde_json::from_str::<Value>(arguments) {
771 v
772 } else {
773 warnings.push(ModelWarning::LossyEncode {
774 field: format!("choices[0].message.tool_calls[{idx}].function.arguments"),
775 detail: "tool arguments not valid JSON; preserved as raw string".into(),
776 });
777 Value::String(arguments.to_owned())
778 };
779 parts.push(ContentPart::ToolUse {
780 id,
781 name,
782 input,
783 provider_echoes: Vec::new(),
784 });
785 }
786 }
787 parts
788}
789
790fn decode_finish_reason(reason: Option<&str>, warnings: &mut Vec<ModelWarning>) -> StopReason {
791 match reason {
792 Some("stop") => StopReason::EndTurn,
793 Some("length") => StopReason::MaxTokens,
794 Some("tool_calls" | "function_call") => StopReason::ToolUse,
795 Some("content_filter") => StopReason::Refusal {
796 reason: RefusalReason::Safety,
797 },
798 Some(other) => {
799 warnings.push(ModelWarning::UnknownStopReason {
800 raw: other.to_owned(),
801 });
802 StopReason::Other {
803 raw: other.to_owned(),
804 }
805 }
806 None => {
807 warnings.push(ModelWarning::LossyEncode {
812 field: "finish_reason".into(),
813 detail: "OpenAI Chat response carried no finish_reason — \
814 IR records `Other{raw:\"missing\"}`"
815 .into(),
816 });
817 StopReason::Other {
818 raw: "missing".to_owned(),
819 }
820 }
821 }
822}
823
824fn decode_usage(usage: Option<&Value>) -> Usage {
825 Usage {
826 input_tokens: u_field(usage, "prompt_tokens"),
827 output_tokens: u_field(usage, "completion_tokens"),
828 cached_input_tokens: u_field_nested(usage, &["prompt_tokens_details", "cached_tokens"]),
829 cache_creation_input_tokens: 0,
830 reasoning_tokens: u_field_nested(usage, &["completion_tokens_details", "reasoning_tokens"]),
831 safety_ratings: Vec::new(),
832 }
833}
834
835fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
836 v.get(key).and_then(Value::as_str).unwrap_or("") }
838
839fn u_field(v: Option<&Value>, key: &str) -> u32 {
840 v.and_then(|inner| inner.get(key))
841 .and_then(Value::as_u64)
842 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
844
845fn u_field_nested(v: Option<&Value>, path: &[&str]) -> u32 {
846 let Some(mut cursor) = v else {
847 return 0;
848 };
849 for segment in path {
850 let Some(next) = cursor.get(*segment) else {
851 return 0;
852 };
853 cursor = next;
854 }
855 cursor
856 .as_u64()
857 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
859
860#[allow(tail_expr_drop_order, clippy::too_many_lines)]
863fn stream_openai_chat(
864 bytes: BoxByteStream<'_>,
865 warnings_in: Vec<ModelWarning>,
866) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
867 async_stream::stream! {
868 let mut bytes = bytes;
869 let mut buf: Vec<u8> = Vec::new();
870 let mut tool_indices_open: HashSet<u64> = HashSet::new();
872 let mut current_tool_index: Option<u64> = None;
873 let mut started = false;
874 let mut last_stop = StopReason::EndTurn;
875 let mut warnings_emitted = false;
876
877 while let Some(chunk) = bytes.next().await {
878 match chunk {
879 Ok(b) => buf.extend_from_slice(&b),
880 Err(e) => {
881 yield Err(e);
882 return;
883 }
884 }
885 if !warnings_emitted {
886 warnings_emitted = true;
887 for w in &warnings_in {
888 yield Ok(StreamDelta::Warning(w.clone()));
889 }
890 }
891 while let Some(pos) = find_double_newline(&buf) {
892 let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
893 let Ok(frame_str) = std::str::from_utf8(&frame) else {
894 continue;
895 };
896 let Some(payload) = parse_sse_data(frame_str) else {
897 continue;
898 };
899 if payload.trim() == "[DONE]" {
900 if current_tool_index.take().is_some() {
901 yield Ok(StreamDelta::ToolUseStop);
902 }
903 yield Ok(StreamDelta::Stop {
904 stop_reason: last_stop.clone(),
905 });
906 return;
907 }
908 let Ok(event) = serde_json::from_str::<Value>(&payload) else {
909 yield Err(Error::invalid_request(format!(
910 "OpenAI Chat stream: malformed chunk: {payload}"
911 )));
912 return;
913 };
914 if !started {
915 started = true;
916 let id = str_field(&event, "id").to_owned();
917 let model = str_field(&event, "model").to_owned();
918 yield Ok(StreamDelta::Start {
919 id,
920 model,
921 provider_echoes: Vec::new(),
922 });
923 }
924 if let Some(usage) = event.get("usage").filter(|v| !v.is_null()) {
925 yield Ok(StreamDelta::Usage(decode_usage(Some(usage))));
926 }
927 let Some(choice) = event
928 .get("choices")
929 .and_then(Value::as_array)
930 .and_then(|a| a.first())
931 else {
932 continue;
933 };
934 if let Some(reason) = choice.get("finish_reason").and_then(Value::as_str) {
935 last_stop = decode_finish_reason(Some(reason), &mut Vec::new());
936 }
937 let Some(delta) = choice.get("delta") else {
938 continue;
939 };
940 if let Some(text) = delta.get("content").and_then(Value::as_str)
941 && !text.is_empty()
942 {
943 if current_tool_index.take().is_some() {
944 yield Ok(StreamDelta::ToolUseStop);
945 }
946 yield Ok(StreamDelta::TextDelta {
947 text: text.to_owned(),
948 provider_echoes: Vec::new(),
949 });
950 }
951 if let Some(tool_calls) = delta.get("tool_calls").and_then(Value::as_array) {
952 for call in tool_calls {
953 let idx = if let Some(n) = call.get("index").and_then(Value::as_u64) {
954 n
955 } else {
956 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
957 field: "stream.delta.tool_calls[].index".into(),
958 detail: "OpenAI Chat stream tool_call missing spec-mandated 'index' field; falling back to slot 0 (mirrors anthropic streaming idx handling)".into(),
959 }));
960 0
961 };
962 let function = call.get("function");
963 let name = function
964 .and_then(|f| f.get("name"))
965 .and_then(Value::as_str)
966 .unwrap_or(""); let arguments = function
968 .and_then(|f| f.get("arguments"))
969 .and_then(Value::as_str)
970 .unwrap_or(""); let id = call
972 .get("id")
973 .and_then(Value::as_str)
974 .unwrap_or("") .to_owned();
976 if tool_indices_open.insert(idx) {
977 if let Some(prev) = current_tool_index.take()
979 && prev != idx
980 {
981 yield Ok(StreamDelta::ToolUseStop);
982 }
983 yield Ok(StreamDelta::ToolUseStart {
984 id,
985 name: name.to_owned(),
986 provider_echoes: Vec::new(),
987 });
988 current_tool_index = Some(idx);
989 }
990 if !arguments.is_empty() {
991 yield Ok(StreamDelta::ToolUseInputDelta {
992 partial_json: arguments.to_owned(),
993 });
994 }
995 }
996 }
997 }
998 }
999 }
1000}
1001
1002fn find_double_newline(buf: &[u8]) -> Option<usize> {
1003 let lf = buf.windows(2).position(|w| w == b"\n\n");
1004 let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1005 match (lf, crlf) {
1006 (Some(a), Some(b)) => Some(a.min(b)),
1007 (Some(a), None) => Some(a),
1008 (None, Some(b)) => Some(b),
1009 (None, None) => None,
1010 }
1011}
1012
1013fn parse_sse_data(frame: &str) -> Option<String> {
1014 let mut out: Option<String> = None;
1015 for line in frame.lines() {
1016 if let Some(rest) = line.strip_prefix("data:") {
1017 let trimmed = rest.strip_prefix(' ').unwrap_or(rest); match &mut out {
1019 Some(existing) => {
1020 existing.push('\n');
1021 existing.push_str(trimmed);
1022 }
1023 None => out = Some(trimmed.to_owned()),
1024 }
1025 }
1026 }
1027 out
1028}