Skip to main content

nemo_flow/codec/
openai_responses.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Built-in codec for the OpenAI Responses API.
5//!
6//! Implements [`LlmCodec`] (request decode/encode) and [`LlmResponseCodec`]
7//! (response decode) for the OpenAI Responses API format.
8//!
9//! The Responses API differs significantly from Chat Completions:
10//! - **Response**: Heterogeneous `output` array (message, function_call, reasoning)
11//!   instead of `choices[0].message`.
12//! - **Finish reason**: Derived from `status` + `incomplete_details.reason`
13//!   instead of `finish_reason` field.
14//! - **Request**: Uses `input` (string or array) instead of `messages`, and
15//!   `instructions` (top-level) instead of system message.
16//! - **Max tokens**: `max_output_tokens` instead of `max_tokens`.
17
18use serde::Deserialize;
19
20use crate::api::llm::LlmRequest;
21use crate::error::{FlowError, Result};
22use crate::json::Json;
23
24use super::request::{
25    AnnotatedLlmRequest, GenerationParams, Message, MessageContent, ToolChoice, ToolDefinition,
26};
27use super::response::{
28    AnnotatedLlmResponse, ApiSpecificResponse, FinishReason, ResponseToolCall, Usage,
29};
30use super::traits::{LlmCodec, LlmResponseCodec};
31
32// ---------------------------------------------------------------------------
33// Public codec struct
34// ---------------------------------------------------------------------------
35
36/// Built-in codec for the OpenAI Responses API.
37pub struct OpenAIResponsesCodec;
38
39// ---------------------------------------------------------------------------
40// Private intermediate serde structs for response decode
41// ---------------------------------------------------------------------------
42
43#[derive(Deserialize)]
44struct RawResponsesResponse {
45    id: Option<String>,
46    model: Option<String>,
47    status: Option<String>,
48    output: Option<Vec<Json>>,
49    usage: Option<RawResponsesUsage>,
50    incomplete_details: Option<Json>,
51    #[serde(flatten)]
52    extra: serde_json::Map<String, Json>,
53}
54
55#[derive(Deserialize)]
56struct RawResponsesUsage {
57    input_tokens: Option<u64>,
58    output_tokens: Option<u64>,
59    total_tokens: Option<u64>,
60    input_tokens_details: Option<RawInputTokensDetails>,
61}
62
63#[derive(Deserialize)]
64struct RawInputTokensDetails {
65    cached_tokens: Option<u64>,
66}
67
68// ---------------------------------------------------------------------------
69// Helper functions
70// ---------------------------------------------------------------------------
71
72/// Map Responses API `status` + `incomplete_details` to normalized [`FinishReason`].
73fn map_responses_finish_reason(
74    status: Option<&str>,
75    incomplete_details: Option<&Json>,
76) -> Option<FinishReason> {
77    let incomplete_reason = incomplete_details
78        .and_then(|d| d.get("reason"))
79        .and_then(|r| r.as_str());
80
81    match status {
82        Some("completed") => Some(FinishReason::Complete),
83        Some("incomplete") => match incomplete_reason {
84            Some("max_output_tokens") => Some(FinishReason::Length),
85            Some("content_filter") => Some(FinishReason::ContentFilter),
86            Some(other) => Some(FinishReason::Unknown(other.to_string())),
87            None => Some(FinishReason::Unknown("incomplete".to_string())),
88        },
89        Some(other) => Some(FinishReason::Unknown(other.to_string())),
90        None => None,
91    }
92}
93
94/// Parse OpenAI tool call arguments from JSON string to [`Json`] value.
95///
96/// Falls back to [`Json::String`] if parsing fails (malformed model output).
97fn parse_arguments(arguments: &str) -> Json {
98    serde_json::from_str(arguments).unwrap_or_else(|_| Json::String(arguments.to_string()))
99}
100
101/// Keys that are modeled in [`AnnotatedLlmRequest`] and should NOT go into `extra`.
102const MODELED_REQUEST_KEYS: &[&str] = &[
103    "input",
104    "instructions",
105    "model",
106    "max_output_tokens",
107    "temperature",
108    "top_p",
109    "tools",
110    "tool_choice",
111];
112
113/// Helper to construct a [`Json`] number from an `f64`.
114fn json_f64(v: f64) -> Json {
115    serde_json::Number::from_f64(v)
116        .map(Json::Number)
117        .unwrap_or(Json::Null)
118}
119
120fn collect_output_parts(items: Option<&[Json]>) -> (Vec<String>, Vec<ResponseToolCall>) {
121    let mut text_parts = Vec::new();
122    let mut tool_calls = Vec::new();
123
124    if let Some(items) = items {
125        for item in items {
126            collect_output_item(item, &mut text_parts, &mut tool_calls);
127        }
128    }
129
130    (text_parts, tool_calls)
131}
132
133fn collect_output_item(
134    item: &Json,
135    text_parts: &mut Vec<String>,
136    tool_calls: &mut Vec<ResponseToolCall>,
137) {
138    match item
139        .get("type")
140        .and_then(|value| value.as_str())
141        .unwrap_or("")
142    {
143        "message" => collect_message_text_parts(item, text_parts),
144        "function_call" => tool_calls.push(parse_function_call(item)),
145        _ => {}
146    }
147}
148
149fn collect_message_text_parts(item: &Json, text_parts: &mut Vec<String>) {
150    let Some(content) = item.get("content").and_then(|value| value.as_array()) else {
151        return;
152    };
153
154    for block in content {
155        if let Some(text) = output_text_block(block) {
156            text_parts.push(text);
157        }
158    }
159}
160
161fn output_text_block(block: &Json) -> Option<String> {
162    (block.get("type").and_then(|value| value.as_str()) == Some("output_text"))
163        .then(|| block.get("text").and_then(|value| value.as_str()))
164        .flatten()
165        .map(str::to_string)
166}
167
168fn parse_function_call(item: &Json) -> ResponseToolCall {
169    ResponseToolCall {
170        id: item
171            .get("call_id")
172            .and_then(|value| value.as_str())
173            .unwrap_or("")
174            .to_string(),
175        name: item
176            .get("name")
177            .and_then(|value| value.as_str())
178            .unwrap_or("")
179            .to_string(),
180        arguments: item
181            .get("arguments")
182            .and_then(|value| value.as_str())
183            .map(parse_arguments)
184            .unwrap_or(Json::Object(serde_json::Map::new())),
185    }
186}
187
188fn message_from_text_parts(text_parts: Vec<String>) -> Option<MessageContent> {
189    match text_parts.as_slice() {
190        [] => None,
191        [text] => Some(MessageContent::Text(text.clone())),
192        _ => Some(MessageContent::Text(text_parts.join("\n"))),
193    }
194}
195
196fn optional_vec<T>(items: Vec<T>) -> Option<Vec<T>> {
197    (!items.is_empty()).then_some(items)
198}
199
200fn split_system_and_input_messages(messages: &[Message]) -> (Option<String>, Vec<&Message>) {
201    let mut system_text = None;
202    let mut input_messages = Vec::new();
203
204    for msg in messages {
205        match msg {
206            Message::System { content, .. } => {
207                if let MessageContent::Text(text) = content {
208                    system_text = Some(text.clone());
209                }
210            }
211            other => input_messages.push(other),
212        }
213    }
214
215    (system_text, input_messages)
216}
217
218fn set_or_remove_string(obj: &mut serde_json::Map<String, Json>, key: &str, value: Option<String>) {
219    if let Some(value) = value {
220        obj.insert(key.into(), Json::String(value));
221    } else {
222        obj.remove(key);
223    }
224}
225
226fn insert_serialized<T: serde::Serialize>(
227    obj: &mut serde_json::Map<String, Json>,
228    key: &str,
229    value: &T,
230    context: &str,
231) -> Result<()> {
232    let json = serde_json::to_value(value)
233        .map_err(|e| FlowError::Internal(format!("OpenAI Responses {context} encode: {e}")))?;
234    obj.insert(key.into(), json);
235    Ok(())
236}
237
238fn overlay_generation_params(obj: &mut serde_json::Map<String, Json>, params: &GenerationParams) {
239    if let Some(temp) = params.temperature {
240        obj.insert("temperature".into(), json_f64(temp));
241    }
242    if let Some(top_p) = params.top_p {
243        obj.insert("top_p".into(), json_f64(top_p));
244    }
245    if let Some(max_tokens) = params.max_tokens {
246        obj.insert("max_output_tokens".into(), Json::from(max_tokens));
247        obj.remove("max_tokens");
248    }
249}
250
251// ---------------------------------------------------------------------------
252// LlmResponseCodec implementation
253// ---------------------------------------------------------------------------
254
255impl LlmResponseCodec for OpenAIResponsesCodec {
256    fn decode_response(&self, response: &Json) -> Result<AnnotatedLlmResponse> {
257        let raw: RawResponsesResponse = serde_json::from_value(response.clone())
258            .map_err(|e| FlowError::Internal(format!("OpenAI Responses response decode: {e}")))?;
259
260        let all_output_items = raw.output.clone();
261        let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref());
262        let message = message_from_text_parts(text_parts);
263        let tool_calls = optional_vec(tool_calls);
264
265        // Map finish reason from status + incomplete_details.
266        let finish_reason =
267            map_responses_finish_reason(raw.status.as_deref(), raw.incomplete_details.as_ref());
268
269        // Map usage.
270        let usage = raw.usage.map(|u| Usage {
271            prompt_tokens: u.input_tokens,
272            completion_tokens: u.output_tokens,
273            total_tokens: u.total_tokens,
274            cache_read_tokens: u.input_tokens_details.and_then(|d| d.cached_tokens),
275            cache_write_tokens: None,
276        });
277
278        // Build API-specific fields.
279        let api_specific = Some(ApiSpecificResponse::OpenAIResponses {
280            output_items: all_output_items,
281            status: raw.status,
282            incomplete_details: raw.incomplete_details,
283        });
284
285        Ok(AnnotatedLlmResponse {
286            id: raw.id,
287            model: raw.model,
288            message,
289            tool_calls,
290            finish_reason,
291            usage,
292            api_specific,
293            extra: raw.extra,
294        })
295    }
296}
297
298// ---------------------------------------------------------------------------
299// LlmCodec implementation
300// ---------------------------------------------------------------------------
301
302impl LlmCodec for OpenAIResponsesCodec {
303    fn decode(&self, request: &LlmRequest) -> Result<AnnotatedLlmRequest> {
304        let obj = request
305            .content
306            .as_object()
307            .ok_or_else(|| FlowError::Internal("request content is not an object".into()))?;
308
309        let mut messages: Vec<Message> = Vec::new();
310
311        // Extract instructions -> system message (first).
312        if let Some(instructions) = obj.get("instructions").and_then(|v| v.as_str()) {
313            messages.push(Message::System {
314                content: MessageContent::Text(instructions.to_string()),
315                name: None,
316            });
317        }
318
319        // Extract input.
320        if let Some(input) = obj.get("input") {
321            if let Some(s) = input.as_str() {
322                // Input is a simple string -> single User message.
323                messages.push(Message::User {
324                    content: MessageContent::Text(s.to_string()),
325                    name: None,
326                });
327            } else if input.is_array() {
328                // Input is an array of message items.
329                let input_messages: Vec<Message> =
330                    serde_json::from_value(input.clone()).unwrap_or_default();
331                messages.extend(input_messages);
332            }
333        }
334
335        // Extract model.
336        let model = obj.get("model").and_then(|v| v.as_str()).map(String::from);
337
338        // Extract generation params.
339        let temperature = obj.get("temperature").and_then(|v| v.as_f64());
340        let top_p = obj.get("top_p").and_then(|v| v.as_f64());
341        let max_tokens = obj.get("max_output_tokens").and_then(|v| v.as_u64());
342        // Responses API does not support stop sequences.
343
344        let params = if temperature.is_some() || max_tokens.is_some() || top_p.is_some() {
345            Some(GenerationParams {
346                temperature,
347                max_tokens,
348                top_p,
349                stop: None,
350            })
351        } else {
352            None
353        };
354
355        // Extract tools.
356        let tools: Option<Vec<ToolDefinition>> = obj
357            .get("tools")
358            .map(|v| serde_json::from_value(v.clone()))
359            .transpose()
360            .map_err(|e| FlowError::Internal(format!("OpenAI Responses tools decode: {e}")))?;
361
362        // Extract tool_choice.
363        let tool_choice: Option<ToolChoice> = obj
364            .get("tool_choice")
365            .map(|v| serde_json::from_value(v.clone()))
366            .transpose()
367            .map_err(|e| {
368                FlowError::Internal(format!("OpenAI Responses tool_choice decode: {e}"))
369            })?;
370
371        // Collect extra fields (keys not in MODELED_REQUEST_KEYS).
372        let extra: serde_json::Map<String, Json> = obj
373            .iter()
374            .filter(|(k, _)| !MODELED_REQUEST_KEYS.contains(&k.as_str()))
375            .map(|(k, v)| (k.clone(), v.clone()))
376            .collect();
377
378        Ok(AnnotatedLlmRequest {
379            messages,
380            model,
381            params,
382            tools,
383            tool_choice,
384            extra,
385        })
386    }
387
388    fn encode(&self, annotated: &AnnotatedLlmRequest, original: &LlmRequest) -> Result<LlmRequest> {
389        let mut content = original.content.clone();
390        let obj = content
391            .as_object_mut()
392            .ok_or_else(|| FlowError::Internal("original content is not an object".into()))?;
393
394        let (system_text, input_messages) = split_system_and_input_messages(&annotated.messages);
395        set_or_remove_string(obj, "instructions", system_text);
396        insert_serialized(obj, "input", &input_messages, "input")?;
397
398        // Overlay model if present.
399        if let Some(ref model) = annotated.model {
400            obj.insert("model".into(), Json::String(model.clone()));
401        }
402
403        // Overlay generation params.
404        if let Some(ref params) = annotated.params {
405            overlay_generation_params(obj, params);
406        }
407
408        // Overlay tools if present.
409        if let Some(ref tools) = annotated.tools {
410            insert_serialized(obj, "tools", tools, "tools")?;
411        }
412
413        // Overlay tool_choice if present.
414        if let Some(ref tool_choice) = annotated.tool_choice {
415            insert_serialized(obj, "tool_choice", tool_choice, "tool_choice")?;
416        }
417
418        // Merge extra fields back.
419        for (k, v) in &annotated.extra {
420            obj.insert(k.clone(), v.clone());
421        }
422
423        Ok(LlmRequest {
424            headers: original.headers.clone(),
425            content,
426        })
427    }
428}
429
430// ---------------------------------------------------------------------------
431// Tests
432// ---------------------------------------------------------------------------
433
434#[cfg(test)]
435#[path = "../../tests/unit/codec/openai_responses_tests.rs"]
436mod tests;