1use serde::Deserialize;
19
20use crate::api::llm::LlmRequest;
21use crate::error::{FlowError, Result};
22use crate::json::Json;
23
24use super::request::{
25 AnnotatedLlmRequest, GenerationParams, Message, MessageContent, ToolChoice, ToolDefinition,
26};
27use super::response::{
28 AnnotatedLlmResponse, ApiSpecificResponse, FinishReason, ResponseToolCall, Usage,
29};
30use super::traits::{LlmCodec, LlmResponseCodec};
31
32pub struct OpenAIResponsesCodec;
38
39#[derive(Deserialize)]
44struct RawResponsesResponse {
45 id: Option<String>,
46 model: Option<String>,
47 status: Option<String>,
48 output: Option<Vec<Json>>,
49 usage: Option<RawResponsesUsage>,
50 incomplete_details: Option<Json>,
51 #[serde(flatten)]
52 extra: serde_json::Map<String, Json>,
53}
54
55#[derive(Deserialize)]
56struct RawResponsesUsage {
57 input_tokens: Option<u64>,
58 output_tokens: Option<u64>,
59 total_tokens: Option<u64>,
60 input_tokens_details: Option<RawInputTokensDetails>,
61}
62
63#[derive(Deserialize)]
64struct RawInputTokensDetails {
65 cached_tokens: Option<u64>,
66}
67
68fn map_responses_finish_reason(
74 status: Option<&str>,
75 incomplete_details: Option<&Json>,
76) -> Option<FinishReason> {
77 let incomplete_reason = incomplete_details
78 .and_then(|d| d.get("reason"))
79 .and_then(|r| r.as_str());
80
81 match status {
82 Some("completed") => Some(FinishReason::Complete),
83 Some("incomplete") => match incomplete_reason {
84 Some("max_output_tokens") => Some(FinishReason::Length),
85 Some("content_filter") => Some(FinishReason::ContentFilter),
86 Some(other) => Some(FinishReason::Unknown(other.to_string())),
87 None => Some(FinishReason::Unknown("incomplete".to_string())),
88 },
89 Some(other) => Some(FinishReason::Unknown(other.to_string())),
90 None => None,
91 }
92}
93
94fn parse_arguments(arguments: &str) -> Json {
98 serde_json::from_str(arguments).unwrap_or_else(|_| Json::String(arguments.to_string()))
99}
100
101const MODELED_REQUEST_KEYS: &[&str] = &[
103 "input",
104 "instructions",
105 "model",
106 "max_output_tokens",
107 "temperature",
108 "top_p",
109 "tools",
110 "tool_choice",
111];
112
113fn json_f64(v: f64) -> Json {
115 serde_json::Number::from_f64(v)
116 .map(Json::Number)
117 .unwrap_or(Json::Null)
118}
119
120fn collect_output_parts(items: Option<&[Json]>) -> (Vec<String>, Vec<ResponseToolCall>) {
121 let mut text_parts = Vec::new();
122 let mut tool_calls = Vec::new();
123
124 if let Some(items) = items {
125 for item in items {
126 collect_output_item(item, &mut text_parts, &mut tool_calls);
127 }
128 }
129
130 (text_parts, tool_calls)
131}
132
133fn collect_output_item(
134 item: &Json,
135 text_parts: &mut Vec<String>,
136 tool_calls: &mut Vec<ResponseToolCall>,
137) {
138 match item
139 .get("type")
140 .and_then(|value| value.as_str())
141 .unwrap_or("")
142 {
143 "message" => collect_message_text_parts(item, text_parts),
144 "function_call" => tool_calls.push(parse_function_call(item)),
145 _ => {}
146 }
147}
148
149fn collect_message_text_parts(item: &Json, text_parts: &mut Vec<String>) {
150 let Some(content) = item.get("content").and_then(|value| value.as_array()) else {
151 return;
152 };
153
154 for block in content {
155 if let Some(text) = output_text_block(block) {
156 text_parts.push(text);
157 }
158 }
159}
160
161fn output_text_block(block: &Json) -> Option<String> {
162 (block.get("type").and_then(|value| value.as_str()) == Some("output_text"))
163 .then(|| block.get("text").and_then(|value| value.as_str()))
164 .flatten()
165 .map(str::to_string)
166}
167
168fn parse_function_call(item: &Json) -> ResponseToolCall {
169 ResponseToolCall {
170 id: item
171 .get("call_id")
172 .and_then(|value| value.as_str())
173 .unwrap_or("")
174 .to_string(),
175 name: item
176 .get("name")
177 .and_then(|value| value.as_str())
178 .unwrap_or("")
179 .to_string(),
180 arguments: item
181 .get("arguments")
182 .and_then(|value| value.as_str())
183 .map(parse_arguments)
184 .unwrap_or(Json::Object(serde_json::Map::new())),
185 }
186}
187
188fn message_from_text_parts(text_parts: Vec<String>) -> Option<MessageContent> {
189 match text_parts.as_slice() {
190 [] => None,
191 [text] => Some(MessageContent::Text(text.clone())),
192 _ => Some(MessageContent::Text(text_parts.join("\n"))),
193 }
194}
195
196fn optional_vec<T>(items: Vec<T>) -> Option<Vec<T>> {
197 (!items.is_empty()).then_some(items)
198}
199
200fn split_system_and_input_messages(messages: &[Message]) -> (Option<String>, Vec<&Message>) {
201 let mut system_text = None;
202 let mut input_messages = Vec::new();
203
204 for msg in messages {
205 match msg {
206 Message::System { content, .. } => {
207 if let MessageContent::Text(text) = content {
208 system_text = Some(text.clone());
209 }
210 }
211 other => input_messages.push(other),
212 }
213 }
214
215 (system_text, input_messages)
216}
217
218fn set_or_remove_string(obj: &mut serde_json::Map<String, Json>, key: &str, value: Option<String>) {
219 if let Some(value) = value {
220 obj.insert(key.into(), Json::String(value));
221 } else {
222 obj.remove(key);
223 }
224}
225
226fn insert_serialized<T: serde::Serialize>(
227 obj: &mut serde_json::Map<String, Json>,
228 key: &str,
229 value: &T,
230 context: &str,
231) -> Result<()> {
232 let json = serde_json::to_value(value)
233 .map_err(|e| FlowError::Internal(format!("OpenAI Responses {context} encode: {e}")))?;
234 obj.insert(key.into(), json);
235 Ok(())
236}
237
238fn overlay_generation_params(obj: &mut serde_json::Map<String, Json>, params: &GenerationParams) {
239 if let Some(temp) = params.temperature {
240 obj.insert("temperature".into(), json_f64(temp));
241 }
242 if let Some(top_p) = params.top_p {
243 obj.insert("top_p".into(), json_f64(top_p));
244 }
245 if let Some(max_tokens) = params.max_tokens {
246 obj.insert("max_output_tokens".into(), Json::from(max_tokens));
247 obj.remove("max_tokens");
248 }
249}
250
251impl LlmResponseCodec for OpenAIResponsesCodec {
256 fn decode_response(&self, response: &Json) -> Result<AnnotatedLlmResponse> {
257 let raw: RawResponsesResponse = serde_json::from_value(response.clone())
258 .map_err(|e| FlowError::Internal(format!("OpenAI Responses response decode: {e}")))?;
259
260 let all_output_items = raw.output.clone();
261 let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref());
262 let message = message_from_text_parts(text_parts);
263 let tool_calls = optional_vec(tool_calls);
264
265 let finish_reason =
267 map_responses_finish_reason(raw.status.as_deref(), raw.incomplete_details.as_ref());
268
269 let usage = raw.usage.map(|u| Usage {
271 prompt_tokens: u.input_tokens,
272 completion_tokens: u.output_tokens,
273 total_tokens: u.total_tokens,
274 cache_read_tokens: u.input_tokens_details.and_then(|d| d.cached_tokens),
275 cache_write_tokens: None,
276 });
277
278 let api_specific = Some(ApiSpecificResponse::OpenAIResponses {
280 output_items: all_output_items,
281 status: raw.status,
282 incomplete_details: raw.incomplete_details,
283 });
284
285 Ok(AnnotatedLlmResponse {
286 id: raw.id,
287 model: raw.model,
288 message,
289 tool_calls,
290 finish_reason,
291 usage,
292 api_specific,
293 extra: raw.extra,
294 })
295 }
296}
297
298impl LlmCodec for OpenAIResponsesCodec {
303 fn decode(&self, request: &LlmRequest) -> Result<AnnotatedLlmRequest> {
304 let obj = request
305 .content
306 .as_object()
307 .ok_or_else(|| FlowError::Internal("request content is not an object".into()))?;
308
309 let mut messages: Vec<Message> = Vec::new();
310
311 if let Some(instructions) = obj.get("instructions").and_then(|v| v.as_str()) {
313 messages.push(Message::System {
314 content: MessageContent::Text(instructions.to_string()),
315 name: None,
316 });
317 }
318
319 if let Some(input) = obj.get("input") {
321 if let Some(s) = input.as_str() {
322 messages.push(Message::User {
324 content: MessageContent::Text(s.to_string()),
325 name: None,
326 });
327 } else if input.is_array() {
328 let input_messages: Vec<Message> =
330 serde_json::from_value(input.clone()).unwrap_or_default();
331 messages.extend(input_messages);
332 }
333 }
334
335 let model = obj.get("model").and_then(|v| v.as_str()).map(String::from);
337
338 let temperature = obj.get("temperature").and_then(|v| v.as_f64());
340 let top_p = obj.get("top_p").and_then(|v| v.as_f64());
341 let max_tokens = obj.get("max_output_tokens").and_then(|v| v.as_u64());
342 let params = if temperature.is_some() || max_tokens.is_some() || top_p.is_some() {
345 Some(GenerationParams {
346 temperature,
347 max_tokens,
348 top_p,
349 stop: None,
350 })
351 } else {
352 None
353 };
354
355 let tools: Option<Vec<ToolDefinition>> = obj
357 .get("tools")
358 .map(|v| serde_json::from_value(v.clone()))
359 .transpose()
360 .map_err(|e| FlowError::Internal(format!("OpenAI Responses tools decode: {e}")))?;
361
362 let tool_choice: Option<ToolChoice> = obj
364 .get("tool_choice")
365 .map(|v| serde_json::from_value(v.clone()))
366 .transpose()
367 .map_err(|e| {
368 FlowError::Internal(format!("OpenAI Responses tool_choice decode: {e}"))
369 })?;
370
371 let extra: serde_json::Map<String, Json> = obj
373 .iter()
374 .filter(|(k, _)| !MODELED_REQUEST_KEYS.contains(&k.as_str()))
375 .map(|(k, v)| (k.clone(), v.clone()))
376 .collect();
377
378 Ok(AnnotatedLlmRequest {
379 messages,
380 model,
381 params,
382 tools,
383 tool_choice,
384 extra,
385 })
386 }
387
388 fn encode(&self, annotated: &AnnotatedLlmRequest, original: &LlmRequest) -> Result<LlmRequest> {
389 let mut content = original.content.clone();
390 let obj = content
391 .as_object_mut()
392 .ok_or_else(|| FlowError::Internal("original content is not an object".into()))?;
393
394 let (system_text, input_messages) = split_system_and_input_messages(&annotated.messages);
395 set_or_remove_string(obj, "instructions", system_text);
396 insert_serialized(obj, "input", &input_messages, "input")?;
397
398 if let Some(ref model) = annotated.model {
400 obj.insert("model".into(), Json::String(model.clone()));
401 }
402
403 if let Some(ref params) = annotated.params {
405 overlay_generation_params(obj, params);
406 }
407
408 if let Some(ref tools) = annotated.tools {
410 insert_serialized(obj, "tools", tools, "tools")?;
411 }
412
413 if let Some(ref tool_choice) = annotated.tool_choice {
415 insert_serialized(obj, "tool_choice", tool_choice, "tool_choice")?;
416 }
417
418 for (k, v) in &annotated.extra {
420 obj.insert(k.clone(), v.clone());
421 }
422
423 Ok(LlmRequest {
424 headers: original.headers.clone(),
425 content,
426 })
427 }
428}
429
430#[cfg(test)]
435#[path = "../../tests/unit/codec/openai_responses_tests.rs"]
436mod tests;