Skip to main content

orchard/client/
responses.rs

1//! OpenAI Responses API surface for Orchard.
2//!
3//! This module maps PIE state-events carried over IPC into typed Responses API events.
4
5use std::collections::{BTreeMap, HashMap};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use rand::{distributions::Alphanumeric, Rng};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use tokio::sync::mpsc;
12
13use super::{tool_choice_to_string, Client, ClientError, Result};
14use crate::formatter::multimodal::{build_multimodal_layout, build_multimodal_messages};
15use crate::ipc::client::{ResponseDelta, ResponseStateEvent};
16use crate::ipc::serialization::PromptPayload;
17
18const RESPONSE_ID_PREFIX: &str = "resp_";
19const MESSAGE_ID_PREFIX: &str = "msg_";
20const FUNCTION_CALL_ID_PREFIX: &str = "fc_";
21const TOOL_CALL_ID_PREFIX: &str = "call_";
22
23fn current_timestamp() -> i64 {
24    SystemTime::now()
25        .duration_since(UNIX_EPOCH)
26        .map(|d| d.as_secs() as i64)
27        .unwrap_or(0)
28}
29
30fn generate_id(prefix: &str) -> String {
31    let random: String = rand::thread_rng()
32        .sample_iter(&Alphanumeric)
33        .take(22)
34        .map(char::from)
35        .collect();
36    format!("{}{}", prefix, random)
37}
38
39fn generate_response_id() -> String {
40    generate_id(RESPONSE_ID_PREFIX)
41}
42
43fn generate_message_id() -> String {
44    generate_id(MESSAGE_ID_PREFIX)
45}
46
47fn generate_function_call_id() -> String {
48    generate_id(FUNCTION_CALL_ID_PREFIX)
49}
50
51fn generate_tool_call_id() -> String {
52    generate_id(TOOL_CALL_ID_PREFIX)
53}
54
55fn finish_reason_to_incomplete(reason: Option<&str>) -> Option<IncompleteDetails> {
56    let normalized = reason.unwrap_or_default().to_lowercase();
57    if matches!(
58        normalized.as_str(),
59        "length" | "max_tokens" | "max_output_tokens"
60    ) {
61        Some(IncompleteDetails {
62            reason: "max_output_tokens".to_string(),
63        })
64    } else if normalized == "content_filter" {
65        Some(IncompleteDetails {
66            reason: "content_filter".to_string(),
67        })
68    } else {
69        None
70    }
71}
72
73fn value_to_string(value: &Value) -> String {
74    match value {
75        Value::String(text) => text.clone(),
76        _ => value.to_string(),
77    }
78}
79
80#[derive(Debug, Deserialize)]
81struct CompletedToolCallValue {
82    name: String,
83    #[serde(default = "default_tool_call_arguments")]
84    arguments: Value,
85}
86
87fn default_tool_call_arguments() -> Value {
88    Value::Object(Default::default())
89}
90
91fn parse_tool_call_completion_value(value: &Value) -> Option<(String, String)> {
92    let structured_value = match value {
93        Value::String(text) => serde_json::from_str::<Value>(text).ok()?,
94        Value::Object(_) => value.clone(),
95        _ => return None,
96    };
97
98    let tool_call: CompletedToolCallValue = serde_json::from_value(structured_value).ok()?;
99    Some((tool_call.name, tool_call.arguments.to_string()))
100}
101
102fn normalize_response_tool_schema(tool: &Value) -> Value {
103    let Some(obj) = tool.as_object() else {
104        return tool.clone();
105    };
106
107    let type_name = obj.get("type").and_then(Value::as_str);
108    let name = obj.get("name").and_then(Value::as_str);
109    let parameters = obj.get("parameters");
110
111    if type_name != Some("function") || name.is_none() || parameters.is_none() {
112        return tool.clone();
113    }
114
115    let name = name.unwrap_or_default();
116    let description = obj
117        .get("description")
118        .and_then(Value::as_str)
119        .unwrap_or(name);
120    let strict = obj.get("strict").and_then(Value::as_bool).unwrap_or(true);
121    let parameters = parameters
122        .cloned()
123        .unwrap_or(Value::Object(Default::default()));
124
125    serde_json::json!({
126        "name": name,
127        "type": "object",
128        "description": description,
129        "properties": {
130            "name": {"const": name},
131            "arguments": parameters,
132        },
133        "strict": strict,
134        "required": ["name", "arguments"],
135    })
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(untagged)]
140pub enum ResponsesInput {
141    Text(String),
142    Items(Vec<ResponseInputItem>),
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
146#[serde(tag = "type", rename_all = "snake_case")]
147pub enum ResponseInputItem {
148    Message {
149        role: String,
150        content: Value,
151        #[serde(default, skip_serializing_if = "Option::is_none")]
152        tool_calls: Option<Vec<Value>>,
153        #[serde(default, skip_serializing_if = "Option::is_none")]
154        tool_call_id: Option<String>,
155    },
156    FunctionCall {
157        call_id: String,
158        name: String,
159        arguments: String,
160    },
161    FunctionCallOutput {
162        call_id: String,
163        output: String,
164    },
165    Reasoning {
166        #[serde(default, skip_serializing_if = "Option::is_none")]
167        summary: Option<Vec<Value>>,
168        #[serde(default, skip_serializing_if = "Option::is_none")]
169        encrypted_content: Option<String>,
170    },
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct ResponsesRequest {
175    pub input: ResponsesInput,
176    #[serde(default)]
177    pub stream: bool,
178    #[serde(default)]
179    pub instructions: Option<String>,
180    #[serde(default)]
181    pub temperature: Option<f64>,
182    #[serde(default)]
183    pub top_p: Option<f64>,
184    #[serde(default)]
185    pub top_k: Option<i32>,
186    #[serde(default)]
187    pub min_p: Option<f64>,
188    #[serde(default)]
189    pub frequency_penalty: Option<f64>,
190    #[serde(default)]
191    pub presence_penalty: Option<f64>,
192    #[serde(default)]
193    pub max_output_tokens: Option<i32>,
194    #[serde(default)]
195    pub top_logprobs: Option<i32>,
196    #[serde(default)]
197    pub tools: Vec<Value>,
198    #[serde(default)]
199    pub tool_choice: Option<Value>,
200    #[serde(default)]
201    pub max_tool_calls: Option<i32>,
202    #[serde(default)]
203    pub text: Option<Value>,
204    #[serde(default)]
205    pub reasoning_effort: Option<String>,
206    #[serde(default)]
207    pub metadata: Option<HashMap<String, String>>,
208    #[serde(default)]
209    pub parallel_tool_calls: bool,
210}
211
212impl ResponsesRequest {
213    pub fn from_text(text: impl Into<String>) -> Self {
214        Self {
215            input: ResponsesInput::Text(text.into()),
216            stream: false,
217            instructions: None,
218            temperature: None,
219            top_p: None,
220            top_k: None,
221            min_p: None,
222            frequency_penalty: None,
223            presence_penalty: None,
224            max_output_tokens: None,
225            top_logprobs: None,
226            tools: Vec::new(),
227            tool_choice: None,
228            max_tool_calls: None,
229            text: None,
230            reasoning_effort: None,
231            metadata: None,
232            parallel_tool_calls: false,
233        }
234    }
235
236    fn to_messages(&self) -> Vec<HashMap<String, Value>> {
237        match &self.input {
238            ResponsesInput::Text(text) => {
239                let mut message = HashMap::new();
240                message.insert("role".to_string(), Value::String("user".to_string()));
241                message.insert("content".to_string(), Value::String(text.clone()));
242                vec![message]
243            }
244            ResponsesInput::Items(items) => {
245                let mut messages = Vec::new();
246                for item in items {
247                    match item {
248                        ResponseInputItem::Message {
249                            role,
250                            content,
251                            tool_calls,
252                            tool_call_id,
253                        } => {
254                            let mut message = HashMap::new();
255                            message.insert("role".to_string(), Value::String(role.clone()));
256                            message.insert("content".to_string(), content.clone());
257                            if let Some(calls) = tool_calls {
258                                message
259                                    .insert("tool_calls".to_string(), Value::Array(calls.clone()));
260                            }
261                            if let Some(call_id) = tool_call_id {
262                                message.insert(
263                                    "tool_call_id".to_string(),
264                                    Value::String(call_id.clone()),
265                                );
266                            }
267                            messages.push(message);
268                        }
269                        ResponseInputItem::FunctionCall {
270                            call_id,
271                            name,
272                            arguments,
273                        } => {
274                            let mut message = HashMap::new();
275                            message
276                                .insert("role".to_string(), Value::String("assistant".to_string()));
277                            message.insert("content".to_string(), Value::String(String::new()));
278                            message.insert(
279                                "tool_calls".to_string(),
280                                Value::Array(vec![serde_json::json!({
281                                    "id": call_id,
282                                    "type": "function",
283                                    "function": {
284                                        "name": name,
285                                        "arguments": arguments,
286                                    }
287                                })]),
288                            );
289                            messages.push(message);
290                        }
291                        ResponseInputItem::FunctionCallOutput { call_id, output } => {
292                            let mut message = HashMap::new();
293                            message.insert("role".to_string(), Value::String("tool".to_string()));
294                            message.insert("content".to_string(), Value::String(output.clone()));
295                            message
296                                .insert("tool_call_id".to_string(), Value::String(call_id.clone()));
297                            messages.push(message);
298                        }
299                        ResponseInputItem::Reasoning { .. } => {
300                            // Reasoning items are not directly representable in template messages.
301                        }
302                    }
303                }
304                messages
305            }
306        }
307    }
308}
309
310#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
311#[serde(rename_all = "snake_case")]
312#[derive(Default)]
313pub enum OutputStatus {
314    #[default]
315    Completed,
316    Incomplete,
317    InProgress,
318    Failed,
319}
320
321#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
322pub struct OutputTextContent {
323    #[serde(rename = "type")]
324    pub content_type: String,
325    pub text: String,
326    #[serde(default)]
327    pub annotations: Vec<Value>,
328    #[serde(default)]
329    pub logprobs: Vec<Value>,
330}
331
332impl OutputTextContent {
333    pub fn new(text: impl Into<String>) -> Self {
334        Self {
335            content_type: "output_text".to_string(),
336            text: text.into(),
337            annotations: Vec::new(),
338            logprobs: Vec::new(),
339        }
340    }
341}
342
343#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
344pub struct ReasoningContent {
345    #[serde(rename = "type")]
346    pub content_type: String,
347    pub text: String,
348}
349
350impl ReasoningContent {
351    pub fn new(text: impl Into<String>) -> Self {
352        Self {
353            content_type: "reasoning_text".to_string(),
354            text: text.into(),
355        }
356    }
357}
358
359#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
360pub struct ReasoningSummaryTextContent {
361    #[serde(rename = "type")]
362    pub content_type: String,
363    pub text: String,
364}
365
366#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
367pub struct OutputMessage {
368    #[serde(rename = "type")]
369    pub output_type: String,
370    pub id: String,
371    pub status: OutputStatus,
372    pub role: String,
373    pub content: Vec<OutputTextContent>,
374}
375
376#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
377pub struct OutputFunctionCall {
378    #[serde(rename = "type")]
379    pub output_type: String,
380    pub id: String,
381    pub call_id: String,
382    pub name: String,
383    pub arguments: String,
384    #[serde(default, skip_serializing_if = "Option::is_none")]
385    pub metadata: Option<Value>,
386    pub status: OutputStatus,
387}
388
389#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
390pub struct OutputReasoning {
391    #[serde(rename = "type")]
392    pub output_type: String,
393    pub id: String,
394    #[serde(default)]
395    pub status: OutputStatus,
396    #[serde(default)]
397    pub summary: Vec<ReasoningSummaryTextContent>,
398    #[serde(default)]
399    pub content: Vec<ReasoningContent>,
400    #[serde(default, skip_serializing_if = "Option::is_none")]
401    pub encrypted_content: Option<String>,
402}
403
404#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
405#[serde(untagged)]
406pub enum ResponseOutputItem {
407    Message(OutputMessage),
408    FunctionCall(OutputFunctionCall),
409    Reasoning(OutputReasoning),
410}
411
412impl ResponseOutputItem {
413    pub fn item_type(&self) -> &'static str {
414        match self {
415            Self::Message(_) => "message",
416            Self::FunctionCall(_) => "function_call",
417            Self::Reasoning(_) => "reasoning",
418        }
419    }
420}
421
422#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
423pub struct IncompleteDetails {
424    pub reason: String,
425}
426
427#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
428pub struct ResponseError {
429    pub code: String,
430    pub message: String,
431}
432
433#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
434pub struct InputTokensDetails {
435    pub cached_tokens: u32,
436}
437
438#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
439pub struct OutputTokensDetails {
440    pub reasoning_tokens: u32,
441}
442
443#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
444pub struct ResponseUsage {
445    pub input_tokens: u32,
446    pub output_tokens: u32,
447    pub total_tokens: u32,
448    #[serde(default, skip_serializing_if = "Option::is_none")]
449    pub input_tokens_details: Option<InputTokensDetails>,
450    #[serde(default, skip_serializing_if = "Option::is_none")]
451    pub output_tokens_details: Option<OutputTokensDetails>,
452}
453
454#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
455pub struct ResponseObject {
456    pub id: String,
457    pub object: String,
458    pub created_at: i64,
459    #[serde(default, skip_serializing_if = "Option::is_none")]
460    pub completed_at: Option<i64>,
461    pub status: OutputStatus,
462    #[serde(default, skip_serializing_if = "Option::is_none")]
463    pub incomplete_details: Option<IncompleteDetails>,
464    #[serde(default, skip_serializing_if = "Option::is_none")]
465    pub error: Option<ResponseError>,
466    pub model: String,
467    pub output: Vec<ResponseOutputItem>,
468    #[serde(default, skip_serializing_if = "Option::is_none")]
469    pub usage: Option<ResponseUsage>,
470    #[serde(default, skip_serializing_if = "Option::is_none")]
471    pub metadata: Option<HashMap<String, String>>,
472    pub parallel_tool_calls: bool,
473    #[serde(default, skip_serializing_if = "Option::is_none")]
474    pub temperature: Option<f64>,
475    #[serde(default, skip_serializing_if = "Option::is_none")]
476    pub top_p: Option<f64>,
477    #[serde(default, skip_serializing_if = "Option::is_none")]
478    pub presence_penalty: Option<f64>,
479    #[serde(default, skip_serializing_if = "Option::is_none")]
480    pub frequency_penalty: Option<f64>,
481    #[serde(default, skip_serializing_if = "Option::is_none")]
482    pub top_k: Option<i32>,
483    #[serde(default, skip_serializing_if = "Option::is_none")]
484    pub min_p: Option<f64>,
485    #[serde(default, skip_serializing_if = "Option::is_none")]
486    pub instructions: Option<String>,
487    #[serde(default, skip_serializing_if = "Option::is_none")]
488    pub max_output_tokens: Option<i32>,
489    #[serde(default, skip_serializing_if = "Option::is_none")]
490    pub top_logprobs: Option<i32>,
491    #[serde(default, skip_serializing_if = "Option::is_none")]
492    pub tool_choice: Option<Value>,
493    #[serde(default)]
494    pub tools: Vec<Value>,
495    #[serde(default, skip_serializing_if = "Option::is_none")]
496    pub max_tool_calls: Option<i32>,
497    #[serde(default, skip_serializing_if = "Option::is_none")]
498    pub text: Option<Value>,
499}
500
501#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
502pub struct ResponseSnapshot {
503    pub id: String,
504    pub object: String,
505    pub created_at: i64,
506    #[serde(default, skip_serializing_if = "Option::is_none")]
507    pub completed_at: Option<i64>,
508    pub status: OutputStatus,
509    #[serde(default, skip_serializing_if = "Option::is_none")]
510    pub incomplete_details: Option<IncompleteDetails>,
511    pub model: String,
512    pub output: Vec<ResponseOutputItem>,
513    #[serde(default, skip_serializing_if = "Option::is_none")]
514    pub usage: Option<ResponseUsage>,
515}
516
517#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
518pub enum ResponseContentPart {
519    OutputText(OutputTextContent),
520    Reasoning(ReasoningContent),
521}
522
523#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
524pub struct ResponseCreatedEvent {
525    pub sequence_number: u64,
526    pub response: ResponseSnapshot,
527}
528
529#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
530pub struct ResponseInProgressEvent {
531    pub sequence_number: u64,
532    pub response: ResponseSnapshot,
533}
534
535#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
536pub struct ResponseCompletedEvent {
537    pub sequence_number: u64,
538    pub response: ResponseSnapshot,
539}
540
541#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
542pub struct ResponseFailedEvent {
543    pub sequence_number: u64,
544    pub response: ResponseSnapshot,
545}
546
547#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
548pub struct ResponseIncompleteEvent {
549    pub sequence_number: u64,
550    pub response: ResponseSnapshot,
551}
552
553#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
554pub struct OutputItemAddedEvent {
555    pub sequence_number: u64,
556    pub output_index: u32,
557    pub item: ResponseOutputItem,
558}
559
560#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
561pub struct OutputItemDoneEvent {
562    pub sequence_number: u64,
563    pub output_index: u32,
564    pub item: ResponseOutputItem,
565}
566
567#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
568pub struct ContentPartAddedEvent {
569    pub sequence_number: u64,
570    pub item_id: String,
571    pub output_index: u32,
572    pub content_index: u32,
573    pub part: ResponseContentPart,
574}
575
576#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
577pub struct ContentPartDoneEvent {
578    pub sequence_number: u64,
579    pub item_id: String,
580    pub output_index: u32,
581    pub content_index: u32,
582    pub part: ResponseContentPart,
583}
584
585#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
586pub struct OutputTextDeltaEvent {
587    pub sequence_number: u64,
588    pub item_id: String,
589    pub output_index: u32,
590    pub content_index: u32,
591    pub delta: String,
592    #[serde(default)]
593    pub logprobs: Vec<Value>,
594}
595
596#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
597pub struct OutputTextDoneEvent {
598    pub sequence_number: u64,
599    pub item_id: String,
600    pub output_index: u32,
601    pub content_index: u32,
602    pub text: String,
603    #[serde(default)]
604    pub logprobs: Vec<Value>,
605}
606
607#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
608pub struct FunctionCallArgumentsDeltaEvent {
609    pub sequence_number: u64,
610    pub item_id: String,
611    pub output_index: u32,
612    pub delta: String,
613}
614
615#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
616pub struct FunctionCallArgumentsDoneEvent {
617    pub sequence_number: u64,
618    pub item_id: String,
619    pub output_index: u32,
620    pub arguments: String,
621}
622
623#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
624pub struct ReasoningDeltaEvent {
625    pub sequence_number: u64,
626    pub item_id: String,
627    pub output_index: u32,
628    pub content_index: u32,
629    pub delta: String,
630}
631
632#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
633pub struct ReasoningDoneEvent {
634    pub sequence_number: u64,
635    pub item_id: String,
636    pub output_index: u32,
637    pub content_index: u32,
638    pub text: String,
639}
640
641#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
642pub struct ReasoningSummaryTextDeltaEvent {
643    pub sequence_number: u64,
644    pub item_id: String,
645    pub output_index: u32,
646    pub summary_index: u32,
647    pub delta: String,
648}
649
650#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
651pub struct ReasoningSummaryTextDoneEvent {
652    pub sequence_number: u64,
653    pub item_id: String,
654    pub output_index: u32,
655    pub summary_index: u32,
656    pub text: String,
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
660pub struct StreamErrorDetail {
661    pub message: String,
662    #[serde(rename = "type")]
663    pub error_type: String,
664}
665
666#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
667pub struct StreamErrorEvent {
668    pub sequence_number: u64,
669    pub error: StreamErrorDetail,
670}
671
672#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
673pub enum ResponseEvent {
674    ResponseCreated(ResponseCreatedEvent),
675    ResponseInProgress(ResponseInProgressEvent),
676    ResponseCompleted(ResponseCompletedEvent),
677    ResponseFailed(ResponseFailedEvent),
678    ResponseIncomplete(ResponseIncompleteEvent),
679    OutputItemAdded(OutputItemAddedEvent),
680    OutputItemDone(OutputItemDoneEvent),
681    ContentPartAdded(ContentPartAddedEvent),
682    ContentPartDone(ContentPartDoneEvent),
683    OutputTextDelta(OutputTextDeltaEvent),
684    OutputTextDone(OutputTextDoneEvent),
685    FunctionCallArgumentsDelta(FunctionCallArgumentsDeltaEvent),
686    FunctionCallArgumentsDone(FunctionCallArgumentsDoneEvent),
687    ReasoningDelta(ReasoningDeltaEvent),
688    ReasoningDone(ReasoningDoneEvent),
689    ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaEvent),
690    ReasoningSummaryTextDone(ReasoningSummaryTextDoneEvent),
691    Error(StreamErrorEvent),
692    Done,
693}
694
695impl ResponseEvent {
696    pub fn event_type(&self) -> &'static str {
697        match self {
698            Self::ResponseCreated(_) => "response.created",
699            Self::ResponseInProgress(_) => "response.in_progress",
700            Self::ResponseCompleted(_) => "response.completed",
701            Self::ResponseFailed(_) => "response.failed",
702            Self::ResponseIncomplete(_) => "response.incomplete",
703            Self::OutputItemAdded(_) => "response.output_item.added",
704            Self::OutputItemDone(_) => "response.output_item.done",
705            Self::ContentPartAdded(_) => "response.content_part.added",
706            Self::ContentPartDone(_) => "response.content_part.done",
707            Self::OutputTextDelta(_) => "response.output_text.delta",
708            Self::OutputTextDone(_) => "response.output_text.done",
709            Self::FunctionCallArgumentsDelta(_) => "response.function_call_arguments.delta",
710            Self::FunctionCallArgumentsDone(_) => "response.function_call_arguments.done",
711            Self::ReasoningDelta(_) => "response.reasoning.delta",
712            Self::ReasoningDone(_) => "response.reasoning.done",
713            Self::ReasoningSummaryTextDelta(_) => "response.reasoning_summary_text.delta",
714            Self::ReasoningSummaryTextDone(_) => "response.reasoning_summary_text.done",
715            Self::Error(_) => "error",
716            Self::Done => "done",
717        }
718    }
719}
720
721pub enum ResponsesResult {
722    Complete(Box<ResponseObject>),
723    Stream(mpsc::Receiver<ResponseEvent>),
724}
725
726#[derive(Debug, Clone)]
727struct StreamingOutputItem {
728    item_id: String,
729    item_type: String,
730    call_id: Option<String>,
731    function_name: Option<String>,
732    accumulated_content: String,
733    accumulated_arguments: String,
734    status: OutputStatus,
735}
736
737impl StreamingOutputItem {
738    fn new(item_type: &str) -> Self {
739        let item_id = match item_type {
740            "tool_call" => generate_function_call_id(),
741            "reasoning" => generate_id("reasoning_"),
742            _ => generate_message_id(),
743        };
744
745        Self {
746            item_id,
747            item_type: item_type.to_string(),
748            call_id: None,
749            function_name: None,
750            accumulated_content: String::new(),
751            accumulated_arguments: String::new(),
752            status: OutputStatus::InProgress,
753        }
754    }
755
756    fn to_skeleton(&self) -> ResponseOutputItem {
757        match self.item_type.as_str() {
758            "tool_call" => ResponseOutputItem::FunctionCall(OutputFunctionCall {
759                output_type: "function_call".to_string(),
760                id: self.item_id.clone(),
761                call_id: self.call_id.clone().unwrap_or_else(generate_tool_call_id),
762                name: self.function_name.clone().unwrap_or_default(),
763                arguments: String::new(),
764                metadata: None,
765                status: OutputStatus::InProgress,
766            }),
767            "reasoning" => ResponseOutputItem::Reasoning(OutputReasoning {
768                output_type: "reasoning".to_string(),
769                id: self.item_id.clone(),
770                status: OutputStatus::InProgress,
771                summary: Vec::new(),
772                content: Vec::new(),
773                encrypted_content: None,
774            }),
775            _ => ResponseOutputItem::Message(OutputMessage {
776                output_type: "message".to_string(),
777                id: self.item_id.clone(),
778                status: OutputStatus::InProgress,
779                role: "assistant".to_string(),
780                content: Vec::new(),
781            }),
782        }
783    }
784
785    fn to_completed(&self) -> ResponseOutputItem {
786        match self.item_type.as_str() {
787            "tool_call" => ResponseOutputItem::FunctionCall(OutputFunctionCall {
788                output_type: "function_call".to_string(),
789                id: self.item_id.clone(),
790                call_id: self.call_id.clone().unwrap_or_else(generate_tool_call_id),
791                name: self.function_name.clone().unwrap_or_default(),
792                arguments: self.accumulated_arguments.clone(),
793                metadata: None,
794                status: OutputStatus::Completed,
795            }),
796            "reasoning" => ResponseOutputItem::Reasoning(OutputReasoning {
797                output_type: "reasoning".to_string(),
798                id: self.item_id.clone(),
799                status: OutputStatus::Completed,
800                summary: Vec::new(),
801                content: if self.accumulated_content.is_empty() {
802                    Vec::new()
803                } else {
804                    vec![ReasoningContent::new(self.accumulated_content.clone())]
805                },
806                encrypted_content: None,
807            }),
808            _ => ResponseOutputItem::Message(OutputMessage {
809                output_type: "message".to_string(),
810                id: self.item_id.clone(),
811                status: OutputStatus::Completed,
812                role: "assistant".to_string(),
813                content: if self.accumulated_content.is_empty() {
814                    Vec::new()
815                } else {
816                    vec![OutputTextContent::new(self.accumulated_content.clone())]
817                },
818            }),
819        }
820    }
821}
822
823#[derive(Debug, Clone)]
824struct ResponseStreamState {
825    response_id: String,
826    model: String,
827    created_at: i64,
828    completed_at: Option<i64>,
829    items: BTreeMap<u32, StreamingOutputItem>,
830    sequence_number: u64,
831    status: OutputStatus,
832    incomplete_details: Option<IncompleteDetails>,
833    usage: Option<ResponseUsage>,
834}
835
836impl ResponseStreamState {
837    fn new(response_id: String, model: String) -> Self {
838        Self {
839            response_id,
840            model,
841            created_at: current_timestamp(),
842            completed_at: None,
843            items: BTreeMap::new(),
844            sequence_number: 0,
845            status: OutputStatus::InProgress,
846            incomplete_details: None,
847            usage: None,
848        }
849    }
850
851    fn next_sequence_number(&mut self) -> u64 {
852        let current = self.sequence_number;
853        self.sequence_number = self.sequence_number.saturating_add(1);
854        current
855    }
856
857    fn get_or_create_item(
858        &mut self,
859        output_index: u32,
860        item_type: &str,
861        identifier: &str,
862    ) -> &mut StreamingOutputItem {
863        self.items.entry(output_index).or_insert_with(|| {
864            let mut item = StreamingOutputItem::new(item_type);
865            if item_type == "tool_call" {
866                item.call_id = Some(generate_tool_call_id());
867                if !identifier.is_empty() {
868                    item.function_name =
869                        Some(identifier.trim_start_matches("tool_call:").to_string());
870                }
871            }
872            item
873        })
874    }
875
876    fn snapshot(&self) -> ResponseSnapshot {
877        let output = self
878            .items
879            .values()
880            .map(|item| {
881                if item.status == OutputStatus::Completed {
882                    item.to_completed()
883                } else {
884                    item.to_skeleton()
885                }
886            })
887            .collect::<Vec<_>>();
888
889        ResponseSnapshot {
890            id: self.response_id.clone(),
891            object: "response".to_string(),
892            created_at: self.created_at,
893            completed_at: self.completed_at,
894            status: self.status,
895            incomplete_details: self.incomplete_details.clone(),
896            model: self.model.clone(),
897            output,
898            usage: self.usage.clone(),
899        }
900    }
901}
902
903#[derive(Debug, Clone)]
904struct AggregatedOutputItem {
905    item_type: String,
906    content: String,
907    arguments: String,
908    identifier: String,
909    function_name: String,
910}
911
912fn process_state_event_for_output(
913    event: &ResponseStateEvent,
914    output_items: &mut BTreeMap<u32, AggregatedOutputItem>,
915) {
916    let output_index = event.output_index;
917    let item_type = if event.item_type.is_empty() {
918        "message"
919    } else {
920        event.item_type.as_str()
921    };
922
923    let item = output_items
924        .entry(output_index)
925        .or_insert_with(|| AggregatedOutputItem {
926            item_type: item_type.to_string(),
927            content: String::new(),
928            arguments: String::new(),
929            identifier: event.identifier.clone(),
930            function_name: if item_type == "tool_call" {
931                event
932                    .identifier
933                    .trim_start_matches("tool_call:")
934                    .to_string()
935            } else {
936                String::new()
937            },
938        });
939
940    if item.item_type == "tool_call" && event.identifier == "arguments" {
941        if event.event_type == "content_delta" {
942            item.arguments.push_str(&event.delta);
943        } else if event.event_type == "item_completed" {
944            if let Some(value) = &event.value {
945                item.arguments = value_to_string(value);
946            }
947        }
948        return;
949    }
950
951    if event.event_type == "content_delta" {
952        item.content.push_str(&event.delta);
953    } else if event.event_type == "item_completed" {
954        if item.item_type == "tool_call" {
955            item.identifier = event.identifier.clone();
956            if let Some(value) = &event.value {
957                if let Some((function_name, arguments)) = parse_tool_call_completion_value(value) {
958                    item.function_name = function_name;
959                    item.arguments = arguments;
960                }
961            }
962            if item.function_name.is_empty() {
963                item.function_name = event
964                    .identifier
965                    .trim_start_matches("tool_call:")
966                    .to_string();
967            }
968        } else if let Some(value) = &event.value {
969            item.content = value_to_string(value);
970        }
971    }
972}
973
974fn build_output_items(
975    output_items: &BTreeMap<u32, AggregatedOutputItem>,
976) -> Vec<ResponseOutputItem> {
977    let mut output = Vec::new();
978    for item in output_items.values() {
979        match item.item_type.as_str() {
980            "tool_call" => {
981                output.push(ResponseOutputItem::FunctionCall(OutputFunctionCall {
982                    output_type: "function_call".to_string(),
983                    id: generate_function_call_id(),
984                    call_id: generate_tool_call_id(),
985                    name: item.function_name.clone(),
986                    arguments: item.arguments.clone(),
987                    metadata: None,
988                    status: OutputStatus::Completed,
989                }));
990            }
991            "reasoning" => {
992                output.push(ResponseOutputItem::Reasoning(OutputReasoning {
993                    output_type: "reasoning".to_string(),
994                    id: generate_id("reasoning_"),
995                    status: OutputStatus::Completed,
996                    summary: Vec::new(),
997                    content: if item.content.is_empty() {
998                        Vec::new()
999                    } else {
1000                        vec![ReasoningContent::new(item.content.clone())]
1001                    },
1002                    encrypted_content: None,
1003                }));
1004            }
1005            _ => {
1006                output.push(ResponseOutputItem::Message(OutputMessage {
1007                    output_type: "message".to_string(),
1008                    id: generate_message_id(),
1009                    status: OutputStatus::Completed,
1010                    role: "assistant".to_string(),
1011                    content: if item.content.is_empty() {
1012                        Vec::new()
1013                    } else {
1014                        vec![OutputTextContent::new(item.content.clone())]
1015                    },
1016                }));
1017            }
1018        }
1019    }
1020
1021    if output.is_empty() {
1022        output.push(ResponseOutputItem::Message(OutputMessage {
1023            output_type: "message".to_string(),
1024            id: generate_message_id(),
1025            status: OutputStatus::Completed,
1026            role: "assistant".to_string(),
1027            content: Vec::new(),
1028        }));
1029    }
1030
1031    output
1032}
1033
1034fn update_usage_from_delta(delta: &ResponseDelta, usage: &mut ResponseUsage) {
1035    if let Some(prompt_tokens) = delta.prompt_token_count {
1036        usage.input_tokens = usage.input_tokens.max(prompt_tokens);
1037    }
1038    if let Some(generation_len) = delta.generation_len {
1039        usage.output_tokens = usage.output_tokens.max(generation_len);
1040    }
1041    if let Some(cached_tokens) = delta.cached_token_count {
1042        usage.input_tokens_details = Some(InputTokensDetails { cached_tokens });
1043    }
1044    if let Some(reasoning_tokens) = delta.reasoning_tokens {
1045        usage.output_tokens_details = Some(OutputTokensDetails { reasoning_tokens });
1046    }
1047    usage.total_tokens = usage.input_tokens + usage.output_tokens;
1048}
1049
1050fn process_state_event_for_streaming(
1051    event: &ResponseStateEvent,
1052    stream_state: &mut ResponseStreamState,
1053    events: &mut Vec<ResponseEvent>,
1054) {
1055    let item_type = if event.item_type.is_empty() {
1056        "message"
1057    } else {
1058        event.item_type.as_str()
1059    };
1060
1061    let output_index = event.output_index;
1062    let identifier = event.identifier.clone();
1063
1064    if item_type == "tool_call" && identifier == "arguments" {
1065        if event.event_type == "content_delta" {
1066            let item_id = {
1067                let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1068                item.accumulated_arguments.push_str(&event.delta);
1069                item.item_id.clone()
1070            };
1071            let sequence_number = stream_state.next_sequence_number();
1072            events.push(ResponseEvent::FunctionCallArgumentsDelta(
1073                FunctionCallArgumentsDeltaEvent {
1074                    sequence_number,
1075                    item_id,
1076                    output_index,
1077                    delta: event.delta.clone(),
1078                },
1079            ));
1080        } else if event.event_type == "item_completed" {
1081            if let Some(value) = &event.value {
1082                let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1083                item.accumulated_arguments = value_to_string(value);
1084            }
1085        }
1086        return;
1087    }
1088
1089    if event.event_type == "item_started" {
1090        let (item_id, skeleton_item) = {
1091            let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1092            (item.item_id.clone(), item.to_skeleton())
1093        };
1094        let sequence_number = stream_state.next_sequence_number();
1095        events.push(ResponseEvent::OutputItemAdded(OutputItemAddedEvent {
1096            sequence_number,
1097            output_index,
1098            item: skeleton_item,
1099        }));
1100
1101        if item_type == "message" {
1102            let sequence_number = stream_state.next_sequence_number();
1103            events.push(ResponseEvent::ContentPartAdded(ContentPartAddedEvent {
1104                sequence_number,
1105                item_id,
1106                output_index,
1107                content_index: 0,
1108                part: ResponseContentPart::OutputText(OutputTextContent::new("")),
1109            }));
1110        } else if item_type == "reasoning" {
1111            let sequence_number = stream_state.next_sequence_number();
1112            events.push(ResponseEvent::ContentPartAdded(ContentPartAddedEvent {
1113                sequence_number,
1114                item_id,
1115                output_index,
1116                content_index: 0,
1117                part: ResponseContentPart::Reasoning(ReasoningContent::new("")),
1118            }));
1119        }
1120        return;
1121    }
1122
1123    if event.event_type == "content_delta" {
1124        let item_id = {
1125            let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1126            item.accumulated_content.push_str(&event.delta);
1127            item.item_id.clone()
1128        };
1129        if item_type == "message" {
1130            let sequence_number = stream_state.next_sequence_number();
1131            events.push(ResponseEvent::OutputTextDelta(OutputTextDeltaEvent {
1132                sequence_number,
1133                item_id,
1134                output_index,
1135                content_index: 0,
1136                delta: event.delta.clone(),
1137                logprobs: Vec::new(),
1138            }));
1139        } else if item_type == "reasoning" {
1140            let sequence_number = stream_state.next_sequence_number();
1141            events.push(ResponseEvent::ReasoningDelta(ReasoningDeltaEvent {
1142                sequence_number,
1143                item_id,
1144                output_index,
1145                content_index: 0,
1146                delta: event.delta.clone(),
1147            }));
1148        }
1149        return;
1150    }
1151
1152    if event.event_type == "item_completed" {
1153        let (item_id, accumulated_content, completed_item) = {
1154            let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1155            if let Some(value) = &event.value {
1156                if item_type == "tool_call" {
1157                    if let Some((function_name, arguments)) =
1158                        parse_tool_call_completion_value(value)
1159                    {
1160                        item.function_name = Some(function_name);
1161                        item.accumulated_arguments = arguments;
1162                    }
1163                } else {
1164                    item.accumulated_content = value_to_string(value);
1165                }
1166            }
1167            item.status = OutputStatus::Completed;
1168            if item_type == "tool_call" && item.function_name.is_none() {
1169                item.function_name = Some(identifier.trim_start_matches("tool_call:").to_string());
1170            }
1171            (
1172                item.item_id.clone(),
1173                item.accumulated_content.clone(),
1174                item.to_completed(),
1175            )
1176        };
1177
1178        if item_type == "message" {
1179            let sequence_number = stream_state.next_sequence_number();
1180            events.push(ResponseEvent::OutputTextDone(OutputTextDoneEvent {
1181                sequence_number,
1182                item_id: item_id.clone(),
1183                output_index,
1184                content_index: 0,
1185                text: accumulated_content.clone(),
1186                logprobs: Vec::new(),
1187            }));
1188
1189            let sequence_number = stream_state.next_sequence_number();
1190            events.push(ResponseEvent::ContentPartDone(ContentPartDoneEvent {
1191                sequence_number,
1192                item_id: item_id.clone(),
1193                output_index,
1194                content_index: 0,
1195                part: ResponseContentPart::OutputText(OutputTextContent::new(accumulated_content)),
1196            }));
1197        } else if item_type == "reasoning" {
1198            let sequence_number = stream_state.next_sequence_number();
1199            events.push(ResponseEvent::ReasoningDone(ReasoningDoneEvent {
1200                sequence_number,
1201                item_id,
1202                output_index,
1203                content_index: 0,
1204                text: accumulated_content,
1205            }));
1206        } else if item_type == "tool_call" {
1207            let arguments = {
1208                let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1209                item.accumulated_arguments.clone()
1210            };
1211            let sequence_number = stream_state.next_sequence_number();
1212            events.push(ResponseEvent::FunctionCallArgumentsDone(
1213                FunctionCallArgumentsDoneEvent {
1214                    sequence_number,
1215                    item_id: item_id.clone(),
1216                    output_index,
1217                    arguments,
1218                },
1219            ));
1220        }
1221
1222        let sequence_number = stream_state.next_sequence_number();
1223        events.push(ResponseEvent::OutputItemDone(OutputItemDoneEvent {
1224            sequence_number,
1225            output_index,
1226            item: completed_item,
1227        }));
1228    }
1229}
1230
1231fn emit_stream_fallback_item_done(
1232    stream_state: &mut ResponseStreamState,
1233    events: &mut Vec<ResponseEvent>,
1234) {
1235    let indexes = stream_state
1236        .items
1237        .iter()
1238        .filter_map(|(index, item)| {
1239            if item.status != OutputStatus::Completed {
1240                Some(*index)
1241            } else {
1242                None
1243            }
1244        })
1245        .collect::<Vec<_>>();
1246
1247    for output_index in indexes {
1248        if let Some(item) = stream_state.items.get_mut(&output_index) {
1249            item.status = OutputStatus::Completed;
1250            let item_type = item.item_type.clone();
1251            let item_id = item.item_id.clone();
1252            let accumulated_content = item.accumulated_content.clone();
1253            let accumulated_arguments = item.accumulated_arguments.clone();
1254            let completed_item = item.to_completed();
1255
1256            if item_type == "message" {
1257                let sequence_number = stream_state.next_sequence_number();
1258                events.push(ResponseEvent::OutputTextDone(OutputTextDoneEvent {
1259                    sequence_number,
1260                    item_id: item_id.clone(),
1261                    output_index,
1262                    content_index: 0,
1263                    text: accumulated_content.clone(),
1264                    logprobs: Vec::new(),
1265                }));
1266                let sequence_number = stream_state.next_sequence_number();
1267                events.push(ResponseEvent::ContentPartDone(ContentPartDoneEvent {
1268                    sequence_number,
1269                    item_id: item_id.clone(),
1270                    output_index,
1271                    content_index: 0,
1272                    part: ResponseContentPart::OutputText(OutputTextContent::new(
1273                        accumulated_content,
1274                    )),
1275                }));
1276            } else if item_type == "tool_call" {
1277                let sequence_number = stream_state.next_sequence_number();
1278                events.push(ResponseEvent::FunctionCallArgumentsDone(
1279                    FunctionCallArgumentsDoneEvent {
1280                        sequence_number,
1281                        item_id: item_id.clone(),
1282                        output_index,
1283                        arguments: accumulated_arguments,
1284                    },
1285                ));
1286            } else if item_type == "reasoning" {
1287                let sequence_number = stream_state.next_sequence_number();
1288                events.push(ResponseEvent::ReasoningDone(ReasoningDoneEvent {
1289                    sequence_number,
1290                    item_id: item_id.clone(),
1291                    output_index,
1292                    content_index: 0,
1293                    text: accumulated_content,
1294                }));
1295            }
1296
1297            let sequence_number = stream_state.next_sequence_number();
1298            events.push(ResponseEvent::OutputItemDone(OutputItemDoneEvent {
1299                sequence_number,
1300                output_index,
1301                item: completed_item,
1302            }));
1303        }
1304    }
1305}
1306
1307async fn stream_response_events(
1308    mut delta_rx: mpsc::UnboundedReceiver<ResponseDelta>,
1309    event_tx: mpsc::Sender<ResponseEvent>,
1310    response_id: String,
1311    model: String,
1312) {
1313    let mut stream_state = ResponseStreamState::new(response_id, model);
1314
1315    if event_tx
1316        .send(ResponseEvent::ResponseCreated(ResponseCreatedEvent {
1317            sequence_number: stream_state.next_sequence_number(),
1318            response: stream_state.snapshot(),
1319        }))
1320        .await
1321        .is_err()
1322    {
1323        return;
1324    }
1325
1326    if event_tx
1327        .send(ResponseEvent::ResponseInProgress(ResponseInProgressEvent {
1328            sequence_number: stream_state.next_sequence_number(),
1329            response: stream_state.snapshot(),
1330        }))
1331        .await
1332        .is_err()
1333    {
1334        return;
1335    }
1336
1337    let mut error_detail: Option<String> = None;
1338    let mut finish_reason: Option<String> = None;
1339    let mut usage = ResponseUsage::default();
1340
1341    while let Some(delta) = delta_rx.recv().await {
1342        if let Some(error) = &delta.error {
1343            error_detail = Some(error.clone());
1344            break;
1345        }
1346
1347        let mut mapped_events = Vec::new();
1348        for event in &delta.state_events {
1349            process_state_event_for_streaming(event, &mut stream_state, &mut mapped_events);
1350        }
1351
1352        for event in mapped_events {
1353            if event_tx.send(event).await.is_err() {
1354                return;
1355            }
1356        }
1357
1358        update_usage_from_delta(&delta, &mut usage);
1359        if usage.total_tokens > 0 {
1360            stream_state.usage = Some(usage.clone());
1361        }
1362
1363        if let Some(reason) = &delta.finish_reason {
1364            finish_reason = Some(reason.to_lowercase());
1365        }
1366
1367        if delta.is_final_delta {
1368            break;
1369        }
1370    }
1371
1372    let mut completion_events = Vec::new();
1373    emit_stream_fallback_item_done(&mut stream_state, &mut completion_events);
1374    for event in completion_events {
1375        if event_tx.send(event).await.is_err() {
1376            return;
1377        }
1378    }
1379
1380    if let Some(detail) = error_detail {
1381        stream_state.status = OutputStatus::Failed;
1382
1383        if event_tx
1384            .send(ResponseEvent::ResponseFailed(ResponseFailedEvent {
1385                sequence_number: stream_state.next_sequence_number(),
1386                response: stream_state.snapshot(),
1387            }))
1388            .await
1389            .is_err()
1390        {
1391            return;
1392        }
1393
1394        if event_tx
1395            .send(ResponseEvent::Error(StreamErrorEvent {
1396                sequence_number: stream_state.next_sequence_number(),
1397                error: StreamErrorDetail {
1398                    message: detail,
1399                    error_type: "inference_error".to_string(),
1400                },
1401            }))
1402            .await
1403            .is_err()
1404        {
1405            return;
1406        }
1407    } else {
1408        stream_state.completed_at = Some(current_timestamp());
1409
1410        if let Some(incomplete_details) = finish_reason_to_incomplete(finish_reason.as_deref()) {
1411            stream_state.status = OutputStatus::Incomplete;
1412            stream_state.incomplete_details = Some(incomplete_details);
1413            if event_tx
1414                .send(ResponseEvent::ResponseIncomplete(ResponseIncompleteEvent {
1415                    sequence_number: stream_state.next_sequence_number(),
1416                    response: stream_state.snapshot(),
1417                }))
1418                .await
1419                .is_err()
1420            {
1421                return;
1422            }
1423        } else {
1424            stream_state.status = OutputStatus::Completed;
1425            if event_tx
1426                .send(ResponseEvent::ResponseCompleted(ResponseCompletedEvent {
1427                    sequence_number: stream_state.next_sequence_number(),
1428                    response: stream_state.snapshot(),
1429                }))
1430                .await
1431                .is_err()
1432            {
1433                return;
1434            }
1435        }
1436    }
1437
1438    let _ = event_tx.send(ResponseEvent::Done).await;
1439}
1440
1441async fn gather_non_streaming_response(
1442    mut delta_rx: mpsc::UnboundedReceiver<ResponseDelta>,
1443    model: &str,
1444    request: &ResponsesRequest,
1445) -> Result<ResponseObject> {
1446    let created_at = current_timestamp();
1447    let mut completed_at: Option<i64> = None;
1448    let mut output_items: BTreeMap<u32, AggregatedOutputItem> = BTreeMap::new();
1449    let mut fallback_content = String::new();
1450    let mut usage = ResponseUsage::default();
1451    let mut error_detail: Option<String> = None;
1452    let mut finish_reason: Option<String> = None;
1453
1454    while let Some(delta) = delta_rx.recv().await {
1455        if let Some(error) = &delta.error {
1456            error_detail = Some(error.clone());
1457        }
1458
1459        if let Some(content) = &delta.content {
1460            fallback_content.push_str(content);
1461        }
1462
1463        for event in &delta.state_events {
1464            process_state_event_for_output(event, &mut output_items);
1465        }
1466
1467        update_usage_from_delta(&delta, &mut usage);
1468
1469        if let Some(reason) = &delta.finish_reason {
1470            finish_reason = Some(reason.to_lowercase());
1471        }
1472
1473        if delta.is_final_delta {
1474            completed_at = Some(current_timestamp());
1475            break;
1476        }
1477    }
1478
1479    if let Some(error) = error_detail {
1480        return Err(ClientError::RequestFailed(error));
1481    }
1482
1483    let incomplete_details = finish_reason_to_incomplete(finish_reason.as_deref());
1484    let output = if output_items.is_empty() && !fallback_content.is_empty() {
1485        vec![ResponseOutputItem::Message(OutputMessage {
1486            output_type: "message".to_string(),
1487            id: generate_message_id(),
1488            status: OutputStatus::Completed,
1489            role: "assistant".to_string(),
1490            content: vec![OutputTextContent::new(fallback_content)],
1491        })]
1492    } else {
1493        build_output_items(&output_items)
1494    };
1495
1496    Ok(ResponseObject {
1497        id: generate_response_id(),
1498        object: "response".to_string(),
1499        created_at,
1500        completed_at,
1501        status: if incomplete_details.is_some() {
1502            OutputStatus::Incomplete
1503        } else {
1504            OutputStatus::Completed
1505        },
1506        incomplete_details,
1507        error: None,
1508        model: model.to_string(),
1509        output,
1510        usage: Some(usage),
1511        metadata: request.metadata.clone(),
1512        parallel_tool_calls: request.parallel_tool_calls,
1513        temperature: request.temperature,
1514        top_p: request.top_p,
1515        presence_penalty: request.presence_penalty,
1516        frequency_penalty: request.frequency_penalty,
1517        top_k: request.top_k,
1518        min_p: request.min_p,
1519        instructions: request.instructions.clone(),
1520        max_output_tokens: request.max_output_tokens,
1521        top_logprobs: request.top_logprobs,
1522        tool_choice: request.tool_choice.clone(),
1523        tools: request.tools.clone(),
1524        max_tool_calls: request.max_tool_calls,
1525        text: request.text.clone(),
1526    })
1527}
1528
1529impl Client {
1530    /// Perform an asynchronous Responses API request.
1531    pub async fn aresponses(
1532        &self,
1533        model_id: &str,
1534        request: ResponsesRequest,
1535    ) -> Result<ResponsesResult> {
1536        let info = self.registry.ensure_loaded(model_id).await?;
1537        let formatter = info.require_formatter()?;
1538        let request_id = self.ipc.next_request_id();
1539        tracing::debug!(
1540            request_id,
1541            model_id = %model_id,
1542            stream = request.stream,
1543            "Building responses request"
1544        );
1545        let messages = request.to_messages();
1546        tracing::trace!(
1547            request_id,
1548            model_id = %model_id,
1549            messages = ?messages,
1550            "Responses messages before multimodal expansion"
1551        );
1552        let reasoning_flag = request.reasoning_effort.is_some();
1553
1554        let (messages_for_template, image_buffers, capabilities, content_order) =
1555            build_multimodal_messages(formatter, &messages, request.instructions.as_deref())
1556                .map_err(|e| ClientError::Multimodal(e.to_string()))?;
1557
1558        if messages_for_template.is_empty() {
1559            return Err(ClientError::RequestFailed(
1560                "Response request must include at least one content segment.".into(),
1561            ));
1562        }
1563        tracing::trace!(
1564            request_id,
1565            model_id = %model_id,
1566            messages_for_template = ?messages_for_template,
1567            "Responses messages after multimodal expansion"
1568        );
1569
1570        let tool_schemas = request
1571            .tools
1572            .iter()
1573            .map(normalize_response_tool_schema)
1574            .collect::<Vec<_>>();
1575        let tool_schemas_json = if tool_schemas.is_empty() {
1576            String::new()
1577        } else {
1578            serde_json::to_string(&tool_schemas).unwrap_or_default()
1579        };
1580        let template_tools = (!tool_schemas.is_empty()).then_some(tool_schemas.as_slice());
1581
1582        let prompt_text = formatter
1583            .apply_template_with_tools(
1584                &messages_for_template,
1585                true,
1586                reasoning_flag,
1587                None,
1588                template_tools,
1589            )
1590            .map_err(|e| ClientError::Formatter(e.to_string()))?;
1591
1592        let capability_placeholder = formatter.capability_placeholder_token();
1593
1594        let layout_segments = build_multimodal_layout(
1595            &prompt_text,
1596            &image_buffers,
1597            &capabilities,
1598            &content_order,
1599            formatter.image_placeholder_token(),
1600            formatter.should_clip_image_placeholder(),
1601            capability_placeholder,
1602        )
1603        .map_err(|e| ClientError::Multimodal(e.to_string()))?;
1604
1605        let final_prompt = formatter.strip_template_placeholders(&prompt_text);
1606        tracing::debug!(
1607            request_id,
1608            model_id = %model_id,
1609            prompt_chars = final_prompt.chars().count(),
1610            image_count = image_buffers.len(),
1611            capability_count = capabilities.len(),
1612            layout_segment_count = layout_segments.len(),
1613            "Prepared responses prompt payload"
1614        );
1615        tracing::trace!(
1616            request_id,
1617            model_id = %model_id,
1618            prompt = %format!("\n{}", final_prompt),
1619            "Responses prompt sent to PIE"
1620        );
1621
1622        let response_format_json = request
1623            .text
1624            .as_ref()
1625            .map(|text| text.get("format").cloned().unwrap_or_else(|| text.clone()))
1626            .map(|response_format| serde_json::to_string(&response_format).unwrap_or_default())
1627            .unwrap_or_default();
1628
1629        let rng_seed = rand::thread_rng().gen::<u64>();
1630        let prompt_payload = PromptPayload {
1631            prompt: final_prompt,
1632            image_buffers,
1633            capabilities: super::convert_capabilities(&capabilities),
1634            layout: super::convert_layout(&layout_segments),
1635            max_generated_tokens: request.max_output_tokens.unwrap_or(0),
1636            temperature: request.temperature.unwrap_or(1.0),
1637            top_p: request.top_p.unwrap_or(1.0),
1638            top_k: request.top_k.unwrap_or(-1),
1639            min_p: request.min_p.unwrap_or(0.0),
1640            rng_seed,
1641            stop_sequences: Vec::new(),
1642            num_candidates: 1,
1643            best_of: Some(1),
1644            final_candidates: Some(1),
1645            frequency_penalty: request.frequency_penalty.unwrap_or(0.0),
1646            presence_penalty: request.presence_penalty.unwrap_or(0.0),
1647            repetition_penalty: 1.0,
1648            repetition_context_size: 60,
1649            top_logprobs: request.top_logprobs.unwrap_or(0),
1650            logit_bias: HashMap::new(),
1651            tool_schemas_json,
1652            tool_calling_tokens: formatter.get_tool_calling_tokens().clone(),
1653            tool_choice: tool_choice_to_string(request.tool_choice.as_ref()),
1654            max_tool_calls: request.max_tool_calls.unwrap_or(0).max(0),
1655            response_format_json,
1656            task_name: None,
1657            reasoning_effort: request.reasoning_effort.clone(),
1658        };
1659        tracing::debug!(
1660            request_id,
1661            model_id = %model_id,
1662            stream = request.stream,
1663            "Dispatching responses request to PIE"
1664        );
1665        let (_batch_size, stream) = self.ipc.send_batch_request(
1666            request_id,
1667            model_id,
1668            &info.model_path,
1669            &[prompt_payload],
1670        )?;
1671
1672        if request.stream {
1673            let (event_tx, event_rx) = mpsc::channel(256);
1674            tokio::spawn(stream_response_events(
1675                stream,
1676                event_tx,
1677                generate_response_id(),
1678                model_id.to_string(),
1679            ));
1680            Ok(ResponsesResult::Stream(event_rx))
1681        } else {
1682            let response = gather_non_streaming_response(stream, model_id, &request).await?;
1683            Ok(ResponsesResult::Complete(Box::new(response)))
1684        }
1685    }
1686}
1687
1688#[cfg(test)]
1689mod tests {
1690    use super::*;
1691
1692    #[test]
1693    fn test_response_event_type_names() {
1694        let snapshot = ResponseSnapshot {
1695            id: "resp_1".to_string(),
1696            object: "response".to_string(),
1697            created_at: 1,
1698            completed_at: None,
1699            status: OutputStatus::InProgress,
1700            incomplete_details: None,
1701            model: "model".to_string(),
1702            output: Vec::new(),
1703            usage: None,
1704        };
1705
1706        let event = ResponseEvent::ResponseCreated(ResponseCreatedEvent {
1707            sequence_number: 0,
1708            response: snapshot,
1709        });
1710        assert_eq!(event.event_type(), "response.created");
1711        assert_eq!(ResponseEvent::Done.event_type(), "done");
1712    }
1713
1714    #[test]
1715    fn test_finish_reason_to_incomplete() {
1716        assert_eq!(
1717            finish_reason_to_incomplete(Some("length")),
1718            Some(IncompleteDetails {
1719                reason: "max_output_tokens".to_string(),
1720            })
1721        );
1722        assert_eq!(
1723            finish_reason_to_incomplete(Some("content_filter")),
1724            Some(IncompleteDetails {
1725                reason: "content_filter".to_string(),
1726            })
1727        );
1728        assert_eq!(finish_reason_to_incomplete(Some("stop")), None);
1729    }
1730
1731    #[test]
1732    fn test_request_input_conversion_string() {
1733        let request = ResponsesRequest::from_text("hello");
1734        let messages = request.to_messages();
1735        assert_eq!(messages.len(), 1);
1736        assert_eq!(
1737            messages[0].get("role").and_then(Value::as_str),
1738            Some("user")
1739        );
1740    }
1741
1742    #[test]
1743    fn test_request_input_conversion_tool_items() {
1744        let request = ResponsesRequest {
1745            input: ResponsesInput::Items(vec![
1746                ResponseInputItem::FunctionCall {
1747                    call_id: "call_1".to_string(),
1748                    name: "get_weather".to_string(),
1749                    arguments: "{\"location\":\"SF\"}".to_string(),
1750                },
1751                ResponseInputItem::FunctionCallOutput {
1752                    call_id: "call_1".to_string(),
1753                    output: "{\"temperature\":65}".to_string(),
1754                },
1755            ]),
1756            stream: false,
1757            instructions: None,
1758            temperature: None,
1759            top_p: None,
1760            top_k: None,
1761            min_p: None,
1762            frequency_penalty: None,
1763            presence_penalty: None,
1764            max_output_tokens: None,
1765            top_logprobs: None,
1766            tools: Vec::new(),
1767            tool_choice: None,
1768            max_tool_calls: None,
1769            text: None,
1770            reasoning_effort: None,
1771            metadata: None,
1772            parallel_tool_calls: false,
1773        };
1774
1775        let messages = request.to_messages();
1776        assert_eq!(messages.len(), 2);
1777        assert_eq!(
1778            messages[0].get("role").and_then(Value::as_str),
1779            Some("assistant")
1780        );
1781        assert_eq!(
1782            messages[1].get("role").and_then(Value::as_str),
1783            Some("tool")
1784        );
1785    }
1786
1787    #[test]
1788    fn test_response_delta_state_event_deserialization_shape() {
1789        let json = serde_json::json!({
1790            "request_id": 1,
1791            "is_final_delta": false,
1792            "state_events": [
1793                {
1794                    "event_type": "item_started",
1795                    "item_type": "message",
1796                    "output_index": 0,
1797                    "identifier": "",
1798                    "delta": ""
1799                }
1800            ]
1801        });
1802        let parsed: ResponseDelta = serde_json::from_value(json).expect("deserialize failed");
1803        assert_eq!(parsed.state_events.len(), 1);
1804        assert_eq!(parsed.state_events[0].event_type, "item_started");
1805    }
1806
1807    #[test]
1808    fn test_build_output_items_uses_structured_tool_call_completion_value() {
1809        let mut output_items = BTreeMap::new();
1810
1811        process_state_event_for_output(
1812            &ResponseStateEvent {
1813                event_type: "content_delta".to_string(),
1814                item_type: "tool_call".to_string(),
1815                output_index: 0,
1816                identifier: "arguments".to_string(),
1817                delta: r#"location="Tokyo", verbose=True"#.to_string(),
1818                value: None,
1819            },
1820            &mut output_items,
1821        );
1822        process_state_event_for_output(
1823            &ResponseStateEvent {
1824                event_type: "item_completed".to_string(),
1825                item_type: "tool_call".to_string(),
1826                output_index: 0,
1827                identifier: "tool_call:get_weather".to_string(),
1828                delta: String::new(),
1829                value: Some(Value::String(
1830                    serde_json::json!({
1831                        "name": "get_weather",
1832                        "arguments": {
1833                            "location": "Tokyo",
1834                            "verbose": true,
1835                            "limit": null,
1836                        },
1837                    })
1838                    .to_string(),
1839                )),
1840            },
1841            &mut output_items,
1842        );
1843
1844        let output = build_output_items(&output_items);
1845        let ResponseOutputItem::FunctionCall(call) = &output[0] else {
1846            panic!("expected function call output");
1847        };
1848
1849        assert_eq!(call.name, "get_weather");
1850        assert_eq!(
1851            serde_json::from_str::<Value>(&call.arguments).expect("valid JSON arguments"),
1852            serde_json::json!({
1853                "location": "Tokyo",
1854                "verbose": true,
1855                "limit": null,
1856            })
1857        );
1858    }
1859
1860    #[test]
1861    fn test_streaming_tool_call_done_uses_structured_completion_value() {
1862        let mut stream_state = ResponseStreamState::new("resp_1".to_string(), "model".to_string());
1863        let mut events = Vec::new();
1864
1865        process_state_event_for_streaming(
1866            &ResponseStateEvent {
1867                event_type: "item_started".to_string(),
1868                item_type: "tool_call".to_string(),
1869                output_index: 0,
1870                identifier: "tool_call:get_weather".to_string(),
1871                delta: String::new(),
1872                value: None,
1873            },
1874            &mut stream_state,
1875            &mut events,
1876        );
1877        process_state_event_for_streaming(
1878            &ResponseStateEvent {
1879                event_type: "content_delta".to_string(),
1880                item_type: "tool_call".to_string(),
1881                output_index: 0,
1882                identifier: "arguments".to_string(),
1883                delta: r#"location="Tokyo", verbose=True"#.to_string(),
1884                value: None,
1885            },
1886            &mut stream_state,
1887            &mut events,
1888        );
1889        process_state_event_for_streaming(
1890            &ResponseStateEvent {
1891                event_type: "item_completed".to_string(),
1892                item_type: "tool_call".to_string(),
1893                output_index: 0,
1894                identifier: "arguments".to_string(),
1895                delta: String::new(),
1896                value: Some(Value::String(
1897                    r#"location="Tokyo", verbose=True"#.to_string(),
1898                )),
1899            },
1900            &mut stream_state,
1901            &mut events,
1902        );
1903        process_state_event_for_streaming(
1904            &ResponseStateEvent {
1905                event_type: "item_completed".to_string(),
1906                item_type: "tool_call".to_string(),
1907                output_index: 0,
1908                identifier: "tool_call:get_weather".to_string(),
1909                delta: String::new(),
1910                value: Some(Value::String(
1911                    serde_json::json!({
1912                        "name": "get_weather",
1913                        "arguments": {
1914                            "location": "Tokyo",
1915                            "verbose": true,
1916                        },
1917                    })
1918                    .to_string(),
1919                )),
1920            },
1921            &mut stream_state,
1922            &mut events,
1923        );
1924
1925        let argument_done_events = events
1926            .iter()
1927            .filter_map(|event| match event {
1928                ResponseEvent::FunctionCallArgumentsDone(done) => Some(done),
1929                _ => None,
1930            })
1931            .collect::<Vec<_>>();
1932        assert_eq!(argument_done_events.len(), 1);
1933        assert_eq!(
1934            serde_json::from_str::<Value>(&argument_done_events[0].arguments)
1935                .expect("valid JSON arguments"),
1936            serde_json::json!({
1937                "location": "Tokyo",
1938                "verbose": true,
1939            })
1940        );
1941
1942        let completed_calls = events
1943            .iter()
1944            .filter_map(|event| match event {
1945                ResponseEvent::OutputItemDone(done) => match &done.item {
1946                    ResponseOutputItem::FunctionCall(call) => Some(call),
1947                    _ => None,
1948                },
1949                _ => None,
1950            })
1951            .collect::<Vec<_>>();
1952        assert_eq!(completed_calls.len(), 1);
1953        assert_eq!(completed_calls[0].name, "get_weather");
1954        assert_eq!(
1955            serde_json::from_str::<Value>(&completed_calls[0].arguments)
1956                .expect("valid JSON arguments"),
1957            serde_json::json!({
1958                "location": "Tokyo",
1959                "verbose": true,
1960            })
1961        );
1962    }
1963}