1#![allow(clippy::cast_possible_truncation)]
23
24use std::collections::HashSet;
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{
31 BoxByteStream, BoxDeltaStream, Codec, EncodedRequest, extract_openai_rate_limit,
32 service_tier_str,
33};
34use crate::error::{Error, Result};
35use crate::ir::{
36 Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
37 ModelWarning, OutputStrategy, ReasoningEffort, RefusalReason, ResponseFormat, Role, StopReason,
38 ToolChoice, ToolKind, ToolResultContent, Usage,
39};
40use crate::rate_limit::RateLimitSnapshot;
41use crate::stream::StreamDelta;
42
43const DEFAULT_MAX_CONTEXT_TOKENS: u32 = 128_000;
44
45#[derive(Clone, Copy, Debug, Default)]
47pub struct OpenAiChatCodec;
48
49impl OpenAiChatCodec {
50 pub const fn new() -> Self {
52 Self
53 }
54}
55
56impl Codec for OpenAiChatCodec {
57 fn name(&self) -> &'static str {
58 "openai-chat"
59 }
60
61 fn capabilities(&self, _model: &str) -> Capabilities {
62 Capabilities {
63 streaming: true,
64 tools: true,
65 multimodal_image: true,
66 multimodal_audio: true,
67 multimodal_video: false,
68 multimodal_document: true,
69 system_prompt: true,
70 structured_output: true,
71 prompt_caching: true,
72 thinking: false,
73 citations: true,
74 web_search: false,
75 computer_use: false,
76 max_context_tokens: DEFAULT_MAX_CONTEXT_TOKENS,
77 }
78 }
79
80 fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
81 let (body, warnings) = build_body(request, false)?;
82 finalize_request(&body, warnings)
83 }
84
85 fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
86 let (body, warnings) = build_body(request, true)?;
87 let mut encoded = finalize_request(&body, warnings)?;
88 encoded.headers.insert(
89 http::header::ACCEPT,
90 http::HeaderValue::from_static("text/event-stream"),
91 );
92 Ok(encoded.into_streaming())
93 }
94
95 fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
96 let raw: Value = super::codec::parse_response_body(body, "OpenAI Chat")?;
97 let mut warnings = warnings_in;
98 let id = str_field(&raw, "id").to_owned();
99 let model = str_field(&raw, "model").to_owned();
100 let usage = decode_usage(raw.get("usage"));
101 let (content, stop_reason) = decode_choice(&raw, &mut warnings);
102 Ok(ModelResponse {
103 id,
104 model,
105 stop_reason,
106 content,
107 usage,
108 rate_limit: None,
109 warnings,
110 provider_echoes: Vec::new(),
111 })
112 }
113
114 fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
115 extract_openai_rate_limit(headers)
116 }
117
118 fn decode_stream<'a>(
119 &'a self,
120 bytes: BoxByteStream<'a>,
121 warnings_in: Vec<ModelWarning>,
122 ) -> BoxDeltaStream<'a> {
123 Box::pin(stream_openai_chat(bytes, warnings_in))
124 }
125}
126
127fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
130 if request.messages.is_empty() && request.system.is_empty() {
131 return Err(Error::invalid_request(
132 "OpenAI Chat requires at least one message",
133 ));
134 }
135 let mut warnings = Vec::with_capacity(1);
138 let messages = encode_messages(request, &mut warnings);
139
140 let mut body = Map::new();
141 body.insert("model".into(), Value::String(request.model.clone()));
142 body.insert("messages".into(), Value::Array(messages));
143 if let Some(n) = request.max_tokens {
144 body.insert("max_tokens".into(), json!(n));
145 }
146 if let Some(t) = request.temperature {
147 body.insert("temperature".into(), json!(t));
148 }
149 if let Some(p) = request.top_p {
150 body.insert("top_p".into(), json!(p));
151 }
152 if request.top_k.is_some() {
153 warnings.push(ModelWarning::LossyEncode {
154 field: "top_k".into(),
155 detail: "OpenAI Chat Completions has no top_k parameter — setting dropped".into(),
156 });
157 }
158 if !request.stop_sequences.is_empty() {
159 body.insert("stop".into(), json!(request.stop_sequences));
160 }
161 if !request.tools.is_empty() {
162 body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
163 body.insert(
164 "tool_choice".into(),
165 encode_tool_choice(&request.tool_choice),
166 );
167 }
168 if let Some(format) = &request.response_format {
169 encode_openai_chat_structured_output(format, &mut body, &mut warnings)?;
170 }
171 if streaming {
172 body.insert("stream".into(), Value::Bool(true));
173 body.insert("stream_options".into(), json!({ "include_usage": true }));
174 }
175 apply_provider_extensions(request, &mut body, &mut warnings);
176 Ok((Value::Object(body), warnings))
177}
178
179fn apply_provider_extensions(
184 request: &ModelRequest,
185 body: &mut Map<String, Value>,
186 warnings: &mut Vec<ModelWarning>,
187) {
188 let ext = &request.provider_extensions;
189 if let Some(parallel) = request.parallel_tool_calls {
190 body.insert("parallel_tool_calls".into(), json!(parallel));
191 }
192 if let Some(seed) = request.seed {
193 body.insert("seed".into(), json!(seed));
194 }
195 if let Some(user) = &request.end_user_id {
196 body.insert("user".into(), Value::String(user.clone()));
197 }
198 if let Some(openai_chat) = &ext.openai_chat {
199 if let Some(key) = &openai_chat.cache_key {
200 body.insert("prompt_cache_key".into(), Value::String(key.clone()));
201 }
202 if let Some(tier) = openai_chat.service_tier {
203 body.insert(
204 "service_tier".into(),
205 Value::String(service_tier_str(tier).into()),
206 );
207 }
208 }
209 if let Some(effort) = &request.reasoning_effort {
210 if is_openai_reasoning_model(&request.model) {
215 let effort_str = encode_chat_reasoning_effort(effort, warnings);
216 body.insert("reasoning_effort".into(), Value::String(effort_str));
217 } else {
218 warnings.push(ModelWarning::LossyEncode {
219 field: "reasoning_effort".into(),
220 detail: "OpenAI Chat Completions accepts `reasoning_effort` only on reasoning \
221 models (o1 / o3 / o4 / gpt-5); current model is non-reasoning — \
222 field dropped. Use OpenAiResponsesCodec for the full reasoning surface."
223 .into(),
224 });
225 }
226 }
227 if ext.anthropic.is_some() {
228 warnings.push(ModelWarning::ProviderExtensionIgnored {
229 vendor: "anthropic".into(),
230 });
231 }
232 if ext.openai_responses.is_some() {
233 warnings.push(ModelWarning::ProviderExtensionIgnored {
234 vendor: "openai_responses".into(),
235 });
236 }
237 if ext.gemini.is_some() {
238 warnings.push(ModelWarning::ProviderExtensionIgnored {
239 vendor: "gemini".into(),
240 });
241 }
242 if ext.bedrock.is_some() {
243 warnings.push(ModelWarning::ProviderExtensionIgnored {
244 vendor: "bedrock".into(),
245 });
246 }
247}
248
249fn is_openai_reasoning_model(model: &str) -> bool {
254 model.starts_with("o1")
255 || model.starts_with("o3")
256 || model.starts_with("o4")
257 || model.starts_with("gpt-5")
258}
259
260fn encode_chat_reasoning_effort(
267 effort: &ReasoningEffort,
268 warnings: &mut Vec<ModelWarning>,
269) -> String {
270 match effort {
271 ReasoningEffort::Off => "none".to_owned(),
272 ReasoningEffort::Minimal => "minimal".to_owned(),
273 ReasoningEffort::Low => "low".to_owned(),
274 ReasoningEffort::Medium => "medium".to_owned(),
275 ReasoningEffort::High => "high".to_owned(),
276 ReasoningEffort::Auto => {
277 warnings.push(ModelWarning::LossyEncode {
278 field: "reasoning_effort".into(),
279 detail: "OpenAI Chat has no `Auto` bucket — snapped to `medium`".into(),
280 });
281 "medium".to_owned()
282 }
283 ReasoningEffort::VendorSpecific(literal) => literal.clone(),
284 }
285}
286
287fn encode_openai_chat_structured_output(
292 format: &ResponseFormat,
293 body: &mut Map<String, Value>,
294 warnings: &mut Vec<ModelWarning>,
295) -> Result<()> {
296 let stripped = crate::LlmFacingSchema::strip(&format.json_schema.schema);
302 let strategy = match format.strategy {
303 OutputStrategy::Auto | OutputStrategy::Native => OutputStrategy::Native,
304 explicit => explicit,
305 };
306 match strategy {
307 OutputStrategy::Native => {
308 if let Err(err) = format.strict_preflight() {
309 warnings.push(ModelWarning::LossyEncode {
310 field: "response_format.json_schema".into(),
311 detail: err.to_string(),
312 });
313 }
314 body.insert(
315 "response_format".into(),
316 json!({
317 "type": "json_schema",
318 "json_schema": {
319 "name": format.json_schema.name,
320 "schema": stripped,
321 "strict": format.strict,
322 }
323 }),
324 );
325 }
326 OutputStrategy::Tool => {
327 let tool_name = format.json_schema.name.clone();
328 let synthetic_tool = json!({
329 "type": "function",
330 "function": {
331 "name": tool_name,
332 "description": format!(
333 "Emit the response as a JSON object matching the {tool_name} schema."
334 ),
335 "parameters": stripped,
336 "strict": format.strict,
337 }
338 });
339 let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
340 if let Value::Array(arr) = tools {
341 arr.insert(0, synthetic_tool);
342 }
343 body.insert(
344 "tool_choice".into(),
345 json!({
346 "type": "function",
347 "function": { "name": format.json_schema.name },
348 }),
349 );
350 }
351 OutputStrategy::Prompted => {
352 return Err(Error::invalid_request(
353 "OutputStrategy::Prompted is deferred to entelix 1.1; use \
354 OutputStrategy::Native or OutputStrategy::Tool",
355 ));
356 }
357 OutputStrategy::Auto => unreachable!("Auto resolved above"),
358 }
359 Ok(())
360}
361
362fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
363 let bytes = serde_json::to_vec(body)?;
364 let mut encoded = EncodedRequest::post_json("/v1/chat/completions", Bytes::from(bytes));
365 encoded.warnings = warnings;
366 Ok(encoded)
367}
368
369fn encode_messages(request: &ModelRequest, warnings: &mut Vec<ModelWarning>) -> Vec<Value> {
372 let mut out: Vec<Value> = Vec::new();
373 if !request.system.is_empty() {
374 if request.system.any_cached() {
379 warnings.push(ModelWarning::LossyEncode {
380 field: "system.cache_control".into(),
381 detail: "OpenAI Chat has no native prompt-cache control; \
382 block text is concatenated and the cache directive \
383 is dropped"
384 .into(),
385 });
386 }
387 out.push(json!({
388 "role": "system",
389 "content": request.system.concat_text(),
390 }));
391 }
392 for (idx, msg) in request.messages.iter().enumerate() {
393 match msg.role {
394 Role::System => {
395 let text = collect_text(&msg.content, warnings, idx);
396 out.push(json!({ "role": "system", "content": text }));
397 }
398 Role::User => {
399 out.push(json!({
400 "role": "user",
401 "content": encode_user_content(&msg.content, warnings, idx),
402 }));
403 }
404 Role::Assistant => {
405 out.push(encode_assistant_message(&msg.content, warnings, idx));
406 }
407 Role::Tool => {
408 for (part_idx, part) in msg.content.iter().enumerate() {
412 if let ContentPart::ToolResult {
413 tool_use_id,
414 content,
415 is_error,
416 ..
417 } = part
418 {
419 let body_str = match content {
420 ToolResultContent::Text(t) => t.clone(),
421 ToolResultContent::Json(v) => v.to_string(),
422 };
423 let mut entry = Map::new();
424 entry.insert("role".into(), Value::String("tool".into()));
425 entry.insert("tool_call_id".into(), Value::String(tool_use_id.clone()));
426 entry.insert("content".into(), Value::String(body_str));
427 if *is_error {
428 warnings.push(ModelWarning::LossyEncode {
429 field: format!("messages[{idx}].content[{part_idx}].is_error"),
430 detail: "OpenAI Chat has no tool_result is_error flag — \
431 carrying via content text"
432 .into(),
433 });
434 }
435 out.push(Value::Object(entry));
436 } else {
437 warnings.push(ModelWarning::LossyEncode {
438 field: format!("messages[{idx}].content[{part_idx}]"),
439 detail: "non-tool_result part on Role::Tool dropped".into(),
440 });
441 }
442 }
443 }
444 }
445 }
446 out
447}
448
449fn encode_user_content(
450 parts: &[ContentPart],
451 warnings: &mut Vec<ModelWarning>,
452 msg_idx: usize,
453) -> Value {
454 if parts.iter().all(|p| matches!(p, ContentPart::Text { .. })) {
456 let mut text = String::new();
457 for part in parts {
458 if let ContentPart::Text { text: t, .. } = part {
459 text.push_str(t);
460 }
461 }
462 return Value::String(text);
463 }
464 let mut arr = Vec::new();
466 for (part_idx, part) in parts.iter().enumerate() {
467 let path = || format!("messages[{msg_idx}].content[{part_idx}]");
468 match part {
469 ContentPart::Text { text, .. } => {
470 arr.push(json!({ "type": "text", "text": text }));
471 }
472 ContentPart::Image { source, .. } => {
473 arr.push(json!({
474 "type": "image_url",
475 "image_url": { "url": media_to_url_chat(source) },
476 }));
477 }
478 ContentPart::Audio { source, .. } => {
479 if let MediaSource::Base64 { media_type, data } = source {
481 let format = audio_format_from_mime(media_type);
482 arr.push(json!({
483 "type": "input_audio",
484 "input_audio": { "data": data, "format": format },
485 }));
486 } else {
487 warnings.push(ModelWarning::LossyEncode {
488 field: path(),
489 detail: "OpenAI Chat input_audio requires base64 source; URL/FileId \
490 audio dropped"
491 .into(),
492 });
493 }
494 }
495 ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
496 field: path(),
497 detail: "OpenAI Chat does not accept video inputs; block dropped".into(),
498 }),
499 ContentPart::Document { source, name, .. } => {
500 if let MediaSource::FileId { id, .. } = source {
503 let mut o = Map::new();
504 o.insert("type".into(), Value::String("file".into()));
505 let mut file_obj = Map::new();
506 file_obj.insert("file_id".into(), Value::String(id.clone()));
507 if let Some(n) = name {
508 file_obj.insert("filename".into(), Value::String(n.clone()));
509 }
510 o.insert("file".into(), Value::Object(file_obj));
511 arr.push(Value::Object(o));
512 } else {
513 warnings.push(ModelWarning::LossyEncode {
514 field: path(),
515 detail: "OpenAI Chat document input requires Files-API FileId source; \
516 inline document dropped"
517 .into(),
518 });
519 }
520 }
521 ContentPart::Thinking { .. } => warnings.push(ModelWarning::LossyEncode {
522 field: path(),
523 detail: "OpenAI Chat does not accept thinking blocks on input; block dropped"
524 .into(),
525 }),
526 ContentPart::Citation { .. } => warnings.push(ModelWarning::LossyEncode {
527 field: path(),
528 detail: "OpenAI Chat does not echo citations on input; block dropped".into(),
529 }),
530 ContentPart::ToolUse { .. } | ContentPart::ToolResult { .. } => {
531 warnings.push(ModelWarning::LossyEncode {
532 field: path(),
533 detail: "tool_use / tool_result not allowed on user role for OpenAI Chat; \
534 move to assistant or tool role"
535 .into(),
536 });
537 }
538 ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
539 warnings.push(ModelWarning::LossyEncode {
540 field: path(),
541 detail: "OpenAI Chat does not accept assistant-produced image / audio output \
542 as input — block dropped"
543 .into(),
544 });
545 }
546 ContentPart::RedactedThinking { .. } => {
547 warnings.push(ModelWarning::LossyEncode {
548 field: path(),
549 detail: "OpenAI Chat does not accept redacted_thinking blocks; block dropped"
550 .into(),
551 });
552 }
553 }
554 }
555 Value::Array(arr)
556}
557
558fn media_to_url_chat(source: &MediaSource) -> String {
559 match source {
560 MediaSource::Url { url, .. } => url.clone(),
561 MediaSource::Base64 { media_type, data } => format!("data:{media_type};base64,{data}"),
562 MediaSource::FileId { id, .. } => id.clone(),
563 }
564}
565
566fn audio_format_from_mime(mime: &str) -> &'static str {
567 match mime {
568 "audio/mp3" | "audio/mpeg" => "mp3",
569 "audio/aac" => "aac",
570 "audio/flac" => "flac",
571 "audio/ogg" | "audio/opus" => "opus",
572 _ => "wav",
576 }
577}
578
579fn encode_assistant_message(
580 parts: &[ContentPart],
581 warnings: &mut Vec<ModelWarning>,
582 msg_idx: usize,
583) -> Value {
584 let mut text_buf = String::new();
585 let mut tool_calls = Vec::new();
586 for (part_idx, part) in parts.iter().enumerate() {
587 match part {
588 ContentPart::Text { text, .. } => text_buf.push_str(text),
589 ContentPart::ToolUse {
590 id, name, input, ..
591 } => {
592 tool_calls.push(json!({
593 "id": id,
594 "type": "function",
595 "function": {
596 "name": name,
597 "arguments": input.to_string(),
598 },
599 }));
600 }
601 ContentPart::Citation { snippet, .. } => text_buf.push_str(snippet),
606 other => {
607 warnings.push(ModelWarning::LossyEncode {
608 field: format!("messages[{msg_idx}].content[{part_idx}]"),
609 detail: format!(
610 "{} not supported on assistant role for OpenAI Chat — dropped",
611 debug_part_kind(other)
612 ),
613 });
614 }
615 }
616 }
617 let mut entry = Map::new();
618 entry.insert("role".into(), Value::String("assistant".into()));
619 entry.insert(
620 "content".into(),
621 if text_buf.is_empty() {
622 Value::Null
623 } else {
624 Value::String(text_buf)
625 },
626 );
627 if !tool_calls.is_empty() {
628 entry.insert("tool_calls".into(), Value::Array(tool_calls));
629 }
630 Value::Object(entry)
631}
632
633fn collect_text(parts: &[ContentPart], warnings: &mut Vec<ModelWarning>, msg_idx: usize) -> String {
634 let mut text = String::new();
635 let mut lossy = false;
636 for part in parts {
637 match part {
638 ContentPart::Text { text: t, .. } => text.push_str(t),
639 _ => lossy = true,
640 }
641 }
642 if lossy {
643 warnings.push(ModelWarning::LossyEncode {
644 field: format!("messages[{msg_idx}].content"),
645 detail: "non-text parts dropped from system message".into(),
646 });
647 }
648 text
649}
650
651const fn debug_part_kind(part: &ContentPart) -> &'static str {
652 match part {
653 ContentPart::Text { .. } => "text",
654 ContentPart::Image { .. } => "image",
655 ContentPart::Audio { .. } => "audio",
656 ContentPart::Video { .. } => "video",
657 ContentPart::Document { .. } => "document",
658 ContentPart::Thinking { .. } => "thinking",
659 ContentPart::Citation { .. } => "citation",
660 ContentPart::ToolUse { .. } => "tool_use",
661 ContentPart::ToolResult { .. } => "tool_result",
662 ContentPart::ImageOutput { .. } => "image_output",
663 ContentPart::AudioOutput { .. } => "audio_output",
664 ContentPart::RedactedThinking { .. } => "redacted_thinking",
665 }
666}
667
668fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
669 let mut arr = Vec::with_capacity(tools.len());
670 for (idx, t) in tools.iter().enumerate() {
671 match &t.kind {
672 ToolKind::Function { input_schema } => arr.push(json!({
673 "type": "function",
674 "function": {
675 "name": t.name,
676 "description": t.description,
677 "parameters": input_schema,
678 },
679 })),
680 ToolKind::WebSearch { .. }
683 | ToolKind::Computer { .. }
684 | ToolKind::TextEditor
685 | ToolKind::Bash
686 | ToolKind::CodeExecution
687 | ToolKind::FileSearch { .. }
688 | ToolKind::CodeInterpreter
689 | ToolKind::ImageGeneration
690 | ToolKind::McpConnector { .. }
691 | ToolKind::Memory => warnings.push(ModelWarning::LossyEncode {
692 field: format!("tools[{idx}]"),
693 detail: "OpenAI Chat Completions advertises only function tools — \
694 vendor built-ins (web_search, computer, file_search, …) \
695 live on the Responses API; tool dropped"
696 .into(),
697 }),
698 }
699 }
700 Value::Array(arr)
701}
702
703fn encode_tool_choice(choice: &ToolChoice) -> Value {
704 match choice {
705 ToolChoice::Auto => Value::String("auto".into()),
706 ToolChoice::Required => Value::String("required".into()),
707 ToolChoice::None => Value::String("none".into()),
708 ToolChoice::Specific { name } => json!({
709 "type": "function",
710 "function": { "name": name },
711 }),
712 }
713}
714
715fn decode_choice(raw: &Value, warnings: &mut Vec<ModelWarning>) -> (Vec<ContentPart>, StopReason) {
718 let choice = raw
719 .get("choices")
720 .and_then(Value::as_array)
721 .and_then(|a| a.first())
722 .cloned()
723 .unwrap_or(Value::Null); let message = choice.get("message").unwrap_or(&Value::Null); let content = decode_assistant_message(message, warnings);
726 let stop_reason = decode_finish_reason(
727 choice.get("finish_reason").and_then(Value::as_str),
728 warnings,
729 );
730 (content, stop_reason)
731}
732
733fn decode_assistant_message(message: &Value, warnings: &mut Vec<ModelWarning>) -> Vec<ContentPart> {
734 let mut parts: Vec<ContentPart> = Vec::new();
735 let text = message
736 .get("content")
737 .and_then(Value::as_str)
738 .unwrap_or_default(); if let Some(annotations) = message.get("annotations").and_then(Value::as_array) {
744 for ann in annotations {
745 if ann.get("type").and_then(Value::as_str) == Some("url_citation")
746 && let Some(uc) = ann.get("url_citation")
747 {
748 parts.push(ContentPart::Citation {
749 snippet: text.to_owned(),
750 source: CitationSource::Url {
751 url: str_field(uc, "url").to_owned(),
752 title: uc.get("title").and_then(Value::as_str).map(str::to_owned),
753 },
754 cache_control: None,
755 provider_echoes: Vec::new(),
756 });
757 }
758 }
759 }
760 if !text.is_empty() {
761 parts.push(ContentPart::text(text));
762 }
763 if let Some(tool_calls) = message.get("tool_calls").and_then(Value::as_array) {
764 for (idx, call) in tool_calls.iter().enumerate() {
765 let id = str_field(call, "id").to_owned();
766 let function = call.get("function").unwrap_or(&Value::Null); let name = str_field(function, "name").to_owned();
768 let arguments = function
769 .get("arguments")
770 .and_then(Value::as_str)
771 .unwrap_or("{}"); let input = if let Ok(v) = serde_json::from_str::<Value>(arguments) {
777 v
778 } else {
779 warnings.push(ModelWarning::LossyEncode {
780 field: format!("choices[0].message.tool_calls[{idx}].function.arguments"),
781 detail: "tool arguments not valid JSON; preserved as raw string".into(),
782 });
783 Value::String(arguments.to_owned())
784 };
785 parts.push(ContentPart::ToolUse {
786 id,
787 name,
788 input,
789 provider_echoes: Vec::new(),
790 });
791 }
792 }
793 parts
794}
795
796fn decode_finish_reason(reason: Option<&str>, warnings: &mut Vec<ModelWarning>) -> StopReason {
797 match reason {
798 Some("stop") => StopReason::EndTurn,
799 Some("length") => StopReason::MaxTokens,
800 Some("tool_calls" | "function_call") => StopReason::ToolUse,
801 Some("content_filter") => StopReason::Refusal {
802 reason: RefusalReason::Safety,
803 },
804 Some(other) => {
805 warnings.push(ModelWarning::UnknownStopReason {
806 raw: other.to_owned(),
807 });
808 StopReason::Other {
809 raw: other.to_owned(),
810 }
811 }
812 None => {
813 warnings.push(ModelWarning::LossyEncode {
818 field: "finish_reason".into(),
819 detail: "OpenAI Chat response carried no finish_reason — \
820 IR records `Other{raw:\"missing\"}`"
821 .into(),
822 });
823 StopReason::Other {
824 raw: "missing".to_owned(),
825 }
826 }
827 }
828}
829
830fn decode_usage(usage: Option<&Value>) -> Usage {
831 Usage {
832 input_tokens: u_field(usage, "prompt_tokens"),
833 output_tokens: u_field(usage, "completion_tokens"),
834 cached_input_tokens: u_field_nested(usage, &["prompt_tokens_details", "cached_tokens"]),
835 cache_creation_input_tokens: 0,
836 reasoning_tokens: u_field_nested(usage, &["completion_tokens_details", "reasoning_tokens"]),
837 safety_ratings: Vec::new(),
838 }
839}
840
841fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
842 v.get(key).and_then(Value::as_str).unwrap_or("") }
844
845fn u_field(v: Option<&Value>, key: &str) -> u32 {
846 v.and_then(|inner| inner.get(key))
847 .and_then(Value::as_u64)
848 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
850
851fn u_field_nested(v: Option<&Value>, path: &[&str]) -> u32 {
852 let Some(mut cursor) = v else {
853 return 0;
854 };
855 for segment in path {
856 let Some(next) = cursor.get(*segment) else {
857 return 0;
858 };
859 cursor = next;
860 }
861 cursor
862 .as_u64()
863 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
865
866#[allow(tail_expr_drop_order, clippy::too_many_lines)]
869fn stream_openai_chat(
870 bytes: BoxByteStream<'_>,
871 warnings_in: Vec<ModelWarning>,
872) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
873 async_stream::stream! {
874 let mut bytes = bytes;
875 let mut buf: Vec<u8> = Vec::new();
876 let mut tool_indices_open: HashSet<u64> = HashSet::new();
878 let mut current_tool_index: Option<u64> = None;
879 let mut started = false;
880 let mut last_stop = StopReason::EndTurn;
881 let mut warnings_emitted = false;
882
883 while let Some(chunk) = bytes.next().await {
884 match chunk {
885 Ok(b) => buf.extend_from_slice(&b),
886 Err(e) => {
887 yield Err(e);
888 return;
889 }
890 }
891 if !warnings_emitted {
892 warnings_emitted = true;
893 for w in &warnings_in {
894 yield Ok(StreamDelta::Warning(w.clone()));
895 }
896 }
897 while let Some(pos) = find_double_newline(&buf) {
898 let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
899 let Ok(frame_str) = std::str::from_utf8(&frame) else {
900 continue;
901 };
902 let Some(payload) = parse_sse_data(frame_str) else {
903 continue;
904 };
905 if payload.trim() == "[DONE]" {
906 if current_tool_index.take().is_some() {
907 yield Ok(StreamDelta::ToolUseStop);
908 }
909 yield Ok(StreamDelta::Stop {
910 stop_reason: last_stop.clone(),
911 });
912 return;
913 }
914 let Ok(event) = serde_json::from_str::<Value>(&payload) else {
915 yield Err(Error::invalid_request(format!(
916 "OpenAI Chat stream: malformed chunk: {payload}"
917 )));
918 return;
919 };
920 if !started {
921 started = true;
922 let id = str_field(&event, "id").to_owned();
923 let model = str_field(&event, "model").to_owned();
924 yield Ok(StreamDelta::Start {
925 id,
926 model,
927 provider_echoes: Vec::new(),
928 });
929 }
930 if let Some(usage) = event.get("usage").filter(|v| !v.is_null()) {
931 yield Ok(StreamDelta::Usage(decode_usage(Some(usage))));
932 }
933 let Some(choice) = event
934 .get("choices")
935 .and_then(Value::as_array)
936 .and_then(|a| a.first())
937 else {
938 continue;
939 };
940 if let Some(reason) = choice.get("finish_reason").and_then(Value::as_str) {
941 last_stop = decode_finish_reason(Some(reason), &mut Vec::new());
942 }
943 let Some(delta) = choice.get("delta") else {
944 continue;
945 };
946 if let Some(text) = delta.get("content").and_then(Value::as_str)
947 && !text.is_empty()
948 {
949 if current_tool_index.take().is_some() {
950 yield Ok(StreamDelta::ToolUseStop);
951 }
952 yield Ok(StreamDelta::TextDelta {
953 text: text.to_owned(),
954 provider_echoes: Vec::new(),
955 });
956 }
957 if let Some(tool_calls) = delta.get("tool_calls").and_then(Value::as_array) {
958 for call in tool_calls {
959 let idx = if let Some(n) = call.get("index").and_then(Value::as_u64) {
960 n
961 } else {
962 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
963 field: "stream.delta.tool_calls[].index".into(),
964 detail: "OpenAI Chat stream tool_call missing spec-mandated 'index' field; falling back to slot 0 (mirrors anthropic streaming idx handling)".into(),
965 }));
966 0
967 };
968 let function = call.get("function");
969 let name = function
970 .and_then(|f| f.get("name"))
971 .and_then(Value::as_str)
972 .unwrap_or(""); let arguments = function
974 .and_then(|f| f.get("arguments"))
975 .and_then(Value::as_str)
976 .unwrap_or(""); let id = call
978 .get("id")
979 .and_then(Value::as_str)
980 .unwrap_or("") .to_owned();
982 if tool_indices_open.insert(idx) {
983 if let Some(prev) = current_tool_index.take()
985 && prev != idx
986 {
987 yield Ok(StreamDelta::ToolUseStop);
988 }
989 yield Ok(StreamDelta::ToolUseStart {
990 id,
991 name: name.to_owned(),
992 provider_echoes: Vec::new(),
993 });
994 current_tool_index = Some(idx);
995 }
996 if !arguments.is_empty() {
997 yield Ok(StreamDelta::ToolUseInputDelta {
998 partial_json: arguments.to_owned(),
999 });
1000 }
1001 }
1002 }
1003 }
1004 }
1005 }
1006}
1007
1008fn find_double_newline(buf: &[u8]) -> Option<usize> {
1009 let lf = buf.windows(2).position(|w| w == b"\n\n");
1010 let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1011 match (lf, crlf) {
1012 (Some(a), Some(b)) => Some(a.min(b)),
1013 (Some(a), None) => Some(a),
1014 (None, Some(b)) => Some(b),
1015 (None, None) => None,
1016 }
1017}
1018
1019fn parse_sse_data(frame: &str) -> Option<String> {
1020 let mut out: Option<String> = None;
1021 for line in frame.lines() {
1022 if let Some(rest) = line.strip_prefix("data:") {
1023 let trimmed = rest.strip_prefix(' ').unwrap_or(rest); match &mut out {
1025 Some(existing) => {
1026 existing.push('\n');
1027 existing.push_str(trimmed);
1028 }
1029 None => out = Some(trimmed.to_owned()),
1030 }
1031 }
1032 }
1033 out
1034}