Skip to main content

nemo_flow/codec/
openai_responses.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Built-in codec for the OpenAI Responses API.
5//!
6//! Implements [`LlmCodec`] (request decode/encode) and [`LlmResponseCodec`]
7//! (response decode) for the OpenAI Responses API format.
8//!
9//! The Responses API differs significantly from Chat Completions:
10//! - **Response**: Heterogeneous `output` array (message, function_call, reasoning)
11//!   instead of `choices[0].message`.
12//! - **Finish reason**: Derived from `status` + `incomplete_details.reason`
13//!   instead of `finish_reason` field.
14//! - **Request**: Uses `input` (string or array) instead of `messages`, and
15//!   `instructions` (top-level) instead of system message.
16//! - **Max tokens**: `max_output_tokens` instead of `max_tokens`.
17
18use serde::Deserialize;
19
20use crate::api::llm::LlmRequest;
21use crate::error::{FlowError, Result};
22use crate::json::Json;
23
24use super::request::{
25    AnnotatedLlmRequest, GenerationParams, Message, MessageContent, ToolChoice, ToolChoiceFunction,
26    ToolChoiceFunctionName, ToolDefinition,
27};
28use super::response::{
29    AnnotatedLlmResponse, ApiSpecificResponse, FinishReason, ResponseToolCall, Usage,
30};
31use super::traits::{LlmCodec, LlmResponseCodec};
32
33// ---------------------------------------------------------------------------
34// Public codec struct
35// ---------------------------------------------------------------------------
36
37/// Built-in codec for the OpenAI Responses API.
38pub struct OpenAIResponsesCodec;
39
40// ---------------------------------------------------------------------------
41// Private intermediate serde structs for response decode
42// ---------------------------------------------------------------------------
43
44#[derive(Deserialize)]
45struct RawResponsesResponse {
46    id: Option<String>,
47    model: Option<String>,
48    status: Option<String>,
49    output: Option<Vec<Json>>,
50    usage: Option<RawResponsesUsage>,
51    incomplete_details: Option<Json>,
52    previous_response_id: Option<String>,
53    store: Option<bool>,
54    service_tier: Option<String>,
55    truncation: Option<Json>,
56    reasoning: Option<Json>,
57    #[serde(flatten)]
58    extra: serde_json::Map<String, Json>,
59}
60
61#[derive(Deserialize)]
62struct RawResponsesUsage {
63    input_tokens: Option<u64>,
64    output_tokens: Option<u64>,
65    total_tokens: Option<u64>,
66    input_tokens_details: Option<RawInputTokensDetails>,
67    output_tokens_details: Option<RawOutputTokensDetails>,
68}
69
70#[derive(Deserialize, Clone)]
71struct RawInputTokensDetails {
72    cached_tokens: Option<u64>,
73    #[serde(flatten)]
74    extra: serde_json::Map<String, Json>,
75}
76
77#[derive(Deserialize, Clone)]
78struct RawOutputTokensDetails {
79    reasoning_tokens: Option<u64>,
80    #[serde(flatten)]
81    extra: serde_json::Map<String, Json>,
82}
83
84// ---------------------------------------------------------------------------
85// Helper functions
86// ---------------------------------------------------------------------------
87
88/// Map Responses API `status` + `incomplete_details` to normalized [`FinishReason`].
89fn map_responses_finish_reason(
90    status: Option<&str>,
91    incomplete_details: Option<&Json>,
92) -> Option<FinishReason> {
93    let incomplete_reason = incomplete_details
94        .and_then(|d| d.get("reason"))
95        .and_then(|r| r.as_str());
96
97    match status {
98        Some("completed") => Some(FinishReason::Complete),
99        Some("incomplete") => match incomplete_reason {
100            Some("max_output_tokens") => Some(FinishReason::Length),
101            Some("content_filter") => Some(FinishReason::ContentFilter),
102            Some(other) => Some(FinishReason::Unknown(other.to_string())),
103            None => Some(FinishReason::Unknown("incomplete".to_string())),
104        },
105        Some(other) => Some(FinishReason::Unknown(other.to_string())),
106        None => None,
107    }
108}
109
110/// Parse OpenAI tool call arguments from JSON string to [`Json`] value.
111///
112/// Falls back to [`Json::String`] if parsing fails (malformed model output).
113fn parse_arguments(arguments: &str) -> Json {
114    serde_json::from_str(arguments).unwrap_or_else(|_| Json::String(arguments.to_string()))
115}
116
117fn input_tokens_details_to_json(details: &RawInputTokensDetails) -> Json {
118    let mut obj = serde_json::Map::new();
119    if let Some(cached_tokens) = details.cached_tokens {
120        obj.insert("cached_tokens".into(), Json::from(cached_tokens));
121    }
122    obj.extend(details.extra.clone());
123    Json::Object(obj)
124}
125
126fn output_tokens_details_to_json(details: &RawOutputTokensDetails) -> Json {
127    let mut obj = serde_json::Map::new();
128    if let Some(reasoning_tokens) = details.reasoning_tokens {
129        obj.insert("reasoning_tokens".into(), Json::from(reasoning_tokens));
130    }
131    obj.extend(details.extra.clone());
132    Json::Object(obj)
133}
134
135/// Keys that are modeled in [`AnnotatedLlmRequest`] and should NOT go into `extra`.
136const MODELED_REQUEST_KEYS: &[&str] = &[
137    "input",
138    "instructions",
139    "model",
140    "max_output_tokens",
141    "temperature",
142    "top_p",
143    "tools",
144    "tool_choice",
145    "store",
146    "previous_response_id",
147    "truncation",
148    "reasoning",
149    "include",
150    "user",
151    "metadata",
152    "service_tier",
153    "parallel_tool_calls",
154    "max_tool_calls",
155    "top_logprobs",
156    "stream",
157];
158const UNPARSED_INPUT_ITEMS_KEY: &str = "_openai_responses_unparsed_input_items";
159
160/// Helper to construct a [`Json`] number from an `f64`.
161fn json_f64(v: f64) -> Json {
162    serde_json::Number::from_f64(v)
163        .map(Json::Number)
164        .unwrap_or(Json::Null)
165}
166
167fn collect_output_parts(items: Option<&[Json]>) -> (Vec<String>, Vec<ResponseToolCall>) {
168    let mut text_parts = Vec::new();
169    let mut tool_calls = Vec::new();
170
171    if let Some(items) = items {
172        for item in items {
173            collect_output_item(item, &mut text_parts, &mut tool_calls);
174        }
175    }
176
177    (text_parts, tool_calls)
178}
179
180fn collect_output_item(
181    item: &Json,
182    text_parts: &mut Vec<String>,
183    tool_calls: &mut Vec<ResponseToolCall>,
184) {
185    match item
186        .get("type")
187        .and_then(|value| value.as_str())
188        .unwrap_or("")
189    {
190        "message" => collect_message_text_parts(item, text_parts),
191        "function_call" => tool_calls.push(parse_function_call(item)),
192        _ => {}
193    }
194}
195
196fn collect_message_text_parts(item: &Json, text_parts: &mut Vec<String>) {
197    let Some(content) = item.get("content").and_then(|value| value.as_array()) else {
198        return;
199    };
200
201    for block in content {
202        if let Some(text) = output_text_block(block) {
203            text_parts.push(text);
204        }
205    }
206}
207
208fn output_text_block(block: &Json) -> Option<String> {
209    (block.get("type").and_then(|value| value.as_str()) == Some("output_text"))
210        .then(|| block.get("text").and_then(|value| value.as_str()))
211        .flatten()
212        .map(str::to_string)
213}
214
215fn parse_function_call(item: &Json) -> ResponseToolCall {
216    ResponseToolCall {
217        id: item
218            .get("call_id")
219            .and_then(|value| value.as_str())
220            .unwrap_or("")
221            .to_string(),
222        name: item
223            .get("name")
224            .and_then(|value| value.as_str())
225            .unwrap_or("")
226            .to_string(),
227        arguments: item
228            .get("arguments")
229            .and_then(|value| value.as_str())
230            .map(parse_arguments)
231            .unwrap_or(Json::Object(serde_json::Map::new())),
232    }
233}
234
235fn message_from_text_parts(text_parts: Vec<String>) -> Option<MessageContent> {
236    match text_parts.as_slice() {
237        [] => None,
238        [text] => Some(MessageContent::Text(text.clone())),
239        _ => Some(MessageContent::Text(text_parts.join("\n"))),
240    }
241}
242
243fn optional_vec<T>(items: Vec<T>) -> Option<Vec<T>> {
244    (!items.is_empty()).then_some(items)
245}
246
247fn split_system_and_input_messages(messages: &[Message]) -> (Option<String>, Vec<&Message>) {
248    let mut system_text = None;
249    let mut input_messages = Vec::new();
250
251    for msg in messages {
252        match msg {
253            Message::System { content, .. } => {
254                if let MessageContent::Text(text) = content {
255                    system_text = Some(text.clone());
256                }
257            }
258            other => input_messages.push(other),
259        }
260    }
261
262    (system_text, input_messages)
263}
264
265fn set_or_remove_string(obj: &mut serde_json::Map<String, Json>, key: &str, value: Option<String>) {
266    if let Some(value) = value {
267        obj.insert(key.into(), Json::String(value));
268    } else {
269        obj.remove(key);
270    }
271}
272
273fn insert_serialized<T: serde::Serialize>(
274    obj: &mut serde_json::Map<String, Json>,
275    key: &str,
276    value: &T,
277    context: &str,
278) -> Result<()> {
279    let json = serde_json::to_value(value)
280        .map_err(|e| FlowError::Internal(format!("OpenAI Responses {context} encode: {e}")))?;
281    obj.insert(key.into(), json);
282    Ok(())
283}
284
285fn overlay_generation_params(obj: &mut serde_json::Map<String, Json>, params: &GenerationParams) {
286    if let Some(temp) = params.temperature {
287        obj.insert("temperature".into(), json_f64(temp));
288    }
289    if let Some(top_p) = params.top_p {
290        obj.insert("top_p".into(), json_f64(top_p));
291    }
292    if let Some(max_tokens) = params.max_tokens {
293        obj.insert("max_output_tokens".into(), Json::from(max_tokens));
294        obj.remove("max_tokens");
295    }
296}
297
298fn encode_openai_responses_input(
299    obj: &mut serde_json::Map<String, Json>,
300    annotated: &AnnotatedLlmRequest,
301) -> Result<()> {
302    let (system_text, input_messages) = split_system_and_input_messages(&annotated.messages);
303    set_or_remove_string(obj, "instructions", system_text);
304    if let Some(raw_input_items) = annotated.extra.get(UNPARSED_INPUT_ITEMS_KEY) {
305        obj.insert("input".into(), raw_input_items.clone());
306    } else {
307        insert_serialized(obj, "input", &input_messages, "input")?;
308    }
309    Ok(())
310}
311
312fn encode_openai_responses_tools(
313    obj: &mut serde_json::Map<String, Json>,
314    annotated: &AnnotatedLlmRequest,
315) -> Result<()> {
316    if let Some(ref tools) = annotated.tools {
317        insert_serialized(obj, "tools", tools, "tools")?;
318    }
319    if let Some(ref tool_choice) = annotated.tool_choice {
320        insert_serialized(obj, "tool_choice", tool_choice, "tool_choice")?;
321    }
322    Ok(())
323}
324
325fn overlay_openai_responses_fields(
326    obj: &mut serde_json::Map<String, Json>,
327    annotated: &AnnotatedLlmRequest,
328) {
329    if let Some(ref model) = annotated.model {
330        obj.insert("model".into(), Json::String(model.clone()));
331    }
332    overlay_openai_responses_json_fields(obj, annotated);
333    overlay_openai_responses_string_fields(obj, annotated);
334    overlay_openai_responses_bool_fields(obj, annotated);
335    overlay_openai_responses_u64_fields(obj, annotated);
336}
337
338fn overlay_openai_responses_json_fields(
339    obj: &mut serde_json::Map<String, Json>,
340    annotated: &AnnotatedLlmRequest,
341) {
342    for (key, value) in [
343        ("truncation", &annotated.truncation),
344        ("reasoning", &annotated.reasoning),
345        ("include", &annotated.include),
346        ("metadata", &annotated.metadata),
347    ] {
348        if let Some(value) = value {
349            obj.insert(key.into(), value.clone());
350        }
351    }
352}
353
354fn overlay_openai_responses_string_fields(
355    obj: &mut serde_json::Map<String, Json>,
356    annotated: &AnnotatedLlmRequest,
357) {
358    for (key, value) in [
359        ("previous_response_id", &annotated.previous_response_id),
360        ("user", &annotated.user),
361        ("service_tier", &annotated.service_tier),
362    ] {
363        if let Some(value) = value {
364            obj.insert(key.into(), Json::String(value.clone()));
365        }
366    }
367}
368
369fn overlay_openai_responses_bool_fields(
370    obj: &mut serde_json::Map<String, Json>,
371    annotated: &AnnotatedLlmRequest,
372) {
373    for (key, value) in [
374        ("store", annotated.store),
375        ("parallel_tool_calls", annotated.parallel_tool_calls),
376        ("stream", annotated.stream),
377    ] {
378        if let Some(value) = value {
379            obj.insert(key.into(), Json::Bool(value));
380        }
381    }
382}
383
384fn overlay_openai_responses_u64_fields(
385    obj: &mut serde_json::Map<String, Json>,
386    annotated: &AnnotatedLlmRequest,
387) {
388    for (key, value) in [
389        ("max_output_tokens", annotated.max_output_tokens),
390        ("max_tool_calls", annotated.max_tool_calls),
391        ("top_logprobs", annotated.top_logprobs),
392    ] {
393        if let Some(value) = value {
394            obj.insert(key.into(), Json::from(value));
395        }
396    }
397}
398
399fn merge_openai_responses_extra_fields(
400    obj: &mut serde_json::Map<String, Json>,
401    extra: &serde_json::Map<String, Json>,
402) {
403    for (k, v) in extra {
404        if k != UNPARSED_INPUT_ITEMS_KEY {
405            obj.insert(k.clone(), v.clone());
406        }
407    }
408}
409
410fn decode_openai_or_anthropic_tool_choice(value: &Json) -> Option<ToolChoice> {
411    if let Ok(parsed) = serde_json::from_value::<ToolChoice>(value.clone()) {
412        return Some(parsed);
413    }
414
415    let obj = value.as_object()?;
416    match obj.get("type").and_then(|v| v.as_str()) {
417        Some("auto") => Some(ToolChoice::Auto),
418        Some("any") => Some(ToolChoice::Required),
419        Some("none") => Some(ToolChoice::None),
420        Some("tool") => {
421            let name = obj.get("name").and_then(|v| v.as_str())?.to_string();
422            Some(ToolChoice::Specific(ToolChoiceFunction {
423                choice_type: "function".to_string(),
424                function: ToolChoiceFunctionName { name },
425            }))
426        }
427        _ => None,
428    }
429}
430
431fn decode_openai_or_anthropic_parallel_tool_calls(
432    obj: &serde_json::Map<String, Json>,
433) -> Option<bool> {
434    if let Some(value) = obj.get("parallel_tool_calls").and_then(|v| v.as_bool()) {
435        return Some(value);
436    }
437    let tool_choice = obj.get("tool_choice")?.as_object()?;
438    tool_choice
439        .get("disable_parallel_tool_use")
440        .and_then(|v| v.as_bool())
441        .map(|disabled| !disabled)
442}
443
444// ---------------------------------------------------------------------------
445// LlmResponseCodec implementation
446// ---------------------------------------------------------------------------
447
448impl LlmResponseCodec for OpenAIResponsesCodec {
449    fn decode_response(&self, response: &Json) -> Result<AnnotatedLlmResponse> {
450        let raw: RawResponsesResponse = serde_json::from_value(response.clone())
451            .map_err(|e| FlowError::Internal(format!("OpenAI Responses response decode: {e}")))?;
452
453        let all_output_items = raw.output.clone();
454        let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref());
455        let message = message_from_text_parts(text_parts);
456        let tool_calls = optional_vec(tool_calls);
457
458        // Map finish reason from status + incomplete_details.
459        let finish_reason =
460            map_responses_finish_reason(raw.status.as_deref(), raw.incomplete_details.as_ref());
461
462        let input_tokens_details = raw.usage.as_ref().and_then(|u| {
463            u.input_tokens_details
464                .as_ref()
465                .map(input_tokens_details_to_json)
466        });
467        let output_tokens_details = raw.usage.as_ref().and_then(|u| {
468            u.output_tokens_details
469                .as_ref()
470                .map(output_tokens_details_to_json)
471        });
472
473        // Map usage.
474        let usage = raw.usage.map(|u| Usage {
475            prompt_tokens: u.input_tokens,
476            completion_tokens: u.output_tokens,
477            total_tokens: u.total_tokens,
478            cache_read_tokens: u
479                .input_tokens_details
480                .as_ref()
481                .and_then(|d| d.cached_tokens),
482            cache_write_tokens: None,
483        });
484
485        // Build API-specific fields.
486        let api_specific = Some(ApiSpecificResponse::OpenAIResponses {
487            output_items: all_output_items,
488            status: raw.status,
489            incomplete_details: raw.incomplete_details,
490            previous_response_id: raw.previous_response_id,
491            store: raw.store,
492            service_tier: raw.service_tier,
493            truncation: raw.truncation,
494            reasoning: raw.reasoning,
495            input_tokens_details,
496            output_tokens_details,
497        });
498
499        Ok(AnnotatedLlmResponse {
500            id: raw.id,
501            model: raw.model,
502            message,
503            tool_calls,
504            finish_reason,
505            usage,
506            api_specific,
507            extra: raw.extra,
508        })
509    }
510}
511
512// ---------------------------------------------------------------------------
513// LlmCodec implementation
514// ---------------------------------------------------------------------------
515
516impl LlmCodec for OpenAIResponsesCodec {
517    fn decode(&self, request: &LlmRequest) -> Result<AnnotatedLlmRequest> {
518        let obj = request
519            .content
520            .as_object()
521            .ok_or_else(|| FlowError::Internal("request content is not an object".into()))?;
522
523        let mut messages: Vec<Message> = Vec::new();
524        let mut preserved_unparsed_input: Option<Json> = None;
525
526        // Extract instructions -> system message (first).
527        if let Some(instructions) = obj.get("instructions").and_then(|v| v.as_str()) {
528            messages.push(Message::System {
529                content: MessageContent::Text(instructions.to_string()),
530                name: None,
531            });
532        }
533
534        // Extract input.
535        if let Some(input) = obj.get("input") {
536            if let Some(s) = input.as_str() {
537                // Input is a simple string -> single User message.
538                messages.push(Message::User {
539                    content: MessageContent::Text(s.to_string()),
540                    name: None,
541                });
542            } else if input.is_array() {
543                // Strict-first parse to avoid partial normalized state.
544                match serde_json::from_value::<Vec<Message>>(input.clone()) {
545                    Ok(input_messages) => messages.extend(input_messages),
546                    Err(_) => {
547                        // Preserve full original array for lossless handling.
548                        preserved_unparsed_input = Some(input.clone());
549                    }
550                }
551            }
552        }
553
554        // Extract model.
555        let model = obj.get("model").and_then(|v| v.as_str()).map(String::from);
556
557        // Extract generation params.
558        let temperature = obj.get("temperature").and_then(|v| v.as_f64());
559        let top_p = obj.get("top_p").and_then(|v| v.as_f64());
560        let max_tokens = obj.get("max_output_tokens").and_then(|v| v.as_u64());
561        // Responses API does not support stop sequences.
562
563        let params = if temperature.is_some() || max_tokens.is_some() || top_p.is_some() {
564            Some(GenerationParams {
565                temperature,
566                max_tokens,
567                top_p,
568                stop: None,
569            })
570        } else {
571            None
572        };
573
574        // Extract tools.
575        let tools: Option<Vec<ToolDefinition>> = obj
576            .get("tools")
577            .map(|v| serde_json::from_value(v.clone()))
578            .transpose()
579            .map_err(|e| FlowError::Internal(format!("OpenAI Responses tools decode: {e}")))?;
580
581        // Extract tool_choice.
582        let tool_choice: Option<ToolChoice> = obj
583            .get("tool_choice")
584            .and_then(decode_openai_or_anthropic_tool_choice);
585
586        // Collect extra fields (keys not in MODELED_REQUEST_KEYS).
587        let mut extra: serde_json::Map<String, Json> = obj
588            .iter()
589            .filter(|(k, _)| !MODELED_REQUEST_KEYS.contains(&k.as_str()))
590            .map(|(k, v)| (k.clone(), v.clone()))
591            .collect();
592        if let Some(input_items) = preserved_unparsed_input {
593            extra.insert(UNPARSED_INPUT_ITEMS_KEY.into(), input_items);
594        }
595
596        Ok(AnnotatedLlmRequest {
597            messages,
598            model,
599            params,
600            tools,
601            tool_choice,
602            store: obj.get("store").and_then(|v| v.as_bool()),
603            previous_response_id: obj
604                .get("previous_response_id")
605                .and_then(|v| v.as_str())
606                .map(String::from),
607            truncation: obj.get("truncation").cloned(),
608            reasoning: obj.get("reasoning").cloned(),
609            include: obj.get("include").cloned(),
610            user: obj.get("user").and_then(|v| v.as_str()).map(String::from),
611            metadata: obj.get("metadata").cloned(),
612            service_tier: obj
613                .get("service_tier")
614                .and_then(|v| v.as_str())
615                .map(String::from),
616            parallel_tool_calls: decode_openai_or_anthropic_parallel_tool_calls(obj),
617            max_output_tokens: obj.get("max_output_tokens").and_then(|v| v.as_u64()),
618            max_tool_calls: obj.get("max_tool_calls").and_then(|v| v.as_u64()),
619            top_logprobs: obj.get("top_logprobs").and_then(|v| v.as_u64()),
620            stream: obj.get("stream").and_then(|v| v.as_bool()),
621            extra,
622        })
623    }
624
625    fn encode(&self, annotated: &AnnotatedLlmRequest, original: &LlmRequest) -> Result<LlmRequest> {
626        let mut content = original.content.clone();
627        let obj = content
628            .as_object_mut()
629            .ok_or_else(|| FlowError::Internal("original content is not an object".into()))?;
630
631        encode_openai_responses_input(obj, annotated)?;
632        if let Some(ref params) = annotated.params {
633            overlay_generation_params(obj, params);
634        }
635        encode_openai_responses_tools(obj, annotated)?;
636        overlay_openai_responses_fields(obj, annotated);
637        merge_openai_responses_extra_fields(obj, &annotated.extra);
638
639        Ok(LlmRequest {
640            headers: original.headers.clone(),
641            content,
642        })
643    }
644}
645
646// ---------------------------------------------------------------------------
647// Streaming codec
648// ---------------------------------------------------------------------------
649
650/// Streaming counterpart to [`OpenAIResponsesCodec`].
651///
652/// Replays the OpenAI Responses SSE event sequence into the same JSON shape the API returns for a
653/// non-streaming request (`{id, model, status, output, usage, incomplete_details, ...}`). Once
654/// finalized, the assembled JSON can be fed back through [`OpenAIResponsesCodec::decode_response`]
655/// to produce the canonical [`AnnotatedLlmResponse`].
656///
657/// # Strategy
658///
659/// The Responses API is a relatively forgiving streaming target because every event carries
660/// either the full `response` snapshot (`response.created`, `response.in_progress`,
661/// `response.completed`, `response.failed`, `response.incomplete`) or the final-state output item
662/// (`response.output_item.done`). We:
663///
664/// 1. Track the latest `response` snapshot — terminal events (`completed`/`failed`/`incomplete`)
665///    typically carry the complete state including `output`, so we prefer those when present.
666/// 2. Track output items by `output_index` — `output_item.done` events deliver the final per-item
667///    state, used as a fallback when the terminal `response.output` is missing or empty.
668/// 3. Per-token `output_text.delta` and `function_call_arguments.delta` events are ignored
669///    because their content is redelivered in the matching `output_item.done` event. Skipping
670///    deltas keeps the codec resilient to schema additions and avoids double-accumulation.
671///
672/// Internal state lives behind `Arc<Mutex<...>>` so the `&self`-produced collector and finalizer
673/// closures share access. Each instance is single-use because [`LlmFinalizerFn`] consumes the
674/// finalize step.
675///
676/// [`AnnotatedLlmResponse`]: crate::codec::response::AnnotatedLlmResponse
677/// [`LlmFinalizerFn`]: crate::api::runtime::LlmFinalizerFn
678pub struct OpenAIResponsesStreamingCodec {
679    state: std::sync::Arc<std::sync::Mutex<OpenAIResponsesStreamingState>>,
680}
681
682impl OpenAIResponsesStreamingCodec {
683    /// Creates a fresh streaming codec with empty accumulator state.
684    pub fn new() -> Self {
685        Self {
686            state: std::sync::Arc::new(std::sync::Mutex::new(
687                OpenAIResponsesStreamingState::default(),
688            )),
689        }
690    }
691}
692
693impl Default for OpenAIResponsesStreamingCodec {
694    fn default() -> Self {
695        Self::new()
696    }
697}
698
699impl super::streaming::StreamingCodec for OpenAIResponsesStreamingCodec {
700    fn collector(&self) -> crate::api::runtime::LlmCollectorFn {
701        let state = std::sync::Arc::clone(&self.state);
702        Box::new(move |event: Json| -> Result<()> {
703            let mut guard = state
704                .lock()
705                .unwrap_or_else(|poisoned| poisoned.into_inner());
706            guard.observe(&event);
707            Ok(())
708        })
709    }
710
711    fn finalizer(&self) -> crate::api::runtime::LlmFinalizerFn {
712        let state = std::sync::Arc::clone(&self.state);
713        Box::new(move || -> Json {
714            let mut guard = state
715                .lock()
716                .unwrap_or_else(|poisoned| poisoned.into_inner());
717            std::mem::take(&mut *guard).finalize()
718        })
719    }
720}
721
722#[derive(Debug, Default)]
723struct OpenAIResponsesStreamingState {
724    /// Latest `response` snapshot from any event that carries one. Last write wins, so terminal
725    /// events with the complete state will end up here when they fire.
726    response: Option<serde_json::Map<String, Json>>,
727    /// Items keyed by `output_index`. Captured from `response.output_item.added` (initial) and
728    /// replaced on `response.output_item.done` (final). Used as a fallback for `output` when the
729    /// terminal `response` snapshot lacks it.
730    items: std::collections::BTreeMap<usize, Json>,
731}
732
733impl OpenAIResponsesStreamingState {
734    fn observe(&mut self, event: &Json) {
735        let event_type = event.get("type").and_then(Json::as_str).unwrap_or("");
736        match event_type {
737            "response.created"
738            | "response.in_progress"
739            | "response.completed"
740            | "response.failed"
741            | "response.incomplete" => self.observe_response_snapshot(event),
742            "response.output_item.added" | "response.output_item.done" => {
743                self.observe_output_item(event);
744            }
745            // response.output_text.delta, response.function_call_arguments.delta,
746            // response.content_part.added/done — content is redelivered in output_item.done, so we
747            // don't accumulate deltas. Unknown events are ignored.
748            _ => {}
749        }
750    }
751
752    fn observe_response_snapshot(&mut self, event: &Json) {
753        let Some(response) = event.get("response") else {
754            return;
755        };
756        if let Json::Object(map) = response {
757            self.response = Some(map.clone());
758        }
759    }
760
761    fn observe_output_item(&mut self, event: &Json) {
762        let Some(index) = event.get("output_index").and_then(Json::as_u64) else {
763            return;
764        };
765        let Some(item) = event.get("item") else {
766            return;
767        };
768        self.items.insert(index as usize, item.clone());
769    }
770
771    fn finalize(self) -> Json {
772        let mut output = self.response.unwrap_or_default();
773        // If the latest snapshot lacked `output` (or has an empty array because it came from an
774        // early `response.created` event), backfill from per-item accumulator. Terminal events
775        // typically carry the complete output, so this branch is a safety net for truncated
776        // streams or schemas that drop output from terminal events.
777        let snapshot_output_empty = output
778            .get("output")
779            .and_then(Json::as_array)
780            .map(|arr| arr.is_empty())
781            .unwrap_or(true);
782        if snapshot_output_empty && !self.items.is_empty() {
783            let items: Vec<Json> = self.items.into_values().collect();
784            output.insert("output".to_string(), Json::Array(items));
785        }
786        Json::Object(output)
787    }
788}
789
790// ---------------------------------------------------------------------------
791// Tests
792// ---------------------------------------------------------------------------
793
794#[cfg(test)]
795#[path = "../../tests/unit/codec/openai_responses_tests.rs"]
796mod tests;