1use serde::Deserialize;
19
20use crate::api::llm::LlmRequest;
21use crate::error::{FlowError, Result};
22use crate::json::Json;
23
24use super::request::{
25 AnnotatedLlmRequest, GenerationParams, Message, MessageContent, ToolChoice, ToolChoiceFunction,
26 ToolChoiceFunctionName, ToolDefinition,
27};
28use super::response::{
29 AnnotatedLlmResponse, ApiSpecificResponse, FinishReason, ResponseToolCall, Usage,
30};
31use super::traits::{LlmCodec, LlmResponseCodec};
32
33pub struct OpenAIResponsesCodec;
39
40#[derive(Deserialize)]
45struct RawResponsesResponse {
46 id: Option<String>,
47 model: Option<String>,
48 status: Option<String>,
49 output: Option<Vec<Json>>,
50 usage: Option<RawResponsesUsage>,
51 incomplete_details: Option<Json>,
52 previous_response_id: Option<String>,
53 store: Option<bool>,
54 service_tier: Option<String>,
55 truncation: Option<Json>,
56 reasoning: Option<Json>,
57 #[serde(flatten)]
58 extra: serde_json::Map<String, Json>,
59}
60
61#[derive(Deserialize)]
62struct RawResponsesUsage {
63 input_tokens: Option<u64>,
64 output_tokens: Option<u64>,
65 total_tokens: Option<u64>,
66 input_tokens_details: Option<RawInputTokensDetails>,
67 output_tokens_details: Option<RawOutputTokensDetails>,
68}
69
70#[derive(Deserialize, Clone)]
71struct RawInputTokensDetails {
72 cached_tokens: Option<u64>,
73 #[serde(flatten)]
74 extra: serde_json::Map<String, Json>,
75}
76
77#[derive(Deserialize, Clone)]
78struct RawOutputTokensDetails {
79 reasoning_tokens: Option<u64>,
80 #[serde(flatten)]
81 extra: serde_json::Map<String, Json>,
82}
83
84fn map_responses_finish_reason(
90 status: Option<&str>,
91 incomplete_details: Option<&Json>,
92) -> Option<FinishReason> {
93 let incomplete_reason = incomplete_details
94 .and_then(|d| d.get("reason"))
95 .and_then(|r| r.as_str());
96
97 match status {
98 Some("completed") => Some(FinishReason::Complete),
99 Some("incomplete") => match incomplete_reason {
100 Some("max_output_tokens") => Some(FinishReason::Length),
101 Some("content_filter") => Some(FinishReason::ContentFilter),
102 Some(other) => Some(FinishReason::Unknown(other.to_string())),
103 None => Some(FinishReason::Unknown("incomplete".to_string())),
104 },
105 Some(other) => Some(FinishReason::Unknown(other.to_string())),
106 None => None,
107 }
108}
109
110fn parse_arguments(arguments: &str) -> Json {
114 serde_json::from_str(arguments).unwrap_or_else(|_| Json::String(arguments.to_string()))
115}
116
117fn input_tokens_details_to_json(details: &RawInputTokensDetails) -> Json {
118 let mut obj = serde_json::Map::new();
119 if let Some(cached_tokens) = details.cached_tokens {
120 obj.insert("cached_tokens".into(), Json::from(cached_tokens));
121 }
122 obj.extend(details.extra.clone());
123 Json::Object(obj)
124}
125
126fn output_tokens_details_to_json(details: &RawOutputTokensDetails) -> Json {
127 let mut obj = serde_json::Map::new();
128 if let Some(reasoning_tokens) = details.reasoning_tokens {
129 obj.insert("reasoning_tokens".into(), Json::from(reasoning_tokens));
130 }
131 obj.extend(details.extra.clone());
132 Json::Object(obj)
133}
134
135const MODELED_REQUEST_KEYS: &[&str] = &[
137 "input",
138 "instructions",
139 "model",
140 "max_output_tokens",
141 "temperature",
142 "top_p",
143 "tools",
144 "tool_choice",
145 "store",
146 "previous_response_id",
147 "truncation",
148 "reasoning",
149 "include",
150 "user",
151 "metadata",
152 "service_tier",
153 "parallel_tool_calls",
154 "max_tool_calls",
155 "top_logprobs",
156 "stream",
157];
158const UNPARSED_INPUT_ITEMS_KEY: &str = "_openai_responses_unparsed_input_items";
159
160fn json_f64(v: f64) -> Json {
162 serde_json::Number::from_f64(v)
163 .map(Json::Number)
164 .unwrap_or(Json::Null)
165}
166
167fn collect_output_parts(items: Option<&[Json]>) -> (Vec<String>, Vec<ResponseToolCall>) {
168 let mut text_parts = Vec::new();
169 let mut tool_calls = Vec::new();
170
171 if let Some(items) = items {
172 for item in items {
173 collect_output_item(item, &mut text_parts, &mut tool_calls);
174 }
175 }
176
177 (text_parts, tool_calls)
178}
179
180fn collect_output_item(
181 item: &Json,
182 text_parts: &mut Vec<String>,
183 tool_calls: &mut Vec<ResponseToolCall>,
184) {
185 match item
186 .get("type")
187 .and_then(|value| value.as_str())
188 .unwrap_or("")
189 {
190 "message" => collect_message_text_parts(item, text_parts),
191 "function_call" => tool_calls.push(parse_function_call(item)),
192 _ => {}
193 }
194}
195
196fn collect_message_text_parts(item: &Json, text_parts: &mut Vec<String>) {
197 let Some(content) = item.get("content").and_then(|value| value.as_array()) else {
198 return;
199 };
200
201 for block in content {
202 if let Some(text) = output_text_block(block) {
203 text_parts.push(text);
204 }
205 }
206}
207
208fn output_text_block(block: &Json) -> Option<String> {
209 (block.get("type").and_then(|value| value.as_str()) == Some("output_text"))
210 .then(|| block.get("text").and_then(|value| value.as_str()))
211 .flatten()
212 .map(str::to_string)
213}
214
215fn parse_function_call(item: &Json) -> ResponseToolCall {
216 ResponseToolCall {
217 id: item
218 .get("call_id")
219 .and_then(|value| value.as_str())
220 .unwrap_or("")
221 .to_string(),
222 name: item
223 .get("name")
224 .and_then(|value| value.as_str())
225 .unwrap_or("")
226 .to_string(),
227 arguments: item
228 .get("arguments")
229 .and_then(|value| value.as_str())
230 .map(parse_arguments)
231 .unwrap_or(Json::Object(serde_json::Map::new())),
232 }
233}
234
235fn message_from_text_parts(text_parts: Vec<String>) -> Option<MessageContent> {
236 match text_parts.as_slice() {
237 [] => None,
238 [text] => Some(MessageContent::Text(text.clone())),
239 _ => Some(MessageContent::Text(text_parts.join("\n"))),
240 }
241}
242
243fn optional_vec<T>(items: Vec<T>) -> Option<Vec<T>> {
244 (!items.is_empty()).then_some(items)
245}
246
247fn split_system_and_input_messages(messages: &[Message]) -> (Option<String>, Vec<&Message>) {
248 let mut system_text = None;
249 let mut input_messages = Vec::new();
250
251 for msg in messages {
252 match msg {
253 Message::System { content, .. } => {
254 if let MessageContent::Text(text) = content {
255 system_text = Some(text.clone());
256 }
257 }
258 other => input_messages.push(other),
259 }
260 }
261
262 (system_text, input_messages)
263}
264
265fn set_or_remove_string(obj: &mut serde_json::Map<String, Json>, key: &str, value: Option<String>) {
266 if let Some(value) = value {
267 obj.insert(key.into(), Json::String(value));
268 } else {
269 obj.remove(key);
270 }
271}
272
273fn insert_serialized<T: serde::Serialize>(
274 obj: &mut serde_json::Map<String, Json>,
275 key: &str,
276 value: &T,
277 context: &str,
278) -> Result<()> {
279 let json = serde_json::to_value(value)
280 .map_err(|e| FlowError::Internal(format!("OpenAI Responses {context} encode: {e}")))?;
281 obj.insert(key.into(), json);
282 Ok(())
283}
284
285fn overlay_generation_params(obj: &mut serde_json::Map<String, Json>, params: &GenerationParams) {
286 if let Some(temp) = params.temperature {
287 obj.insert("temperature".into(), json_f64(temp));
288 }
289 if let Some(top_p) = params.top_p {
290 obj.insert("top_p".into(), json_f64(top_p));
291 }
292 if let Some(max_tokens) = params.max_tokens {
293 obj.insert("max_output_tokens".into(), Json::from(max_tokens));
294 obj.remove("max_tokens");
295 }
296}
297
298fn encode_openai_responses_input(
299 obj: &mut serde_json::Map<String, Json>,
300 annotated: &AnnotatedLlmRequest,
301) -> Result<()> {
302 let (system_text, input_messages) = split_system_and_input_messages(&annotated.messages);
303 set_or_remove_string(obj, "instructions", system_text);
304 if let Some(raw_input_items) = annotated.extra.get(UNPARSED_INPUT_ITEMS_KEY) {
305 obj.insert("input".into(), raw_input_items.clone());
306 } else {
307 insert_serialized(obj, "input", &input_messages, "input")?;
308 }
309 Ok(())
310}
311
312fn encode_openai_responses_tools(
313 obj: &mut serde_json::Map<String, Json>,
314 annotated: &AnnotatedLlmRequest,
315) -> Result<()> {
316 if let Some(ref tools) = annotated.tools {
317 insert_serialized(obj, "tools", tools, "tools")?;
318 }
319 if let Some(ref tool_choice) = annotated.tool_choice {
320 insert_serialized(obj, "tool_choice", tool_choice, "tool_choice")?;
321 }
322 Ok(())
323}
324
325fn overlay_openai_responses_fields(
326 obj: &mut serde_json::Map<String, Json>,
327 annotated: &AnnotatedLlmRequest,
328) {
329 if let Some(ref model) = annotated.model {
330 obj.insert("model".into(), Json::String(model.clone()));
331 }
332 overlay_openai_responses_json_fields(obj, annotated);
333 overlay_openai_responses_string_fields(obj, annotated);
334 overlay_openai_responses_bool_fields(obj, annotated);
335 overlay_openai_responses_u64_fields(obj, annotated);
336}
337
338fn overlay_openai_responses_json_fields(
339 obj: &mut serde_json::Map<String, Json>,
340 annotated: &AnnotatedLlmRequest,
341) {
342 for (key, value) in [
343 ("truncation", &annotated.truncation),
344 ("reasoning", &annotated.reasoning),
345 ("include", &annotated.include),
346 ("metadata", &annotated.metadata),
347 ] {
348 if let Some(value) = value {
349 obj.insert(key.into(), value.clone());
350 }
351 }
352}
353
354fn overlay_openai_responses_string_fields(
355 obj: &mut serde_json::Map<String, Json>,
356 annotated: &AnnotatedLlmRequest,
357) {
358 for (key, value) in [
359 ("previous_response_id", &annotated.previous_response_id),
360 ("user", &annotated.user),
361 ("service_tier", &annotated.service_tier),
362 ] {
363 if let Some(value) = value {
364 obj.insert(key.into(), Json::String(value.clone()));
365 }
366 }
367}
368
369fn overlay_openai_responses_bool_fields(
370 obj: &mut serde_json::Map<String, Json>,
371 annotated: &AnnotatedLlmRequest,
372) {
373 for (key, value) in [
374 ("store", annotated.store),
375 ("parallel_tool_calls", annotated.parallel_tool_calls),
376 ("stream", annotated.stream),
377 ] {
378 if let Some(value) = value {
379 obj.insert(key.into(), Json::Bool(value));
380 }
381 }
382}
383
384fn overlay_openai_responses_u64_fields(
385 obj: &mut serde_json::Map<String, Json>,
386 annotated: &AnnotatedLlmRequest,
387) {
388 for (key, value) in [
389 ("max_output_tokens", annotated.max_output_tokens),
390 ("max_tool_calls", annotated.max_tool_calls),
391 ("top_logprobs", annotated.top_logprobs),
392 ] {
393 if let Some(value) = value {
394 obj.insert(key.into(), Json::from(value));
395 }
396 }
397}
398
399fn merge_openai_responses_extra_fields(
400 obj: &mut serde_json::Map<String, Json>,
401 extra: &serde_json::Map<String, Json>,
402) {
403 for (k, v) in extra {
404 if k != UNPARSED_INPUT_ITEMS_KEY {
405 obj.insert(k.clone(), v.clone());
406 }
407 }
408}
409
410fn decode_openai_or_anthropic_tool_choice(value: &Json) -> Option<ToolChoice> {
411 if let Ok(parsed) = serde_json::from_value::<ToolChoice>(value.clone()) {
412 return Some(parsed);
413 }
414
415 let obj = value.as_object()?;
416 match obj.get("type").and_then(|v| v.as_str()) {
417 Some("auto") => Some(ToolChoice::Auto),
418 Some("any") => Some(ToolChoice::Required),
419 Some("none") => Some(ToolChoice::None),
420 Some("tool") => {
421 let name = obj.get("name").and_then(|v| v.as_str())?.to_string();
422 Some(ToolChoice::Specific(ToolChoiceFunction {
423 choice_type: "function".to_string(),
424 function: ToolChoiceFunctionName { name },
425 }))
426 }
427 _ => None,
428 }
429}
430
431fn decode_openai_or_anthropic_parallel_tool_calls(
432 obj: &serde_json::Map<String, Json>,
433) -> Option<bool> {
434 if let Some(value) = obj.get("parallel_tool_calls").and_then(|v| v.as_bool()) {
435 return Some(value);
436 }
437 let tool_choice = obj.get("tool_choice")?.as_object()?;
438 tool_choice
439 .get("disable_parallel_tool_use")
440 .and_then(|v| v.as_bool())
441 .map(|disabled| !disabled)
442}
443
444impl LlmResponseCodec for OpenAIResponsesCodec {
449 fn decode_response(&self, response: &Json) -> Result<AnnotatedLlmResponse> {
450 let raw: RawResponsesResponse = serde_json::from_value(response.clone())
451 .map_err(|e| FlowError::Internal(format!("OpenAI Responses response decode: {e}")))?;
452
453 let all_output_items = raw.output.clone();
454 let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref());
455 let message = message_from_text_parts(text_parts);
456 let tool_calls = optional_vec(tool_calls);
457
458 let finish_reason =
460 map_responses_finish_reason(raw.status.as_deref(), raw.incomplete_details.as_ref());
461
462 let input_tokens_details = raw.usage.as_ref().and_then(|u| {
463 u.input_tokens_details
464 .as_ref()
465 .map(input_tokens_details_to_json)
466 });
467 let output_tokens_details = raw.usage.as_ref().and_then(|u| {
468 u.output_tokens_details
469 .as_ref()
470 .map(output_tokens_details_to_json)
471 });
472
473 let usage = raw.usage.map(|u| Usage {
475 prompt_tokens: u.input_tokens,
476 completion_tokens: u.output_tokens,
477 total_tokens: u.total_tokens,
478 cache_read_tokens: u
479 .input_tokens_details
480 .as_ref()
481 .and_then(|d| d.cached_tokens),
482 cache_write_tokens: None,
483 });
484
485 let api_specific = Some(ApiSpecificResponse::OpenAIResponses {
487 output_items: all_output_items,
488 status: raw.status,
489 incomplete_details: raw.incomplete_details,
490 previous_response_id: raw.previous_response_id,
491 store: raw.store,
492 service_tier: raw.service_tier,
493 truncation: raw.truncation,
494 reasoning: raw.reasoning,
495 input_tokens_details,
496 output_tokens_details,
497 });
498
499 Ok(AnnotatedLlmResponse {
500 id: raw.id,
501 model: raw.model,
502 message,
503 tool_calls,
504 finish_reason,
505 usage,
506 api_specific,
507 extra: raw.extra,
508 })
509 }
510}
511
512impl LlmCodec for OpenAIResponsesCodec {
517 fn decode(&self, request: &LlmRequest) -> Result<AnnotatedLlmRequest> {
518 let obj = request
519 .content
520 .as_object()
521 .ok_or_else(|| FlowError::Internal("request content is not an object".into()))?;
522
523 let mut messages: Vec<Message> = Vec::new();
524 let mut preserved_unparsed_input: Option<Json> = None;
525
526 if let Some(instructions) = obj.get("instructions").and_then(|v| v.as_str()) {
528 messages.push(Message::System {
529 content: MessageContent::Text(instructions.to_string()),
530 name: None,
531 });
532 }
533
534 if let Some(input) = obj.get("input") {
536 if let Some(s) = input.as_str() {
537 messages.push(Message::User {
539 content: MessageContent::Text(s.to_string()),
540 name: None,
541 });
542 } else if input.is_array() {
543 match serde_json::from_value::<Vec<Message>>(input.clone()) {
545 Ok(input_messages) => messages.extend(input_messages),
546 Err(_) => {
547 preserved_unparsed_input = Some(input.clone());
549 }
550 }
551 }
552 }
553
554 let model = obj.get("model").and_then(|v| v.as_str()).map(String::from);
556
557 let temperature = obj.get("temperature").and_then(|v| v.as_f64());
559 let top_p = obj.get("top_p").and_then(|v| v.as_f64());
560 let max_tokens = obj.get("max_output_tokens").and_then(|v| v.as_u64());
561 let params = if temperature.is_some() || max_tokens.is_some() || top_p.is_some() {
564 Some(GenerationParams {
565 temperature,
566 max_tokens,
567 top_p,
568 stop: None,
569 })
570 } else {
571 None
572 };
573
574 let tools: Option<Vec<ToolDefinition>> = obj
576 .get("tools")
577 .map(|v| serde_json::from_value(v.clone()))
578 .transpose()
579 .map_err(|e| FlowError::Internal(format!("OpenAI Responses tools decode: {e}")))?;
580
581 let tool_choice: Option<ToolChoice> = obj
583 .get("tool_choice")
584 .and_then(decode_openai_or_anthropic_tool_choice);
585
586 let mut extra: serde_json::Map<String, Json> = obj
588 .iter()
589 .filter(|(k, _)| !MODELED_REQUEST_KEYS.contains(&k.as_str()))
590 .map(|(k, v)| (k.clone(), v.clone()))
591 .collect();
592 if let Some(input_items) = preserved_unparsed_input {
593 extra.insert(UNPARSED_INPUT_ITEMS_KEY.into(), input_items);
594 }
595
596 Ok(AnnotatedLlmRequest {
597 messages,
598 model,
599 params,
600 tools,
601 tool_choice,
602 store: obj.get("store").and_then(|v| v.as_bool()),
603 previous_response_id: obj
604 .get("previous_response_id")
605 .and_then(|v| v.as_str())
606 .map(String::from),
607 truncation: obj.get("truncation").cloned(),
608 reasoning: obj.get("reasoning").cloned(),
609 include: obj.get("include").cloned(),
610 user: obj.get("user").and_then(|v| v.as_str()).map(String::from),
611 metadata: obj.get("metadata").cloned(),
612 service_tier: obj
613 .get("service_tier")
614 .and_then(|v| v.as_str())
615 .map(String::from),
616 parallel_tool_calls: decode_openai_or_anthropic_parallel_tool_calls(obj),
617 max_output_tokens: obj.get("max_output_tokens").and_then(|v| v.as_u64()),
618 max_tool_calls: obj.get("max_tool_calls").and_then(|v| v.as_u64()),
619 top_logprobs: obj.get("top_logprobs").and_then(|v| v.as_u64()),
620 stream: obj.get("stream").and_then(|v| v.as_bool()),
621 extra,
622 })
623 }
624
625 fn encode(&self, annotated: &AnnotatedLlmRequest, original: &LlmRequest) -> Result<LlmRequest> {
626 let mut content = original.content.clone();
627 let obj = content
628 .as_object_mut()
629 .ok_or_else(|| FlowError::Internal("original content is not an object".into()))?;
630
631 encode_openai_responses_input(obj, annotated)?;
632 if let Some(ref params) = annotated.params {
633 overlay_generation_params(obj, params);
634 }
635 encode_openai_responses_tools(obj, annotated)?;
636 overlay_openai_responses_fields(obj, annotated);
637 merge_openai_responses_extra_fields(obj, &annotated.extra);
638
639 Ok(LlmRequest {
640 headers: original.headers.clone(),
641 content,
642 })
643 }
644}
645
646pub struct OpenAIResponsesStreamingCodec {
679 state: std::sync::Arc<std::sync::Mutex<OpenAIResponsesStreamingState>>,
680}
681
682impl OpenAIResponsesStreamingCodec {
683 pub fn new() -> Self {
685 Self {
686 state: std::sync::Arc::new(std::sync::Mutex::new(
687 OpenAIResponsesStreamingState::default(),
688 )),
689 }
690 }
691}
692
693impl Default for OpenAIResponsesStreamingCodec {
694 fn default() -> Self {
695 Self::new()
696 }
697}
698
699impl super::streaming::StreamingCodec for OpenAIResponsesStreamingCodec {
700 fn collector(&self) -> crate::api::runtime::LlmCollectorFn {
701 let state = std::sync::Arc::clone(&self.state);
702 Box::new(move |event: Json| -> Result<()> {
703 let mut guard = state
704 .lock()
705 .unwrap_or_else(|poisoned| poisoned.into_inner());
706 guard.observe(&event);
707 Ok(())
708 })
709 }
710
711 fn finalizer(&self) -> crate::api::runtime::LlmFinalizerFn {
712 let state = std::sync::Arc::clone(&self.state);
713 Box::new(move || -> Json {
714 let mut guard = state
715 .lock()
716 .unwrap_or_else(|poisoned| poisoned.into_inner());
717 std::mem::take(&mut *guard).finalize()
718 })
719 }
720}
721
722#[derive(Debug, Default)]
723struct OpenAIResponsesStreamingState {
724 response: Option<serde_json::Map<String, Json>>,
727 items: std::collections::BTreeMap<usize, Json>,
731}
732
733impl OpenAIResponsesStreamingState {
734 fn observe(&mut self, event: &Json) {
735 let event_type = event.get("type").and_then(Json::as_str).unwrap_or("");
736 match event_type {
737 "response.created"
738 | "response.in_progress"
739 | "response.completed"
740 | "response.failed"
741 | "response.incomplete" => self.observe_response_snapshot(event),
742 "response.output_item.added" | "response.output_item.done" => {
743 self.observe_output_item(event);
744 }
745 _ => {}
749 }
750 }
751
752 fn observe_response_snapshot(&mut self, event: &Json) {
753 let Some(response) = event.get("response") else {
754 return;
755 };
756 if let Json::Object(map) = response {
757 self.response = Some(map.clone());
758 }
759 }
760
761 fn observe_output_item(&mut self, event: &Json) {
762 let Some(index) = event.get("output_index").and_then(Json::as_u64) else {
763 return;
764 };
765 let Some(item) = event.get("item") else {
766 return;
767 };
768 self.items.insert(index as usize, item.clone());
769 }
770
771 fn finalize(self) -> Json {
772 let mut output = self.response.unwrap_or_default();
773 let snapshot_output_empty = output
778 .get("output")
779 .and_then(Json::as_array)
780 .map(|arr| arr.is_empty())
781 .unwrap_or(true);
782 if snapshot_output_empty && !self.items.is_empty() {
783 let items: Vec<Json> = self.items.into_values().collect();
784 output.insert("output".to_string(), Json::Array(items));
785 }
786 Json::Object(output)
787 }
788}
789
790#[cfg(test)]
795#[path = "../../tests/unit/codec/openai_responses_tests.rs"]
796mod tests;